Struct Stream

Source
pub struct Stream<T: 'static> { /* private fields */ }
Expand description

A stream of events, values in time.

Streams have combinators to build “execution trees” working over events.

§Memory

Some streams have “memory”. Streams with memory keeps a copy of the last value they produced so that any new subscriber will syncronously receive the value.

Streams with memory are explicitly created using .remember(), but also by other combinators such as .fold() and .start_with().

Implementations§

Source§

impl<T> Stream<T>

Source

pub fn sink() -> Sink<T>

Create a sink that is used to push values into a stream.

let sink = froop::Stream::sink();

// collect values going into the sink
let coll = sink.stream().collect();

sink.update(0);
sink.update(1);
sink.update(2);
sink.end();

assert_eq!(coll.wait(), vec![0, 1, 2]);
Source

pub fn of(value: T) -> Stream<T>
where T: Clone,

Create a stream with memory that only emits one single value to anyone subscribing.

let value = froop::Stream::of(42);

// both collectors will receive the value
let coll1 = value.collect();
let coll2 = value.collect();

// use .take() since stream doesn't end
assert_eq!(coll1.take(), [42]);
assert_eq!(coll2.take(), [42]);
Source

pub fn never() -> Stream<T>

Create a stream that never emits any value and never ends.

use froop::Stream;

let never: Stream<u32> = Stream::never();
let coll = never.collect();
assert_eq!(coll.take(), vec![]);
Source

pub fn has_memory(&self) -> bool

Check if this stream has “memory”.

Streams with memory keeps a copy of the last value they produced so that any new subscriber will syncronously receive the value.

Streams with memory are explicitly created using .remember(), but also by other combinators such as .fold() and .start_with().

The memory is not inherited to child combinators. I.e.

let sink = froop::Stream::sink();
sink.update(0);

// This stream has memory.
let rem = sink.stream().remember();

// This filtered stream has _NO_ memory.
let filt = rem.filter(|t| *t > 10);

assert!(rem.has_memory());
assert!(!filt.has_memory());
Source

pub fn imitator() -> Imitator<T>
where T: Clone,

Creates an imitator. Imitators are used to make cyclic streams.

Source

pub fn subscribe<F>(&self, f: F) -> Subscription
where F: FnMut(Option<&T>) + 'static,

Subscribe to events from this stream. The returned subscription can be used to unsubscribe at a future time.

Each value is wrapped in an Option, there will be exactly one None event when the stream ends.

let sink = froop::Stream::sink();
let stream = sink.stream();

let handle = std::thread::spawn(move || {

  // values are Some(0), Some(1), Some(2), None
  stream.subscribe(|v| if let Some(v) = v {
      println!("Got value: {}", v);
  });

  // stall thread until stream ends.
  stream.wait();
});

sink.update(0);
sink.update(1);
sink.update(2);
sink.end();

handle.join();
Source

pub fn collect(&self) -> Collector<T>
where T: Clone,

Collect events into a Collector. This is mostly interesting for testing.

let sink = froop::Stream::sink();

// collect all values emitted into the sink
let coll = sink.stream().collect();

std::thread::spawn(move || {
  sink.update(0);
  sink.update(1);
  sink.update(2);
  sink.end();
});

let result = coll.wait(); // wait for stream to end
assert_eq!(result, vec![0, 1, 2]);
Source

pub fn dedupe(&self) -> Stream<T>
where T: Clone + PartialEq,

Dedupe stream by the event itself.

This clones every event to compare with the next.

let sink = froop::Stream::sink();

let deduped = sink.stream().dedupe();

let coll = deduped.collect();

sink.update(0);
sink.update(0);
sink.update(1);
sink.update(1);
sink.end();

assert_eq!(coll.wait(), vec![0, 1]);
Source

pub fn dedupe_by<U, F>(&self, f: F) -> Stream<T>
where U: PartialEq + 'static, F: FnMut(&T) -> U + 'static,

Dedupe stream by some extracted value.

use froop::{Stream, Sink};

