Receiver

Struct Receiver 

Source
pub struct Receiver<T> { /* private fields */ }
Expand description

Receiver for subscribing to messages

This is a wrapper around tokio channels that provides a unified interface for both bounded and unbounded channels. Bounded channels provide backpressure to prevent memory exhaustion from fast publishers.

§Backpressure

When using bounded channels (via with_capacity()), the receiver applies backpressure when the channel is full. This prevents memory exhaustion but may cause publishers to block or drop messages depending on the strategy.

§Example

use mecha10::prelude::*;

// Unbounded receiver (default, no backpressure)
let mut images = ctx.subscribe(sensor::CAMERA_RGB).await?;

// Bounded receiver with capacity limit (applies backpressure)
let mut images = ctx.subscribe(sensor::CAMERA_RGB)
    .await?
    .with_capacity(100);  // Hold max 100 messages

while let Some(image) = images.recv().await {
    // Process image
}

Implementations§

Source§

impl<T> Receiver<T>

Source

pub async fn recv(&mut self) -> Option<T>

Receive the next message

Returns None when all senders have been dropped.

Source

pub fn try_recv(&mut self) -> Result<T, TryRecvError>

Try to receive a message without blocking

Returns an error if the channel is empty or all senders have been dropped.

Source

pub fn is_empty(&self) -> bool

Check if the channel is currently empty

Note: This is a point-in-time check and may change immediately after.

Source

pub fn len(&self) -> usize

Get the number of messages currently in the channel

This is useful for monitoring backpressure and queue depth.

§Example
let depth = rx.len();
if depth > 50 {
    warn!("Channel backlog growing: {} messages", depth);
}
Source

pub fn with_capacity(self, capacity: usize) -> Self
where T: 'static + Send,

Convert an unbounded receiver to a bounded one with the specified capacity

This creates a new bounded channel and spawns a task to forward messages. Messages are dropped if the new channel fills up, applying backpressure.

§Arguments
  • capacity - Maximum number of messages to buffer
§Backpressure Strategy

When the bounded channel is full, the oldest message is dropped to make room for new messages. This “drop oldest” strategy ensures recent data is always available.

§Example
let mut images = ctx.subscribe(sensor::CAMERA_RGB)
    .await?
    .with_capacity(50);  // Limit to 50 images in queue

// If publisher sends faster than we process, old images are dropped
while let Some(image) = images.recv().await {
    // Always get relatively recent images
}
Source

pub fn with_drop_oldest(self, capacity: usize) -> Self
where T: 'static + Send,

Convert an unbounded receiver to a bounded one that drops oldest messages

Similar to with_capacity(), but explicitly uses a “drop oldest” strategy when the buffer is full. This is useful for real-time data where the most recent value is more important than historical data.

§Arguments
  • capacity - Maximum number of messages to buffer
§Example
// For sensor data, we want the latest reading
let mut lidar = ctx.subscribe(sensor::LIDAR_SCAN)
    .await?
    .with_drop_oldest(10);  // Keep only 10 most recent scans

while let Some(scan) = lidar.recv().await {
    // Always process recent data
}
Source

pub fn capacity(&self) -> Option<usize>

Get a reference to the maximum capacity if this is a bounded channel

Returns None for unbounded channels or the capacity for bounded ones.

§Example
if let Some(cap) = rx.capacity() {
    println!("Channel capacity: {}", cap);
    println!("Current usage: {}/{}", rx.len(), cap);
    println!("Utilization: {:.1}%", (rx.len() as f32 / cap as f32) * 100.0);
} else {
    println!("Unbounded channel, {} messages queued", rx.len());
}
Source

pub fn utilization(&self) -> Option<f32>

Get the current utilization ratio (0.0 to 1.0) for bounded channels

Returns None for unbounded channels.

§Example
if let Some(utilization) = rx.utilization() {
    if utilization > 0.8 {
        warn!("Channel is {}% full, backpressure may occur", utilization * 100.0);
    }
}
Source

pub fn map<U, F>(self, f: F) -> Receiver<U>
where F: FnMut(T) -> U + Send + 'static, T: Send + 'static, U: Send + 'static,

Map (transform) messages as they are received

Creates a new receiver that applies a transformation function to each message. This is useful for data transformations like resizing images, converting units, or extracting fields from complex messages.

§Arguments
  • f - Transformation function to apply to each message
§Example
// Resize camera images on the fly
let mut resized_images = ctx.subscribe(sensor::CAMERA_RGB)
    .await?
    .map(|img| resize_image(img, 320, 240));

while let Some(small_image) = resized_images.recv().await {
    // Process resized image
}

