pub struct Receiver<T> { /* private fields */ }Expand description
Receiver for subscribing to messages
This is a wrapper around tokio channels that provides a unified interface for both bounded and unbounded channels. Bounded channels provide backpressure to prevent memory exhaustion from fast publishers.
§Backpressure
When using bounded channels (via with_capacity()), the receiver applies
backpressure when the channel is full. This prevents memory exhaustion but
may cause publishers to block or drop messages depending on the strategy.
§Example
use mecha10::prelude::*;
// Unbounded receiver (default, no backpressure)
let mut images = ctx.subscribe(sensor::CAMERA_RGB).await?;
// Bounded receiver with capacity limit (applies backpressure)
let mut images = ctx.subscribe(sensor::CAMERA_RGB)
.await?
.with_capacity(100); // Hold max 100 messages
while let Some(image) = images.recv().await {
// Process image
}Implementations§
Source§impl<T> Receiver<T>
impl<T> Receiver<T>
Sourcepub async fn recv(&mut self) -> Option<T>
pub async fn recv(&mut self) -> Option<T>
Receive the next message
Returns None when all senders have been dropped.
Sourcepub fn try_recv(&mut self) -> Result<T, TryRecvError>
pub fn try_recv(&mut self) -> Result<T, TryRecvError>
Try to receive a message without blocking
Returns an error if the channel is empty or all senders have been dropped.
Sourcepub fn is_empty(&self) -> bool
pub fn is_empty(&self) -> bool
Check if the channel is currently empty
Note: This is a point-in-time check and may change immediately after.
Sourcepub fn len(&self) -> usize
pub fn len(&self) -> usize
Get the number of messages currently in the channel
This is useful for monitoring backpressure and queue depth.
§Example
let depth = rx.len();
if depth > 50 {
warn!("Channel backlog growing: {} messages", depth);
}Sourcepub fn with_capacity(self, capacity: usize) -> Selfwhere
T: 'static + Send,
pub fn with_capacity(self, capacity: usize) -> Selfwhere
T: 'static + Send,
Convert an unbounded receiver to a bounded one with the specified capacity
This creates a new bounded channel and spawns a task to forward messages. Messages are dropped if the new channel fills up, applying backpressure.
§Arguments
capacity- Maximum number of messages to buffer
§Backpressure Strategy
When the bounded channel is full, the oldest message is dropped to make room for new messages. This “drop oldest” strategy ensures recent data is always available.
§Example
let mut images = ctx.subscribe(sensor::CAMERA_RGB)
.await?
.with_capacity(50); // Limit to 50 images in queue
// If publisher sends faster than we process, old images are dropped
while let Some(image) = images.recv().await {
// Always get relatively recent images
}Sourcepub fn with_drop_oldest(self, capacity: usize) -> Selfwhere
T: 'static + Send,
pub fn with_drop_oldest(self, capacity: usize) -> Selfwhere
T: 'static + Send,
Convert an unbounded receiver to a bounded one that drops oldest messages
Similar to with_capacity(), but explicitly uses a “drop oldest” strategy
when the buffer is full. This is useful for real-time data where the most
recent value is more important than historical data.
§Arguments
capacity- Maximum number of messages to buffer
§Example
// For sensor data, we want the latest reading
let mut lidar = ctx.subscribe(sensor::LIDAR_SCAN)
.await?
.with_drop_oldest(10); // Keep only 10 most recent scans
while let Some(scan) = lidar.recv().await {
// Always process recent data
}Sourcepub fn capacity(&self) -> Option<usize>
pub fn capacity(&self) -> Option<usize>
Get a reference to the maximum capacity if this is a bounded channel
Returns None for unbounded channels or the capacity for bounded ones.
§Example
if let Some(cap) = rx.capacity() {
println!("Channel capacity: {}", cap);
println!("Current usage: {}/{}", rx.len(), cap);
println!("Utilization: {:.1}%", (rx.len() as f32 / cap as f32) * 100.0);
} else {
println!("Unbounded channel, {} messages queued", rx.len());
}Sourcepub fn utilization(&self) -> Option<f32>
pub fn utilization(&self) -> Option<f32>
Get the current utilization ratio (0.0 to 1.0) for bounded channels
Returns None for unbounded channels.
§Example
if let Some(utilization) = rx.utilization() {
if utilization > 0.8 {
warn!("Channel is {}% full, backpressure may occur", utilization * 100.0);
}
}Sourcepub fn map<U, F>(self, f: F) -> Receiver<U>
pub fn map<U, F>(self, f: F) -> Receiver<U>
Map (transform) messages as they are received
Creates a new receiver that applies a transformation function to each message. This is useful for data transformations like resizing images, converting units, or extracting fields from complex messages.
§Arguments
f- Transformation function to apply to each message
§Example
// Resize camera images on the fly
let mut resized_images = ctx.subscribe(sensor::CAMERA_RGB)
.await?
.map(|img| resize_image(img, 320, 240));
while let Some(small_image) = resized_images.recv().await {
// Process resized image
}
// Extract specific fields from complex messages
let mut speeds = ctx.subscribe(motor::STATUS)
.await?
.map(|status| status.current_speed);
while let Some(speed) = speeds.recv().await {
println!("Current speed: {}", speed);
}Sourcepub fn filter<F>(self, predicate: F) -> Receiver<T>
pub fn filter<F>(self, predicate: F) -> Receiver<T>
Filter messages based on a predicate
Creates a new receiver that only passes through messages that satisfy the predicate. This is useful for filtering sensor data, ignoring invalid readings, or implementing conditional processing.
§Arguments
predicate- Function that returnstruefor messages to keep,falseto drop
§Example
// Only process high-speed motor data
let mut high_speeds = ctx.subscribe(motor::STATUS)
.await?
.filter(|status| status.speed > 0.5);
while let Some(status) = high_speeds.recv().await {
println!("High speed detected: {}", status.speed);
}
// Filter out invalid lidar readings
let mut valid_scans = ctx.subscribe(sensor::LIDAR_SCAN)
.await?
.filter(|scan| scan.ranges.iter().all(|&r| r > 0.0 && r < 100.0));
while let Some(scan) = valid_scans.recv().await {
// All ranges are valid
}Sourcepub fn filter_map<U, F>(self, f: F) -> Receiver<U>
pub fn filter_map<U, F>(self, f: F) -> Receiver<U>
Filter and map messages in one operation
Combines filtering and mapping into a single efficient operation. The function
returns Some(U) for messages to keep (and transform), or None to drop.
This is more efficient than chaining .filter() and .map() separately.
§Arguments
f- Function that returnsSome(transformed)to keep,Noneto drop
§Example
// Extract speed only when robot is moving
let mut active_speeds = ctx.subscribe(motor::STATUS)
.await?
.filter_map(|status| {
if status.speed > 0.1 {
Some(status.speed)
} else {
None
}
});
while let Some(speed) = active_speeds.recv().await {
println!("Robot moving at: {}", speed);
}
// Parse valid sensor readings
let mut parsed_data = ctx.subscribe(sensor::RAW_DATA)
.await?
.filter_map(|raw| parse_sensor_data(&raw).ok());
while let Some(data) = parsed_data.recv().await {
// Process successfully parsed data
}Sourcepub fn debounce(self, duration: Duration) -> Receiver<T>where
T: Send + 'static,
pub fn debounce(self, duration: Duration) -> Receiver<T>where
T: Send + 'static,
Debounce (rate-limit) messages to prevent rapid bursts
Creates a new receiver that only emits a message if a certain duration has passed since the last emission. This is useful for reducing update frequency of high-rate sensors or preventing UI update storms.
Strategy: “Emit latest after quiet period”
- Messages are buffered during the debounce period
- Only the most recent message is emitted after the period expires
- Earlier messages within the period are dropped
§Arguments
duration- Minimum time between emissions
§Example
// Reduce camera updates to max 10 Hz (100ms debounce)
let mut debounced_images = ctx.subscribe(sensor::CAMERA_RGB)
.await?
.debounce(Duration::from_millis(100));
while let Some(image) = debounced_images.recv().await {
// Process at most 10 images/sec
}
// Prevent rapid motor status updates
let mut stable_status = ctx.subscribe(motor::STATUS)
.await?
.debounce(Duration::from_millis(50));
while let Some(status) = stable_status.recv().await {
// Only get updates every 50ms
}Sourcepub fn window(self, window_duration: Duration) -> Receiver<Vec<T>>where
T: Send + 'static,
pub fn window(self, window_duration: Duration) -> Receiver<Vec<T>>where
T: Send + 'static,
Window messages over a time period
Creates a new receiver that collects messages over a fixed time window and emits them as a batch (Vec) at the end of each window. This is useful for:
- Aggregating sensor readings (averaging, min/max)
- Batch processing for efficiency
- Time-series analysis
- Rate calculation (count messages per window)
§Behavior
- Fixed Windows: Each window has a fixed duration
- Non-overlapping: Windows don’t overlap (tumbling window)
- Batch Emission: All messages in window emitted as Vec at window end
- Empty Windows: Empty Vec emitted if no messages received
- Termination: Emits final partial window when source ends
§Arguments
window_duration- Duration of each time window
§Example
// Average IMU readings over 100ms windows
let mut windowed = ctx.subscribe(sensor::IMU)
.await?
.window(Duration::from_millis(100));
while let Some(batch) = windowed.recv().await {
if !batch.is_empty() {
let avg = batch.iter().map(|imu| imu.accel_x).sum::<f32>() / batch.len() as f32;
println!("Average accel: {} (n={})", avg, batch.len());
}
}
// Count messages per second
let mut counts = ctx.subscribe("/events")
.await?
.window(Duration::from_secs(1))
.map(|batch| batch.len());
while let Some(count) = counts.recv().await {
println!("Events/sec: {}", count);
}
// Batch process for efficiency
let mut batches = ctx.subscribe("/data")
.await?
.window(Duration::from_millis(50));
while let Some(batch) = batches.recv().await {
// Process entire batch at once (more efficient than one-by-one)
process_batch(&batch).await?;
}§Performance
- Memory: Buffers all messages within window (use reasonable window sizes)
- Latency: Adds latency equal to window duration
- CPU: Minimal overhead for window management
§Caveats
- Window size affects memory usage (smaller windows = less memory)
- All messages are delivered at window end (batch latency)
- For sliding windows, consider multiple overlapping
.window()subscriptions
Sourcepub fn zip<U>(self, other: Receiver<U>) -> Receiver<(T, U)>
pub fn zip<U>(self, other: Receiver<U>) -> Receiver<(T, U)>
Combine two streams by pairing their messages
Creates a new stream that emits pairs of messages from two input streams.
The combined stream emits (T, U) tuples when both streams have messages available.
§Behavior
- Paired Emission: Waits for both streams to have messages, then emits the pair
- Buffering: Buffers messages from faster stream until slower stream catches up
- Termination: Stops when either stream ends
- Capacity: Uses the smaller capacity of the two input streams
§Use Cases
- Combining sensor readings (e.g., camera + LiDAR)
- Synchronizing multiple data sources
- Correlating events from different streams
- Sensor fusion
§Example
use mecha10::prelude::*;
// Subscribe to two sensor streams
let camera = ctx.subscribe("/sensor/camera").await?;
let lidar = ctx.subscribe("/sensor/lidar").await?;
// Combine them
let mut fused = camera.zip(lidar);
// Process paired messages
while let Some((img, scan)) = fused.recv().await {
// Both sensors have data - process together
process_sensor_fusion(&img, &scan).await?;
}§Performance
- Memory: Buffers messages from faster stream (bounded by capacity)
- Latency: Adds latency equal to slower stream’s inter-message delay
- CPU: Minimal overhead for pairing logic
§Caveats
- If streams have very different rates, slower stream will be the bottleneck
- Buffer can fill up if rate difference is sustained
- Consider
.debounce()on faster stream if rate mismatch is severe
Trait Implementations§
Source§impl<T> From<UnboundedReceiver<T>> for Receiver<T>
impl<T> From<UnboundedReceiver<T>> for Receiver<T>
Source§fn from(inner: UnboundedReceiver<T>) -> Self
fn from(inner: UnboundedReceiver<T>) -> Self
Source§impl<T: Message + Send + 'static> ReceiverExt<T> for Receiver<T>
impl<T: Message + Send + 'static> ReceiverExt<T> for Receiver<T>
Source§fn stream(self) -> StreamBuilder<T>
fn stream(self) -> StreamBuilder<T>
Source§fn filter<F>(self, predicate: F) -> StreamBuilder<T>
fn filter<F>(self, predicate: F) -> StreamBuilder<T>
Source§fn map<F>(self, mapper: F) -> StreamBuilder<T>
fn map<F>(self, mapper: F) -> StreamBuilder<T>
Source§fn debounce(self, duration: Duration) -> StreamBuilder<T>
fn debounce(self, duration: Duration) -> StreamBuilder<T>
Source§fn throttle(self, duration: Duration) -> StreamBuilder<T>
fn throttle(self, duration: Duration) -> StreamBuilder<T>
Source§fn latest(self) -> StreamBuilder<T>
fn latest(self) -> StreamBuilder<T>
Source§fn batch(self, size: usize, timeout: Duration) -> BatchBuilder<T>where
T: Clone,
fn batch(self, size: usize, timeout: Duration) -> BatchBuilder<T>where
T: Clone,
Auto Trait Implementations§
impl<T> Freeze for Receiver<T>
impl<T> RefUnwindSafe for Receiver<T>
impl<T> Send for Receiver<T>where
T: Send,
impl<T> Sync for Receiver<T>where
T: Send,
impl<T> Unpin for Receiver<T>
impl<T> UnwindSafe for Receiver<T>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self> ⓘ
fn instrument(self, span: Span) -> Instrumented<Self> ⓘ
Source§fn in_current_span(self) -> Instrumented<Self> ⓘ
fn in_current_span(self) -> Instrumented<Self> ⓘ
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more