#[derive(Clone, Debug)]
struct Foo(&'static str, usize);

let sink: Sink<Foo> = Stream::sink();

// dedupe this stream of Foo on the contained usize
let deduped = sink.stream().dedupe_by(|v| v.1);

let coll = deduped.collect();

sink.update(Foo("yo", 1));
sink.update(Foo("bro", 1));
sink.update(Foo("lo", 2));
sink.update(Foo("lo", 2));
sink.end();

assert_eq!(format!("{:?}", coll.wait()),
    "[Foo(\"yo\", 1), Foo(\"lo\", 2)]");
Source

pub fn drop(&self, amount: usize) -> Stream<T>

Drop an amount of initial values.

let sink = froop::Stream::sink();

// drop 2 initial values
let dropped = sink.stream().drop(2);

let coll = dropped.collect();

sink.update(0);
sink.update(1);
sink.update(2);
sink.update(3);
sink.end();

assert_eq!(coll.wait(), vec![2, 3]);
Source

pub fn drop_while<F>(&self, f: F) -> Stream<T>
where F: FnMut(&T) -> bool + 'static,

Don’t take values while some condition holds true. Once the condition is false, the resulting stream emits all events.

let sink = froop::Stream::sink();

// drop initial odd values
let dropped = sink.stream().drop_while(|v| v % 2 == 1);

let coll = dropped.collect();

sink.update(1);
sink.update(3);
sink.update(4);
sink.update(5); // not dropped
sink.end();

assert_eq!(coll.wait(), vec![4, 5]);
Source

pub fn end_when<U>(&self, other: &Stream<U>) -> Stream<T>

Produce a stream that ends when some other stream ends.

use froop::Stream;

let sink1 = Stream::sink();
let sink2 = Stream::sink();

// ending shows values of sink1, but ends when sink2 does.
let ending = sink1.stream().end_when(&sink2.stream());

let coll = ending.collect();

sink1.update(0);
sink2.update("yo");
sink1.update(1);
sink2.end();
sink1.update(2); // collector never sees this value

assert_eq!(coll.wait(), [0, 1]);
Source

pub fn filter<F>(&self, f: F) -> Stream<T>
where F: FnMut(&T) -> bool + 'static,

Filter out a subset of the events in the stream.

let sink = froop::Stream::sink();

// keep even numbers
let filtered = sink.stream().filter(|v| v % 2 == 0);

let coll = filtered.collect();

sink.update(0);
sink.update(1);
sink.update(2);
sink.end();

assert_eq!(coll.wait(), vec![0, 2]);
Source

pub fn fold<U, F>(&self, seed: U, f: F) -> Stream<U>
where U: 'static, F: FnMut(U, &T) -> U + 'static,

Combine events from the past, with new events to produce an output.

This is roughly equivalent to a “fold” or “reduce” over an array. For each event we emit the latest state out. The seed value is emitted straight away.

The result is always a “memory” stream.

let sink = froop::Stream::sink();

let folded = sink.stream()
    .fold(40.5, |prev, next| prev + (*next as f32) / 2.0);

let coll = folded.collect();

sink.update(0);
sink.update(1);
sink.update(2);
sink.end();

assert_eq!(coll.wait(), vec![40.5, 40.5, 41.0, 42.0]);
Source

pub fn last(&self) -> Stream<T>
where T: Clone,

Emits the last seen event when the stream closes.

let sink = froop::Stream::sink();

let coll = sink.stream().last().collect();

sink.update(0);
sink.update(1);
sink.update(2);
sink.end();

assert_eq!(coll.wait(), vec![2]);
Source

pub fn map<U, F>(&self, f: F) -> Stream<U>
where U: 'static, F: FnMut(&T) -> U + 'static,

Transform events.

let sink = froop::Stream::sink();

let mapped = sink.stream().map(|v| format!("yo {}", v));

let coll = mapped.collect();

sink.update(41);
sink.update(42);
sink.end();

assert_eq!(coll.wait(),
    vec!["yo 41".to_string(), "yo 42".to_string()]);
Source

pub fn map_to<U>(&self, u: U) -> Stream<U>
where U: Clone + 'static,

For every event, emit a static value.

let sink = froop::Stream::sink();

let mapped = sink.stream().map_to(42.0);

let coll = mapped.collect();

sink.update(0);
sink.update(1);
sink.end();

assert_eq!(coll.wait(), vec![42.0, 42.0]);
Source

pub fn merge(streams: Vec<Stream<T>>) -> Stream<T>

Merge events from a bunch of streams to one stream.

use froop::Stream;

let sink1 = Stream::sink();
let sink2 = Stream::sink();

let merged = Stream::merge(vec![
    sink1.stream(),
    sink2.stream()
]);

let coll = merged.collect();

sink1.update(0);
sink2.update(10);
sink1.update(1);
sink2.update(11);
sink1.end();
sink2.end();

assert_eq!(coll.wait(), vec![0, 10, 1, 11]);
Source

pub fn remember(&self) -> Stream<T>
where T: Clone,

Make a stream in memory mode. Each value is remembered for future subscribers.

let sink = froop::Stream::sink();

let rem = sink.stream().remember();

sink.update(0);
sink.update(1);

// receives last "remembered" value
let coll = rem.collect();

sink.update(2);
sink.end();

assert_eq!(coll.wait(), vec![1, 2]);
Source

pub fn sample_combine<U>(&self, other: &Stream<U>) -> Stream<(T, U)>
where T: Clone, U: Clone,

On every event in this stream, combine with the last value of the other stream.

use froop::Stream;

let sink1 = Stream::sink();
let sink2 = Stream::sink();

let comb = sink1.stream().sample_combine(&sink2.stream());

let coll = comb.collect();

sink1.update(0);     // lost, because no value in sink2
sink2.update("foo"); // doesn't trigger combine
sink1.update(1);
sink1.update(2);
sink2.update("bar");
sink2.end();         // sink2 is "bar" forever
sink1.update(3);
sink1.end();

assert_eq!(coll.wait(),
  vec![(1, "foo"), (2, "foo"), (3, "bar")])
Source

pub fn start_with(&self, start: T) -> Stream<T>

Prepend a start value to the stream. The result is a memory stream.

let sink = froop::Stream::sink();

sink.update(0); // lost

let started = sink.stream().start_with(1);

let coll = started.collect(); // receives 1 and following

sink.update(2);
sink.end();

assert_eq!(coll.wait(), vec![1, 2]);
Source

pub fn take(&self, amount: usize) -> Stream<T>

Take a number of events, then end the stream.

let sink = froop::Stream::sink();

let take2 = sink.stream().take(2);

let coll = take2.collect();

sink.update(0);
sink.update(1); // take2 ends here
sink.update(2);

assert_eq!(coll.wait(), vec![0, 1]);
Source

pub fn take_while<F>(&self, f: F) -> Stream<T>
where F: FnMut(&T) -> bool + 'static,

Take events from the stream as long as a condition holds true.

let sink = froop::Stream::sink();

// take events as long as they are even
let take = sink.stream().take_while(|v| *v % 2 == 0);

let coll = take.collect();

sink.update(0);
sink.update(2);
sink.update(3); // take ends here
sink.update(4);

assert_eq!(coll.wait(), vec![0, 2]);
Source

pub fn wait(&self)

Stalls calling thread until the stream ends.

let sink = froop::Stream::sink();
let stream = sink.stream();

std::thread::spawn(move || {
  sink.update(0);
  sink.update(1);
  sink.update(2);
  sink.end(); // this releases the wait
});

stream.wait(); // wait for other thread
Source§

impl<T> Stream<Stream<T>>

Source

pub fn flatten(&self) -> Stream<T>

Flatten out a stream of streams, sequentially.

For each new stream, unsubscribe from the previous, and subscribe to the new. The new stream “interrupts” the previous stream.

use froop::{Stream, Sink};

let sink1: Sink<Stream<u32>> = Stream::sink();
let sink2: Sink<u32> = Stream::sink();
let sink3: Sink<u32> = Stream::sink();

let flat = sink1.stream().flatten();

let coll = flat.collect();

sink2.update(0); // lost

sink1.update(sink2.stream());
sink2.update(1);
sink2.update(2);
sink2.end(); // does not end sink1

sink3.update(10); // lost

sink1.update(sink3.stream());
sink3.update(11);

sink1.end();

assert_eq!(coll.wait(), vec![1, 2, 11]);
Source

pub fn flatten_concurrently(&self) -> Stream<T>

Flatten out a stream of streams, concurrently.

For each new stream, keep the previous, and subscribe to the new.

use froop::{Stream, Sink};

let sink1: Sink<Stream<u32>> = Stream::sink();
let sink2: Sink<u32> = Stream::sink();
let sink3: Sink<u32> = Stream::sink();

let flat = sink1.stream().flatten_concurrently();

let coll = flat.collect();

sink2.update(0); // lost

sink1.update(sink2.stream());
sink2.update(1);
sink2.update(2);

sink3.update(10); // lost

sink1.update(sink3.stream());
sink3.update(11);
sink2.update(3);
sink3.update(12);

sink1.end();

assert_eq!(coll.wait(), vec![1, 2, 11, 3, 12]);
§

impl<A, B> Stream<(A, B)>
where A: Clone, B: Clone,

pub fn combine2(a: &Stream<A>, b: &Stream<B>) -> Stream<(A, B)>

Combine a number of streams into one.

The resulting stream emits when any of the incoming streams emit, but only when all incoming have had an initial value.

§

impl<A, B, C> Stream<(A, B, C)>
where A: Clone, B: Clone, C: Clone,

pub fn combine3( a: &Stream<A>, b: &Stream<B>, c: &Stream<C>, ) -> Stream<(A, B, C)>

Combine a number of streams into one.

The resulting stream emits when any of the incoming streams emit, but only when all incoming have had an initial value.

§

impl<A, B, C, D> Stream<(A, B, C, D)>
where A: Clone, B: Clone, C: Clone, D: Clone,

pub fn combine4( a: &Stream<A>, b: &Stream<B>, c: &Stream<C>, d: &Stream<D>, ) -> Stream<(A, B, C, D)>

Combine a number of streams into one.

The resulting stream emits when any of the incoming streams emit, but only when all incoming have had an initial value.

§

impl<A, B, C, D, E> Stream<(A, B, C, D, E)>
where A: Clone, B: Clone, C: Clone, D: Clone, E: Clone,

pub fn combine5( a: &Stream<A>, b: &Stream<B>, c: &Stream<C>, d: &Stream<D>, e: &Stream<E>, ) -> Stream<(A, B, C, D, E)>

Combine a number of streams into one.

The resulting stream emits when any of the incoming streams emit, but only when all incoming have had an initial value.

§

impl<A, B, C, D, E, F> Stream<(A, B, C, D, E, F)>
where A: Clone, B: Clone, C: Clone, D: Clone, E: Clone, F: Clone,

pub fn combine6( a: &Stream<A>, b: &Stream<B>, c: &Stream<C>, d: &Stream<D>, e: &Stream<E>, f: &Stream<F>, ) -> Stream<(A, B, C, D, E, F)>

Combine a number of streams into one.

The resulting stream emits when any of the incoming streams emit, but only when all incoming have had an initial value.

§

impl<A, B, C, D, E, F, G> Stream<(A, B, C, D, E, F, G)>
where A: Clone, B: Clone, C: Clone, D: Clone, E: Clone, F: Clone, G: Clone,

pub fn combine7( a: &Stream<A>, b: &Stream<B>, c: &Stream<C>, d: &Stream<D>, e: &Stream<E>, f: &Stream<F>, g: &Stream<G>, ) -> Stream<(A, B, C, D, E, F, G)>

Combine a number of streams into one.

The resulting stream emits when any of the incoming streams emit, but only when all incoming have had an initial value.

Trait Implementations§

Source§

impl<T> Clone for Stream<T>

Source§

fn clone(&self) -> Self

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more

Auto Trait Implementations§

§

impl<T> Freeze for Stream<T>

§

impl<T> RefUnwindSafe for Stream<T>

§

impl<T> Send for Stream<T>
where T: Send,

§

impl<T> Sync for Stream<T>
where T: Send,

§

impl<T> Unpin for Stream<T>

§

impl<T> UnwindSafe for Stream<T>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.