pub struct StreamBuilder<T: Message> { /* private fields */ }Expand description
Builder for creating filtered/transformed message streams
This builder allows chaining operations to create declarative message processing pipelines.
Implementations§
Source§impl<T: Message + Send + 'static> StreamBuilder<T>
impl<T: Message + Send + 'static> StreamBuilder<T>
Sourcepub fn filter<F>(self, predicate: F) -> Self
pub fn filter<F>(self, predicate: F) -> Self
Filter messages based on a predicate
Only messages where the predicate returns true will be passed through.
§Example
let filtered = receiver
.filter(|img| img.width >= 640)
.build();Sourcepub fn throttle(self, duration: Duration) -> Self
pub fn throttle(self, duration: Duration) -> Self
Throttle messages to a maximum rate
Messages will be rate-limited to at most one per duration.
If multiple messages arrive during the throttle period, only the
first one is kept and the rest are dropped.
§Example
// Max 10 Hz
let throttled = receiver
.throttle(Duration::from_millis(100))
.build();Sourcepub fn latest(self) -> Self
pub fn latest(self) -> Self
Keep only the latest message, dropping older ones
Useful for scenarios where you only care about the most recent state and processing old messages is wasteful.
§Example
let latest = receiver.latest().build();Sourcepub fn map<F>(self, mapper: F) -> Self
pub fn map<F>(self, mapper: F) -> Self
Transform messages using a mapping function
Applies the given function to each message. The function must return the same type (for type-changing transformations, process messages manually).
§Example
// Resize images to thumbnails (returns Image)
let thumbnails = receiver
.map(|img| resize_to_thumbnail(img))
.build();
// Normalize image brightness
let normalized = receiver
.map(|mut img| {
normalize_brightness(&mut img);
img
})
.build();Sourcepub fn debounce(self, duration: Duration) -> Self
pub fn debounce(self, duration: Duration) -> Self
Debounce messages - ignore duplicates within a time window
Only the first message is kept; subsequent messages within the debounce period are dropped. This is different from throttle - debounce resets the timer on each new message.
Useful for:
- Button press handling (ignore bounces)
- Noisy sensor data
- Rapid user input events
§Example
// Ignore button bounces within 300ms
let debounced = receiver
.debounce(Duration::from_millis(300))
.build();
// Only get one event per 300ms window
while let Some(press) = debounced.recv().await {
handle_button_press(press);
}Sourcepub fn zip<U>(self, other: Receiver<U>) -> Receiver<(T, U)>
pub fn zip<U>(self, other: Receiver<U>) -> Receiver<(T, U)>
Zip this stream with another stream
Combines two streams element-by-element, producing tuples of messages.
The resulting stream will emit (T, U) tuples when both streams have
messages available.
If one stream ends before the other, the zip stream will also end.
§Arguments
other- Another receiver to zip with this stream
§Returns
A Receiver<(T, U)> that yields tuples of messages from both streams
§Example
// Zip camera images with LiDAR scans
let mut images = ctx.subscribe(sensor::CAMERA_RGB).await?;
let mut scans = ctx.subscribe(sensor::LIDAR_SCAN).await?;
let mut zipped = images.zip(scans).build();
while let Some((image, scan)) = zipped.recv().await {
// Process synchronized image + LiDAR data
println!("Image {}x{}, LiDAR {} points",
image.width, image.height, scan.ranges.len());
}Sourcepub fn window(self, window_duration: Duration) -> Receiver<Vec<T>>where
T: Clone,
pub fn window(self, window_duration: Duration) -> Receiver<Vec<T>>where
T: Clone,
Window messages over a time period
Collects messages over a fixed time window and emits them as a vector. This is useful for time-based aggregation and analysis.
§Arguments
window_duration- Duration of the time window
§Returns
A Receiver<Vec<T>> that yields vectors of messages collected in each window
§Example
// Collect detections over 1-second windows
let mut windowed = ctx.subscribe(perception::DETECTIONS)
.window(Duration::from_secs(1));
while let Some(detections) = windowed.recv().await {
// Process all detections from the last second
println!("Received {} detections in 1 second window", detections.len());
// Example: Calculate average confidence
let avg_confidence = detections.iter()
.map(|d| d.confidence)
.sum::<f32>() / detections.len() as f32;
println!("Average confidence: {:.2}", avg_confidence);
}§Use Cases
- Rate calculation: Count messages per time window
- Moving averages: Compute statistics over windows
- Burst detection: Identify spikes in message frequency
- Time-series analysis: Analyze temporal patterns
Auto Trait Implementations§
impl<T> Freeze for StreamBuilder<T>
impl<T> !RefUnwindSafe for StreamBuilder<T>
impl<T> Send for StreamBuilder<T>
impl<T> Sync for StreamBuilder<T>
impl<T> Unpin for StreamBuilder<T>
impl<T> !UnwindSafe for StreamBuilder<T>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self> ⓘ
fn instrument(self, span: Span) -> Instrumented<Self> ⓘ
Source§fn in_current_span(self) -> Instrumented<Self> ⓘ
fn in_current_span(self) -> Instrumented<Self> ⓘ
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more