Stream

Struct Stream 

Source
pub struct Stream<T> { /* private fields */ }
Expand description

Streams are objects that emit events in sequence as they are created. Streams are similar to Iterators in Rust in that both represent a sequence of values and both can be modified by ‘pipe’ functions like map and filter. The difference is that all values of an iterator are known immediately (or, at least, execution will block while the next item is retrieved), whereas it would not be uncommon for a stream to live for the entire duration of a program, emitting new values from time-to-time.

§Examples

let stream_host: epoxy_streams::Sink<i32> = epoxy_streams::Sink::new();
let stream = stream_host.get_stream();
{
    let _sub = stream.subscribe(|val| {val;});
    assert_eq!(stream.count_subscribers(), 1);
}
assert_eq!(stream.count_subscribers(), 0);
use std::sync::{Arc, Mutex};

let stream_host: epoxy_streams::Sink<i32> = epoxy_streams::Sink::new();
let stream = stream_host.get_stream();

let last_value = Arc::new(Mutex::new(0_i32));
let last_value_write = last_value.clone();

let subscription = stream.subscribe(move |val| {
    *last_value_write.lock().unwrap() = *val;
});

stream_host.emit(1);
assert_eq!(*last_value.lock().unwrap(), 1);

stream_host.emit(100);
assert_eq!(*last_value.lock().unwrap(), 100);

Implementations§

Source§

impl<T> Stream<T>
where T: 'static,

Source

pub fn to_reactive_value(self) -> ReadonlyReactiveValue<T>
where T: Default,

Creates a ReactiveValue from the stream, using the empty state value for type T as the default.

Use to_reactive_value_with_default or to_reactive_value_with_default_rc if you want a more reasonable default value, or if your type T does not implement Default.

§Examples
use epoxy_streams::ReactiveValue;

let stream_host: epoxy_streams::Sink<i32> = epoxy_streams::Sink::new();
let stream = stream_host.get_stream();

let reactive_value = stream.map(|val| val * 100).to_reactive_value();
assert_eq!(*reactive_value.get(), 0);

stream_host.emit(100);
assert_eq!(*reactive_value.get(), 10000);
Source

pub fn to_reactive_value_with_default( self, default: T, ) -> ReadonlyReactiveValue<T>

See to_reactive_value.

Source

pub fn to_reactive_value_with_default_rc( self, default: Arc<T>, ) -> ReadonlyReactiveValue<T>

See to_reactive_value.

Source§

impl<T> Stream<T>
where T: 'static,

Source

pub fn scan<U, F>(&self, scan_fn: F, initial_value: U) -> Stream<U>
where F: Fn(&U, Arc<T>) -> U + 'static, U: 'static,

Similar to the reduce method on iterators, but works iteratively and creates a stream that emits with the latest ‘reduced’ value whenever the original stream emits.

Note that because streams never officially ‘close’, there is no actual reduce method.

§Examples
use std::sync::{Arc, Mutex};

let stream_host: epoxy_streams::Sink<i32> = epoxy_streams::Sink::new();
let stream = stream_host.get_stream();

let last_value = Arc::new(Mutex::new(vec![]));
let last_value_write = last_value.clone();

let subscription = stream
    .scan(|acc, val| {
        let mut extended = vec![];
        extended.extend(acc);
        extended.push(*val);
        extended
    }, vec![])
    .subscribe(move |val| {
         *last_value_write.lock().unwrap() = (*val).clone();
    });

stream_host.emit(2);
assert_eq!(*last_value.lock().unwrap(), vec![2]);

stream_host.emit(3);
assert_eq!(*last_value.lock().unwrap(), vec![2, 3]);
Source

pub fn count_values(&self) -> Stream<u32>

Creates a stream that counts the number of times the original stream emits.

§Examples
use std::sync::{Arc, Mutex};

let stream_host: epoxy_streams::Sink<i32> = epoxy_streams::Sink::new();
let stream = stream_host.get_stream();

let last_value = Arc::new(Mutex::new(0_u32));
let last_value_write = last_value.clone();

let subscription = stream.count_values().subscribe(move |val| {
    *last_value_write.lock().unwrap() = (*val).clone();
});

stream_host.emit(2);
assert_eq!(*last_value.lock().unwrap(), 1);