// Extract specific fields from complex messages
let mut speeds = ctx.subscribe(motor::STATUS)
    .await?
    .map(|status| status.current_speed);

while let Some(speed) = speeds.recv().await {
    println!("Current speed: {}", speed);
}
Source

pub fn filter<F>(self, predicate: F) -> Receiver<T>
where F: FnMut(&T) -> bool + Send + 'static, T: Send + 'static,

Filter messages based on a predicate

Creates a new receiver that only passes through messages that satisfy the predicate. This is useful for filtering sensor data, ignoring invalid readings, or implementing conditional processing.

§Arguments
  • predicate - Function that returns true for messages to keep, false to drop
§Example
// Only process high-speed motor data
let mut high_speeds = ctx.subscribe(motor::STATUS)
    .await?
    .filter(|status| status.speed > 0.5);

while let Some(status) = high_speeds.recv().await {
    println!("High speed detected: {}", status.speed);
}

// Filter out invalid lidar readings
let mut valid_scans = ctx.subscribe(sensor::LIDAR_SCAN)
    .await?
    .filter(|scan| scan.ranges.iter().all(|&r| r > 0.0 && r < 100.0));

while let Some(scan) = valid_scans.recv().await {
    // All ranges are valid
}
Source

pub fn filter_map<U, F>(self, f: F) -> Receiver<U>
where F: FnMut(T) -> Option<U> + Send + 'static, T: Send + 'static, U: Send + 'static,

Filter and map messages in one operation

Combines filtering and mapping into a single efficient operation. The function returns Some(U) for messages to keep (and transform), or None to drop. This is more efficient than chaining .filter() and .map() separately.

§Arguments
  • f - Function that returns Some(transformed) to keep, None to drop
§Example
// Extract speed only when robot is moving
let mut active_speeds = ctx.subscribe(motor::STATUS)
    .await?
    .filter_map(|status| {
        if status.speed > 0.1 {
            Some(status.speed)
        } else {
            None
        }
    });

while let Some(speed) = active_speeds.recv().await {
    println!("Robot moving at: {}", speed);
}

// Parse valid sensor readings
let mut parsed_data = ctx.subscribe(sensor::RAW_DATA)
    .await?
    .filter_map(|raw| parse_sensor_data(&raw).ok());

while let Some(data) = parsed_data.recv().await {
    // Process successfully parsed data
}
Source

pub fn debounce(self, duration: Duration) -> Receiver<T>
where T: Send + 'static,

Debounce (rate-limit) messages to prevent rapid bursts

Creates a new receiver that only emits a message if a certain duration has passed since the last emission. This is useful for reducing update frequency of high-rate sensors or preventing UI update storms.

Strategy: “Emit latest after quiet period”

  • Messages are buffered during the debounce period
  • Only the most recent message is emitted after the period expires
  • Earlier messages within the period are dropped
§Arguments
  • duration - Minimum time between emissions
§Example
// Reduce camera updates to max 10 Hz (100ms debounce)
let mut debounced_images = ctx.subscribe(sensor::CAMERA_RGB)
    .await?
    .debounce(Duration::from_millis(100));

while let Some(image) = debounced_images.recv().await {
    // Process at most 10 images/sec
}

// Prevent rapid motor status updates
let mut stable_status = ctx.subscribe(motor::STATUS)
    .await?
    .debounce(Duration::from_millis(50));

while let Some(status) = stable_status.recv().await {
    // Only get updates every 50ms
}
Source

pub fn window(self, window_duration: Duration) -> Receiver<Vec<T>>
where T: Send + 'static,

Window messages over a time period

Creates a new receiver that collects messages over a fixed time window and emits them as a batch (Vec) at the end of each window. This is useful for:

  • Aggregating sensor readings (averaging, min/max)
  • Batch processing for efficiency
  • Time-series analysis
  • Rate calculation (count messages per window)
§Behavior
  • Fixed Windows: Each window has a fixed duration
  • Non-overlapping: Windows don’t overlap (tumbling window)
  • Batch Emission: All messages in window emitted as Vec at window end
  • Empty Windows: Empty Vec emitted if no messages received
  • Termination: Emits final partial window when source ends
§Arguments
  • window_duration - Duration of each time window
§Example
// Average IMU readings over 100ms windows
let mut windowed = ctx.subscribe(sensor::IMU)
    .await?
    .window(Duration::from_millis(100));

while let Some(batch) = windowed.recv().await {
    if !batch.is_empty() {
        let avg = batch.iter().map(|imu| imu.accel_x).sum::<f32>() / batch.len() as f32;
        println!("Average accel: {} (n={})", avg, batch.len());
    }
}

