Trait reactive_rs::Stream

source ·
pub trait Stream<'a>: Sized {
    type Context: ?Sized;
    type Item: ?Sized;

Show 19 methods fn subscribe_ctx<O>(self, observer: O)
    where
        O: 'a + FnMut(&Self::Context, &Self::Item)
; fn subscribe<O>(self, observer: O)
    where
        O: 'a + FnMut(&Self::Item)
, { ... } fn broadcast(self) -> Broadcast<'a, Self::Context, Self::Item>
    where
        Self: 'a
, { ... } fn ctx(self) -> Context<Self> { ... } fn with_ctx<T>(self, ctx: T) -> WithContext<Self, T> { ... } fn with_ctx_map<F, T>(self, func: F) -> WithContextMap<Self, F>
    where
        F: 'a + FnMut(&Self::Context, &Self::Item) -> T
, { ... } fn map<F, T>(self, func: F) -> Map<Self, NoContext<F>>
    where
        F: 'a + FnMut(&Self::Item) -> T
, { ... } fn map_ctx<F, T>(self, func: F) -> Map<Self, F>
    where
        F: 'a + FnMut(&Self::Context, &Self::Item) -> T
, { ... } fn map_both<F, C, T>(self, func: F) -> MapBoth<Self, NoContext<F>>
    where
        F: 'a + FnMut(&Self::Item) -> (C, T)
, { ... } fn map_both_ctx<F, C, T>(self, func: F) -> MapBoth<Self, F>
    where
        F: 'a + FnMut(&Self::Context, &Self::Item) -> (C, T)
, { ... } fn filter<F>(self, func: F) -> Filter<Self, NoContext<F>>
    where
        F: 'a + FnMut(&Self::Item) -> bool
, { ... } fn filter_ctx<F>(self, func: F) -> Filter<Self, F>
    where
        F: 'a + FnMut(&Self::Context, &Self::Item) -> bool
, { ... } fn filter_map<F, T>(self, func: F) -> FilterMap<Self, NoContext<F>>
    where
        F: 'a + FnMut(&Self::Item) -> Option<T>
, { ... } fn filter_map_ctx<F, T>(self, func: F) -> FilterMap<Self, F>
    where
        F: 'a + FnMut(&Self::Context, &Self::Item) -> Option<T>
, { ... } fn fold<F, T: 'a>(self, init: T, func: F) -> Fold<Self, NoContext<F>, T>
    where
        F: 'a + FnMut(&T, &Self::Item) -> T
, { ... } fn fold_ctx<F, T: 'a>(self, init: T, func: F) -> Fold<Self, F, T>
    where
        F: 'a + FnMut(&Self::Context, &T, &Self::Item) -> T
, { ... } fn inspect<F>(self, func: F) -> Inspect<Self, NoContext<F>>
    where
        F: 'a + FnMut(&Self::Item)
, { ... } fn inspect_ctx<F>(self, func: F) -> Inspect<Self, F>
    where
        F: 'a + FnMut(&Self::Context, &Self::Item)
, { ... } fn last_n(self, count: usize) -> LastN<Self, Self::Item>
    where
        Self::Item: 'a + Clone + Sized
, { ... }
}
Expand description

A stream of context/value pairs that can be subscribed to.

Note: in order to use stream trait methods, this trait must imported into the current scope.

Required Associated Types§

The type of the context attached to emitted elements.

Can be set to () to ignore the context part of the stream.

The type of the elements being emitted.

Required Methods§

Same as subscribe(), but the closure receives two arguments (context/value), by reference.

Examples
let result = Cell::new((0, 0.));
let stream = Broadcast::<i32, f64>::new();
stream
    .clone()
    .map_ctx(|c, x| (*c as f64) + *x)
    .subscribe_ctx(|c, x| result.set((*c, *x)));
stream.send_ctx(3, 7.5);
assert_eq!(result.get(), (3, 10.5));

Provided Methods§

Attaches an observer (a user-provided mutable closure) to the stream, which consumes the stream object.

A stream with no observer attached is essentially just a function of an observer; it will not react to incoming events until it is subscribed to.

Examples
let out = RefCell::new(Vec::new());
let stream = SimpleBroadcast::<i64>::new();
stream
    .clone()
    .filter(|x| x % 2 != 0)
    .subscribe(|x| out.borrow_mut().push(*x));
stream.feed(0..=5);
assert_eq!(&*out.borrow(), &[1, 3, 5]);

Create a broadcast from a stream, enabling multiple observers. This is the only Stream trait method that incurs a slight runtime cost, due to the broadcast object having to store observers as boxed trait objects in a reference-counted container; all other methods can be inlined.

Note: this is equivalent to creating a broadcast via from_stream() constructor.

Convenience method to extract the context into a separate stream.

Examples
let stream = Broadcast::<i32, String>::new();
let double_ctx = stream.ctx().map(|x| x * 2);
Notes
  • Resulting stream’s context/value will reference the same object (original stream’s context).
  • The return value is a Stream object (both context/value types are the original stream’s context type).

Set the context to a fixed constant value.