stream_host.emit(3);
assert_eq!(*last_value.lock().unwrap(), 2);
Examples found in repository?
examples/event_stream.rs (line 30)
28fn main() {
29    let button = MyButton::new();
30    let all_clicks_count = button.get_clicks().count_values();
31    let left_clicks_count = button
32        .get_clicks()
33        .filter(|click| *click == MouseButton::Left)
34        .count_values();
35
36    button.click(MouseButton::Left);
37    button.click(MouseButton::Right);
38    button.click(MouseButton::Left);
39    button.click(MouseButton::Middle);
40    button.click(MouseButton::Left);
41
42    {
43        let _sub_all = all_clicks_count.subscribe(|count| {
44            assert_eq!(*count, 6);
45            println!("Counted 6 clicks");
46        });
47        let _sub_left = left_clicks_count.subscribe(|count| {
48            assert_eq!(*count, 4);
49            println!("Counted 4 left clicks");
50        });
51        button.click(MouseButton::Left);
52    }
53}
Source

pub fn buffer(&self, max_buffer_size: usize) -> Stream<Vec<T>>
where T: Clone,

Creates a stream that returns a vector of the last n emissions of the original stream.

§Examples
use std::sync::{Arc, Mutex};

let stream_host: epoxy_streams::Sink<i32> = epoxy_streams::Sink::new();
let stream = stream_host.get_stream();

let last_value = Arc::new(Mutex::new(vec![]));
let last_value_write = last_value.clone();

let subscription = stream.buffer(2).subscribe(move |val| {
    *last_value_write.lock().unwrap() = (*val).clone();
});

stream_host.emit(2);
assert_eq!(*last_value.lock().unwrap(), vec![2]);

stream_host.emit(3);
assert_eq!(*last_value.lock().unwrap(), vec![2, 3]);

stream_host.emit(4);
assert_eq!(*last_value.lock().unwrap(), vec![3, 4]);
Source§

impl<T> Stream<T>
where T: 'static,

Source

pub fn filter<F>(&self, filter_function: F) -> Stream<T>
where F: Fn(&T) -> bool + 'static,

Returns a stream that emits only those values from the original stream that pass a test.

§Examples
use std::sync::{Arc, Mutex};

let stream_host: epoxy_streams::Sink<i32> = epoxy_streams::Sink::new();
let stream = stream_host.get_stream();

let last_value = Arc::new(Mutex::new(0_i32));
let last_value_write = last_value.clone();

let subscription = stream
    .filter(|val| val % 2 == 0 /* Allow only even numbers through */)
    .subscribe(move |val| {
         *last_value_write.lock().unwrap() = *val;
    });

stream_host.emit(2);
assert_eq!(*last_value.lock().unwrap(), 2);

stream_host.emit(3);
assert_eq!(*last_value.lock().unwrap(), 2); // Note that '3' was not emitted

stream_host.emit(4);
assert_eq!(*last_value.lock().unwrap(), 4);
Examples found in repository?
examples/event_stream.rs (line 33)
28fn main() {
29    let button = MyButton::new();
30    let all_clicks_count = button.get_clicks().count_values();
31    let left_clicks_count = button
32        .get_clicks()
33        .filter(|click| *click == MouseButton::Left)
34        .count_values();
35
36    button.click(MouseButton::Left);
37    button.click(MouseButton::Right);
38    button.click(MouseButton::Left);
39    button.click(MouseButton::Middle);
40    button.click(MouseButton::Left);
41
42    {
43        let _sub_all = all_clicks_count.subscribe(|count| {
44            assert_eq!(*count, 6);
45            println!("Counted 6 clicks");
46        });
47        let _sub_left = left_clicks_count.subscribe(|count| {
48            assert_eq!(*count, 4);
49            println!("Counted 4 left clicks");
50        });
51        button.click(MouseButton::Left);
52    }
53}
Source

pub fn map<U, F>(&self, map_function: F) -> Stream<U>
where U: 'static, F: Fn(&T) -> U + 'static,

Returns a stream containing modified values from the original stream.

§Examples
use std::sync::{Arc, Mutex};

let stream_host: epoxy_streams::Sink<i32> = epoxy_streams::Sink::new();
let stream = stream_host.get_stream();

let last_value = Arc::new(Mutex::new(0_i32));
let last_value_write = last_value.clone();

let subscription = stream
    .map(|val| val * 100)
    .subscribe(move |val| {
         *last_value_write.lock().unwrap() = *val;
    });

stream_host.emit(2);
assert_eq!(*last_value.lock().unwrap(), 200);

stream_host.emit(3);
assert_eq!(*last_value.lock().unwrap(), 300);
Source

