[][src]Struct epoxy::Stream

pub struct Stream<T> { /* fields omitted */ }

Streams are objects that emit events in sequence as they are created. Streams are similar to Iterators in Rust in that both represent a sequence of values and both can be modified by 'pipe' functions like map and filter. The difference is that all values of an iterator are known immediately (or, at least, execution will block while the next item is retrieved), whereas it would not be uncommon for a stream to live for the entire duration of a program, emitting new values from time-to-time.

Examples

let stream_host: epoxy_streams::Sink<i32> = epoxy_streams::Sink::new();
let stream = stream_host.get_stream();
{
    let _sub = stream.subscribe(|val| {val;});
    assert_eq!(stream.count_subscribers(), 1);
}
assert_eq!(stream.count_subscribers(), 0);
use std::sync::{Arc, Mutex};

let stream_host: epoxy_streams::Sink<i32> = epoxy_streams::Sink::new();
let stream = stream_host.get_stream();

let last_value = Arc::new(Mutex::new(0_i32));
let last_value_write = last_value.clone();

let subscription = stream.subscribe(move |val| {
    *last_value_write.lock().unwrap() = *val;
});

stream_host.emit(1);
assert_eq!(*last_value.lock().unwrap(), 1);

stream_host.emit(100);
assert_eq!(*last_value.lock().unwrap(), 100);

Methods

impl<T> Stream<T> where
    T: 'static, 
[src]

pub fn to_reactive_value(self) -> ReadonlyReactiveValue<T> where
    T: Default
[src]

Creates a ReactiveValue from the stream, using the empty state value for type T as the default.

Use to_reactive_value_with_default or to_reactive_value_with_default_rc if you want a more reasonable default value, or if your type T does not implement Default.

Examples

use epoxy_streams::ReactiveValue;

let stream_host: epoxy_streams::Sink<i32> = epoxy_streams::Sink::new();
let stream = stream_host.get_stream();

let reactive_value = stream.map(|val| val * 100).to_reactive_value();
assert_eq!(*reactive_value.get(), 0);

stream_host.emit(100);
assert_eq!(*reactive_value.get(), 10000);

pub fn to_reactive_value_with_default(
    self,
    default: T
) -> ReadonlyReactiveValue<T>
[src]

See to_reactive_value.

pub fn to_reactive_value_with_default_rc(
    self,
    default: Arc<T>
) -> ReadonlyReactiveValue<T>
[src]

See to_reactive_value.

impl<T> Stream<T> where
    T: 'static, 
[src]

pub fn scan<U, F>(&self, scan_fn: F, initial_value: U) -> Stream<U> where
    F: Fn(&U, Arc<T>) -> U + 'static,
    U: 'static, 
[src]

Similar to the reduce method on iterators, but works iteratively and creates a stream that emits with the latest 'reduced' value whenever the original stream emits.

Note that because streams never officially 'close', there is no actual reduce method.

Examples

use std::sync::{Arc, Mutex};

let stream_host: epoxy_streams::Sink<i32> = epoxy_streams::Sink::new();
let stream = stream_host.get_stream();

let last_value = Arc::new(Mutex::new(vec![]));
let last_value_write = last_value.clone();

let subscription = stream
    .scan(|acc, val| {
        let mut extended = vec![];
        extended.extend(acc);
        extended.push(*val);
        extended
    }, vec![])
    .subscribe(move |val| {
         *last_value_write.lock().unwrap() = (*val).clone();
    });

stream_host.emit(2);
assert_eq!(*last_value.lock().unwrap(), vec![2]);

stream_host.emit(3);
assert_eq!(*last_value.lock().unwrap(), vec![2, 3]);

pub fn count_values(&self) -> Stream<u32>[src]

Creates a stream that counts the number of times the original stream emits.

Examples

use std::sync::{Arc, Mutex};

let stream_host: epoxy_streams::Sink<i32> = epoxy_streams::Sink::new();
let stream = stream_host.get_stream();

let last_value = Arc::new(Mutex::new(0_u32));
let last_value_write = last_value.clone();

let subscription = stream.count_values().subscribe(move |val| {
    *last_value_write.lock().unwrap() = (*val).clone();
});

stream_host.emit(2);
assert_eq!(*last_value.lock().unwrap(), 1);

stream_host.emit(3);
assert_eq!(*last_value.lock().unwrap(), 2);

pub fn buffer(&self, max_buffer_size: usize) -> Stream<Vec<T>> where
    T: Clone
[src]

Creates a stream that returns a vector of the last n emissions of the original stream.

Examples

use std::sync::{Arc, Mutex};

let stream_host: epoxy_streams::Sink<i32> = epoxy_streams::Sink::new();
let stream = stream_host.get_stream();

let last_value = Arc::new(Mutex::new(vec![]));
let last_value_write = last_value.clone();

let subscription = stream.buffer(2).subscribe(move |val| {
    *last_value_write.lock().unwrap() = (*val).clone();
});

stream_host.emit(2);
assert_eq!(*last_value.lock().unwrap(), vec![2]);

stream_host.emit(3);
assert_eq!(*last_value.lock().unwrap(), vec![2, 3]);

stream_host.emit(4);
assert_eq!(*last_value.lock().unwrap(), vec![3, 4]);

impl<T> Stream<T> where
    T: 'static, 
[src]

pub fn filter<F>(&self, filter_function: F) -> Stream<T> where
    F: Fn(&T) -> bool + 'static, 
[src]

Returns a stream that emits only those values from the original stream that pass a test.

Examples

use std::sync::{Arc, Mutex};