Examples
let stream = SimpleBroadcast::<String>::new();
let stream_with_ctx = stream.with_ctx(42);
Notes
  • The value is passed down unchanged.
  • The return value is a Stream object (context type is the type of the provided value; value type is unchanged).

Creates a new stream which calls a closure on each context/value and uses that as the context.

Examples
let stream = SimpleBroadcast::<String>::new();
let string_and_len = stream.with_ctx_map(|_, s| s.len());
Notes
  • The value is passed down unchanged.
  • The closure receives all of its arguments by reference.
  • The return value is a Stream object (context type is the return type of the closure; value type is unchanged).

Creates a new stream which calls a closure on each element and uses that as the value.

Examples
let stream = SimpleBroadcast::<String>::new();
let contains_foo = stream.map(|s| s.contains("foo"));
Notes
  • The context is passed down unchanged.
  • The closure receives its argument by reference.
  • The return value is a Stream object (context type is unchanged; value type is the return type of the closure).

Same as map(), but the closure receives two arguments (context/value), by reference.

Examples
let stream = Broadcast::<bool, i32>::new();
let div2 = stream.map_ctx(|c, x| (*x % 2 == 0) == *c);

Same as map(), but the closure is expected to return a (context, value) tuple, so that both the context and the value can be changed at the same time.

Examples
let stream = SimpleBroadcast::<i32>::new();
let string_context = stream.map_both(|x| (x.to_string(), *x));
Notes
  • The context and the value are changed simultaneously.
  • The closure receives its argument by reference.
  • The return value is a Stream object (context type and value type depend on the return type of the closure).

Same as map_both(), but the closure receives two arguments (context/value), by reference.

Examples
let stream = Broadcast::<A, B>::new();
let swapped = stream.map_both_ctx(|a, b| (*a, *b));

Creates a stream which uses a closure to determine if an element should be yielded.

The closure must return true or false and is called on each element of the original stream. If true is returned, the element is passed downstream.

Examples
let stream = SimpleBroadcast::<Vec<i32>>::new();
let non_empty = stream.filter(|v| !v.is_empty());
Notes
  • The context is passed down unchanged.
  • The closure receives its argument by reference.
  • The return value is a Stream object (same context type and value type as the original stream).

Same as filter(), but the closure receives two arguments (context/value), by reference.

Examples
let stream = Broadcast::<usize, Vec<i32>>::new();
let filter_len = stream.filter_ctx(|ctx, v| v.len() == *ctx);

Creates a stream that both filters and maps.

The closure must return an Option<T>. If it returns Some(element), then that element is returned; otherwise it is skipped.

Examples
let stream = SimpleBroadcast::<String>::new();
let valid_ints = stream.filter_map(|s| s.parse::<i64>().ok());
Notes
  • The context is passed down unchanged.
  • The closure receives its argument by reference.
  • The return value is a Stream object (context type is unchanged; value type is the is T if the return type of the closure is Option<T>).

Same as filter_map(), but the closure receives two arguments (context/value), by reference.

Examples
let stream = Broadcast::<Option<i64>, String>::new();
let int_or_ctx = stream.filter_map_ctx(|c, s| s.parse().ok().or(*c));

‘Reduce’ operation on streams.

This method takes two arguments: an initial value, and a closure with two arguments: an accumulator and an element. The closure returns the value that the accumulator should have for the next iteration; the initial value is the value the accumulator will have on the first call.

Examples
let stream = SimpleBroadcast::<i32>::new();
let cum_sum = stream.fold(0, |acc, x| acc + x);
Notes
  • The context is passed down unchanged.
  • The closure receives all of its arguments by reference.
  • The return value is a Stream object (context type is unchanged; value type is the accumulator type).

Same as fold(), but the closure receives three arguments (context/accumulator/value), by reference.

Examples
let stream = Broadcast::<i32, i32>::new();
let bnd_sum = stream.fold_ctx(0, |c, acc, x| *c.min(&(acc + x)));

Do something with each element of a stream, passing the value on.

The closure will only be called if the stream is actually subscribed to (just calling inspect() does nothing on its own).

Examples
let stream = SimpleBroadcast::<String>::new();
let stream = stream.inspect(|x| println!("{:?}", x));
Notes
  • Both context/value are passed down unchanged.
  • The closure receives its argument by reference.
  • The return value is a Stream object (same context type and value type as the original stream).

Same as inspect(), but the closure receives two arguments (context/value), by reference.

Examples
let stream = Broadcast::<i32, String>::new();
let stream = stream.inspect_ctx(|c, x| println!("{} {}", c, x));

Creates a stream that caches up to n last elements.

The elements are stored in a contiguous double-ended queue provided via slice-deque crate. The output stream yields slice views into this queue.

Examples
let stream = SimpleBroadcast::<i64>::new();
let last_3 = stream.last_n(3);
Notes
  • The context is passed down unchanged (only values are cached).
  • The return value is a Stream object (context type is unchanged; value type is [T] where T is the original value type).
  • Slices may contain less than n elements (while the queue is being filled up initially).
  • The value type of the original stream must implement Clone.
  • This method is only present if feature = "slice-deque" is enabled (on by default).

Implementors§