mecha10_core/context/receiver.rs
1//! Receiver for subscribing to messages
2//!
3//! Provides a type-safe wrapper around tokio channels for receiving messages
4//! from subscribed topics with support for both bounded and unbounded channels.
5//!
6//! # Backpressure Strategies
7//!
8//! The receiver supports multiple backpressure strategies to prevent memory exhaustion
9//! from fast publishers. Choose based on your use case:
10//!
11//! ## 1. Unbounded (Default)
12//! No backpressure - messages queue indefinitely. Use for low-rate topics.
13//!
14//! ```rust,no_run
15//! # use mecha10::prelude::*;
16//! # async fn example(ctx: &Context) -> Result<()> {
17//! let mut rx = ctx.subscribe(sensor::CAMERA_RGB).await?;
18//! // Unlimited queue - risk of memory growth
19//! # Ok(())
20//! # }
21//! ```
22//!
23//! ## 2. Bounded with Blocking (`.with_capacity()`)
24//! Publisher waits when buffer full. Use when all messages are important.
25//!
26//! ```rust,no_run
27//! # use mecha10::prelude::*;
28//! # async fn example(ctx: &Context) -> Result<()> {
29//! let mut rx = ctx.subscribe(sensor::CAMERA_RGB)
30//! .await?
31//! .with_capacity(50); // Block publisher when 50 messages queued
32//! # Ok(())
33//! # }
34//! ```
35//!
36//! ## 3. Bounded with Drop Oldest (`.with_drop_oldest()`)
37//! Drops old messages when buffer full. Use for real-time sensor data.
38//!
39//! ```rust,no_run
40//! # use mecha10::prelude::*;
41//! # async fn example(ctx: &Context) -> Result<()> {
42//! let mut rx = ctx.subscribe(sensor::LIDAR_SCAN)
43//! .await?
44//! .with_drop_oldest(10); // Keep only 10 newest scans
45//! # Ok(())
46//! # }
47//! ```
48//!
49//! ## 4. Rate Limiting (`.debounce()`)
50//! Limit emission rate. Use to reduce update frequency.
51//!
52//! ```rust,no_run
53//! # use mecha10::prelude::*;
54//! # use std::time::Duration;
55//! # async fn example(ctx: &Context) -> Result<()> {
56//! let mut rx = ctx.subscribe(sensor::CAMERA_RGB)
57//! .await?
58//! .debounce(Duration::from_millis(100)); // Max 10 Hz
59//! # Ok(())
60//! # }
61//! ```
62//!
63//! ## Stream Combinators
64//!
65//! Transform and combine streams with powerful combinators:
66//!
67//! ```rust,no_run
68//! # use mecha10::prelude::*;
69//! # use std::time::Duration;
70//! # async fn example(ctx: &Context) -> Result<()> {
71//! // Transform messages
72//! let mut scaled = ctx.subscribe(sensor::CAMERA_RGB)
73//! .await?
74//! .map(|img| scale_down(&img));
75//!
76//! // Filter messages
77//! let mut valid = ctx.subscribe(sensor::LIDAR)
78//! .await?
79//! .filter(|scan| scan.range > 0.1);
80//!
81//! // Combine streams (sensor fusion)
82//! let camera = ctx.subscribe(sensor::CAMERA_RGB).await?;
83//! let lidar = ctx.subscribe(sensor::LIDAR).await?;
84//! let mut fused = camera.zip(lidar);
85//!
86//! // Window messages over time (for aggregation)
87//! let mut windowed = ctx.subscribe(sensor::IMU)
88//! .await?
89//! .window(Duration::from_millis(100)) // Collect 100ms batches
90//! .map(|batch| calculate_average(&batch)); // Aggregate
91//!
92//! // Chain combinators
93//! let mut processed = ctx.subscribe(sensor::CAMERA_RGB)
94//! .await?
95//! .with_drop_oldest(20) // Buffer 20 latest
96//! .debounce(Duration::from_millis(100)) // Max 10 Hz
97//! .filter(|img| img.width == 640) // Only 640px wide
98//! .map(|img| preprocess(&img)); // Transform
99//! # Ok(())
100//! # }
101//! ```
102//!
103//! # Monitoring Backpressure
104//!
105//! Use `.len()`, `.capacity()`, and `.utilization()` to monitor queue health:
106//!
107//! ```rust,no_run
108//! # use mecha10::prelude::*;
109//! # async fn example(rx: &mut Receiver<String>) {
110//! if let Some(util) = rx.utilization() {
111//! if util > 0.8 {
112//! println!("WARNING: Queue is {}% full!", util * 100.0);
113//! }
114//! }
115//! # }
116//! ```
117
118use std::time::Duration;
119use tokio::sync::mpsc;
120use tokio::time::{sleep, Instant};
121
122/// Internal channel type for the receiver
123pub(crate) enum ReceiverInner<T> {
124 /// Unbounded channel (no backpressure, risk of memory exhaustion)
125 Unbounded(mpsc::UnboundedReceiver<T>),
126 /// Bounded channel (with backpressure, prevents memory exhaustion)
127 Bounded(mpsc::Receiver<T>),
128}
129
130/// Receiver for subscribing to messages
131///
132/// This is a wrapper around tokio channels that provides a unified interface
133/// for both bounded and unbounded channels. Bounded channels provide backpressure
134/// to prevent memory exhaustion from fast publishers.
135///
136/// # Backpressure
137///
138/// When using bounded channels (via `with_capacity()`), the receiver applies
139/// backpressure when the channel is full. This prevents memory exhaustion but
140/// may cause publishers to block or drop messages depending on the strategy.
141///
142/// # Example
143///
144/// ```rust
145/// use mecha10::prelude::*;
146///
147/// # async fn example(ctx: &Context) -> Result<()> {
148/// // Unbounded receiver (default, no backpressure)
149/// let mut images = ctx.subscribe(sensor::CAMERA_RGB).await?;
150///
151/// // Bounded receiver with capacity limit (applies backpressure)
152/// let mut images = ctx.subscribe(sensor::CAMERA_RGB)
153/// .await?
154/// .with_capacity(100); // Hold max 100 messages
155///
156/// while let Some(image) = images.recv().await {
157/// // Process image
158/// }
159/// # Ok(())
160/// # }
161/// ```
162pub struct Receiver<T> {
163 pub(crate) inner: ReceiverInner<T>,
164}
165
166impl<T> Receiver<T> {
167 /// Receive the next message
168 ///
169 /// Returns `None` when all senders have been dropped.
170 pub async fn recv(&mut self) -> Option<T> {
171 match &mut self.inner {
172 ReceiverInner::Unbounded(rx) => rx.recv().await,
173 ReceiverInner::Bounded(rx) => rx.recv().await,
174 }
175 }
176
177 /// Try to receive a message without blocking
178 ///
179 /// Returns an error if the channel is empty or all senders have been dropped.
180 pub fn try_recv(&mut self) -> std::result::Result<T, mpsc::error::TryRecvError> {
181 match &mut self.inner {
182 ReceiverInner::Unbounded(rx) => rx.try_recv(),
183 ReceiverInner::Bounded(rx) => rx.try_recv(),
184 }
185 }
186
187 /// Check if the channel is currently empty
188 ///
189 /// Note: This is a point-in-time check and may change immediately after.
190 pub fn is_empty(&self) -> bool {
191 match &self.inner {
192 ReceiverInner::Unbounded(rx) => rx.is_empty(),
193 ReceiverInner::Bounded(rx) => rx.is_empty(),
194 }
195 }
196
197 /// Get the number of messages currently in the channel
198 ///
199 /// This is useful for monitoring backpressure and queue depth.
200 ///
201 /// # Example
202 ///
203 /// ```rust
204 /// # use mecha10::prelude::*;
205 /// # async fn example(mut rx: Receiver<String>) {
206 /// let depth = rx.len();
207 /// if depth > 50 {
208 /// warn!("Channel backlog growing: {} messages", depth);
209 /// }
210 /// # }
211 /// ```
212 pub fn len(&self) -> usize {
213 match &self.inner {
214 ReceiverInner::Unbounded(rx) => rx.len(),
215 ReceiverInner::Bounded(rx) => rx.len(),
216 }
217 }
218
219 /// Convert an unbounded receiver to a bounded one with the specified capacity
220 ///
221 /// This creates a new bounded channel and spawns a task to forward messages.
222 /// Messages are dropped if the new channel fills up, applying backpressure.
223 ///
224 /// # Arguments
225 ///
226 /// * `capacity` - Maximum number of messages to buffer
227 ///
228 /// # Backpressure Strategy
229 ///
230 /// When the bounded channel is full, the oldest message is dropped to make
231 /// room for new messages. This "drop oldest" strategy ensures recent data
232 /// is always available.
233 ///
234 /// # Example
235 ///
236 /// ```rust
237 /// # use mecha10::prelude::*;
238 /// # async fn example(ctx: &Context) -> Result<()> {
239 /// let mut images = ctx.subscribe(sensor::CAMERA_RGB)
240 /// .await?
241 /// .with_capacity(50); // Limit to 50 images in queue
242 ///
243 /// // If publisher sends faster than we process, old images are dropped
244 /// while let Some(image) = images.recv().await {
245 /// // Always get relatively recent images
246 /// }
247 /// # Ok(())
248 /// # }
249 /// ```
250 pub fn with_capacity(self, capacity: usize) -> Self
251 where
252 T: 'static + Send,
253 {
254 match self.inner {
255 ReceiverInner::Bounded(_) => {
256 // Already bounded, no conversion needed
257 self
258 }
259 ReceiverInner::Unbounded(mut unbounded_rx) => {
260 let (tx, rx) = mpsc::channel(capacity);
261
262 // Spawn task to forward messages with backpressure
263 tokio::spawn(async move {
264 while let Some(msg) = unbounded_rx.recv().await {
265 // Try to send with backpressure
266 // If channel is full, this will wait until space is available
267 if tx.send(msg).await.is_err() {
268 // Receiver dropped, stop forwarding
269 break;
270 }
271 }
272 });
273
274 Self {
275 inner: ReceiverInner::Bounded(rx),
276 }
277 }
278 }
279 }
280
281 /// Convert an unbounded receiver to a bounded one that drops oldest messages
282 ///
283 /// Similar to `with_capacity()`, but explicitly uses a "drop oldest" strategy
284 /// when the buffer is full. This is useful for real-time data where the most
285 /// recent value is more important than historical data.
286 ///
287 /// # Arguments
288 ///
289 /// * `capacity` - Maximum number of messages to buffer
290 ///
291 /// # Example
292 ///
293 /// ```rust
294 /// # use mecha10::prelude::*;
295 /// # async fn example(ctx: &Context) -> Result<()> {
296 /// // For sensor data, we want the latest reading
297 /// let mut lidar = ctx.subscribe(sensor::LIDAR_SCAN)
298 /// .await?
299 /// .with_drop_oldest(10); // Keep only 10 most recent scans
300 ///
301 /// while let Some(scan) = lidar.recv().await {
302 /// // Always process recent data
303 /// }
304 /// # Ok(())
305 /// # }
306 /// ```
307 pub fn with_drop_oldest(self, capacity: usize) -> Self
308 where
309 T: 'static + Send,
310 {
311 match self.inner {
312 ReceiverInner::Bounded(_) => self,
313 ReceiverInner::Unbounded(mut unbounded_rx) => {
314 let (tx, rx) = mpsc::channel(capacity);
315
316 tokio::spawn(async move {
317 while let Some(msg) = unbounded_rx.recv().await {
318 // Try to send without waiting
319 if tx.try_send(msg).is_err() {
320 // Channel full or closed
321 // For try_send on bounded channel, this means it's full
322 // Drop the message and continue (drop oldest strategy)
323 continue;
324 }
325 }
326 });
327
328 Self {
329 inner: ReceiverInner::Bounded(rx),
330 }
331 }
332 }
333 }
334
335 /// Get a reference to the maximum capacity if this is a bounded channel
336 ///
337 /// Returns `None` for unbounded channels or the capacity for bounded ones.
338 ///
339 /// # Example
340 ///
341 /// ```rust
342 /// # use mecha10::prelude::*;
343 /// # async fn example(rx: Receiver<String>) {
344 /// if let Some(cap) = rx.capacity() {
345 /// println!("Channel capacity: {}", cap);
346 /// println!("Current usage: {}/{}", rx.len(), cap);
347 /// println!("Utilization: {:.1}%", (rx.len() as f32 / cap as f32) * 100.0);
348 /// } else {
349 /// println!("Unbounded channel, {} messages queued", rx.len());
350 /// }
351 /// # }
352 /// ```
353 pub fn capacity(&self) -> Option<usize> {
354 match &self.inner {
355 ReceiverInner::Unbounded(_) => None,
356 ReceiverInner::Bounded(rx) => Some(rx.max_capacity()),
357 }
358 }
359
360 /// Get the current utilization ratio (0.0 to 1.0) for bounded channels
361 ///
362 /// Returns `None` for unbounded channels.
363 ///
364 /// # Example
365 ///
366 /// ```rust
367 /// # use mecha10::prelude::*;
368 /// # async fn example(rx: Receiver<String>) {
369 /// if let Some(utilization) = rx.utilization() {
370 /// if utilization > 0.8 {
371 /// warn!("Channel is {}% full, backpressure may occur", utilization * 100.0);
372 /// }
373 /// }
374 /// # }
375 /// ```
376 pub fn utilization(&self) -> Option<f32> {
377 self.capacity()
378 .map(|cap| if cap == 0 { 1.0 } else { self.len() as f32 / cap as f32 })
379 }
380
381 /// Map (transform) messages as they are received
382 ///
383 /// Creates a new receiver that applies a transformation function to each message.
384 /// This is useful for data transformations like resizing images, converting units,
385 /// or extracting fields from complex messages.
386 ///
387 /// # Arguments
388 ///
389 /// * `f` - Transformation function to apply to each message
390 ///
391 /// # Example
392 ///
393 /// ```rust
394 /// # use mecha10::prelude::*;
395 /// # async fn example(ctx: &Context) -> Result<()> {
396 /// // Resize camera images on the fly
397 /// let mut resized_images = ctx.subscribe(sensor::CAMERA_RGB)
398 /// .await?
399 /// .map(|img| resize_image(img, 320, 240));
400 ///
401 /// while let Some(small_image) = resized_images.recv().await {
402 /// // Process resized image
403 /// }
404 ///
405 /// // Extract specific fields from complex messages
406 /// let mut speeds = ctx.subscribe(motor::STATUS)
407 /// .await?
408 /// .map(|status| status.current_speed);
409 ///
410 /// while let Some(speed) = speeds.recv().await {
411 /// println!("Current speed: {}", speed);
412 /// }
413 /// # Ok(())
414 /// # }
415 /// # fn resize_image(img: String, w: u32, h: u32) -> String { img }
416 /// ```
417 pub fn map<U, F>(self, f: F) -> Receiver<U>
418 where
419 F: FnMut(T) -> U + Send + 'static,
420 T: Send + 'static,
421 U: Send + 'static,
422 {
423 let (tx, rx) = match &self.inner {
424 ReceiverInner::Unbounded(_) => {
425 let (tx, rx) = mpsc::unbounded_channel();
426 (MappedSender::Unbounded(tx), ReceiverInner::Unbounded(rx))
427 }
428 ReceiverInner::Bounded(bounded_rx) => {
429 let capacity = bounded_rx.max_capacity();
430 let (tx, rx) = mpsc::channel(capacity);
431 (MappedSender::Bounded(tx), ReceiverInner::Bounded(rx))
432 }
433 };
434
435 // Spawn task to apply transformation
436 let mut receiver = self;
437 let mut transform = f;
438 tokio::spawn(async move {
439 while let Some(msg) = receiver.recv().await {
440 let transformed = transform(msg);
441
442 // Send transformed message
443 let send_result = match tx {
444 MappedSender::Unbounded(ref tx) => tx.send(transformed).map_err(|_| ()),
445 MappedSender::Bounded(ref tx) => tx.send(transformed).await.map_err(|_| ()),
446 };
447
448 if send_result.is_err() {
449 // Receiver dropped, stop processing
450 break;
451 }
452 }
453 });
454
455 Receiver { inner: rx }
456 }
457
458 /// Filter messages based on a predicate
459 ///
460 /// Creates a new receiver that only passes through messages that satisfy the predicate.
461 /// This is useful for filtering sensor data, ignoring invalid readings, or implementing
462 /// conditional processing.
463 ///
464 /// # Arguments
465 ///
466 /// * `predicate` - Function that returns `true` for messages to keep, `false` to drop
467 ///
468 /// # Example
469 ///
470 /// ```rust
471 /// # use mecha10::prelude::*;
472 /// # async fn example(ctx: &Context) -> Result<()> {
473 /// // Only process high-speed motor data
474 /// let mut high_speeds = ctx.subscribe(motor::STATUS)
475 /// .await?
476 /// .filter(|status| status.speed > 0.5);
477 ///
478 /// while let Some(status) = high_speeds.recv().await {
479 /// println!("High speed detected: {}", status.speed);
480 /// }
481 ///
482 /// // Filter out invalid lidar readings
483 /// let mut valid_scans = ctx.subscribe(sensor::LIDAR_SCAN)
484 /// .await?
485 /// .filter(|scan| scan.ranges.iter().all(|&r| r > 0.0 && r < 100.0));
486 ///
487 /// while let Some(scan) = valid_scans.recv().await {
488 /// // All ranges are valid
489 /// }
490 /// # Ok(())
491 /// # }
492 /// ```
493 pub fn filter<F>(self, mut predicate: F) -> Receiver<T>
494 where
495 F: FnMut(&T) -> bool + Send + 'static,
496 T: Send + 'static,
497 {
498 let (tx, rx) = match &self.inner {
499 ReceiverInner::Unbounded(_) => {
500 let (tx, rx) = mpsc::unbounded_channel();
501 (FilteredSender::Unbounded(tx), ReceiverInner::Unbounded(rx))
502 }
503 ReceiverInner::Bounded(bounded_rx) => {
504 let capacity = bounded_rx.max_capacity();
505 let (tx, rx) = mpsc::channel(capacity);
506 (FilteredSender::Bounded(tx), ReceiverInner::Bounded(rx))
507 }
508 };
509
510 // Spawn task to filter messages
511 let mut receiver = self;
512 tokio::spawn(async move {
513 while let Some(msg) = receiver.recv().await {
514 // Apply predicate
515 if !predicate(&msg) {
516 continue; // Skip messages that don't match
517 }
518
519 // Send filtered message
520 let send_result = match &tx {
521 FilteredSender::Unbounded(tx) => tx.send(msg).map_err(|_| ()),
522 FilteredSender::Bounded(tx) => tx.send(msg).await.map_err(|_| ()),
523 };
524
525 if send_result.is_err() {
526 // Receiver dropped, stop processing
527 break;
528 }
529 }
530 });
531
532 Receiver { inner: rx }
533 }
534
535 /// Filter and map messages in one operation
536 ///
537 /// Combines filtering and mapping into a single efficient operation. The function
538 /// returns `Some(U)` for messages to keep (and transform), or `None` to drop.
539 /// This is more efficient than chaining `.filter()` and `.map()` separately.
540 ///
541 /// # Arguments
542 ///
543 /// * `f` - Function that returns `Some(transformed)` to keep, `None` to drop
544 ///
545 /// # Example
546 ///
547 /// ```rust
548 /// # use mecha10::prelude::*;
549 /// # async fn example(ctx: &Context) -> Result<()> {
550 /// // Extract speed only when robot is moving
551 /// let mut active_speeds = ctx.subscribe(motor::STATUS)
552 /// .await?
553 /// .filter_map(|status| {
554 /// if status.speed > 0.1 {
555 /// Some(status.speed)
556 /// } else {
557 /// None
558 /// }
559 /// });
560 ///
561 /// while let Some(speed) = active_speeds.recv().await {
562 /// println!("Robot moving at: {}", speed);
563 /// }
564 ///
565 /// // Parse valid sensor readings
566 /// let mut parsed_data = ctx.subscribe(sensor::RAW_DATA)
567 /// .await?
568 /// .filter_map(|raw| parse_sensor_data(&raw).ok());
569 ///
570 /// while let Some(data) = parsed_data.recv().await {
571 /// // Process successfully parsed data
572 /// }
573 /// # Ok(())
574 /// # }
575 /// # fn parse_sensor_data(raw: &String) -> Result<String> { Ok(raw.clone()) }
576 /// ```
577 pub fn filter_map<U, F>(self, mut f: F) -> Receiver<U>
578 where
579 F: FnMut(T) -> Option<U> + Send + 'static,
580 T: Send + 'static,
581 U: Send + 'static,
582 {
583 let (tx, rx) = match &self.inner {
584 ReceiverInner::Unbounded(_) => {
585 let (tx, rx) = mpsc::unbounded_channel();
586 (FilterMapSender::Unbounded(tx), ReceiverInner::Unbounded(rx))
587 }
588 ReceiverInner::Bounded(bounded_rx) => {
589 let capacity = bounded_rx.max_capacity();
590 let (tx, rx) = mpsc::channel(capacity);
591 (FilterMapSender::Bounded(tx), ReceiverInner::Bounded(rx))
592 }
593 };
594
595 // Spawn task to filter and map
596 let mut receiver = self;
597 tokio::spawn(async move {
598 while let Some(msg) = receiver.recv().await {
599 // Apply filter_map function
600 if let Some(transformed) = f(msg) {
601 // Send transformed message
602 let send_result = match &tx {
603 FilterMapSender::Unbounded(tx) => tx.send(transformed).map_err(|_| ()),
604 FilterMapSender::Bounded(tx) => tx.send(transformed).await.map_err(|_| ()),
605 };
606
607 if send_result.is_err() {
608 // Receiver dropped, stop processing
609 break;
610 }
611 }
612 // If None, message is dropped (filtered out)
613 }
614 });
615
616 Receiver { inner: rx }
617 }
618
619 /// Debounce (rate-limit) messages to prevent rapid bursts
620 ///
621 /// Creates a new receiver that only emits a message if a certain duration has passed
622 /// since the last emission. This is useful for reducing update frequency of high-rate
623 /// sensors or preventing UI update storms.
624 ///
625 /// **Strategy:** "Emit latest after quiet period"
626 /// - Messages are buffered during the debounce period
627 /// - Only the most recent message is emitted after the period expires
628 /// - Earlier messages within the period are dropped
629 ///
630 /// # Arguments
631 ///
632 /// * `duration` - Minimum time between emissions
633 ///
634 /// # Example
635 ///
636 /// ```rust
637 /// # use mecha10::prelude::*;
638 /// # use std::time::Duration;
639 /// # async fn example(ctx: &Context) -> Result<()> {
640 /// // Reduce camera updates to max 10 Hz (100ms debounce)
641 /// let mut debounced_images = ctx.subscribe(sensor::CAMERA_RGB)
642 /// .await?
643 /// .debounce(Duration::from_millis(100));
644 ///
645 /// while let Some(image) = debounced_images.recv().await {
646 /// // Process at most 10 images/sec
647 /// }
648 ///
649 /// // Prevent rapid motor status updates
650 /// let mut stable_status = ctx.subscribe(motor::STATUS)
651 /// .await?
652 /// .debounce(Duration::from_millis(50));
653 ///
654 /// while let Some(status) = stable_status.recv().await {
655 /// // Only get updates every 50ms
656 /// }
657 /// # Ok(())
658 /// # }
659 /// ```
660 pub fn debounce(self, duration: Duration) -> Receiver<T>
661 where
662 T: Send + 'static,
663 {
664 let (tx, rx) = match &self.inner {
665 ReceiverInner::Unbounded(_) => {
666 let (tx, rx) = mpsc::unbounded_channel();
667 (DebouncedSender::Unbounded(tx), ReceiverInner::Unbounded(rx))
668 }
669 ReceiverInner::Bounded(bounded_rx) => {
670 let capacity = bounded_rx.max_capacity();
671 let (tx, rx) = mpsc::channel(capacity);
672 (DebouncedSender::Bounded(tx), ReceiverInner::Bounded(rx))
673 }
674 };
675
676 // Spawn task to debounce messages
677 let mut receiver = self;
678 tokio::spawn(async move {
679 let mut last_emit = Instant::now();
680 let mut pending_msg: Option<T> = None;
681
682 loop {
683 // Calculate time until next allowed emission
684 let elapsed = last_emit.elapsed();
685 let remaining = if elapsed >= duration {
686 Duration::from_millis(0)
687 } else {
688 duration - elapsed
689 };
690
691 // Wait for either a message or debounce timeout
692 tokio::select! {
693 // New message received
694 msg = receiver.recv() => {
695 match msg {
696 Some(new_msg) => {
697 // Store as pending (replaces any previous pending)
698 pending_msg = Some(new_msg);
699
700 // If debounce period passed, emit immediately
701 if remaining.is_zero() {
702 if let Some(msg_to_send) = pending_msg.take() {
703 let send_result = match &tx {
704 DebouncedSender::Unbounded(tx) => tx.send(msg_to_send).map_err(|_| ()),
705 DebouncedSender::Bounded(tx) => tx.send(msg_to_send).await.map_err(|_| ()),
706 };
707
708 if send_result.is_err() {
709 break; // Receiver dropped
710 }
711
712 last_emit = Instant::now();
713 }
714 }
715 // Otherwise, message stays pending until timeout
716 }
717 None => {
718 // Source closed, send any pending message and exit
719 if let Some(msg_to_send) = pending_msg.take() {
720 let _ = match &tx {
721 DebouncedSender::Unbounded(tx) => tx.send(msg_to_send).map_err(|_| ()),
722 DebouncedSender::Bounded(tx) => tx.send(msg_to_send).await.map_err(|_| ()),
723 };
724 }
725 break;
726 }
727 }
728 }
729
730 // Debounce timeout - emit pending message if any
731 _ = sleep(remaining), if pending_msg.is_some() && !remaining.is_zero() => {
732 if let Some(msg_to_send) = pending_msg.take() {
733 let send_result = match &tx {
734 DebouncedSender::Unbounded(tx) => tx.send(msg_to_send).map_err(|_| ()),
735 DebouncedSender::Bounded(tx) => tx.send(msg_to_send).await.map_err(|_| ()),
736 };
737
738 if send_result.is_err() {
739 break; // Receiver dropped
740 }
741
742 last_emit = Instant::now();
743 }
744 }
745 }
746 }
747 });
748
749 Receiver { inner: rx }
750 }
751
752 /// Window messages over a time period
753 ///
754 /// Creates a new receiver that collects messages over a fixed time window and emits
755 /// them as a batch (Vec) at the end of each window. This is useful for:
756 /// - Aggregating sensor readings (averaging, min/max)
757 /// - Batch processing for efficiency
758 /// - Time-series analysis
759 /// - Rate calculation (count messages per window)
760 ///
761 /// # Behavior
762 ///
763 /// - **Fixed Windows**: Each window has a fixed duration
764 /// - **Non-overlapping**: Windows don't overlap (tumbling window)
765 /// - **Batch Emission**: All messages in window emitted as Vec at window end
766 /// - **Empty Windows**: Empty Vec emitted if no messages received
767 /// - **Termination**: Emits final partial window when source ends
768 ///
769 /// # Arguments
770 ///
771 /// * `window_duration` - Duration of each time window
772 ///
773 /// # Example
774 ///
775 /// ```rust
776 /// # use mecha10::prelude::*;
777 /// # use std::time::Duration;
778 /// # async fn example(ctx: &Context) -> Result<()> {
779 /// // Average IMU readings over 100ms windows
780 /// let mut windowed = ctx.subscribe(sensor::IMU)
781 /// .await?
782 /// .window(Duration::from_millis(100));
783 ///
784 /// while let Some(batch) = windowed.recv().await {
785 /// if !batch.is_empty() {
786 /// let avg = batch.iter().map(|imu| imu.accel_x).sum::<f32>() / batch.len() as f32;
787 /// println!("Average accel: {} (n={})", avg, batch.len());
788 /// }
789 /// }
790 ///
791 /// // Count messages per second
792 /// let mut counts = ctx.subscribe("/events")
793 /// .await?
794 /// .window(Duration::from_secs(1))
795 /// .map(|batch| batch.len());
796 ///
797 /// while let Some(count) = counts.recv().await {
798 /// println!("Events/sec: {}", count);
799 /// }
800 ///
801 /// // Batch process for efficiency
802 /// let mut batches = ctx.subscribe("/data")
803 /// .await?
804 /// .window(Duration::from_millis(50));
805 ///
806 /// while let Some(batch) = batches.recv().await {
807 /// // Process entire batch at once (more efficient than one-by-one)
808 /// process_batch(&batch).await?;
809 /// }
810 /// # Ok(())
811 /// # }
812 /// ```
813 ///
814 /// # Performance
815 ///
816 /// - **Memory**: Buffers all messages within window (use reasonable window sizes)
817 /// - **Latency**: Adds latency equal to window duration
818 /// - **CPU**: Minimal overhead for window management
819 ///
820 /// # Caveats
821 ///
822 /// - Window size affects memory usage (smaller windows = less memory)
823 /// - All messages are delivered at window end (batch latency)
824 /// - For sliding windows, consider multiple overlapping `.window()` subscriptions
825 pub fn window(self, window_duration: Duration) -> Receiver<Vec<T>>
826 where
827 T: Send + 'static,
828 {
829 let (tx, rx) = match &self.inner {
830 ReceiverInner::Unbounded(_) => {
831 let (tx, rx) = mpsc::unbounded_channel();
832 (WindowedSender::Unbounded(tx), ReceiverInner::Unbounded(rx))
833 }
834 ReceiverInner::Bounded(bounded_rx) => {
835 let capacity = bounded_rx.max_capacity();
836 let (tx, rx) = mpsc::channel(capacity);
837 (WindowedSender::Bounded(tx), ReceiverInner::Bounded(rx))
838 }
839 };
840
841 // Spawn task to window messages
842 let mut receiver = self;
843 tokio::spawn(async move {
844 let mut buffer: Vec<T> = Vec::new();
845 let mut interval = tokio::time::interval(window_duration);
846 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
847
848 loop {
849 tokio::select! {
850 // New message received
851 msg = receiver.recv() => {
852 match msg {
853 Some(new_msg) => {
854 buffer.push(new_msg);
855 }
856 None => {
857 // Source ended - emit final window and exit
858 if !buffer.is_empty() {
859 let _ = match &tx {
860 WindowedSender::Unbounded(tx) => tx.send(buffer).map_err(|_| ()),
861 WindowedSender::Bounded(tx) => tx.send(buffer).await.map_err(|_| ()),
862 };
863 }
864 break;
865 }
866 }
867 }
868
869 // Window duration elapsed
870 _ = interval.tick() => {
871 // Emit current window (even if empty)
872 let batch = std::mem::take(&mut buffer);
873 let send_result = match &tx {
874 WindowedSender::Unbounded(tx) => tx.send(batch).map_err(|_| ()),
875 WindowedSender::Bounded(tx) => tx.send(batch).await.map_err(|_| ()),
876 };
877
878 if send_result.is_err() {
879 // Receiver dropped, stop windowing
880 break;
881 }
882 }
883 }
884 }
885 });
886
887 Receiver { inner: rx }
888 }
889
890 /// Combine two streams by pairing their messages
891 ///
892 /// Creates a new stream that emits pairs of messages from two input streams.
893 /// The combined stream emits `(T, U)` tuples when both streams have messages available.
894 ///
895 /// # Behavior
896 ///
897 /// - **Paired Emission**: Waits for both streams to have messages, then emits the pair
898 /// - **Buffering**: Buffers messages from faster stream until slower stream catches up
899 /// - **Termination**: Stops when either stream ends
900 /// - **Capacity**: Uses the smaller capacity of the two input streams
901 ///
902 /// # Use Cases
903 ///
904 /// - Combining sensor readings (e.g., camera + LiDAR)
905 /// - Synchronizing multiple data sources
906 /// - Correlating events from different streams
907 /// - Sensor fusion
908 ///
909 /// # Example
910 ///
911 /// ```rust
912 /// use mecha10::prelude::*;
913 ///
914 /// # async fn example(ctx: &Context) -> Result<()> {
915 /// // Subscribe to two sensor streams
916 /// let camera = ctx.subscribe("/sensor/camera").await?;
917 /// let lidar = ctx.subscribe("/sensor/lidar").await?;
918 ///
919 /// // Combine them
920 /// let mut fused = camera.zip(lidar);
921 ///
922 /// // Process paired messages
923 /// while let Some((img, scan)) = fused.recv().await {
924 /// // Both sensors have data - process together
925 /// process_sensor_fusion(&img, &scan).await?;
926 /// }
927 /// # Ok(())
928 /// # }
929 /// ```
930 ///
931 /// # Performance
932 ///
933 /// - **Memory**: Buffers messages from faster stream (bounded by capacity)
934 /// - **Latency**: Adds latency equal to slower stream's inter-message delay
935 /// - **CPU**: Minimal overhead for pairing logic
936 ///
937 /// # Caveats
938 ///
939 /// - If streams have very different rates, slower stream will be the bottleneck
940 /// - Buffer can fill up if rate difference is sustained
941 /// - Consider `.debounce()` on faster stream if rate mismatch is severe
942 pub fn zip<U>(self, other: Receiver<U>) -> Receiver<(T, U)>
943 where
944 T: Send + 'static,
945 U: Send + 'static,
946 {
947 // Determine capacity based on both receivers
948 let capacity = match (&self.inner, &other.inner) {
949 (ReceiverInner::Unbounded(_), ReceiverInner::Unbounded(_)) => None,
950 (ReceiverInner::Bounded(a), ReceiverInner::Unbounded(_)) => Some(a.max_capacity()),
951 (ReceiverInner::Unbounded(_), ReceiverInner::Bounded(b)) => Some(b.max_capacity()),
952 (ReceiverInner::Bounded(a), ReceiverInner::Bounded(b)) => Some(a.max_capacity().min(b.max_capacity())),
953 };
954
955 // Create output channel
956 let (tx, rx) = if let Some(cap) = capacity {
957 let (tx, rx) = mpsc::channel(cap);
958 (ZippedSender::Bounded(tx), ReceiverInner::Bounded(rx))
959 } else {
960 let (tx, rx) = mpsc::unbounded_channel();
961 (ZippedSender::Unbounded(tx), ReceiverInner::Unbounded(rx))
962 };
963
964 // Spawn task to zip messages
965 let mut receiver_a = self;
966 let mut receiver_b = other;
967
968 tokio::spawn(async move {
969 loop {
970 // Wait for both streams to have messages
971 let msg_a = receiver_a.recv().await;
972 let msg_b = receiver_b.recv().await;
973
974 match (msg_a, msg_b) {
975 (Some(a), Some(b)) => {
976 // Both messages available, send pair
977 let send_result = match &tx {
978 ZippedSender::Unbounded(tx) => tx.send((a, b)).map_err(|_| ()),
979 ZippedSender::Bounded(tx) => tx.send((a, b)).await.map_err(|_| ()),
980 };
981
982 if send_result.is_err() {
983 // Receiver dropped, stop zipping
984 break;
985 }
986 }
987 _ => {
988 // One or both streams ended
989 break;
990 }
991 }
992 }
993 });
994
995 Receiver { inner: rx }
996 }
997}
998
999// Helper enums for sender types in combinators
1000enum MappedSender<T> {
1001 Unbounded(mpsc::UnboundedSender<T>),
1002 Bounded(mpsc::Sender<T>),
1003}
1004
1005enum FilteredSender<T> {
1006 Unbounded(mpsc::UnboundedSender<T>),
1007 Bounded(mpsc::Sender<T>),
1008}
1009
1010enum FilterMapSender<T> {
1011 Unbounded(mpsc::UnboundedSender<T>),
1012 Bounded(mpsc::Sender<T>),
1013}
1014
1015enum DebouncedSender<T> {
1016 Unbounded(mpsc::UnboundedSender<T>),
1017 Bounded(mpsc::Sender<T>),
1018}
1019
1020enum ZippedSender<T> {
1021 Unbounded(mpsc::UnboundedSender<T>),
1022 Bounded(mpsc::Sender<T>),
1023}
1024
1025enum WindowedSender<T> {
1026 Unbounded(mpsc::UnboundedSender<T>),
1027 Bounded(mpsc::Sender<T>),
1028}
1029
1030impl<T> From<mpsc::UnboundedReceiver<T>> for Receiver<T> {
1031 fn from(inner: mpsc::UnboundedReceiver<T>) -> Self {
1032 Self {
1033 inner: ReceiverInner::Unbounded(inner),
1034 }
1035 }
1036}
1037
1038impl<T> From<mpsc::Receiver<T>> for Receiver<T> {
1039 fn from(inner: mpsc::Receiver<T>) -> Self {
1040 Self {
1041 inner: ReceiverInner::Bounded(inner),
1042 }
1043 }
1044}