Struct carboxyl::Stream [] [src]

pub struct Stream<A> { /* fields omitted */ }

A stream of events.

Conceptually a stream can be thought of as a series of discrete events that occur at specific times. They are ordered by a transaction system. This means that firings of disjoint events can not interfere with each other. The consequences of one event are atomically reflected in dependent quantities.

Streams provide a number of primitive operations. These can be used to compose streams and combine them with signals. For instance, streams can be mapped over with a function, merged with another stream of the same type or filtered by some predicate.

Algebraic laws

Furthermore, streams satisfy certain algebraic properties that are useful to reason about them.

Monoid

For once, streams of the same type form a monoid under merging. The neutral element in this context is Stream::never(). So the following laws always hold for streams a, b and c of the same type:

  • Left identity: Stream::never().merge(&a) == a,
  • Right identity: a.merge(&Stream::never()) == a,
  • Associativity: a.merge(&b).merge(&c) == a.merge(&b.merge(&c)).

Note that equality in this context is not actually implemented as such, since comparing two (potentially infinite) streams is a prohibitive operation. Instead, the expressions above can be used interchangably and behave identically.

Functor

Under the mapping operation streams also become a functor. A functor is a generic type like Stream with some mapping operation that takes a function Fn(A) -> B to map a Stream<A> to a Stream<B>. Algebraically it satisfies the following laws:

  • The identity function is preserved: a.map(|x| x) == a,
  • Function composition is respected: a.map(f).map(g) == a.map(|x| g(f(x))).

Methods

impl<A: Clone + Send + Sync + 'static> Stream<A>
[src]

Create a stream that never fires. This can be useful in certain situations, where a stream is logically required, but no events are expected.

Map the stream to another stream using a function.

map applies a function to every event fired in this stream to create a new stream of type B.

let sink: Sink<i32> = Sink::new();
let mut events = sink.stream().map(|x| x + 4).events();
sink.send(3);
assert_eq!(events.next(), Some(7));

Filter a stream according to a predicate.

filter creates a new stream that only fires those events from the original stream that satisfy the predicate.

let sink: Sink<i32> = Sink::new();
let mut events = sink.stream()
    .filter(|&x| (x >= 4) && (x <= 10))
    .events();
sink.send(2); // won't arrive
sink.send(5); // will arrive
assert_eq!(events.next(), Some(5));

Both filter and map a stream.

This is equivalent to .map(f).filter_some().

let sink = Sink::new();
let mut events = sink.stream()
    .filter_map(|i| if i > 3 { Some(i + 2) } else { None })
    .events();
sink.send(2);
sink.send(4);
assert_eq!(events.next(), Some(6));

Merge with another stream.

merge takes two streams and creates a new stream that fires events from both input streams.

let sink_1 = Sink::<i32>::new();
let sink_2 = Sink::<i32>::new();
let mut events = sink_1.stream().merge(&sink_2.stream()).events();
sink_1.send(2);
assert_eq!(events.next(), Some(2));
sink_2.send(4);
assert_eq!(events.next(), Some(4));

Coalesce multiple event firings within the same transaction into a single event.

The function should ideally commute, as the order of events within a transaction is not well-defined.

Hold an event in a signal.

The resulting signal holds the value of the last event fired by the stream.

let sink = Sink::new();
let signal = sink.stream().hold(0);
assert_eq!(signal.sample(), 0);
sink.send(2);
assert_eq!(signal.sample(), 2);

A blocking iterator over the stream.

Scan a stream and accumulate its event firings in a signal.

Starting at some initial value, each new event changes the value of the resulting signal as prescribed by the supplied function.

let sink = Sink::new();
let sum = sink.stream().fold(0, |a, b| a + b);
assert_eq!(sum.sample(), 0);
sink.send(2);
assert_eq!(sum.sample(), 2);
sink.send(4);
assert_eq!(sum.sample(), 6);

impl<A: Clone + Send + Sync + 'static> Stream<Option<A>>
[src]

Filter a stream of options.

filter_some creates a new stream that only fires the unwrapped Some(…) events from the original stream omitting any None events.

let sink = Sink::new();
let mut events = sink.stream().filter_some().events();
sink.send(None); // won't arrive
sink.send(Some(5)); // will arrive
assert_eq!(events.next(), Some(5));

impl<A: Send + Sync + Clone + 'static> Stream<Stream<A>>
[src]

Switch between streams.

This takes a stream of streams and maps it to a new stream, which fires all events from the most recent stream fired into it.

Example

// Create sinks
let stream_sink: Sink<Stream<i32>> = Sink::new();
let sink1: Sink<i32> = Sink::new();
let sink2: Sink<i32> = Sink::new();

// Switch and listen
let switched = stream_sink.stream().switch();
let mut events = switched.events();

// Should not receive events from either sink
sink1.send(1); sink2.send(2);

// Now switch to sink 2
stream_sink.send(sink2.stream());
sink1.send(3); sink2.send(4);
assert_eq!(events.next(), Some(4));

// And then to sink 1
stream_sink.send(sink1.stream());
sink1.send(5); sink2.send(6);
assert_eq!(events.next(), Some(5));

Trait Implementations

impl<A> Clone for Stream<A>
[src]

Returns a copy of the value. Read more

Performs copy-assignment from source. Read more