let stream_host: epoxy_streams::Sink<i32> = epoxy_streams::Sink::new();
let stream = stream_host.get_stream();

let last_value = Arc::new(Mutex::new(0_i32));
let last_value_write = last_value.clone();

let subscription = stream
    .filter(|val| val % 2 == 0 /* Allow only even numbers through */)
    .subscribe(move |val| {
         *last_value_write.lock().unwrap() = *val;
    });

stream_host.emit(2);
assert_eq!(*last_value.lock().unwrap(), 2);

stream_host.emit(3);
assert_eq!(*last_value.lock().unwrap(), 2); // Note that '3' was not emitted

stream_host.emit(4);
assert_eq!(*last_value.lock().unwrap(), 4);

pub fn map<U, F>(&self, map_function: F) -> Stream<U> where
    F: Fn(&T) -> U + 'static,
    U: 'static, 
[src]

Returns a stream containing modified values from the original stream.

Examples

use std::sync::{Arc, Mutex};

let stream_host: epoxy_streams::Sink<i32> = epoxy_streams::Sink::new();
let stream = stream_host.get_stream();

let last_value = Arc::new(Mutex::new(0_i32));
let last_value_write = last_value.clone();

let subscription = stream
    .map(|val| val * 100)
    .subscribe(move |val| {
         *last_value_write.lock().unwrap() = *val;
    });

stream_host.emit(2);
assert_eq!(*last_value.lock().unwrap(), 200);

stream_host.emit(3);
assert_eq!(*last_value.lock().unwrap(), 300);

pub fn map_rc<U, F>(&self, map_function: F) -> Stream<U> where
    F: Fn(Arc<T>) -> Arc<U> + 'static,
    U: 'static, 
[src]

Returns a stream containing modified values from the original stream.

Examples

use std::sync::{Arc, Mutex};
use std::rc::Rc;

let stream_host: epoxy_streams::Sink<i32> = epoxy_streams::Sink::new();
let stream = stream_host.get_stream();

let last_value = Arc::new(Mutex::new(0_i32));
let last_value_write = last_value.clone();
 
let fallback_value = Arc::new(10);

let subscription = stream
    .map_rc(move |val| if (*val < 0) { fallback_value.clone() } else { val })
    .subscribe(move |val| {
         *last_value_write.lock().unwrap() = *val;
    });

stream_host.emit(-2);
assert_eq!(*last_value.lock().unwrap(), 10);

stream_host.emit(12);
assert_eq!(*last_value.lock().unwrap(), 12);
 
stream_host.emit(-10);
assert_eq!(*last_value.lock().unwrap(), 10);

pub fn flat_map<U, F, C>(&self, iter_map_function: F) -> Stream<U> where
    C: IntoIterator<Item = U>,
    F: Fn(&T) -> C + 'static,
    U: 'static, 
[src]

Returns a stream that can emit multiple values for each value from the original stream.

Examples

use std::sync::{Arc, Mutex};

let stream_host: epoxy_streams::Sink<i32> = epoxy_streams::Sink::new();
let stream = stream_host.get_stream();

let last_value = Arc::new(Mutex::new(0_i32));
let last_value_write = last_value.clone();

let subscription = stream
    .flat_map(|val| vec![*val, val + 1, val + 2])
    .subscribe(move |val| {
         *last_value_write.lock().unwrap() = *val;
    });

stream_host.emit(2);
assert_eq!(*last_value.lock().unwrap(), 4); /* Emitted 2, 3, then 4 */

stream_host.emit(3);
assert_eq!(*last_value.lock().unwrap(), 5); /* Emitted 3, 4, then 5 */

pub fn inspect<F>(&self, inspect_function: F) -> Stream<T> where
    F: Fn(&T) + 'static, 
[src]

Similar to subscribing to a stream in that inspect_function runs whenever the stream emits, but returns a derived stream matching the original stream instead of a SubscriptionRef.

impl<T> Stream<T>[src]

pub fn subscribe<F>(&self, listener: F) -> Subscription<T> where
    F: Fn(Arc<T>) + 'static, 
[src]

Subscribing to a stream will cause the given 'listener' function to be executed whenever a new object is added to the stream. This listener function has a static lifetime because it lives as long as the returned Subscription object, which means that in most cases if the given function needs to capture any scope from its environment it will need to be used with Rust's move annotation.

pub fn unsubscribe(&self, _subscription: Subscription<T>)[src]

Usually subscriptions are removed by simply letting the Subscription object fall out of scope, but this declarative API is provided as well as it may be more readable in some situations.

pub fn count_subscribers(&self) -> usize[src]

Returns the total number of subscribers listening to this stream, includes any derived streams (ones created with a pipe operation like map or filter).

Trait Implementations

impl<T> Clone for Stream<T>[src]

fn clone_from(&mut self, source: &Self)
1.0.0
[src]

Performs copy-assignment from source. Read more

Auto Trait Implementations

impl<T> !Send for Stream<T>

impl<T> !Sync for Stream<T>

Blanket Implementations

impl<T, U> Into for T where
    U: From<T>, 
[src]

impl<T> From for T[src]

impl<T> ToOwned for T where
    T: Clone
[src]

type Owned = T

The resulting type after obtaining ownership.

impl<T, U> TryFrom for T where
    U: Into<T>, 
[src]

type Error = Infallible

The type returned in the event of a conversion error.

impl<T, U> TryInto for T where
    U: TryFrom<T>, 
[src]

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.

impl<T> Borrow for T where
    T: ?Sized
[src]

impl<T> BorrowMut for T where
    T: ?Sized
[src]

impl<T> Any for T where
    T: 'static + ?Sized
[src]