pub struct Stream<T: 'static> { /* private fields */ }
Expand description
A stream of events, values in time.
Streams have combinators to build “execution trees” working over events.
§Memory
Some streams have “memory”. Streams with memory keeps a copy of the last value they produced so that any new subscriber will syncronously receive the value.
Streams with memory are explicitly created using
.remember()
, but also by other combinators
such as .fold()
and
.start_with()
.
Implementations§
Source§impl<T> Stream<T>
impl<T> Stream<T>
Sourcepub fn sink() -> Sink<T>
pub fn sink() -> Sink<T>
Create a sink that is used to push values into a stream.
let sink = froop::Stream::sink();
// collect values going into the sink
let coll = sink.stream().collect();
sink.update(0);
sink.update(1);
sink.update(2);
sink.end();
assert_eq!(coll.wait(), vec![0, 1, 2]);
Sourcepub fn of(value: T) -> Stream<T>where
T: Clone,
pub fn of(value: T) -> Stream<T>where
T: Clone,
Create a stream with memory that only emits one single value to anyone subscribing.
let value = froop::Stream::of(42);
// both collectors will receive the value
let coll1 = value.collect();
let coll2 = value.collect();
// use .take() since stream doesn't end
assert_eq!(coll1.take(), [42]);
assert_eq!(coll2.take(), [42]);
Sourcepub fn never() -> Stream<T>
pub fn never() -> Stream<T>
Create a stream that never emits any value and never ends.
use froop::Stream;
let never: Stream<u32> = Stream::never();
let coll = never.collect();
assert_eq!(coll.take(), vec![]);
Sourcepub fn has_memory(&self) -> bool
pub fn has_memory(&self) -> bool
Check if this stream has “memory”.
Streams with memory keeps a copy of the last value they produced so that any new subscriber will syncronously receive the value.
Streams with memory are explicitly created using .remember()
, but also by
other combinators such as .fold()
and .start_with()
.
The memory is not inherited to child combinators. I.e.
let sink = froop::Stream::sink();
sink.update(0);
// This stream has memory.
let rem = sink.stream().remember();
// This filtered stream has _NO_ memory.
let filt = rem.filter(|t| *t > 10);
assert!(rem.has_memory());
assert!(!filt.has_memory());
Sourcepub fn imitator() -> Imitator<T>where
T: Clone,
pub fn imitator() -> Imitator<T>where
T: Clone,
Creates an imitator. Imitators are used to make cyclic streams.
Sourcepub fn subscribe<F>(&self, f: F) -> Subscription
pub fn subscribe<F>(&self, f: F) -> Subscription
Subscribe to events from this stream. The returned subscription can be used to unsubscribe at a future time.
Each value is wrapped in an Option
, there will be exactly one None event when
the stream ends.
let sink = froop::Stream::sink();
let stream = sink.stream();
let handle = std::thread::spawn(move || {
// values are Some(0), Some(1), Some(2), None
stream.subscribe(|v| if let Some(v) = v {
println!("Got value: {}", v);
});
// stall thread until stream ends.
stream.wait();
});
sink.update(0);
sink.update(1);
sink.update(2);
sink.end();
handle.join();
Sourcepub fn collect(&self) -> Collector<T>where
T: Clone,
pub fn collect(&self) -> Collector<T>where
T: Clone,
Collect events into a Collector
. This is mostly interesting for testing.
let sink = froop::Stream::sink();
// collect all values emitted into the sink
let coll = sink.stream().collect();
std::thread::spawn(move || {
sink.update(0);
sink.update(1);
sink.update(2);
sink.end();
});
let result = coll.wait(); // wait for stream to end
assert_eq!(result, vec![0, 1, 2]);
Sourcepub fn dedupe(&self) -> Stream<T>
pub fn dedupe(&self) -> Stream<T>
Dedupe stream by the event itself.
This clones every event to compare with the next.
let sink = froop::Stream::sink();
let deduped = sink.stream().dedupe();
let coll = deduped.collect();
sink.update(0);
sink.update(0);
sink.update(1);
sink.update(1);
sink.end();
assert_eq!(coll.wait(), vec![0, 1]);
Sourcepub fn dedupe_by<U, F>(&self, f: F) -> Stream<T>
pub fn dedupe_by<U, F>(&self, f: F) -> Stream<T>
Dedupe stream by some extracted value.
use froop::{Stream, Sink};
#[derive(Clone, Debug)]
struct Foo(&'static str, usize);
let sink: Sink<Foo> = Stream::sink();
// dedupe this stream of Foo on the contained usize
let deduped = sink.stream().dedupe_by(|v| v.1);
let coll = deduped.collect();
sink.update(Foo("yo", 1));
sink.update(Foo("bro", 1));
sink.update(Foo("lo", 2));
sink.update(Foo("lo", 2));
sink.end();
assert_eq!(format!("{:?}", coll.wait()),
"[Foo(\"yo\", 1), Foo(\"lo\", 2)]");
Sourcepub fn drop(&self, amount: usize) -> Stream<T>
pub fn drop(&self, amount: usize) -> Stream<T>
Drop an amount of initial values.
let sink = froop::Stream::sink();
// drop 2 initial values
let dropped = sink.stream().drop(2);
let coll = dropped.collect();
sink.update(0);
sink.update(1);
sink.update(2);
sink.update(3);
sink.end();
assert_eq!(coll.wait(), vec![2, 3]);
Sourcepub fn drop_while<F>(&self, f: F) -> Stream<T>
pub fn drop_while<F>(&self, f: F) -> Stream<T>
Don’t take values while some condition holds true. Once the condition is false, the resulting stream emits all events.
let sink = froop::Stream::sink();
// drop initial odd values
let dropped = sink.stream().drop_while(|v| v % 2 == 1);
let coll = dropped.collect();
sink.update(1);
sink.update(3);
sink.update(4);
sink.update(5); // not dropped
sink.end();
assert_eq!(coll.wait(), vec![4, 5]);
Sourcepub fn end_when<U>(&self, other: &Stream<U>) -> Stream<T>
pub fn end_when<U>(&self, other: &Stream<U>) -> Stream<T>
Produce a stream that ends when some other stream ends.
use froop::Stream;
let sink1 = Stream::sink();
let sink2 = Stream::sink();
// ending shows values of sink1, but ends when sink2 does.
let ending = sink1.stream().end_when(&sink2.stream());
let coll = ending.collect();
sink1.update(0);
sink2.update("yo");
sink1.update(1);
sink2.end();
sink1.update(2); // collector never sees this value
assert_eq!(coll.wait(), [0, 1]);
Sourcepub fn filter<F>(&self, f: F) -> Stream<T>
pub fn filter<F>(&self, f: F) -> Stream<T>
Filter out a subset of the events in the stream.
let sink = froop::Stream::sink();
// keep even numbers
let filtered = sink.stream().filter(|v| v % 2 == 0);
let coll = filtered.collect();
sink.update(0);
sink.update(1);
sink.update(2);
sink.end();
assert_eq!(coll.wait(), vec![0, 2]);
Sourcepub fn fold<U, F>(&self, seed: U, f: F) -> Stream<U>
pub fn fold<U, F>(&self, seed: U, f: F) -> Stream<U>
Combine events from the past, with new events to produce an output.
This is roughly equivalent to a “fold” or “reduce” over an array. For each event we emit the latest state out. The seed value is emitted straight away.
The result is always a “memory” stream.
let sink = froop::Stream::sink();
let folded = sink.stream()
.fold(40.5, |prev, next| prev + (*next as f32) / 2.0);
let coll = folded.collect();
sink.update(0);
sink.update(1);
sink.update(2);
sink.end();
assert_eq!(coll.wait(), vec![40.5, 40.5, 41.0, 42.0]);
Sourcepub fn last(&self) -> Stream<T>where
T: Clone,
pub fn last(&self) -> Stream<T>where
T: Clone,
Emits the last seen event when the stream closes.
let sink = froop::Stream::sink();
let coll = sink.stream().last().collect();
sink.update(0);
sink.update(1);
sink.update(2);
sink.end();
assert_eq!(coll.wait(), vec![2]);
Sourcepub fn map<U, F>(&self, f: F) -> Stream<U>
pub fn map<U, F>(&self, f: F) -> Stream<U>
Transform events.
let sink = froop::Stream::sink();
let mapped = sink.stream().map(|v| format!("yo {}", v));
let coll = mapped.collect();
sink.update(41);
sink.update(42);
sink.end();
assert_eq!(coll.wait(),
vec!["yo 41".to_string(), "yo 42".to_string()]);
Sourcepub fn map_to<U>(&self, u: U) -> Stream<U>where
U: Clone + 'static,
pub fn map_to<U>(&self, u: U) -> Stream<U>where
U: Clone + 'static,
For every event, emit a static value.
let sink = froop::Stream::sink();
let mapped = sink.stream().map_to(42.0);
let coll = mapped.collect();
sink.update(0);
sink.update(1);
sink.end();
assert_eq!(coll.wait(), vec![42.0, 42.0]);
Sourcepub fn merge(streams: Vec<Stream<T>>) -> Stream<T>
pub fn merge(streams: Vec<Stream<T>>) -> Stream<T>
Merge events from a bunch of streams to one stream.
use froop::Stream;
let sink1 = Stream::sink();
let sink2 = Stream::sink();
let merged = Stream::merge(vec![
sink1.stream(),
sink2.stream()
]);
let coll = merged.collect();
sink1.update(0);
sink2.update(10);
sink1.update(1);
sink2.update(11);
sink1.end();
sink2.end();
assert_eq!(coll.wait(), vec![0, 10, 1, 11]);
Sourcepub fn remember(&self) -> Stream<T>where
T: Clone,
pub fn remember(&self) -> Stream<T>where
T: Clone,
Make a stream in memory mode. Each value is remembered for future subscribers.
let sink = froop::Stream::sink();
let rem = sink.stream().remember();
sink.update(0);
sink.update(1);
// receives last "remembered" value
let coll = rem.collect();
sink.update(2);
sink.end();
assert_eq!(coll.wait(), vec![1, 2]);
Sourcepub fn sample_combine<U>(&self, other: &Stream<U>) -> Stream<(T, U)>
pub fn sample_combine<U>(&self, other: &Stream<U>) -> Stream<(T, U)>
On every event in this stream, combine with the last value of the other stream.
use froop::Stream;
let sink1 = Stream::sink();
let sink2 = Stream::sink();
let comb = sink1.stream().sample_combine(&sink2.stream());
let coll = comb.collect();
sink1.update(0); // lost, because no value in sink2
sink2.update("foo"); // doesn't trigger combine
sink1.update(1);
sink1.update(2);
sink2.update("bar");
sink2.end(); // sink2 is "bar" forever
sink1.update(3);
sink1.end();
assert_eq!(coll.wait(),
vec![(1, "foo"), (2, "foo"), (3, "bar")])
Sourcepub fn start_with(&self, start: T) -> Stream<T>
pub fn start_with(&self, start: T) -> Stream<T>
Prepend a start value to the stream. The result is a memory stream.
let sink = froop::Stream::sink();
sink.update(0); // lost
let started = sink.stream().start_with(1);
let coll = started.collect(); // receives 1 and following
sink.update(2);
sink.end();
assert_eq!(coll.wait(), vec![1, 2]);
Sourcepub fn take(&self, amount: usize) -> Stream<T>
pub fn take(&self, amount: usize) -> Stream<T>
Take a number of events, then end the stream.
let sink = froop::Stream::sink();
let take2 = sink.stream().take(2);
let coll = take2.collect();
sink.update(0);
sink.update(1); // take2 ends here
sink.update(2);
assert_eq!(coll.wait(), vec![0, 1]);
Sourcepub fn take_while<F>(&self, f: F) -> Stream<T>
pub fn take_while<F>(&self, f: F) -> Stream<T>
Take events from the stream as long as a condition holds true.
let sink = froop::Stream::sink();
// take events as long as they are even
let take = sink.stream().take_while(|v| *v % 2 == 0);
let coll = take.collect();
sink.update(0);
sink.update(2);
sink.update(3); // take ends here
sink.update(4);
assert_eq!(coll.wait(), vec![0, 2]);
Source§impl<T> Stream<Stream<T>>
impl<T> Stream<Stream<T>>
Sourcepub fn flatten(&self) -> Stream<T>
pub fn flatten(&self) -> Stream<T>
Flatten out a stream of streams, sequentially.
For each new stream, unsubscribe from the previous, and subscribe to the new. The new stream “interrupts” the previous stream.
use froop::{Stream, Sink};
let sink1: Sink<Stream<u32>> = Stream::sink();
let sink2: Sink<u32> = Stream::sink();
let sink3: Sink<u32> = Stream::sink();
let flat = sink1.stream().flatten();
let coll = flat.collect();
sink2.update(0); // lost
sink1.update(sink2.stream());
sink2.update(1);
sink2.update(2);
sink2.end(); // does not end sink1
sink3.update(10); // lost
sink1.update(sink3.stream());
sink3.update(11);
sink1.end();
assert_eq!(coll.wait(), vec![1, 2, 11]);
Sourcepub fn flatten_concurrently(&self) -> Stream<T>
pub fn flatten_concurrently(&self) -> Stream<T>
Flatten out a stream of streams, concurrently.
For each new stream, keep the previous, and subscribe to the new.
use froop::{Stream, Sink};
let sink1: Sink<Stream<u32>> = Stream::sink();
let sink2: Sink<u32> = Stream::sink();
let sink3: Sink<u32> = Stream::sink();
let flat = sink1.stream().flatten_concurrently();
let coll = flat.collect();
sink2.update(0); // lost
sink1.update(sink2.stream());
sink2.update(1);
sink2.update(2);
sink3.update(10); // lost
sink1.update(sink3.stream());
sink3.update(11);
sink2.update(3);
sink3.update(12);
sink1.end();
assert_eq!(coll.wait(), vec![1, 2, 11, 3, 12]);
§impl<A, B, C, D> Stream<(A, B, C, D)>
impl<A, B, C, D> Stream<(A, B, C, D)>
§impl<A, B, C, D, E> Stream<(A, B, C, D, E)>
impl<A, B, C, D, E> Stream<(A, B, C, D, E)>
§impl<A, B, C, D, E, F> Stream<(A, B, C, D, E, F)>
impl<A, B, C, D, E, F> Stream<(A, B, C, D, E, F)>
§impl<A, B, C, D, E, F, G> Stream<(A, B, C, D, E, F, G)>
impl<A, B, C, D, E, F, G> Stream<(A, B, C, D, E, F, G)>
pub fn combine7(
a: &Stream<A>,
b: &Stream<B>,
c: &Stream<C>,
d: &Stream<D>,
e: &Stream<E>,
f: &Stream<F>,
g: &Stream<G>,
) -> Stream<(A, B, C, D, E, F, G)>
pub fn combine7( a: &Stream<A>, b: &Stream<B>, c: &Stream<C>, d: &Stream<D>, e: &Stream<E>, f: &Stream<F>, g: &Stream<G>, ) -> Stream<(A, B, C, D, E, F, G)>
Combine a number of streams into one.
The resulting stream emits when any of the incoming streams emit, but only when all incoming have had an initial value.