Struct xi::Stream[][src]

pub struct Stream<T: 'static> { /* fields omitted */ }

A stream of events, values in time.

Streams have combinators to build “execution trees” working over events.

Memory

Some streams have “memory”. Streams with memory keeps a copy of the last value they produced so that any new subscriber will syncronously receive the value.

Streams with memory are explicitly created using .remember(), but also by other combinators such as .fold() and .start_with().

Implementations

impl<T> Stream<T>[src]

pub fn sink() -> Sink<T>[src]

Create a sink that is used to push values into a stream.

let sink = xi::Stream::sink();

// collect values going into the sink
let coll = sink.stream().collect();

sink.update(0);
sink.update(1);
sink.update(2);
sink.end();

assert_eq!(coll.wait(), vec![0, 1, 2]);

pub fn of(value: T) -> Stream<T> where
    T: Clone
[src]

Create a stream with memory that only emits one single value to anyone subscribing.

let value = xi::Stream::of(42);

// both collectors will receive the value
let coll1 = value.collect();
let coll2 = value.collect();

// use .take() since stream doesn't end
assert_eq!(coll1.take(), [42]);
assert_eq!(coll2.take(), [42]);

pub fn never() -> Stream<T>[src]

Create a stream that never emits any value and never ends.

use xi::Stream;

let never: Stream<u32> = Stream::never();
let coll = never.collect();
assert_eq!(coll.take(), vec![]);

pub fn has_memory(&self) -> bool[src]

Check if this stream has “memory”.

Streams with memory keeps a copy of the last value they produced so that any new subscriber will syncronously receive the value.

Streams with memory are explicitly created using .remember(), but also by other combinators such as .fold() and .start_with().

The memory is not inherited to child combinators. I.e.

let sink = xi::Stream::sink();
sink.update(0);

// This stream has memory.
let rem = sink.stream().remember();

// This filtered stream has _NO_ memory.
let filt = rem.filter(|t| *t > 10);

assert!(rem.has_memory());
assert!(!filt.has_memory());

pub fn imitator() -> Imitator<T> where
    T: Clone
[src]

Creates an imitator. Imitators are used to make cyclic streams.

pub fn subscribe<F>(&self, f: F) -> Subscription where
    F: FnMut(Option<&T>) + 'static, 
[src]

Subscribe to events from this stream. The returned subscription can be used to unsubscribe at a future time.

Each value is wrapped in an Option, there will be exactly one None event when the stream ends.

let sink = xi::Stream::sink();
let stream = sink.stream();

let handle = std::thread::spawn(move || {

  // values are Some(0), Some(1), Some(2), None
  stream.subscribe(|v| if let Some(v) = v {
      println!("Got value: {}", v);
  });

  // stall thread until stream ends.
  stream.wait();
});

sink.update(0);
sink.update(1);
sink.update(2);
sink.end();

handle.join();

pub fn collect(&self) -> Collector<T> where
    T: Clone
[src]

Collect events into a Collector. This is mostly interesting for testing.

let sink = xi::Stream::sink();

// collect all values emitted into the sink
let coll = sink.stream().collect();

std::thread::spawn(move || {
  sink.update(0);
  sink.update(1);
  sink.update(2);
  sink.end();
});

let result = coll.wait(); // wait for stream to end
assert_eq!(result, vec![0, 1, 2]);

pub fn dedupe(&self) -> Stream<T> where
    T: Clone + PartialEq
[src]

Dedupe stream by the event itself.

This clones every event to compare with the next.

let sink = xi::Stream::sink();

let deduped = sink.stream().dedupe();

let coll = deduped.collect();

sink.update(0);
sink.update(0);
sink.update(1);
sink.update(1);
sink.end();

assert_eq!(coll.wait(), vec![0, 1]);

pub fn dedupe_by<U, F>(&self, f: F) -> Stream<T> where
    U: PartialEq + 'static,
    F: FnMut(&T) -> U + 'static, 
[src]

Dedupe stream by some extracted value.

use xi::{Stream, Sink};

