mecha10_core/context/
receiver.rs

1//! Receiver for subscribing to messages
2//!
3//! Provides a type-safe wrapper around tokio channels for receiving messages
4//! from subscribed topics with support for both bounded and unbounded channels.
5//!
6//! # Backpressure Strategies
7//!
8//! The receiver supports multiple backpressure strategies to prevent memory exhaustion
9//! from fast publishers. Choose based on your use case:
10//!
11//! ## 1. Unbounded (Default)
12//! No backpressure - messages queue indefinitely. Use for low-rate topics.
13//!
14//! ```rust,no_run
15//! # use mecha10::prelude::*;
16//! # async fn example(ctx: &Context) -> Result<()> {
17//! let mut rx = ctx.subscribe(sensor::CAMERA_RGB).await?;
18//! // Unlimited queue - risk of memory growth
19//! # Ok(())
20//! # }
21//! ```
22//!
23//! ## 2. Bounded with Blocking (`.with_capacity()`)
24//! Publisher waits when buffer full. Use when all messages are important.
25//!
26//! ```rust,no_run
27//! # use mecha10::prelude::*;
28//! # async fn example(ctx: &Context) -> Result<()> {
29//! let mut rx = ctx.subscribe(sensor::CAMERA_RGB)
30//!     .await?
31//!     .with_capacity(50); // Block publisher when 50 messages queued
32//! # Ok(())
33//! # }
34//! ```
35//!
36//! ## 3. Bounded with Drop Oldest (`.with_drop_oldest()`)
37//! Drops old messages when buffer full. Use for real-time sensor data.
38//!
39//! ```rust,no_run
40//! # use mecha10::prelude::*;
41//! # async fn example(ctx: &Context) -> Result<()> {
42//! let mut rx = ctx.subscribe(sensor::LIDAR_SCAN)
43//!     .await?
44//!     .with_drop_oldest(10); // Keep only 10 newest scans
45//! # Ok(())
46//! # }
47//! ```
48//!
49//! ## 4. Rate Limiting (`.debounce()`)
50//! Limit emission rate. Use to reduce update frequency.
51//!
52//! ```rust,no_run
53//! # use mecha10::prelude::*;
54//! # use std::time::Duration;
55//! # async fn example(ctx: &Context) -> Result<()> {
56//! let mut rx = ctx.subscribe(sensor::CAMERA_RGB)
57//!     .await?
58//!     .debounce(Duration::from_millis(100)); // Max 10 Hz
59//! # Ok(())
60//! # }
61//! ```
62//!
63//! ## Stream Combinators
64//!
65//! Transform and combine streams with powerful combinators:
66//!
67//! ```rust,no_run
68//! # use mecha10::prelude::*;
69//! # use std::time::Duration;
70//! # async fn example(ctx: &Context) -> Result<()> {
71//! // Transform messages
72//! let mut scaled = ctx.subscribe(sensor::CAMERA_RGB)
73//!     .await?
74//!     .map(|img| scale_down(&img));
75//!
76//! // Filter messages
77//! let mut valid = ctx.subscribe(sensor::LIDAR)
78//!     .await?
79//!     .filter(|scan| scan.range > 0.1);
80//!
81//! // Combine streams (sensor fusion)
82//! let camera = ctx.subscribe(sensor::CAMERA_RGB).await?;
83//! let lidar = ctx.subscribe(sensor::LIDAR).await?;
84//! let mut fused = camera.zip(lidar);
85//!
86//! // Window messages over time (for aggregation)
87//! let mut windowed = ctx.subscribe(sensor::IMU)
88//!     .await?
89//!     .window(Duration::from_millis(100))      // Collect 100ms batches
90//!     .map(|batch| calculate_average(&batch)); // Aggregate
91//!
92//! // Chain combinators
93//! let mut processed = ctx.subscribe(sensor::CAMERA_RGB)
94//!     .await?
95//!     .with_drop_oldest(20)                    // Buffer 20 latest
96//!     .debounce(Duration::from_millis(100))    // Max 10 Hz
97//!     .filter(|img| img.width == 640)          // Only 640px wide
98//!     .map(|img| preprocess(&img));            // Transform
99//! # Ok(())
100//! # }
101//! ```
102//!
103//! # Monitoring Backpressure
104//!
105//! Use `.len()`, `.capacity()`, and `.utilization()` to monitor queue health:
106//!
107//! ```rust,no_run
108//! # use mecha10::prelude::*;
109//! # async fn example(rx: &mut Receiver<String>) {
110//! if let Some(util) = rx.utilization() {
111//!     if util > 0.8 {
112//!         println!("WARNING: Queue is {}% full!", util * 100.0);
113//!     }
114//! }
115//! # }
116//! ```
117
118use std::time::Duration;
119use tokio::sync::mpsc;
120use tokio::time::{sleep, Instant};
121
122/// Internal channel type for the receiver
123pub(crate) enum ReceiverInner<T> {
124    /// Unbounded channel (no backpressure, risk of memory exhaustion)
125    Unbounded(mpsc::UnboundedReceiver<T>),
126    /// Bounded channel (with backpressure, prevents memory exhaustion)
127    Bounded(mpsc::Receiver<T>),
128}
129
130/// Receiver for subscribing to messages
131///
132/// This is a wrapper around tokio channels that provides a unified interface
133/// for both bounded and unbounded channels. Bounded channels provide backpressure
134/// to prevent memory exhaustion from fast publishers.
135///
136/// # Backpressure
137///
138/// When using bounded channels (via `with_capacity()`), the receiver applies
139/// backpressure when the channel is full. This prevents memory exhaustion but
140/// may cause publishers to block or drop messages depending on the strategy.
141///
142/// # Example
143///
144/// ```rust
145/// use mecha10::prelude::*;
146///
147/// # async fn example(ctx: &Context) -> Result<()> {
148/// // Unbounded receiver (default, no backpressure)
149/// let mut images = ctx.subscribe(sensor::CAMERA_RGB).await?;
150///
151/// // Bounded receiver with capacity limit (applies backpressure)
152/// let mut images = ctx.subscribe(sensor::CAMERA_RGB)
153///     .await?
154///     .with_capacity(100);  // Hold max 100 messages
155///
156/// while let Some(image) = images.recv().await {
157///     // Process image
158/// }
159/// # Ok(())
160/// # }
161/// ```
162pub struct Receiver<T> {
163    pub(crate) inner: ReceiverInner<T>,
164}
165
166impl<T> Receiver<T> {
167    /// Receive the next message
168    ///
169    /// Returns `None` when all senders have been dropped.
170    pub async fn recv(&mut self) -> Option<T> {
171        match &mut self.inner {
172            ReceiverInner::Unbounded(rx) => rx.recv().await,
173            ReceiverInner::Bounded(rx) => rx.recv().await,
174        }
175    }
176
177    /// Try to receive a message without blocking
178    ///
179    /// Returns an error if the channel is empty or all senders have been dropped.
180    pub fn try_recv(&mut self) -> std::result::Result<T, mpsc::error::TryRecvError> {
181        match &mut self.inner {
182            ReceiverInner::Unbounded(rx) => rx.try_recv(),
183            ReceiverInner::Bounded(rx) => rx.try_recv(),
184        }
185    }
186
187    /// Check if the channel is currently empty
188    ///
189    /// Note: This is a point-in-time check and may change immediately after.
190    pub fn is_empty(&self) -> bool {
191        match &self.inner {
192            ReceiverInner::Unbounded(rx) => rx.is_empty(),
193            ReceiverInner::Bounded(rx) => rx.is_empty(),
194        }
195    }
196
197    /// Get the number of messages currently in the channel
198    ///
199    /// This is useful for monitoring backpressure and queue depth.
200    ///
201    /// # Example
202    ///
203    /// ```rust
204    /// # use mecha10::prelude::*;
205    /// # async fn example(mut rx: Receiver<String>) {
206    /// let depth = rx.len();
207    /// if depth > 50 {
208    ///     warn!("Channel backlog growing: {} messages", depth);
209    /// }
210    /// # }
211    /// ```
212    pub fn len(&self) -> usize {
213        match &self.inner {
214            ReceiverInner::Unbounded(rx) => rx.len(),
215            ReceiverInner::Bounded(rx) => rx.len(),
216        }
217    }
218
219    /// Convert an unbounded receiver to a bounded one with the specified capacity
220    ///
221    /// This creates a new bounded channel and spawns a task to forward messages.
222    /// Messages are dropped if the new channel fills up, applying backpressure.
223    ///
224    /// # Arguments
225    ///
226    /// * `capacity` - Maximum number of messages to buffer
227    ///
228    /// # Backpressure Strategy
229    ///
230    /// When the bounded channel is full, the oldest message is dropped to make
231    /// room for new messages. This "drop oldest" strategy ensures recent data
232    /// is always available.
233    ///
234    /// # Example
235    ///
236    /// ```rust
237    /// # use mecha10::prelude::*;
238    /// # async fn example(ctx: &Context) -> Result<()> {
239    /// let mut images = ctx.subscribe(sensor::CAMERA_RGB)
240    ///     .await?
241    ///     .with_capacity(50);  // Limit to 50 images in queue
242    ///
243    /// // If publisher sends faster than we process, old images are dropped
244    /// while let Some(image) = images.recv().await {
245    ///     // Always get relatively recent images
246    /// }
247    /// # Ok(())
248    /// # }
249    /// ```
250    pub fn with_capacity(self, capacity: usize) -> Self
251    where
252        T: 'static + Send,
253    {
254        match self.inner {
255            ReceiverInner::Bounded(_) => {
256                // Already bounded, no conversion needed
257                self
258            }
259            ReceiverInner::Unbounded(mut unbounded_rx) => {
260                let (tx, rx) = mpsc::channel(capacity);
261
262                // Spawn task to forward messages with backpressure
263                tokio::spawn(async move {
264                    while let Some(msg) = unbounded_rx.recv().await {
265                        // Try to send with backpressure
266                        // If channel is full, this will wait until space is available
267                        if tx.send(msg).await.is_err() {
268                            // Receiver dropped, stop forwarding
269                            break;
270                        }
271                    }
272                });
273
274                Self {
275                    inner: ReceiverInner::Bounded(rx),
276                }
277            }
278        }
279    }
280
281    /// Convert an unbounded receiver to a bounded one that drops oldest messages
282    ///
283    /// Similar to `with_capacity()`, but explicitly uses a "drop oldest" strategy
284    /// when the buffer is full. This is useful for real-time data where the most
285    /// recent value is more important than historical data.
286    ///
287    /// # Arguments
288    ///
289    /// * `capacity` - Maximum number of messages to buffer
290    ///
291    /// # Example
292    ///
293    /// ```rust
294    /// # use mecha10::prelude::*;
295    /// # async fn example(ctx: &Context) -> Result<()> {
296    /// // For sensor data, we want the latest reading
297    /// let mut lidar = ctx.subscribe(sensor::LIDAR_SCAN)
298    ///     .await?
299    ///     .with_drop_oldest(10);  // Keep only 10 most recent scans
300    ///
301    /// while let Some(scan) = lidar.recv().await {
302    ///     // Always process recent data
303    /// }
304    /// # Ok(())
305    /// # }
306    /// ```
307    pub fn with_drop_oldest(self, capacity: usize) -> Self
308    where
309        T: 'static + Send,
310    {
311        match self.inner {
312            ReceiverInner::Bounded(_) => self,
313            ReceiverInner::Unbounded(mut unbounded_rx) => {
314                let (tx, rx) = mpsc::channel(capacity);
315
316                tokio::spawn(async move {
317                    while let Some(msg) = unbounded_rx.recv().await {
318                        // Try to send without waiting
319                        if tx.try_send(msg).is_err() {
320                            // Channel full or closed
321                            // For try_send on bounded channel, this means it's full
322                            // Drop the message and continue (drop oldest strategy)
323                            continue;
324                        }
325                    }
326                });
327
328                Self {
329                    inner: ReceiverInner::Bounded(rx),
330                }
331            }
332        }
333    }
334
335    /// Get a reference to the maximum capacity if this is a bounded channel
336    ///
337    /// Returns `None` for unbounded channels or the capacity for bounded ones.
338    ///
339    /// # Example
340    ///
341    /// ```rust
342    /// # use mecha10::prelude::*;
343    /// # async fn example(rx: Receiver<String>) {
344    /// if let Some(cap) = rx.capacity() {
345    ///     println!("Channel capacity: {}", cap);
346    ///     println!("Current usage: {}/{}", rx.len(), cap);
347    ///     println!("Utilization: {:.1}%", (rx.len() as f32 / cap as f32) * 100.0);
348    /// } else {
349    ///     println!("Unbounded channel, {} messages queued", rx.len());
350    /// }
351    /// # }
352    /// ```
353    pub fn capacity(&self) -> Option<usize> {
354        match &self.inner {
355            ReceiverInner::Unbounded(_) => None,
356            ReceiverInner::Bounded(rx) => Some(rx.max_capacity()),
357        }
358    }
359
360    /// Get the current utilization ratio (0.0 to 1.0) for bounded channels
361    ///
362    /// Returns `None` for unbounded channels.
363    ///
364    /// # Example
365    ///
366    /// ```rust
367    /// # use mecha10::prelude::*;
368    /// # async fn example(rx: Receiver<String>) {
369    /// if let Some(utilization) = rx.utilization() {
370    ///     if utilization > 0.8 {
371    ///         warn!("Channel is {}% full, backpressure may occur", utilization * 100.0);
372    ///     }
373    /// }
374    /// # }
375    /// ```
376    pub fn utilization(&self) -> Option<f32> {
377        self.capacity()
378            .map(|cap| if cap == 0 { 1.0 } else { self.len() as f32 / cap as f32 })
379    }
380
381    /// Map (transform) messages as they are received
382    ///
383    /// Creates a new receiver that applies a transformation function to each message.
384    /// This is useful for data transformations like resizing images, converting units,
385    /// or extracting fields from complex messages.
386    ///
387    /// # Arguments
388    ///
389    /// * `f` - Transformation function to apply to each message
390    ///
391    /// # Example
392    ///
393    /// ```rust
394    /// # use mecha10::prelude::*;
395    /// # async fn example(ctx: &Context) -> Result<()> {
396    /// // Resize camera images on the fly
397    /// let mut resized_images = ctx.subscribe(sensor::CAMERA_RGB)
398    ///     .await?
399    ///     .map(|img| resize_image(img, 320, 240));
400    ///
401    /// while let Some(small_image) = resized_images.recv().await {
402    ///     // Process resized image
403    /// }
404    ///
405    /// // Extract specific fields from complex messages
406    /// let mut speeds = ctx.subscribe(motor::STATUS)
407    ///     .await?
408    ///     .map(|status| status.current_speed);
409    ///
410    /// while let Some(speed) = speeds.recv().await {
411    ///     println!("Current speed: {}", speed);
412    /// }
413    /// # Ok(())
414    /// # }
415    /// # fn resize_image(img: String, w: u32, h: u32) -> String { img }
416    /// ```
417    pub fn map<U, F>(self, f: F) -> Receiver<U>
418    where
419        F: FnMut(T) -> U + Send + 'static,
420        T: Send + 'static,
421        U: Send + 'static,
422    {
423        let (tx, rx) = match &self.inner {
424            ReceiverInner::Unbounded(_) => {
425                let (tx, rx) = mpsc::unbounded_channel();
426                (MappedSender::Unbounded(tx), ReceiverInner::Unbounded(rx))
427            }
428            ReceiverInner::Bounded(bounded_rx) => {
429                let capacity = bounded_rx.max_capacity();
430                let (tx, rx) = mpsc::channel(capacity);
431                (MappedSender::Bounded(tx), ReceiverInner::Bounded(rx))
432            }
433        };
434
435        // Spawn task to apply transformation
436        let mut receiver = self;
437        let mut transform = f;
438        tokio::spawn(async move {
439            while let Some(msg) = receiver.recv().await {
440                let transformed = transform(msg);
441
442                // Send transformed message
443                let send_result = match tx {
444                    MappedSender::Unbounded(ref tx) => tx.send(transformed).map_err(|_| ()),
445                    MappedSender::Bounded(ref tx) => tx.send(transformed).await.map_err(|_| ()),
446                };
447
448                if send_result.is_err() {
449                    // Receiver dropped, stop processing
450                    break;
451                }
452            }
453        });
454
455        Receiver { inner: rx }
456    }
457
458    /// Filter messages based on a predicate
459    ///
460    /// Creates a new receiver that only passes through messages that satisfy the predicate.
461    /// This is useful for filtering sensor data, ignoring invalid readings, or implementing
462    /// conditional processing.
463    ///
464    /// # Arguments
465    ///
466    /// * `predicate` - Function that returns `true` for messages to keep, `false` to drop
467    ///
468    /// # Example
469    ///
470    /// ```rust
471    /// # use mecha10::prelude::*;
472    /// # async fn example(ctx: &Context) -> Result<()> {
473    /// // Only process high-speed motor data
474    /// let mut high_speeds = ctx.subscribe(motor::STATUS)
475    ///     .await?
476    ///     .filter(|status| status.speed > 0.5);
477    ///
478    /// while let Some(status) = high_speeds.recv().await {
479    ///     println!("High speed detected: {}", status.speed);
480    /// }
481    ///
482    /// // Filter out invalid lidar readings
483    /// let mut valid_scans = ctx.subscribe(sensor::LIDAR_SCAN)
484    ///     .await?
485    ///     .filter(|scan| scan.ranges.iter().all(|&r| r > 0.0 && r < 100.0));
486    ///
487    /// while let Some(scan) = valid_scans.recv().await {
488    ///     // All ranges are valid
489    /// }
490    /// # Ok(())
491    /// # }
492    /// ```
493    pub fn filter<F>(self, mut predicate: F) -> Receiver<T>
494    where
495        F: FnMut(&T) -> bool + Send + 'static,
496        T: Send + 'static,
497    {
498        let (tx, rx) = match &self.inner {
499            ReceiverInner::Unbounded(_) => {
500                let (tx, rx) = mpsc::unbounded_channel();
501                (FilteredSender::Unbounded(tx), ReceiverInner::Unbounded(rx))
502            }
503            ReceiverInner::Bounded(bounded_rx) => {
504                let capacity = bounded_rx.max_capacity();
505                let (tx, rx) = mpsc::channel(capacity);
506                (FilteredSender::Bounded(tx), ReceiverInner::Bounded(rx))
507            }
508        };
509
510        // Spawn task to filter messages
511        let mut receiver = self;
512        tokio::spawn(async move {
513            while let Some(msg) = receiver.recv().await {
514                // Apply predicate
515                if !predicate(&msg) {
516                    continue; // Skip messages that don't match
517                }
518
519                // Send filtered message
520                let send_result = match &tx {
521                    FilteredSender::Unbounded(tx) => tx.send(msg).map_err(|_| ()),
522                    FilteredSender::Bounded(tx) => tx.send(msg).await.map_err(|_| ()),
523                };
524
525                if send_result.is_err() {
526                    // Receiver dropped, stop processing
527                    break;
528                }
529            }
530        });
531
532        Receiver { inner: rx }
533    }
534
535    /// Filter and map messages in one operation
536    ///
537    /// Combines filtering and mapping into a single efficient operation. The function
538    /// returns `Some(U)` for messages to keep (and transform), or `None` to drop.
539    /// This is more efficient than chaining `.filter()` and `.map()` separately.
540    ///
541    /// # Arguments
542    ///
543    /// * `f` - Function that returns `Some(transformed)` to keep, `None` to drop
544    ///
545    /// # Example
546    ///
547    /// ```rust
548    /// # use mecha10::prelude::*;
549    /// # async fn example(ctx: &Context) -> Result<()> {
550    /// // Extract speed only when robot is moving
551    /// let mut active_speeds = ctx.subscribe(motor::STATUS)
552    ///     .await?
553    ///     .filter_map(|status| {
554    ///         if status.speed > 0.1 {
555    ///             Some(status.speed)
556    ///         } else {
557    ///             None
558    ///         }
559    ///     });
560    ///
561    /// while let Some(speed) = active_speeds.recv().await {
562    ///     println!("Robot moving at: {}", speed);
563    /// }
564    ///
565    /// // Parse valid sensor readings
566    /// let mut parsed_data = ctx.subscribe(sensor::RAW_DATA)
567    ///     .await?
568    ///     .filter_map(|raw| parse_sensor_data(&raw).ok());
569    ///
570    /// while let Some(data) = parsed_data.recv().await {
571    ///     // Process successfully parsed data
572    /// }
573    /// # Ok(())
574    /// # }
575    /// # fn parse_sensor_data(raw: &String) -> Result<String> { Ok(raw.clone()) }
576    /// ```
577    pub fn filter_map<U, F>(self, mut f: F) -> Receiver<U>
578    where
579        F: FnMut(T) -> Option<U> + Send + 'static,
580        T: Send + 'static,
581        U: Send + 'static,
582    {
583        let (tx, rx) = match &self.inner {
584            ReceiverInner::Unbounded(_) => {
585                let (tx, rx) = mpsc::unbounded_channel();
586                (FilterMapSender::Unbounded(tx), ReceiverInner::Unbounded(rx))
587            }
588            ReceiverInner::Bounded(bounded_rx) => {
589                let capacity = bounded_rx.max_capacity();
590                let (tx, rx) = mpsc::channel(capacity);
591                (FilterMapSender::Bounded(tx), ReceiverInner::Bounded(rx))
592            }
593        };
594
595        // Spawn task to filter and map
596        let mut receiver = self;
597        tokio::spawn(async move {
598            while let Some(msg) = receiver.recv().await {
599                // Apply filter_map function
600                if let Some(transformed) = f(msg) {
601                    // Send transformed message
602                    let send_result = match &tx {
603                        FilterMapSender::Unbounded(tx) => tx.send(transformed).map_err(|_| ()),
604                        FilterMapSender::Bounded(tx) => tx.send(transformed).await.map_err(|_| ()),
605                    };
606
607                    if send_result.is_err() {
608                        // Receiver dropped, stop processing
609                        break;
610                    }
611                }
612                // If None, message is dropped (filtered out)
613            }
614        });
615
616        Receiver { inner: rx }
617    }
618
619    /// Debounce (rate-limit) messages to prevent rapid bursts
620    ///
621    /// Creates a new receiver that only emits a message if a certain duration has passed
622    /// since the last emission. This is useful for reducing update frequency of high-rate
623    /// sensors or preventing UI update storms.
624    ///
625    /// **Strategy:** "Emit latest after quiet period"
626    /// - Messages are buffered during the debounce period
627    /// - Only the most recent message is emitted after the period expires
628    /// - Earlier messages within the period are dropped
629    ///
630    /// # Arguments
631    ///
632    /// * `duration` - Minimum time between emissions
633    ///
634    /// # Example
635    ///
636    /// ```rust
637    /// # use mecha10::prelude::*;
638    /// # use std::time::Duration;
639    /// # async fn example(ctx: &Context) -> Result<()> {
640    /// // Reduce camera updates to max 10 Hz (100ms debounce)
641    /// let mut debounced_images = ctx.subscribe(sensor::CAMERA_RGB)
642    ///     .await?
643    ///     .debounce(Duration::from_millis(100));
644    ///
645    /// while let Some(image) = debounced_images.recv().await {
646    ///     // Process at most 10 images/sec
647    /// }
648    ///
649    /// // Prevent rapid motor status updates
650    /// let mut stable_status = ctx.subscribe(motor::STATUS)
651    ///     .await?
652    ///     .debounce(Duration::from_millis(50));
653    ///
654    /// while let Some(status) = stable_status.recv().await {
655    ///     // Only get updates every 50ms
656    /// }
657    /// # Ok(())
658    /// # }
659    /// ```
660    pub fn debounce(self, duration: Duration) -> Receiver<T>
661    where
662        T: Send + 'static,
663    {
664        let (tx, rx) = match &self.inner {
665            ReceiverInner::Unbounded(_) => {
666                let (tx, rx) = mpsc::unbounded_channel();
667                (DebouncedSender::Unbounded(tx), ReceiverInner::Unbounded(rx))
668            }
669            ReceiverInner::Bounded(bounded_rx) => {
670                let capacity = bounded_rx.max_capacity();
671                let (tx, rx) = mpsc::channel(capacity);
672                (DebouncedSender::Bounded(tx), ReceiverInner::Bounded(rx))
673            }
674        };
675
676        // Spawn task to debounce messages
677        let mut receiver = self;
678        tokio::spawn(async move {
679            let mut last_emit = Instant::now();
680            let mut pending_msg: Option<T> = None;
681
682            loop {
683                // Calculate time until next allowed emission
684                let elapsed = last_emit.elapsed();
685                let remaining = if elapsed >= duration {
686                    Duration::from_millis(0)
687                } else {
688                    duration - elapsed
689                };
690
691                // Wait for either a message or debounce timeout
692                tokio::select! {
693                    // New message received
694                    msg = receiver.recv() => {
695                        match msg {
696                            Some(new_msg) => {
697                                // Store as pending (replaces any previous pending)
698                                pending_msg = Some(new_msg);
699
700                                // If debounce period passed, emit immediately
701                                if remaining.is_zero() {
702                                    if let Some(msg_to_send) = pending_msg.take() {
703                                        let send_result = match &tx {
704                                            DebouncedSender::Unbounded(tx) => tx.send(msg_to_send).map_err(|_| ()),
705                                            DebouncedSender::Bounded(tx) => tx.send(msg_to_send).await.map_err(|_| ()),
706                                        };
707
708                                        if send_result.is_err() {
709                                            break; // Receiver dropped
710                                        }
711
712                                        last_emit = Instant::now();
713                                    }
714                                }
715                                // Otherwise, message stays pending until timeout
716                            }
717                            None => {
718                                // Source closed, send any pending message and exit
719                                if let Some(msg_to_send) = pending_msg.take() {
720                                    let _ = match &tx {
721                                        DebouncedSender::Unbounded(tx) => tx.send(msg_to_send).map_err(|_| ()),
722                                        DebouncedSender::Bounded(tx) => tx.send(msg_to_send).await.map_err(|_| ()),
723                                    };
724                                }
725                                break;
726                            }
727                        }
728                    }
729
730                    // Debounce timeout - emit pending message if any
731                    _ = sleep(remaining), if pending_msg.is_some() && !remaining.is_zero() => {
732                        if let Some(msg_to_send) = pending_msg.take() {
733                            let send_result = match &tx {
734                                DebouncedSender::Unbounded(tx) => tx.send(msg_to_send).map_err(|_| ()),
735                                DebouncedSender::Bounded(tx) => tx.send(msg_to_send).await.map_err(|_| ()),
736                            };
737
738                            if send_result.is_err() {
739                                break; // Receiver dropped
740                            }
741
742                            last_emit = Instant::now();
743                        }
744                    }
745                }
746            }
747        });
748
749        Receiver { inner: rx }
750    }
751
752    /// Window messages over a time period
753    ///
754    /// Creates a new receiver that collects messages over a fixed time window and emits
755    /// them as a batch (Vec) at the end of each window. This is useful for:
756    /// - Aggregating sensor readings (averaging, min/max)
757    /// - Batch processing for efficiency
758    /// - Time-series analysis
759    /// - Rate calculation (count messages per window)
760    ///
761    /// # Behavior
762    ///
763    /// - **Fixed Windows**: Each window has a fixed duration
764    /// - **Non-overlapping**: Windows don't overlap (tumbling window)
765    /// - **Batch Emission**: All messages in window emitted as Vec at window end
766    /// - **Empty Windows**: Empty Vec emitted if no messages received
767    /// - **Termination**: Emits final partial window when source ends
768    ///
769    /// # Arguments
770    ///
771    /// * `window_duration` - Duration of each time window
772    ///
773    /// # Example
774    ///
775    /// ```rust
776    /// # use mecha10::prelude::*;
777    /// # use std::time::Duration;
778    /// # async fn example(ctx: &Context) -> Result<()> {
779    /// // Average IMU readings over 100ms windows
780    /// let mut windowed = ctx.subscribe(sensor::IMU)
781    ///     .await?
782    ///     .window(Duration::from_millis(100));
783    ///
784    /// while let Some(batch) = windowed.recv().await {
785    ///     if !batch.is_empty() {
786    ///         let avg = batch.iter().map(|imu| imu.accel_x).sum::<f32>() / batch.len() as f32;
787    ///         println!("Average accel: {} (n={})", avg, batch.len());
788    ///     }
789    /// }
790    ///
791    /// // Count messages per second
792    /// let mut counts = ctx.subscribe("/events")
793    ///     .await?
794    ///     .window(Duration::from_secs(1))
795    ///     .map(|batch| batch.len());
796    ///
797    /// while let Some(count) = counts.recv().await {
798    ///     println!("Events/sec: {}", count);
799    /// }
800    ///
801    /// // Batch process for efficiency
802    /// let mut batches = ctx.subscribe("/data")
803    ///     .await?
804    ///     .window(Duration::from_millis(50));
805    ///
806    /// while let Some(batch) = batches.recv().await {
807    ///     // Process entire batch at once (more efficient than one-by-one)
808    ///     process_batch(&batch).await?;
809    /// }
810    /// # Ok(())
811    /// # }
812    /// ```
813    ///
814    /// # Performance
815    ///
816    /// - **Memory**: Buffers all messages within window (use reasonable window sizes)
817    /// - **Latency**: Adds latency equal to window duration
818    /// - **CPU**: Minimal overhead for window management
819    ///
820    /// # Caveats
821    ///
822    /// - Window size affects memory usage (smaller windows = less memory)
823    /// - All messages are delivered at window end (batch latency)
824    /// - For sliding windows, consider multiple overlapping `.window()` subscriptions
825    pub fn window(self, window_duration: Duration) -> Receiver<Vec<T>>
826    where
827        T: Send + 'static,
828    {
829        let (tx, rx) = match &self.inner {
830            ReceiverInner::Unbounded(_) => {
831                let (tx, rx) = mpsc::unbounded_channel();
832                (WindowedSender::Unbounded(tx), ReceiverInner::Unbounded(rx))
833            }
834            ReceiverInner::Bounded(bounded_rx) => {
835                let capacity = bounded_rx.max_capacity();
836                let (tx, rx) = mpsc::channel(capacity);
837                (WindowedSender::Bounded(tx), ReceiverInner::Bounded(rx))
838            }
839        };
840
841        // Spawn task to window messages
842        let mut receiver = self;
843        tokio::spawn(async move {
844            let mut buffer: Vec<T> = Vec::new();
845            let mut interval = tokio::time::interval(window_duration);
846            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
847
848            loop {
849                tokio::select! {
850                    // New message received
851                    msg = receiver.recv() => {
852                        match msg {
853                            Some(new_msg) => {
854                                buffer.push(new_msg);
855                            }
856                            None => {
857                                // Source ended - emit final window and exit
858                                if !buffer.is_empty() {
859                                    let _ = match &tx {
860                                        WindowedSender::Unbounded(tx) => tx.send(buffer).map_err(|_| ()),
861                                        WindowedSender::Bounded(tx) => tx.send(buffer).await.map_err(|_| ()),
862                                    };
863                                }
864                                break;
865                            }
866                        }
867                    }
868
869                    // Window duration elapsed
870                    _ = interval.tick() => {
871                        // Emit current window (even if empty)
872                        let batch = std::mem::take(&mut buffer);
873                        let send_result = match &tx {
874                            WindowedSender::Unbounded(tx) => tx.send(batch).map_err(|_| ()),
875                            WindowedSender::Bounded(tx) => tx.send(batch).await.map_err(|_| ()),
876                        };
877
878                        if send_result.is_err() {
879                            // Receiver dropped, stop windowing
880                            break;
881                        }
882                    }
883                }
884            }
885        });
886
887        Receiver { inner: rx }
888    }
889
890    /// Combine two streams by pairing their messages
891    ///
892    /// Creates a new stream that emits pairs of messages from two input streams.
893    /// The combined stream emits `(T, U)` tuples when both streams have messages available.
894    ///
895    /// # Behavior
896    ///
897    /// - **Paired Emission**: Waits for both streams to have messages, then emits the pair
898    /// - **Buffering**: Buffers messages from faster stream until slower stream catches up
899    /// - **Termination**: Stops when either stream ends
900    /// - **Capacity**: Uses the smaller capacity of the two input streams
901    ///
902    /// # Use Cases
903    ///
904    /// - Combining sensor readings (e.g., camera + LiDAR)
905    /// - Synchronizing multiple data sources
906    /// - Correlating events from different streams
907    /// - Sensor fusion
908    ///
909    /// # Example
910    ///
911    /// ```rust
912    /// use mecha10::prelude::*;
913    ///
914    /// # async fn example(ctx: &Context) -> Result<()> {
915    /// // Subscribe to two sensor streams
916    /// let camera = ctx.subscribe("/sensor/camera").await?;
917    /// let lidar = ctx.subscribe("/sensor/lidar").await?;
918    ///
919    /// // Combine them
920    /// let mut fused = camera.zip(lidar);
921    ///
922    /// // Process paired messages
923    /// while let Some((img, scan)) = fused.recv().await {
924    ///     // Both sensors have data - process together
925    ///     process_sensor_fusion(&img, &scan).await?;
926    /// }
927    /// # Ok(())
928    /// # }
929    /// ```
930    ///
931    /// # Performance
932    ///
933    /// - **Memory**: Buffers messages from faster stream (bounded by capacity)
934    /// - **Latency**: Adds latency equal to slower stream's inter-message delay
935    /// - **CPU**: Minimal overhead for pairing logic
936    ///
937    /// # Caveats
938    ///
939    /// - If streams have very different rates, slower stream will be the bottleneck
940    /// - Buffer can fill up if rate difference is sustained
941    /// - Consider `.debounce()` on faster stream if rate mismatch is severe
942    pub fn zip<U>(self, other: Receiver<U>) -> Receiver<(T, U)>
943    where
944        T: Send + 'static,
945        U: Send + 'static,
946    {
947        // Determine capacity based on both receivers
948        let capacity = match (&self.inner, &other.inner) {
949            (ReceiverInner::Unbounded(_), ReceiverInner::Unbounded(_)) => None,
950            (ReceiverInner::Bounded(a), ReceiverInner::Unbounded(_)) => Some(a.max_capacity()),
951            (ReceiverInner::Unbounded(_), ReceiverInner::Bounded(b)) => Some(b.max_capacity()),
952            (ReceiverInner::Bounded(a), ReceiverInner::Bounded(b)) => Some(a.max_capacity().min(b.max_capacity())),
953        };
954
955        // Create output channel
956        let (tx, rx) = if let Some(cap) = capacity {
957            let (tx, rx) = mpsc::channel(cap);
958            (ZippedSender::Bounded(tx), ReceiverInner::Bounded(rx))
959        } else {
960            let (tx, rx) = mpsc::unbounded_channel();
961            (ZippedSender::Unbounded(tx), ReceiverInner::Unbounded(rx))
962        };
963
964        // Spawn task to zip messages
965        let mut receiver_a = self;
966        let mut receiver_b = other;
967
968        tokio::spawn(async move {
969            loop {
970                // Wait for both streams to have messages
971                let msg_a = receiver_a.recv().await;
972                let msg_b = receiver_b.recv().await;
973
974                match (msg_a, msg_b) {
975                    (Some(a), Some(b)) => {
976                        // Both messages available, send pair
977                        let send_result = match &tx {
978                            ZippedSender::Unbounded(tx) => tx.send((a, b)).map_err(|_| ()),
979                            ZippedSender::Bounded(tx) => tx.send((a, b)).await.map_err(|_| ()),
980                        };
981
982                        if send_result.is_err() {
983                            // Receiver dropped, stop zipping
984                            break;
985                        }
986                    }
987                    _ => {
988                        // One or both streams ended
989                        break;
990                    }
991                }
992            }
993        });
994
995        Receiver { inner: rx }
996    }
997}
998
999// Helper enums for sender types in combinators
1000enum MappedSender<T> {
1001    Unbounded(mpsc::UnboundedSender<T>),
1002    Bounded(mpsc::Sender<T>),
1003}
1004
1005enum FilteredSender<T> {
1006    Unbounded(mpsc::UnboundedSender<T>),
1007    Bounded(mpsc::Sender<T>),
1008}
1009
1010enum FilterMapSender<T> {
1011    Unbounded(mpsc::UnboundedSender<T>),
1012    Bounded(mpsc::Sender<T>),
1013}
1014
1015enum DebouncedSender<T> {
1016    Unbounded(mpsc::UnboundedSender<T>),
1017    Bounded(mpsc::Sender<T>),
1018}
1019
1020enum ZippedSender<T> {
1021    Unbounded(mpsc::UnboundedSender<T>),
1022    Bounded(mpsc::Sender<T>),
1023}
1024
1025enum WindowedSender<T> {
1026    Unbounded(mpsc::UnboundedSender<T>),
1027    Bounded(mpsc::Sender<T>),
1028}
1029
1030impl<T> From<mpsc::UnboundedReceiver<T>> for Receiver<T> {
1031    fn from(inner: mpsc::UnboundedReceiver<T>) -> Self {
1032        Self {
1033            inner: ReceiverInner::Unbounded(inner),
1034        }
1035    }
1036}
1037
1038impl<T> From<mpsc::Receiver<T>> for Receiver<T> {
1039    fn from(inner: mpsc::Receiver<T>) -> Self {
1040        Self {
1041            inner: ReceiverInner::Bounded(inner),
1042        }
1043    }
1044}