pub struct Stream<T> { /* private fields */ }Expand description
Streams are objects that emit events in sequence as they are created. Streams are
similar to Iterators in Rust in that both represent a sequence of values and both
can be modified by ‘pipe’ functions like map and filter. The difference is that
all values of an iterator are known immediately (or, at least, execution will block
while the next item is retrieved), whereas it would not be uncommon for a stream to
live for the entire duration of a program, emitting new values from time-to-time.
§Examples
let stream_host: epoxy_streams::Sink<i32> = epoxy_streams::Sink::new();
let stream = stream_host.get_stream();
{
let _sub = stream.subscribe(|val| {val;});
assert_eq!(stream.count_subscribers(), 1);
}
assert_eq!(stream.count_subscribers(), 0);use std::sync::{Arc, Mutex};
let stream_host: epoxy_streams::Sink<i32> = epoxy_streams::Sink::new();
let stream = stream_host.get_stream();
let last_value = Arc::new(Mutex::new(0_i32));
let last_value_write = last_value.clone();
let subscription = stream.subscribe(move |val| {
*last_value_write.lock().unwrap() = *val;
});
stream_host.emit(1);
assert_eq!(*last_value.lock().unwrap(), 1);
stream_host.emit(100);
assert_eq!(*last_value.lock().unwrap(), 100);Implementations§
Source§impl<T> Stream<T>where
T: 'static,
impl<T> Stream<T>where
T: 'static,
Sourcepub fn to_reactive_value(self) -> ReadonlyReactiveValue<T>where
T: Default,
pub fn to_reactive_value(self) -> ReadonlyReactiveValue<T>where
T: Default,
Creates a ReactiveValue from the stream, using the empty state value for type T as the default.
Use to_reactive_value_with_default or to_reactive_value_with_default_rc if you want
a more reasonable default value, or if your type T does not implement Default.
§Examples
use epoxy_streams::ReactiveValue;
let stream_host: epoxy_streams::Sink<i32> = epoxy_streams::Sink::new();
let stream = stream_host.get_stream();
let reactive_value = stream.map(|val| val * 100).to_reactive_value();
assert_eq!(*reactive_value.get(), 0);
stream_host.emit(100);
assert_eq!(*reactive_value.get(), 10000);Sourcepub fn to_reactive_value_with_default(
self,
default: T,
) -> ReadonlyReactiveValue<T>
pub fn to_reactive_value_with_default( self, default: T, ) -> ReadonlyReactiveValue<T>
See to_reactive_value.
Sourcepub fn to_reactive_value_with_default_rc(
self,
default: Arc<T>,
) -> ReadonlyReactiveValue<T>
pub fn to_reactive_value_with_default_rc( self, default: Arc<T>, ) -> ReadonlyReactiveValue<T>
See to_reactive_value.
Source§impl<T> Stream<T>where
T: 'static,
impl<T> Stream<T>where
T: 'static,
Sourcepub fn scan<U, F>(&self, scan_fn: F, initial_value: U) -> Stream<U>
pub fn scan<U, F>(&self, scan_fn: F, initial_value: U) -> Stream<U>
Similar to the reduce method on iterators, but works iteratively and creates a stream
that emits with the latest ‘reduced’ value whenever the original stream emits.
Note that because streams never officially ‘close’, there is no actual reduce method.
§Examples
use std::sync::{Arc, Mutex};
let stream_host: epoxy_streams::Sink<i32> = epoxy_streams::Sink::new();
let stream = stream_host.get_stream();
let last_value = Arc::new(Mutex::new(vec![]));
let last_value_write = last_value.clone();
let subscription = stream
.scan(|acc, val| {
let mut extended = vec![];
extended.extend(acc);
extended.push(*val);
extended
}, vec![])
.subscribe(move |val| {
*last_value_write.lock().unwrap() = (*val).clone();
});
stream_host.emit(2);
assert_eq!(*last_value.lock().unwrap(), vec![2]);
stream_host.emit(3);
assert_eq!(*last_value.lock().unwrap(), vec![2, 3]);Sourcepub fn count_values(&self) -> Stream<u32>
pub fn count_values(&self) -> Stream<u32>
Creates a stream that counts the number of times the original stream emits.
§Examples
use std::sync::{Arc, Mutex};
let stream_host: epoxy_streams::Sink<i32> = epoxy_streams::Sink::new();
let stream = stream_host.get_stream();
let last_value = Arc::new(Mutex::new(0_u32));
let last_value_write = last_value.clone();
let subscription = stream.count_values().subscribe(move |val| {
*last_value_write.lock().unwrap() = (*val).clone();
});
stream_host.emit(2);
assert_eq!(*last_value.lock().unwrap(), 1);
stream_host.emit(3);
assert_eq!(*last_value.lock().unwrap(), 2);Examples found in repository?
28fn main() {
29 let button = MyButton::new();
30 let all_clicks_count = button.get_clicks().count_values();
31 let left_clicks_count = button
32 .get_clicks()
33 .filter(|click| *click == MouseButton::Left)
34 .count_values();
35
36 button.click(MouseButton::Left);
37 button.click(MouseButton::Right);
38 button.click(MouseButton::Left);
39 button.click(MouseButton::Middle);
40 button.click(MouseButton::Left);
41
42 {
43 let _sub_all = all_clicks_count.subscribe(|count| {
44 assert_eq!(*count, 6);
45 println!("Counted 6 clicks");
46 });
47 let _sub_left = left_clicks_count.subscribe(|count| {
48 assert_eq!(*count, 4);
49 println!("Counted 4 left clicks");
50 });
51 button.click(MouseButton::Left);
52 }
53}Sourcepub fn buffer(&self, max_buffer_size: usize) -> Stream<Vec<T>>where
T: Clone,
pub fn buffer(&self, max_buffer_size: usize) -> Stream<Vec<T>>where
T: Clone,
Creates a stream that returns a vector of the last n emissions of the original stream.
§Examples
use std::sync::{Arc, Mutex};
let stream_host: epoxy_streams::Sink<i32> = epoxy_streams::Sink::new();
let stream = stream_host.get_stream();
let last_value = Arc::new(Mutex::new(vec![]));
let last_value_write = last_value.clone();
let subscription = stream.buffer(2).subscribe(move |val| {
*last_value_write.lock().unwrap() = (*val).clone();
});
stream_host.emit(2);
assert_eq!(*last_value.lock().unwrap(), vec![2]);
stream_host.emit(3);
assert_eq!(*last_value.lock().unwrap(), vec![2, 3]);
stream_host.emit(4);
assert_eq!(*last_value.lock().unwrap(), vec![3, 4]);Source§impl<T> Stream<T>where
T: 'static,
impl<T> Stream<T>where
T: 'static,
Sourcepub fn filter<F>(&self, filter_function: F) -> Stream<T>
pub fn filter<F>(&self, filter_function: F) -> Stream<T>
Returns a stream that emits only those values from the original stream that pass a test.
§Examples
use std::sync::{Arc, Mutex};
let stream_host: epoxy_streams::Sink<i32> = epoxy_streams::Sink::new();
let stream = stream_host.get_stream();
let last_value = Arc::new(Mutex::new(0_i32));
let last_value_write = last_value.clone();
let subscription = stream
.filter(|val| val % 2 == 0 /* Allow only even numbers through */)
.subscribe(move |val| {
*last_value_write.lock().unwrap() = *val;
});
stream_host.emit(2);
assert_eq!(*last_value.lock().unwrap(), 2);
stream_host.emit(3);
assert_eq!(*last_value.lock().unwrap(), 2); // Note that '3' was not emitted
stream_host.emit(4);
assert_eq!(*last_value.lock().unwrap(), 4);Examples found in repository?
28fn main() {
29 let button = MyButton::new();
30 let all_clicks_count = button.get_clicks().count_values();
31 let left_clicks_count = button
32 .get_clicks()
33 .filter(|click| *click == MouseButton::Left)
34 .count_values();
35
36 button.click(MouseButton::Left);
37 button.click(MouseButton::Right);
38 button.click(MouseButton::Left);
39 button.click(MouseButton::Middle);
40 button.click(MouseButton::Left);
41
42 {
43 let _sub_all = all_clicks_count.subscribe(|count| {
44 assert_eq!(*count, 6);
45 println!("Counted 6 clicks");
46 });
47 let _sub_left = left_clicks_count.subscribe(|count| {
48 assert_eq!(*count, 4);
49 println!("Counted 4 left clicks");
50 });
51 button.click(MouseButton::Left);
52 }
53}Sourcepub fn map<U, F>(&self, map_function: F) -> Stream<U>
pub fn map<U, F>(&self, map_function: F) -> Stream<U>
Returns a stream containing modified values from the original stream.
§Examples
use std::sync::{Arc, Mutex};
let stream_host: epoxy_streams::Sink<i32> = epoxy_streams::Sink::new();
let stream = stream_host.get_stream();
let last_value = Arc::new(Mutex::new(0_i32));
let last_value_write = last_value.clone();
let subscription = stream
.map(|val| val * 100)
.subscribe(move |val| {
*last_value_write.lock().unwrap() = *val;
});
stream_host.emit(2);
assert_eq!(*last_value.lock().unwrap(), 200);
stream_host.emit(3);
assert_eq!(*last_value.lock().unwrap(), 300);Sourcepub fn map_rc<U, F>(&self, map_function: F) -> Stream<U>
pub fn map_rc<U, F>(&self, map_function: F) -> Stream<U>
Returns a stream containing modified values from the original stream.
§Examples
use std::sync::{Arc, Mutex};
use std::rc::Rc;
let stream_host: epoxy_streams::Sink<i32> = epoxy_streams::Sink::new();
let stream = stream_host.get_stream();
let last_value = Arc::new(Mutex::new(0_i32));
let last_value_write = last_value.clone();
let fallback_value = Arc::new(10);
let subscription = stream
.map_rc(move |val| if (*val < 0) { fallback_value.clone() } else { val })
.subscribe(move |val| {
*last_value_write.lock().unwrap() = *val;
});
stream_host.emit(-2);
assert_eq!(*last_value.lock().unwrap(), 10);
stream_host.emit(12);
assert_eq!(*last_value.lock().unwrap(), 12);
stream_host.emit(-10);
assert_eq!(*last_value.lock().unwrap(), 10);Sourcepub fn flat_map<U, F, C>(&self, iter_map_function: F) -> Stream<U>
pub fn flat_map<U, F, C>(&self, iter_map_function: F) -> Stream<U>
Returns a stream that can emit multiple values for each value from the original stream.
§Examples
use std::sync::{Arc, Mutex};
let stream_host: epoxy_streams::Sink<i32> = epoxy_streams::Sink::new();
let stream = stream_host.get_stream();
let last_value = Arc::new(Mutex::new(0_i32));
let last_value_write = last_value.clone();
let subscription = stream
.flat_map(|val| vec![*val, val + 1, val + 2])
.subscribe(move |val| {
*last_value_write.lock().unwrap() = *val;
});
stream_host.emit(2);
assert_eq!(*last_value.lock().unwrap(), 4); /* Emitted 2, 3, then 4 */
stream_host.emit(3);
assert_eq!(*last_value.lock().unwrap(), 5); /* Emitted 3, 4, then 5 */Source§impl<T> Stream<T>
impl<T> Stream<T>
Sourcepub fn subscribe<F>(&self, listener: F) -> Subscription<T>
pub fn subscribe<F>(&self, listener: F) -> Subscription<T>
Subscribing to a stream will cause the given ‘listener’ function to be executed whenever
a new object is added to the stream. This listener function has a static lifetime because
it lives as long as the returned Subscription object, which means that in most cases if the
given function needs to capture any scope from its environment it will need to be used with
Rust’s move annotation.
Examples found in repository?
28fn main() {
29 let button = MyButton::new();
30 let all_clicks_count = button.get_clicks().count_values();
31 let left_clicks_count = button
32 .get_clicks()
33 .filter(|click| *click == MouseButton::Left)
34 .count_values();
35
36 button.click(MouseButton::Left);
37 button.click(MouseButton::Right);
38 button.click(MouseButton::Left);
39 button.click(MouseButton::Middle);
40 button.click(MouseButton::Left);
41
42 {
43 let _sub_all = all_clicks_count.subscribe(|count| {
44 assert_eq!(*count, 6);
45 println!("Counted 6 clicks");
46 });
47 let _sub_left = left_clicks_count.subscribe(|count| {
48 assert_eq!(*count, 4);
49 println!("Counted 4 left clicks");
50 });
51 button.click(MouseButton::Left);
52 }
53}Sourcepub fn unsubscribe(&self, _subscription: Subscription<T>)
pub fn unsubscribe(&self, _subscription: Subscription<T>)
Usually subscriptions are removed by simply letting the Subscription object fall out of scope, but this declarative API is provided as well as it may be more readable in some situations.
Sourcepub fn count_subscribers(&self) -> usize
pub fn count_subscribers(&self) -> usize
Returns the total number of subscribers listening to this stream, includes any derived
streams (ones created with a pipe operation like map or filter).