// Count messages per second
let mut counts = ctx.subscribe("/events")
    .await?
    .window(Duration::from_secs(1))
    .map(|batch| batch.len());

while let Some(count) = counts.recv().await {
    println!("Events/sec: {}", count);
}

// Batch process for efficiency
let mut batches = ctx.subscribe("/data")
    .await?
    .window(Duration::from_millis(50));

while let Some(batch) = batches.recv().await {
    // Process entire batch at once (more efficient than one-by-one)
    process_batch(&batch).await?;
}
§Performance
  • Memory: Buffers all messages within window (use reasonable window sizes)
  • Latency: Adds latency equal to window duration
  • CPU: Minimal overhead for window management
§Caveats
  • Window size affects memory usage (smaller windows = less memory)
  • All messages are delivered at window end (batch latency)
  • For sliding windows, consider multiple overlapping .window() subscriptions
Source

pub fn zip<U>(self, other: Receiver<U>) -> Receiver<(T, U)>
where T: Send + 'static, U: Send + 'static,

Combine two streams by pairing their messages

Creates a new stream that emits pairs of messages from two input streams. The combined stream emits (T, U) tuples when both streams have messages available.

§Behavior
  • Paired Emission: Waits for both streams to have messages, then emits the pair
  • Buffering: Buffers messages from faster stream until slower stream catches up
  • Termination: Stops when either stream ends
  • Capacity: Uses the smaller capacity of the two input streams
§Use Cases
  • Combining sensor readings (e.g., camera + LiDAR)
  • Synchronizing multiple data sources
  • Correlating events from different streams
  • Sensor fusion
§Example
use mecha10::prelude::*;

// Subscribe to two sensor streams
let camera = ctx.subscribe("/sensor/camera").await?;
let lidar = ctx.subscribe("/sensor/lidar").await?;

// Combine them
let mut fused = camera.zip(lidar);

// Process paired messages
while let Some((img, scan)) = fused.recv().await {
    // Both sensors have data - process together
    process_sensor_fusion(&img, &scan).await?;
}
§Performance
  • Memory: Buffers messages from faster stream (bounded by capacity)
  • Latency: Adds latency equal to slower stream’s inter-message delay
  • CPU: Minimal overhead for pairing logic
§Caveats
  • If streams have very different rates, slower stream will be the bottleneck
  • Buffer can fill up if rate difference is sustained
  • Consider .debounce() on faster stream if rate mismatch is severe

Trait Implementations§

Source§

impl<T> From<Receiver<T>> for Receiver<T>

Source§

fn from(inner: Receiver<T>) -> Self

Converts to this type from the input type.
Source§

impl<T> From<UnboundedReceiver<T>> for Receiver<T>

Source§

fn from(inner: UnboundedReceiver<T>) -> Self

Converts to this type from the input type.
Source§

impl<T: Message + Send + 'static> ReceiverExt<T> for Receiver<T>

Source§

fn stream(self) -> StreamBuilder<T>

Start building a stream pipeline
Source§

fn filter<F>(self, predicate: F) -> StreamBuilder<T>
where F: Fn(&T) -> bool + Send + Sync + 'static,

Filter messages based on a predicate
Source§

fn map<F>(self, mapper: F) -> StreamBuilder<T>
where F: Fn(T) -> T + Send + Sync + 'static,

Transform messages using a mapping function
Source§

fn debounce(self, duration: Duration) -> StreamBuilder<T>

Debounce messages - ignore duplicates within a time window
Source§

fn throttle(self, duration: Duration) -> StreamBuilder<T>

Throttle messages to a maximum rate
Source§

fn latest(self) -> StreamBuilder<T>

Keep only the latest message
Source§

fn batch(self, size: usize, timeout: Duration) -> BatchBuilder<T>
where T: Clone,

Batch messages together Read more
Source§

fn zip<U>(self, other: Receiver<U>) -> Receiver<(T, U)>
where U: Message + Send + 'static,

Zip this stream with another stream
Source§

fn window(self, window_duration: Duration) -> Receiver<Vec<T>>
where T: Clone,

Window messages over a time period

Auto Trait Implementations§

§

impl<T> Freeze for Receiver<T>

§

impl<T> RefUnwindSafe for Receiver<T>

§

impl<T> Send for Receiver<T>
where T: Send,

§

impl<T> Sync for Receiver<T>
where T: Send,

§

impl<T> Unpin for Receiver<T>

§

impl<T> UnwindSafe for Receiver<T>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more