Struct xi::Stream [−][src]
A stream of events, values in time.
Streams have combinators to build “execution trees” working over events.
Memory
Some streams have “memory”. Streams with memory keeps a copy of the last value they produced so that any new subscriber will syncronously receive the value.
Streams with memory are explicitly created using
.remember()
, but also by other combinators
such as .fold()
and
.start_with()
.
Implementations
impl<T> Stream<T>
[src]
pub fn sink() -> Sink<T>
[src]
Create a sink that is used to push values into a stream.
let sink = xi::Stream::sink(); // collect values going into the sink let coll = sink.stream().collect(); sink.update(0); sink.update(1); sink.update(2); sink.end(); assert_eq!(coll.wait(), vec![0, 1, 2]);
pub fn of(value: T) -> Stream<T> where
T: Clone,
[src]
T: Clone,
Create a stream with memory that only emits one single value to anyone subscribing.
let value = xi::Stream::of(42); // both collectors will receive the value let coll1 = value.collect(); let coll2 = value.collect(); // use .take() since stream doesn't end assert_eq!(coll1.take(), [42]); assert_eq!(coll2.take(), [42]);
pub fn never() -> Stream<T>
[src]
Create a stream that never emits any value and never ends.
use xi::Stream; let never: Stream<u32> = Stream::never(); let coll = never.collect(); assert_eq!(coll.take(), vec![]);
pub fn has_memory(&self) -> bool
[src]
Check if this stream has “memory”.
Streams with memory keeps a copy of the last value they produced so that any new subscriber will syncronously receive the value.
Streams with memory are explicitly created using .remember()
, but also by
other combinators such as .fold()
and .start_with()
.
The memory is not inherited to child combinators. I.e.
let sink = xi::Stream::sink(); sink.update(0); // This stream has memory. let rem = sink.stream().remember(); // This filtered stream has _NO_ memory. let filt = rem.filter(|t| *t > 10); assert!(rem.has_memory()); assert!(!filt.has_memory());
pub fn imitator() -> Imitator<T> where
T: Clone,
[src]
T: Clone,
Creates an imitator. Imitators are used to make cyclic streams.
pub fn subscribe<F>(&self, f: F) -> Subscription where
F: FnMut(Option<&T>) + 'static,
[src]
F: FnMut(Option<&T>) + 'static,
Subscribe to events from this stream. The returned subscription can be used to unsubscribe at a future time.
Each value is wrapped in an Option
, there will be exactly one None event when
the stream ends.
let sink = xi::Stream::sink(); let stream = sink.stream(); let handle = std::thread::spawn(move || { // values are Some(0), Some(1), Some(2), None stream.subscribe(|v| if let Some(v) = v { println!("Got value: {}", v); }); // stall thread until stream ends. stream.wait(); }); sink.update(0); sink.update(1); sink.update(2); sink.end(); handle.join();
pub fn collect(&self) -> Collector<T> where
T: Clone,
[src]
T: Clone,
Collect events into a Collector
. This is mostly interesting for testing.
let sink = xi::Stream::sink(); // collect all values emitted into the sink let coll = sink.stream().collect(); std::thread::spawn(move || { sink.update(0); sink.update(1); sink.update(2); sink.end(); }); let result = coll.wait(); // wait for stream to end assert_eq!(result, vec![0, 1, 2]);
pub fn dedupe(&self) -> Stream<T> where
T: Clone + PartialEq,
[src]
T: Clone + PartialEq,
Dedupe stream by the event itself.
This clones every event to compare with the next.
let sink = xi::Stream::sink(); let deduped = sink.stream().dedupe(); let coll = deduped.collect(); sink.update(0); sink.update(0); sink.update(1); sink.update(1); sink.end(); assert_eq!(coll.wait(), vec![0, 1]);
pub fn dedupe_by<U, F>(&self, f: F) -> Stream<T> where
U: PartialEq + 'static,
F: FnMut(&T) -> U + 'static,
[src]
U: PartialEq + 'static,
F: FnMut(&T) -> U + 'static,
Dedupe stream by some extracted value.
use xi::{Stream, Sink}; #[derive(Clone, Debug)] struct Foo(&'static str, usize); let sink: Sink<Foo> = Stream::sink(); // dedupe this stream of Foo on the contained usize let deduped = sink.stream().dedupe_by(|v| v.1); let coll = deduped.collect(); sink.update(Foo("yo", 1)); sink.update(Foo("bro", 1)); sink.update(Foo("lo", 2)); sink.update(Foo("lo", 2)); sink.end(); assert_eq!(format!("{:?}", coll.wait()), "[Foo(\"yo\", 1), Foo(\"lo\", 2)]");
pub fn drop(&self, amount: usize) -> Stream<T>
[src]
Drop an amount of initial values.
let sink = xi::Stream::sink(); // drop 2 initial values let dropped = sink.stream().drop(2); let coll = dropped.collect(); sink.update(0); sink.update(1); sink.update(2); sink.update(3); sink.end(); assert_eq!(coll.wait(), vec![2, 3]);
pub fn drop_while<F>(&self, f: F) -> Stream<T> where
F: FnMut(&T) -> bool + 'static,
[src]
F: FnMut(&T) -> bool + 'static,
Don’t take values while some condition holds true. Once the condition is false, the resulting stream emits all events.
let sink = xi::Stream::sink(); // drop initial odd values let dropped = sink.stream().drop_while(|v| v % 2 == 1); let coll = dropped.collect(); sink.update(1); sink.update(3); sink.update(4); sink.update(5); // not dropped sink.end(); assert_eq!(coll.wait(), vec![4, 5]);
pub fn end_when<U>(&self, other: &Stream<U>) -> Stream<T>
[src]
Produce a stream that ends when some other stream ends.
use xi::Stream; let sink1 = Stream::sink(); let sink2 = Stream::sink(); // ending shows values of sink1, but ends when sink2 does. let ending = sink1.stream().end_when(&sink2.stream()); let coll = ending.collect(); sink1.update(0); sink2.update("yo"); sink1.update(1); sink2.end(); sink1.update(2); // collector never sees this value assert_eq!(coll.wait(), [0, 1]);
pub fn filter<F>(&self, f: F) -> Stream<T> where
F: FnMut(&T) -> bool + 'static,
[src]
F: FnMut(&T) -> bool + 'static,
Filter out a subset of the events in the stream.
let sink = xi::Stream::sink(); // keep even numbers let filtered = sink.stream().filter(|v| v % 2 == 0); let coll = filtered.collect(); sink.update(0); sink.update(1); sink.update(2); sink.end(); assert_eq!(coll.wait(), vec![0, 2]);
pub fn fold<U, F>(&self, seed: U, f: F) -> Stream<U> where
U: 'static,
F: FnMut(U, &T) -> U + 'static,
[src]
U: 'static,
F: FnMut(U, &T) -> U + 'static,
Combine events from the past, with new events to produce an output.
This is roughly equivalent to a “fold” or “reduce” over an array. For each event we emit the latest state out. The seed value is emitted straight away.
The result is always a “memory” stream.
let sink = xi::Stream::sink(); let folded = sink.stream() .fold(40.5, |prev, next| prev + (*next as f32) / 2.0); let coll = folded.collect(); sink.update(0); sink.update(1); sink.update(2); sink.end(); assert_eq!(coll.wait(), vec![40.5, 40.5, 41.0, 42.0]);
pub fn last(&self) -> Stream<T> where
T: Clone,
[src]
T: Clone,
Emits the last seen event when the stream closes.
let sink = xi::Stream::sink(); let coll = sink.stream().last().collect(); sink.update(0); sink.update(1); sink.update(2); sink.end(); assert_eq!(coll.wait(), vec![2]);
pub fn map<U, F>(&self, f: F) -> Stream<U> where
U: 'static,
F: FnMut(&T) -> U + 'static,
[src]
U: 'static,
F: FnMut(&T) -> U + 'static,
Transform events.
let sink = xi::Stream::sink(); let mapped = sink.stream().map(|v| format!("yo {}", v)); let coll = mapped.collect(); sink.update(41); sink.update(42); sink.end(); assert_eq!(coll.wait(), vec!["yo 41".to_string(), "yo 42".to_string()]);
pub fn map_to<U>(&self, u: U) -> Stream<U> where
U: Clone + 'static,
[src]
U: Clone + 'static,
For every event, emit a static value.
let sink = xi::Stream::sink(); let mapped = sink.stream().map_to(42.0); let coll = mapped.collect(); sink.update(0); sink.update(1); sink.end(); assert_eq!(coll.wait(), vec![42.0, 42.0]);
pub fn merge(streams: Vec<Stream<T>>) -> Stream<T>
[src]
Merge events from a bunch of streams to one stream.
use xi::Stream; let sink1 = Stream::sink(); let sink2 = Stream::sink(); let merged = Stream::merge(vec![ sink1.stream(), sink2.stream() ]); let coll = merged.collect(); sink1.update(0); sink2.update(10); sink1.update(1); sink2.update(11); sink1.end(); sink2.end(); assert_eq!(coll.wait(), vec![0, 10, 1, 11]);
pub fn remember(&self) -> Stream<T> where
T: Clone,
[src]
T: Clone,
Make a stream in memory mode. Each value is remembered for future subscribers.
let sink = xi::Stream::sink(); let rem = sink.stream().remember(); sink.update(0); sink.update(1); // receives last "remembered" value let coll = rem.collect(); sink.update(2); sink.end(); assert_eq!(coll.wait(), vec![1, 2]);
pub fn sample_combine<U>(&self, other: &Stream<U>) -> Stream<(T, U)> where
T: Clone,
U: Clone,
[src]
T: Clone,
U: Clone,
On every event in this stream, combine with the last value of the other stream.
use xi::Stream; let sink1 = Stream::sink(); let sink2 = Stream::sink(); let comb = sink1.stream().sample_combine(&sink2.stream()); let coll = comb.collect(); sink1.update(0); // lost, because no value in sink2 sink2.update("foo"); // doesn't trigger combine sink1.update(1); sink1.update(2); sink2.update("bar"); sink2.end(); // sink2 is "bar" forever sink1.update(3); sink1.end(); assert_eq!(coll.wait(), vec![(1, "foo"), (2, "foo"), (3, "bar")])
pub fn start_with(&self, start: T) -> Stream<T>
[src]
Prepend a start value to the stream. The result is a memory stream.
let sink = xi::Stream::sink(); sink.update(0); // lost let started = sink.stream().start_with(1); let coll = started.collect(); // receives 1 and following sink.update(2); sink.end(); assert_eq!(coll.wait(), vec![1, 2]);
pub fn take(&self, amount: usize) -> Stream<T>
[src]
Take a number of events, then end the stream.
let sink = xi::Stream::sink(); let take2 = sink.stream().take(2); let coll = take2.collect(); sink.update(0); sink.update(1); // take2 ends here sink.update(2); assert_eq!(coll.wait(), vec![0, 1]);
pub fn take_while<F>(&self, f: F) -> Stream<T> where
F: FnMut(&T) -> bool + 'static,
[src]
F: FnMut(&T) -> bool + 'static,
Take events from the stream as long as a condition holds true.
let sink = xi::Stream::sink(); // take events as long as they are even let take = sink.stream().take_while(|v| *v % 2 == 0); let coll = take.collect(); sink.update(0); sink.update(2); sink.update(3); // take ends here sink.update(4); assert_eq!(coll.wait(), vec![0, 2]);
pub fn wait(&self)
[src]
Stalls calling thread until the stream ends.
let sink = xi::Stream::sink(); let stream = sink.stream(); std::thread::spawn(move || { sink.update(0); sink.update(1); sink.update(2); sink.end(); // this releases the wait }); stream.wait(); // wait for other thread
impl<T> Stream<Stream<T>>
[src]
pub fn flatten(&self) -> Stream<T>
[src]
Flatten out a stream of streams, sequentially.
For each new stream, unsubscribe from the previous, and subscribe to the new. The new stream “interrupts” the previous stream.
use xi::{Stream, Sink}; let sink1: Sink<Stream<u32>> = Stream::sink(); let sink2: Sink<u32> = Stream::sink(); let sink3: Sink<u32> = Stream::sink(); let flat = sink1.stream().flatten(); let coll = flat.collect(); sink2.update(0); // lost sink1.update(sink2.stream()); sink2.update(1); sink2.update(2); sink2.end(); // does not end sink1 sink3.update(10); // lost sink1.update(sink3.stream()); sink3.update(11); sink1.end(); assert_eq!(coll.wait(), vec![1, 2, 11]);
pub fn flatten_concurrently(&self) -> Stream<T>
[src]
Flatten out a stream of streams, concurrently.
For each new stream, keep the previous, and subscribe to the new.
use xi::{Stream, Sink}; let sink1: Sink<Stream<u32>> = Stream::sink(); let sink2: Sink<u32> = Stream::sink(); let sink3: Sink<u32> = Stream::sink(); let flat = sink1.stream().flatten_concurrently(); let coll = flat.collect(); sink2.update(0); // lost sink1.update(sink2.stream()); sink2.update(1); sink2.update(2); sink3.update(10); // lost sink1.update(sink3.stream()); sink3.update(11); sink2.update(3); sink3.update(12); sink1.end(); assert_eq!(coll.wait(), vec![1, 2, 11, 3, 12]);
impl<A, B> Stream<(A, B)> where
A: Clone,
B: Clone,
A: Clone,
B: Clone,
pub fn combine2(a: &Stream<A>, b: &Stream<B>) -> Stream<(A, B)>
Combine a number of streams into one.
The resulting stream emits when any of the incoming streams emit, but only when all incoming have had an initial value.
impl<A, B, C> Stream<(A, B, C)> where
A: Clone,
B: Clone,
C: Clone,
A: Clone,
B: Clone,
C: Clone,
pub fn combine3(
a: &Stream<A>,
b: &Stream<B>,
c: &Stream<C>
) -> Stream<(A, B, C)>
a: &Stream<A>,
b: &Stream<B>,
c: &Stream<C>
) -> Stream<(A, B, C)>
Combine a number of streams into one.
The resulting stream emits when any of the incoming streams emit, but only when all incoming have had an initial value.
impl<A, B, C, D> Stream<(A, B, C, D)> where
A: Clone,
B: Clone,
C: Clone,
D: Clone,
A: Clone,
B: Clone,
C: Clone,
D: Clone,
pub fn combine4(
a: &Stream<A>,
b: &Stream<B>,
c: &Stream<C>,
d: &Stream<D>
) -> Stream<(A, B, C, D)>
a: &Stream<A>,
b: &Stream<B>,
c: &Stream<C>,
d: &Stream<D>
) -> Stream<(A, B, C, D)>
Combine a number of streams into one.
The resulting stream emits when any of the incoming streams emit, but only when all incoming have had an initial value.
impl<A, B, C, D, E> Stream<(A, B, C, D, E)> where
A: Clone,
B: Clone,
C: Clone,
D: Clone,
E: Clone,
A: Clone,
B: Clone,
C: Clone,
D: Clone,
E: Clone,
pub fn combine5(
a: &Stream<A>,
b: &Stream<B>,
c: &Stream<C>,
d: &Stream<D>,
e: &Stream<E>
) -> Stream<(A, B, C, D, E)>
a: &Stream<A>,
b: &Stream<B>,
c: &Stream<C>,
d: &Stream<D>,
e: &Stream<E>
) -> Stream<(A, B, C, D, E)>
Combine a number of streams into one.
The resulting stream emits when any of the incoming streams emit, but only when all incoming have had an initial value.
impl<A, B, C, D, E, F> Stream<(A, B, C, D, E, F)> where
A: Clone,
B: Clone,
C: Clone,
D: Clone,
E: Clone,
F: Clone,
A: Clone,
B: Clone,
C: Clone,
D: Clone,
E: Clone,
F: Clone,
pub fn combine6(
a: &Stream<A>,
b: &Stream<B>,
c: &Stream<C>,
d: &Stream<D>,
e: &Stream<E>,
f: &Stream<F>
) -> Stream<(A, B, C, D, E, F)>
a: &Stream<A>,
b: &Stream<B>,
c: &Stream<C>,
d: &Stream<D>,
e: &Stream<E>,
f: &Stream<F>
) -> Stream<(A, B, C, D, E, F)>
Combine a number of streams into one.
The resulting stream emits when any of the incoming streams emit, but only when all incoming have had an initial value.
impl<A, B, C, D, E, F, G> Stream<(A, B, C, D, E, F, G)> where
A: Clone,
B: Clone,
C: Clone,
D: Clone,
E: Clone,
F: Clone,
G: Clone,
A: Clone,
B: Clone,
C: Clone,
D: Clone,
E: Clone,
F: Clone,
G: Clone,
pub fn combine7(
a: &Stream<A>,
b: &Stream<B>,
c: &Stream<C>,
d: &Stream<D>,
e: &Stream<E>,
f: &Stream<F>,
g: &Stream<G>
) -> Stream<(A, B, C, D, E, F, G)>
a: &Stream<A>,
b: &Stream<B>,
c: &Stream<C>,
d: &Stream<D>,
e: &Stream<E>,
f: &Stream<F>,
g: &Stream<G>
) -> Stream<(A, B, C, D, E, F, G)>
Combine a number of streams into one.
The resulting stream emits when any of the incoming streams emit, but only when all incoming have had an initial value.
Trait Implementations
impl<T> Clone for Stream<T>
[src]
fn clone(&self) -> Self
[src]
pub fn clone_from(&mut self, source: &Self)
1.0.0[src]
Auto Trait Implementations
impl<T> RefUnwindSafe for Stream<T>
impl<T> Send for Stream<T> where
T: Send,
T: Send,
impl<T> Sync for Stream<T> where
T: Send,
T: Send,
impl<T> Unpin for Stream<T>
impl<T> UnwindSafe for Stream<T>
Blanket Implementations
impl<T> Any for T where
T: 'static + ?Sized,
[src]
T: 'static + ?Sized,
impl<T> Borrow<T> for T where
T: ?Sized,
[src]
T: ?Sized,
impl<T> BorrowMut<T> for T where
T: ?Sized,
[src]
T: ?Sized,
pub fn borrow_mut(&mut self) -> &mut T
[src]
impl<T> From<T> for T
[src]
impl<T, U> Into<U> for T where
U: From<T>,
[src]
U: From<T>,
impl<T> ToOwned for T where
T: Clone,
[src]
T: Clone,
type Owned = T
The resulting type after obtaining ownership.
pub fn to_owned(&self) -> T
[src]
pub fn clone_into(&self, target: &mut T)
[src]
impl<T, U> TryFrom<U> for T where
U: Into<T>,
[src]
U: Into<T>,
type Error = Infallible
The type returned in the event of a conversion error.
pub fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>
[src]
impl<T, U> TryInto<U> for T where
U: TryFrom<T>,
[src]
U: TryFrom<T>,