pub fn map_rc<U, F>(&self, map_function: F) -> Stream<U>
where U: 'static, F: Fn(Arc<T>) -> Arc<U> + 'static,

Returns a stream containing modified values from the original stream.

§Examples
use std::sync::{Arc, Mutex};
use std::rc::Rc;

let stream_host: epoxy_streams::Sink<i32> = epoxy_streams::Sink::new();
let stream = stream_host.get_stream();

let last_value = Arc::new(Mutex::new(0_i32));
let last_value_write = last_value.clone();
 
let fallback_value = Arc::new(10);

let subscription = stream
    .map_rc(move |val| if (*val < 0) { fallback_value.clone() } else { val })
    .subscribe(move |val| {
         *last_value_write.lock().unwrap() = *val;
    });

stream_host.emit(-2);
assert_eq!(*last_value.lock().unwrap(), 10);

stream_host.emit(12);
assert_eq!(*last_value.lock().unwrap(), 12);
 
stream_host.emit(-10);
assert_eq!(*last_value.lock().unwrap(), 10);
Source

pub fn flat_map<U, F, C>(&self, iter_map_function: F) -> Stream<U>
where U: 'static, C: IntoIterator<Item = U>, F: Fn(&T) -> C + 'static,

Returns a stream that can emit multiple values for each value from the original stream.

§Examples
use std::sync::{Arc, Mutex};

let stream_host: epoxy_streams::Sink<i32> = epoxy_streams::Sink::new();
let stream = stream_host.get_stream();

let last_value = Arc::new(Mutex::new(0_i32));
let last_value_write = last_value.clone();

let subscription = stream
    .flat_map(|val| vec![*val, val + 1, val + 2])
    .subscribe(move |val| {
         *last_value_write.lock().unwrap() = *val;
    });

stream_host.emit(2);
assert_eq!(*last_value.lock().unwrap(), 4); /* Emitted 2, 3, then 4 */

stream_host.emit(3);
assert_eq!(*last_value.lock().unwrap(), 5); /* Emitted 3, 4, then 5 */
Source

pub fn inspect<F>(&self, inspect_function: F) -> Stream<T>
where F: Fn(&T) + 'static,

Similar to subscribing to a stream in that inspect_function runs whenever the stream emits, but returns a derived stream matching the original stream instead of a SubscriptionRef.

Source§

impl<T> Stream<T>

Source

pub fn subscribe<F>(&self, listener: F) -> Subscription<T>
where F: Fn(Arc<T>) + 'static,

Subscribing to a stream will cause the given ‘listener’ function to be executed whenever a new object is added to the stream. This listener function has a static lifetime because it lives as long as the returned Subscription object, which means that in most cases if the given function needs to capture any scope from its environment it will need to be used with Rust’s move annotation.

Examples found in repository?
examples/event_stream.rs (lines 43-46)
28fn main() {
29    let button = MyButton::new();
30    let all_clicks_count = button.get_clicks().count_values();
31    let left_clicks_count = button
32        .get_clicks()
33        .filter(|click| *click == MouseButton::Left)
34        .count_values();
35
36    button.click(MouseButton::Left);
37    button.click(MouseButton::Right);
38    button.click(MouseButton::Left);
39    button.click(MouseButton::Middle);
40    button.click(MouseButton::Left);
41
42    {
43        let _sub_all = all_clicks_count.subscribe(|count| {
44            assert_eq!(*count, 6);
45            println!("Counted 6 clicks");
46        });
47        let _sub_left = left_clicks_count.subscribe(|count| {
48            assert_eq!(*count, 4);
49            println!("Counted 4 left clicks");
50        });
51        button.click(MouseButton::Left);
52    }
53}
Source

pub fn unsubscribe(&self, _subscription: Subscription<T>)

Usually subscriptions are removed by simply letting the Subscription object fall out of scope, but this declarative API is provided as well as it may be more readable in some situations.

Source

pub fn count_subscribers(&self) -> usize

Returns the total number of subscribers listening to this stream, includes any derived streams (ones created with a pipe operation like map or filter).

Trait Implementations§

Source§

impl<T> Clone for Stream<T>

Source§

fn clone(&self) -> Stream<T>

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more

Auto Trait Implementations§

§

impl<T> Freeze for Stream<T>

§

impl<T> RefUnwindSafe for Stream<T>

§

impl<T> !Send for Stream<T>

§

impl<T> !Sync for Stream<T>

§

impl<T> Unpin for Stream<T>

§

impl<T> UnwindSafe for Stream<T>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.