StreamBuilder

Struct StreamBuilder 

Source
pub struct StreamBuilder<T: Message> { /* private fields */ }
Expand description

Builder for creating filtered/transformed message streams

This builder allows chaining operations to create declarative message processing pipelines.

Implementations§

Source§

impl<T: Message + Send + 'static> StreamBuilder<T>

Source

pub fn new(receiver: Receiver<T>) -> Self

Create a new stream builder from a receiver

Source

pub fn filter<F>(self, predicate: F) -> Self
where F: Fn(&T) -> bool + Send + Sync + 'static,

Filter messages based on a predicate

Only messages where the predicate returns true will be passed through.

§Example
let filtered = receiver
    .filter(|img| img.width >= 640)
    .build();
Source

pub fn throttle(self, duration: Duration) -> Self

Throttle messages to a maximum rate

Messages will be rate-limited to at most one per duration. If multiple messages arrive during the throttle period, only the first one is kept and the rest are dropped.

§Example
// Max 10 Hz
let throttled = receiver
    .throttle(Duration::from_millis(100))
    .build();
Source

pub fn latest(self) -> Self

Keep only the latest message, dropping older ones

Useful for scenarios where you only care about the most recent state and processing old messages is wasteful.

§Example
let latest = receiver.latest().build();
Source

pub fn map<F>(self, mapper: F) -> Self
where F: Fn(T) -> T + Send + Sync + 'static,

Transform messages using a mapping function

Applies the given function to each message. The function must return the same type (for type-changing transformations, process messages manually).

§Example
// Resize images to thumbnails (returns Image)
let thumbnails = receiver
    .map(|img| resize_to_thumbnail(img))
    .build();

// Normalize image brightness
let normalized = receiver
    .map(|mut img| {
        normalize_brightness(&mut img);
        img
    })
    .build();
Source

pub fn debounce(self, duration: Duration) -> Self

Debounce messages - ignore duplicates within a time window

Only the first message is kept; subsequent messages within the debounce period are dropped. This is different from throttle - debounce resets the timer on each new message.

Useful for:

  • Button press handling (ignore bounces)
  • Noisy sensor data
  • Rapid user input events
§Example
// Ignore button bounces within 300ms
let debounced = receiver
    .debounce(Duration::from_millis(300))
    .build();

// Only get one event per 300ms window
while let Some(press) = debounced.recv().await {
    handle_button_press(press);
}
Source

pub fn zip<U>(self, other: Receiver<U>) -> Receiver<(T, U)>
where U: Message + Send + 'static,

Zip this stream with another stream

Combines two streams element-by-element, producing tuples of messages. The resulting stream will emit (T, U) tuples when both streams have messages available.

If one stream ends before the other, the zip stream will also end.

§Arguments
  • other - Another receiver to zip with this stream
§Returns

A Receiver<(T, U)> that yields tuples of messages from both streams

§Example
// Zip camera images with LiDAR scans
let mut images = ctx.subscribe(sensor::CAMERA_RGB).await?;
let mut scans = ctx.subscribe(sensor::LIDAR_SCAN).await?;

let mut zipped = images.zip(scans).build();

while let Some((image, scan)) = zipped.recv().await {
    // Process synchronized image + LiDAR data
    println!("Image {}x{}, LiDAR {} points",
        image.width, image.height, scan.ranges.len());
}
Source

pub fn window(self, window_duration: Duration) -> Receiver<Vec<T>>
where T: Clone,

Window messages over a time period

Collects messages over a fixed time window and emits them as a vector. This is useful for time-based aggregation and analysis.

§Arguments
  • window_duration - Duration of the time window
§Returns

A Receiver<Vec<T>> that yields vectors of messages collected in each window

§Example
// Collect detections over 1-second windows
let mut windowed = ctx.subscribe(perception::DETECTIONS)
    .window(Duration::from_secs(1));

while let Some(detections) = windowed.recv().await {
    // Process all detections from the last second
    println!("Received {} detections in 1 second window", detections.len());

    // Example: Calculate average confidence
    let avg_confidence = detections.iter()
        .map(|d| d.confidence)
        .sum::<f32>() / detections.len() as f32;
    println!("Average confidence: {:.2}", avg_confidence);
}
§Use Cases
  • Rate calculation: Count messages per time window
  • Moving averages: Compute statistics over windows
  • Burst detection: Identify spikes in message frequency
  • Time-series analysis: Analyze temporal patterns
Source

pub fn build(self) -> Receiver<T>

Build the transformed receiver

Applies all the configured transformations and returns a new receiver.

Auto Trait Implementations§

§

impl<T> Freeze for StreamBuilder<T>

§

impl<T> !RefUnwindSafe for StreamBuilder<T>

§

impl<T> Send for StreamBuilder<T>

§

impl<T> Sync for StreamBuilder<T>

§

impl<T> Unpin for StreamBuilder<T>

§

impl<T> !UnwindSafe for StreamBuilder<T>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more