ReceiverExt

Trait ReceiverExt 

Source
pub trait ReceiverExt<T: Message>: Sized {
    // Required methods
    fn stream(self) -> StreamBuilder<T>;
    fn filter<F>(self, predicate: F) -> StreamBuilder<T>
       where F: Fn(&T) -> bool + Send + Sync + 'static;
    fn map<F>(self, mapper: F) -> StreamBuilder<T>
       where F: Fn(T) -> T + Send + Sync + 'static;
    fn debounce(self, duration: Duration) -> StreamBuilder<T>;
    fn throttle(self, duration: Duration) -> StreamBuilder<T>;
    fn latest(self) -> StreamBuilder<T>;
    fn batch(self, size: usize, timeout: Duration) -> BatchBuilder<T>
       where T: Clone;
    fn zip<U>(self, other: Receiver<U>) -> Receiver<(T, U)>
       where U: Message + Send + 'static;
    fn window(self, window_duration: Duration) -> Receiver<Vec<T>>
       where T: Clone;
}
Expand description

Extension trait to add stream combinator methods to Receiver

Required Methods§

Source

fn stream(self) -> StreamBuilder<T>

Start building a stream pipeline

Source

fn filter<F>(self, predicate: F) -> StreamBuilder<T>
where F: Fn(&T) -> bool + Send + Sync + 'static,

Filter messages based on a predicate

Source

fn map<F>(self, mapper: F) -> StreamBuilder<T>
where F: Fn(T) -> T + Send + Sync + 'static,

Transform messages using a mapping function

Source

fn debounce(self, duration: Duration) -> StreamBuilder<T>

Debounce messages - ignore duplicates within a time window

Source

fn throttle(self, duration: Duration) -> StreamBuilder<T>

Throttle messages to a maximum rate

Source

fn latest(self) -> StreamBuilder<T>

Keep only the latest message

Source

fn batch(self, size: usize, timeout: Duration) -> BatchBuilder<T>
where T: Clone,

Batch messages together

Collects messages into batches of size size, or until timeout elapses, whichever comes first. Returns a BatchBuilder which can be built into a Receiver<Vec>.

§Arguments
  • size - Maximum number of messages per batch
  • timeout - Maximum time to wait before emitting a partial batch
§Example
// Collect messages into batches of 10, or every 1 second
let mut batched = ctx.subscribe(sensor::IMU)
    .await?
    .batch(10, Duration::from_secs(1))
    .build();

while let Some(batch) = batched.recv().await {
    println!("Received batch of {} messages", batch.len());
    // Process batch efficiently
}
Source

fn zip<U>(self, other: Receiver<U>) -> Receiver<(T, U)>
where U: Message + Send + 'static,

Zip this stream with another stream

Source

fn window(self, window_duration: Duration) -> Receiver<Vec<T>>
where T: Clone,

Window messages over a time period

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementors§

Source§

impl<T: Message + Send + 'static> ReceiverExt<T> for Receiver<T>