pub struct Stream<T: 'static> { /* private fields */ }Expand description
A stream of events, values in time.
Streams have combinators to build “execution trees” working over events.
§Memory
Some streams have “memory”. Streams with memory keeps a copy of the last value they produced so that any new subscriber will syncronously receive the value.
Streams with memory are explicitly created using
.remember(), but also by other combinators
such as .fold() and
.start_with().
Implementations§
Source§impl<T> Stream<T>
impl<T> Stream<T>
Sourcepub fn sink() -> Sink<T>
pub fn sink() -> Sink<T>
Create a sink that is used to push values into a stream.
let sink = froop::Stream::sink();
// collect values going into the sink
let coll = sink.stream().collect();
sink.update(0);
sink.update(1);
sink.update(2);
sink.end();
assert_eq!(coll.wait(), vec![0, 1, 2]);Sourcepub fn of(value: T) -> Stream<T>where
T: Clone,
pub fn of(value: T) -> Stream<T>where
T: Clone,
Create a stream with memory that only emits one single value to anyone subscribing.
let value = froop::Stream::of(42);
// both collectors will receive the value
let coll1 = value.collect();
let coll2 = value.collect();
// use .take() since stream doesn't end
assert_eq!(coll1.take(), [42]);
assert_eq!(coll2.take(), [42]);Sourcepub fn never() -> Stream<T>
pub fn never() -> Stream<T>
Create a stream that never emits any value and never ends.
use froop::Stream;
let never: Stream<u32> = Stream::never();
let coll = never.collect();
assert_eq!(coll.take(), vec![]);Sourcepub fn has_memory(&self) -> bool
pub fn has_memory(&self) -> bool
Check if this stream has “memory”.
Streams with memory keeps a copy of the last value they produced so that any new subscriber will syncronously receive the value.
Streams with memory are explicitly created using .remember(), but also by
other combinators such as .fold() and .start_with().
The memory is not inherited to child combinators. I.e.
let sink = froop::Stream::sink();
sink.update(0);
// This stream has memory.
let rem = sink.stream().remember();
// This filtered stream has _NO_ memory.
let filt = rem.filter(|t| *t > 10);
assert!(rem.has_memory());
assert!(!filt.has_memory());Sourcepub fn imitator() -> Imitator<T>where
T: Clone,
pub fn imitator() -> Imitator<T>where
T: Clone,
Creates an imitator. Imitators are used to make cyclic streams.
Sourcepub fn subscribe<F>(&self, f: F) -> Subscription
pub fn subscribe<F>(&self, f: F) -> Subscription
Subscribe to events from this stream. The returned subscription can be used to unsubscribe at a future time.
Each value is wrapped in an Option, there will be exactly one None event when
the stream ends.
let sink = froop::Stream::sink();
let stream = sink.stream();
let handle = std::thread::spawn(move || {
// values are Some(0), Some(1), Some(2), None
stream.subscribe(|v| if let Some(v) = v {
println!("Got value: {}", v);
});
// stall thread until stream ends.
stream.wait();
});
sink.update(0);
sink.update(1);
sink.update(2);
sink.end();
handle.join();Sourcepub fn collect(&self) -> Collector<T>where
T: Clone,
pub fn collect(&self) -> Collector<T>where
T: Clone,
Collect events into a Collector. This is mostly interesting for testing.
let sink = froop::Stream::sink();
// collect all values emitted into the sink
let coll = sink.stream().collect();
std::thread::spawn(move || {
sink.update(0);
sink.update(1);
sink.update(2);
sink.end();
});
let result = coll.wait(); // wait for stream to end
assert_eq!(result, vec![0, 1, 2]);Sourcepub fn dedupe(&self) -> Stream<T>
pub fn dedupe(&self) -> Stream<T>
Dedupe stream by the event itself.
This clones every event to compare with the next.
let sink = froop::Stream::sink();
let deduped = sink.stream().dedupe();
let coll = deduped.collect();
sink.update(0);
sink.update(0);
sink.update(1);
sink.update(1);
sink.end();
assert_eq!(coll.wait(), vec![0, 1]);Sourcepub fn dedupe_by<U, F>(&self, f: F) -> Stream<T>
pub fn dedupe_by<U, F>(&self, f: F) -> Stream<T>
Dedupe stream by some extracted value.
use froop::{Stream, Sink};
#[derive(Clone, Debug)]
struct Foo(&'static str, usize);
let sink: Sink<Foo> = Stream::sink();
// dedupe this stream of Foo on the contained usize
let deduped = sink.stream().dedupe_by(|v| v.1);
let coll = deduped.collect();
sink.update(Foo("yo", 1));
sink.update(Foo("bro", 1));
sink.update(Foo("lo", 2));
sink.update(Foo("lo", 2));
sink.end();
assert_eq!(format!("{:?}", coll.wait()),
"[Foo(\"yo\", 1), Foo(\"lo\", 2)]");Sourcepub fn drop(&self, amount: usize) -> Stream<T>
pub fn drop(&self, amount: usize) -> Stream<T>
Drop an amount of initial values.
let sink = froop::Stream::sink();
// drop 2 initial values
let dropped = sink.stream().drop(2);
let coll = dropped.collect();
sink.update(0);
sink.update(1);
sink.update(2);
sink.update(3);
sink.end();
assert_eq!(coll.wait(), vec![2, 3]);Sourcepub fn drop_while<F>(&self, f: F) -> Stream<T>
pub fn drop_while<F>(&self, f: F) -> Stream<T>
Don’t take values while some condition holds true. Once the condition is false, the resulting stream emits all events.
let sink = froop::Stream::sink();
// drop initial odd values
let dropped = sink.stream().drop_while(|v| v % 2 == 1);
let coll = dropped.collect();
sink.update(1);
sink.update(3);
sink.update(4);
sink.update(5); // not dropped
sink.end();
assert_eq!(coll.wait(), vec![4, 5]);Sourcepub fn end_when<U>(&self, other: &Stream<U>) -> Stream<T>
pub fn end_when<U>(&self, other: &Stream<U>) -> Stream<T>
Produce a stream that ends when some other stream ends.
use froop::Stream;
let sink1 = Stream::sink();
let sink2 = Stream::sink();
// ending shows values of sink1, but ends when sink2 does.
let ending = sink1.stream().end_when(&sink2.stream());
let coll = ending.collect();
sink1.update(0);
sink2.update("yo");
sink1.update(1);
sink2.end();
sink1.update(2); // collector never sees this value
assert_eq!(coll.wait(), [0, 1]);Sourcepub fn filter<F>(&self, f: F) -> Stream<T>
pub fn filter<F>(&self, f: F) -> Stream<T>
Filter out a subset of the events in the stream.
let sink = froop::Stream::sink();
// keep even numbers
let filtered = sink.stream().filter(|v| v % 2 == 0);
let coll = filtered.collect();
sink.update(0);
sink.update(1);
sink.update(2);
sink.end();
assert_eq!(coll.wait(), vec![0, 2]);Sourcepub fn fold<U, F>(&self, seed: U, f: F) -> Stream<U>
pub fn fold<U, F>(&self, seed: U, f: F) -> Stream<U>
Combine events from the past, with new events to produce an output.
This is roughly equivalent to a “fold” or “reduce” over an array. For each event we emit the latest state out. The seed value is emitted straight away.
The result is always a “memory” stream.
let sink = froop::Stream::sink();
let folded = sink.stream()
.fold(40.5, |prev, next| prev + (*next as f32) / 2.0);
let coll = folded.collect();
sink.update(0);
sink.update(1);
sink.update(2);
sink.end();
assert_eq!(coll.wait(), vec![40.5, 40.5, 41.0, 42.0]);Sourcepub fn last(&self) -> Stream<T>where
T: Clone,
pub fn last(&self) -> Stream<T>where
T: Clone,
Emits the last seen event when the stream closes.
let sink = froop::Stream::sink();
let coll = sink.stream().last().collect();
sink.update(0);
sink.update(1);
sink.update(2);
sink.end();
assert_eq!(coll.wait(), vec![2]);Sourcepub fn map<U, F>(&self, f: F) -> Stream<U>
pub fn map<U, F>(&self, f: F) -> Stream<U>
Transform events.
let sink = froop::Stream::sink();
let mapped = sink.stream().map(|v| format!("yo {}", v));
let coll = mapped.collect();
sink.update(41);
sink.update(42);
sink.end();
assert_eq!(coll.wait(),
vec!["yo 41".to_string(), "yo 42".to_string()]);Sourcepub fn map_to<U>(&self, u: U) -> Stream<U>where
U: Clone + 'static,
pub fn map_to<U>(&self, u: U) -> Stream<U>where
U: Clone + 'static,
For every event, emit a static value.
let sink = froop::Stream::sink();
let mapped = sink.stream().map_to(42.0);
let coll = mapped.collect();
sink.update(0);
sink.update(1);
sink.end();
assert_eq!(coll.wait(), vec![42.0, 42.0]);Sourcepub fn merge(streams: Vec<Stream<T>>) -> Stream<T>
pub fn merge(streams: Vec<Stream<T>>) -> Stream<T>
Merge events from a bunch of streams to one stream.
use froop::Stream;
let sink1 = Stream::sink();
let sink2 = Stream::sink();
let merged = Stream::merge(vec![
sink1.stream(),
sink2.stream()
]);
let coll = merged.collect();
sink1.update(0);
sink2.update(10);
sink1.update(1);
sink2.update(11);
sink1.end();
sink2.end();
assert_eq!(coll.wait(), vec![0, 10, 1, 11]);Sourcepub fn remember(&self) -> Stream<T>where
T: Clone,
pub fn remember(&self) -> Stream<T>where
T: Clone,
Make a stream in memory mode. Each value is remembered for future subscribers.
let sink = froop::Stream::sink();
let rem = sink.stream().remember();
sink.update(0);
sink.update(1);
// receives last "remembered" value
let coll = rem.collect();
sink.update(2);
sink.end();
assert_eq!(coll.wait(), vec![1, 2]);Sourcepub fn sample_combine<U>(&self, other: &Stream<U>) -> Stream<(T, U)>
pub fn sample_combine<U>(&self, other: &Stream<U>) -> Stream<(T, U)>
On every event in this stream, combine with the last value of the other stream.
use froop::Stream;
let sink1 = Stream::sink();
let sink2 = Stream::sink();
let comb = sink1.stream().sample_combine(&sink2.stream());
let coll = comb.collect();
sink1.update(0); // lost, because no value in sink2
sink2.update("foo"); // doesn't trigger combine
sink1.update(1);
sink1.update(2);
sink2.update("bar");
sink2.end(); // sink2 is "bar" forever
sink1.update(3);
sink1.end();
assert_eq!(coll.wait(),
vec![(1, "foo"), (2, "foo"), (3, "bar")])Sourcepub fn start_with(&self, start: T) -> Stream<T>
pub fn start_with(&self, start: T) -> Stream<T>
Prepend a start value to the stream. The result is a memory stream.
let sink = froop::Stream::sink();
sink.update(0); // lost
let started = sink.stream().start_with(1);
let coll = started.collect(); // receives 1 and following
sink.update(2);
sink.end();
assert_eq!(coll.wait(), vec![1, 2]);Sourcepub fn take(&self, amount: usize) -> Stream<T>
pub fn take(&self, amount: usize) -> Stream<T>
Take a number of events, then end the stream.
let sink = froop::Stream::sink();
let take2 = sink.stream().take(2);
let coll = take2.collect();
sink.update(0);
sink.update(1); // take2 ends here
sink.update(2);
assert_eq!(coll.wait(), vec![0, 1]);Sourcepub fn take_while<F>(&self, f: F) -> Stream<T>
pub fn take_while<F>(&self, f: F) -> Stream<T>
Take events from the stream as long as a condition holds true.
let sink = froop::Stream::sink();
// take events as long as they are even
let take = sink.stream().take_while(|v| *v % 2 == 0);
let coll = take.collect();
sink.update(0);
sink.update(2);
sink.update(3); // take ends here
sink.update(4);
assert_eq!(coll.wait(), vec![0, 2]);Source§impl<T> Stream<Stream<T>>
impl<T> Stream<Stream<T>>
Sourcepub fn flatten(&self) -> Stream<T>
pub fn flatten(&self) -> Stream<T>
Flatten out a stream of streams, sequentially.
For each new stream, unsubscribe from the previous, and subscribe to the new. The new stream “interrupts” the previous stream.
use froop::{Stream, Sink};
let sink1: Sink<Stream<u32>> = Stream::sink();
let sink2: Sink<u32> = Stream::sink();
let sink3: Sink<u32> = Stream::sink();
let flat = sink1.stream().flatten();
let coll = flat.collect();
sink2.update(0); // lost
sink1.update(sink2.stream());
sink2.update(1);
sink2.update(2);
sink2.end(); // does not end sink1
sink3.update(10); // lost
sink1.update(sink3.stream());
sink3.update(11);
sink1.end();
assert_eq!(coll.wait(), vec![1, 2, 11]);Sourcepub fn flatten_concurrently(&self) -> Stream<T>
pub fn flatten_concurrently(&self) -> Stream<T>
Flatten out a stream of streams, concurrently.
For each new stream, keep the previous, and subscribe to the new.
use froop::{Stream, Sink};
let sink1: Sink<Stream<u32>> = Stream::sink();
let sink2: Sink<u32> = Stream::sink();
let sink3: Sink<u32> = Stream::sink();
let flat = sink1.stream().flatten_concurrently();
let coll = flat.collect();
sink2.update(0); // lost
sink1.update(sink2.stream());
sink2.update(1);
sink2.update(2);
sink3.update(10); // lost
sink1.update(sink3.stream());
sink3.update(11);
sink2.update(3);
sink3.update(12);
sink1.end();
assert_eq!(coll.wait(), vec![1, 2, 11, 3, 12]);§impl<A, B, C, D> Stream<(A, B, C, D)>
impl<A, B, C, D> Stream<(A, B, C, D)>
§impl<A, B, C, D, E> Stream<(A, B, C, D, E)>
impl<A, B, C, D, E> Stream<(A, B, C, D, E)>
§impl<A, B, C, D, E, F> Stream<(A, B, C, D, E, F)>
impl<A, B, C, D, E, F> Stream<(A, B, C, D, E, F)>
§impl<A, B, C, D, E, F, G> Stream<(A, B, C, D, E, F, G)>
impl<A, B, C, D, E, F, G> Stream<(A, B, C, D, E, F, G)>
pub fn combine7(
a: &Stream<A>,
b: &Stream<B>,
c: &Stream<C>,
d: &Stream<D>,
e: &Stream<E>,
f: &Stream<F>,
g: &Stream<G>,
) -> Stream<(A, B, C, D, E, F, G)>
pub fn combine7( a: &Stream<A>, b: &Stream<B>, c: &Stream<C>, d: &Stream<D>, e: &Stream<E>, f: &Stream<F>, g: &Stream<G>, ) -> Stream<(A, B, C, D, E, F, G)>
Combine a number of streams into one.
The resulting stream emits when any of the incoming streams emit, but only when all incoming have had an initial value.