#[derive(Clone, Debug)]
struct Foo(&'static str, usize);

let sink: Sink<Foo> = Stream::sink();

// dedupe this stream of Foo on the contained usize
let deduped = sink.stream().dedupe_by(|v| v.1);

let coll = deduped.collect();

sink.update(Foo("yo", 1));
sink.update(Foo("bro", 1));
sink.update(Foo("lo", 2));
sink.update(Foo("lo", 2));
sink.end();

assert_eq!(format!("{:?}", coll.wait()),
    "[Foo(\"yo\", 1), Foo(\"lo\", 2)]");

pub fn drop(&self, amount: usize) -> Stream<T>[src]

Drop an amount of initial values.

let sink = xi::Stream::sink();

// drop 2 initial values
let dropped = sink.stream().drop(2);

let coll = dropped.collect();

sink.update(0);
sink.update(1);
sink.update(2);
sink.update(3);
sink.end();

assert_eq!(coll.wait(), vec![2, 3]);

pub fn drop_while<F>(&self, f: F) -> Stream<T> where
    F: FnMut(&T) -> bool + 'static, 
[src]

Don’t take values while some condition holds true. Once the condition is false, the resulting stream emits all events.

let sink = xi::Stream::sink();

// drop initial odd values
let dropped = sink.stream().drop_while(|v| v % 2 == 1);

let coll = dropped.collect();

sink.update(1);
sink.update(3);
sink.update(4);
sink.update(5); // not dropped
sink.end();

assert_eq!(coll.wait(), vec![4, 5]);

pub fn end_when<U>(&self, other: &Stream<U>) -> Stream<T>[src]

Produce a stream that ends when some other stream ends.

use xi::Stream;

let sink1 = Stream::sink();
let sink2 = Stream::sink();

// ending shows values of sink1, but ends when sink2 does.
let ending = sink1.stream().end_when(&sink2.stream());

let coll = ending.collect();

sink1.update(0);
sink2.update("yo");
sink1.update(1);
sink2.end();
sink1.update(2); // collector never sees this value

assert_eq!(coll.wait(), [0, 1]);

pub fn filter<F>(&self, f: F) -> Stream<T> where
    F: FnMut(&T) -> bool + 'static, 
[src]

Filter out a subset of the events in the stream.

let sink = xi::Stream::sink();

// keep even numbers
let filtered = sink.stream().filter(|v| v % 2 == 0);

let coll = filtered.collect();

sink.update(0);
sink.update(1);
sink.update(2);
sink.end();

assert_eq!(coll.wait(), vec![0, 2]);

pub fn fold<U, F>(&self, seed: U, f: F) -> Stream<U> where
    U: 'static,
    F: FnMut(U, &T) -> U + 'static, 
[src]

Combine events from the past, with new events to produce an output.

This is roughly equivalent to a “fold” or “reduce” over an array. For each event we emit the latest state out. The seed value is emitted straight away.

The result is always a “memory” stream.

let sink = xi::Stream::sink();

let folded = sink.stream()
    .fold(40.5, |prev, next| prev + (*next as f32) / 2.0);

let coll = folded.collect();

sink.update(0);
sink.update(1);
sink.update(2);
sink.end();

assert_eq!(coll.wait(), vec![40.5, 40.5, 41.0, 42.0]);

pub fn last(&self) -> Stream<T> where
    T: Clone
[src]

Emits the last seen event when the stream closes.

let sink = xi::Stream::sink();

let coll = sink.stream().last().collect();

sink.update(0);
sink.update(1);
sink.update(2);
sink.end();

assert_eq!(coll.wait(), vec![2]);

pub fn map<U, F>(&self, f: F) -> Stream<U> where
    U: 'static,
    F: FnMut(&T) -> U + 'static, 
[src]

Transform events.

let sink = xi::Stream::sink();

let mapped = sink.stream().map(|v| format!("yo {}", v));

let coll = mapped.collect();

sink.update(41);
sink.update(42);
sink.end();

assert_eq!(coll.wait(),
    vec!["yo 41".to_string(), "yo 42".to_string()]);

pub fn map_to<U>(&self, u: U) -> Stream<U> where
    U: Clone + 'static, 
[src]

For every event, emit a static value.

let sink = xi::Stream::sink();

let mapped = sink.stream().map_to(42.0);

let coll = mapped.collect();

sink.update(0);
sink.update(1);
sink.end();

assert_eq!(coll.wait(), vec![42.0, 42.0]);

pub fn merge(streams: Vec<Stream<T>>) -> Stream<T>[src]

Merge events from a bunch of streams to one stream.

use xi::Stream;

let sink1 = Stream::sink();
let sink2 = Stream::sink();

let merged = Stream::merge(vec![
    sink1.stream(),
    sink2.stream()
]);

let coll = merged.collect();

sink1.update(0);
sink2.update(10);
sink1.update(1);
sink2.update(11);
sink1.end();
sink2.end();

assert_eq!(coll.wait(), vec![0, 10, 1, 11]);

pub fn remember(&self) -> Stream<T> where
    T: Clone
[src]

Make a stream in memory mode. Each value is remembered for future subscribers.

let sink = xi::Stream::sink();

let rem = sink.stream().remember();

sink.update(0);
sink.update(1);

// receives last "remembered" value
let coll = rem.collect();

sink.update(2);
sink.end();

assert_eq!(coll.wait(), vec![1, 2]);

pub fn sample_combine<U>(&self, other: &Stream<U>) -> Stream<(T, U)> where
    T: Clone,
    U: Clone
[src]

On every event in this stream, combine with the last value of the other stream.

use xi::Stream;

let sink1 = Stream::sink();
let sink2 = Stream::sink();

let comb = sink1.stream().sample_combine(&sink2.stream());

let coll = comb.collect();

sink1.update(0);     // lost, because no value in sink2
sink2.update("foo"); // doesn't trigger combine
sink1.update(1);
sink1.update(2);
sink2.update("bar");
sink2.end();         // sink2 is "bar" forever
sink1.update(3);
sink1.end();

assert_eq!(coll.wait(),
  vec![(1, "foo"), (2, "foo"), (3, "bar")])

pub fn start_with(&self, start: T) -> Stream<T>[src]

Prepend a start value to the stream. The result is a memory stream.

let sink = xi::Stream::sink();

sink.update(0); // lost

let started = sink.stream().start_with(1);

let coll = started.collect(); // receives 1 and following

sink.update(2);
sink.end();

assert_eq!(coll.wait(), vec![1, 2]);

pub fn take(&self, amount: usize) -> Stream<T>[src]

Take a number of events, then end the stream.

let sink = xi::Stream::sink();

let take2 = sink.stream().take(2);

let coll = take2.collect();

sink.update(0);
sink.update(1); // take2 ends here
sink.update(2);

assert_eq!(coll.wait(), vec![0, 1]);

pub fn take_while<F>(&self, f: F) -> Stream<T> where
    F: FnMut(&T) -> bool + 'static, 
[src]

Take events from the stream as long as a condition holds true.

let sink = xi::Stream::sink();

// take events as long as they are even
let take = sink.stream().take_while(|v| *v % 2 == 0);

let coll = take.collect();

sink.update(0);
sink.update(2);
sink.update(3); // take ends here
sink.update(4);

assert_eq!(coll.wait(), vec![0, 2]);

pub fn wait(&self)[src]

Stalls calling thread until the stream ends.

let sink = xi::Stream::sink();
let stream = sink.stream();

std::thread::spawn(move || {
  sink.update(0);
  sink.update(1);
  sink.update(2);
  sink.end(); // this releases the wait
});

stream.wait(); // wait for other thread

impl<T> Stream<Stream<T>>[src]

pub fn flatten(&self) -> Stream<T>[src]

Flatten out a stream of streams, sequentially.

For each new stream, unsubscribe from the previous, and subscribe to the new. The new stream “interrupts” the previous stream.

use xi::{Stream, Sink};

let sink1: Sink<Stream<u32>> = Stream::sink();
let sink2: Sink<u32> = Stream::sink();
let sink3: Sink<u32> = Stream::sink();

let flat = sink1.stream().flatten();

let coll = flat.collect();

sink2.update(0); // lost

sink1.update(sink2.stream());
sink2.update(1);
sink2.update(2);
sink2.end(); // does not end sink1

sink3.update(10); // lost

sink1.update(sink3.stream());
sink3.update(11);

sink1.end();

assert_eq!(coll.wait(), vec![1, 2, 11]);

pub fn flatten_concurrently(&self) -> Stream<T>[src]

Flatten out a stream of streams, concurrently.

For each new stream, keep the previous, and subscribe to the new.

use xi::{Stream, Sink};

let sink1: Sink<Stream<u32>> = Stream::sink();
let sink2: Sink<u32> = Stream::sink();
let sink3: Sink<u32> = Stream::sink();

let flat = sink1.stream().flatten_concurrently();

let coll = flat.collect();

sink2.update(0); // lost

sink1.update(sink2.stream());
sink2.update(1);
sink2.update(2);

sink3.update(10); // lost

sink1.update(sink3.stream());
sink3.update(11);
sink2.update(3);
sink3.update(12);

sink1.end();

assert_eq!(coll.wait(), vec![1, 2, 11, 3, 12]);

impl<A, B> Stream<(A, B)> where
    A: Clone,
    B: Clone

pub fn combine2(a: &Stream<A>, b: &Stream<B>) -> Stream<(A, B)>

Combine a number of streams into one.

The resulting stream emits when any of the incoming streams emit, but only when all incoming have had an initial value.

impl<A, B, C> Stream<(A, B, C)> where
    A: Clone,
    B: Clone,
    C: Clone

pub fn combine3(
    a: &Stream<A>,
    b: &Stream<B>,
    c: &Stream<C>
) -> Stream<(A, B, C)>

Combine a number of streams into one.

The resulting stream emits when any of the incoming streams emit, but only when all incoming have had an initial value.

impl<A, B, C, D> Stream<(A, B, C, D)> where
    A: Clone,
    B: Clone,
    C: Clone,
    D: Clone

pub fn combine4(
    a: &Stream<A>,
    b: &Stream<B>,
    c: &Stream<C>,
    d: &Stream<D>
) -> Stream<(A, B, C, D)>

Combine a number of streams into one.

The resulting stream emits when any of the incoming streams emit, but only when all incoming have had an initial value.

impl<A, B, C, D, E> Stream<(A, B, C, D, E)> where
    A: Clone,
    B: Clone,
    C: Clone,
    D: Clone,
    E: Clone

pub fn combine5(
    a: &Stream<A>,
    b: &Stream<B>,
    c: &Stream<C>,
    d: &Stream<D>,
    e: &Stream<E>
) -> Stream<(A, B, C, D, E)>

Combine a number of streams into one.

The resulting stream emits when any of the incoming streams emit, but only when all incoming have had an initial value.

impl<A, B, C, D, E, F> Stream<(A, B, C, D, E, F)> where
    A: Clone,
    B: Clone,
    C: Clone,
    D: Clone,
    E: Clone,
    F: Clone

pub fn combine6(
    a: &Stream<A>,
    b: &Stream<B>,
    c: &Stream<C>,
    d: &Stream<D>,
    e: &Stream<E>,
    f: &Stream<F>
) -> Stream<(A, B, C, D, E, F)>

Combine a number of streams into one.

The resulting stream emits when any of the incoming streams emit, but only when all incoming have had an initial value.

impl<A, B, C, D, E, F, G> Stream<(A, B, C, D, E, F, G)> where
    A: Clone,
    B: Clone,
    C: Clone,
    D: Clone,
    E: Clone,
    F: Clone,
    G: Clone

pub fn combine7(
    a: &Stream<A>,
    b: &Stream<B>,
    c: &Stream<C>,
    d: &Stream<D>,
    e: &Stream<E>,
    f: &Stream<F>,
    g: &Stream<G>
) -> Stream<(A, B, C, D, E, F, G)>

Combine a number of streams into one.

The resulting stream emits when any of the incoming streams emit, but only when all incoming have had an initial value.

Trait Implementations

impl<T> Clone for Stream<T>[src]

Auto Trait Implementations

impl<T> RefUnwindSafe for Stream<T>

impl<T> Send for Stream<T> where
    T: Send

impl<T> Sync for Stream<T> where
    T: Send

impl<T> Unpin for Stream<T>

impl<T> UnwindSafe for Stream<T>

Blanket Implementations

impl<T> Any for T where
    T: 'static + ?Sized
[src]

impl<T> Borrow<T> for T where
    T: ?Sized
[src]

impl<T> BorrowMut<T> for T where
    T: ?Sized
[src]

impl<T> From<T> for T[src]

impl<T, U> Into<U> for T where
    U: From<T>, 
[src]

impl<T> ToOwned for T where
    T: Clone
[src]

type Owned = T

The resulting type after obtaining ownership.

impl<T, U> TryFrom<U> for T where
    U: Into<T>, 
[src]

type Error = Infallible

The type returned in the event of a conversion error.

impl<T, U> TryInto<U> for T where
    U: TryFrom<T>, 
[src]

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.