mecha10_core/
stream.rs

1//! Stream combinators for declarative message processing
2//!
3//! This module provides a fluent API for transforming and filtering message streams.
4//!
5//! # Examples
6//!
7//! ## Filtering
8//!
9//! ```rust
10//! use mecha10::prelude::*;
11//! use mecha10::topics::perception;
12//!
13//! # async fn example(ctx: &Context) -> Result<()> {
14//! // Only process high-confidence detections
15//! let mut detections = ctx.subscribe(perception::DETECTIONS)
16//!     .filter(|d| d.iter().any(|det| det.confidence > 0.8))
17//!     .await?;
18//!
19//! while let Some(dets) = detections.recv().await {
20//!     // All detections have confidence > 0.8
21//! }
22//! # Ok(())
23//! # }
24//! ```
25//!
26//! ## Throttling
27//!
28//! ```rust
29//! use mecha10::prelude::*;
30//! use mecha10::topics::sensor;
31//!
32//! # async fn example(ctx: &Context) -> Result<()> {
33//! // Limit to 10 Hz
34//! let mut images = ctx.subscribe(sensor::CAMERA_RGB)
35//!     .throttle(Duration::from_millis(100))
36//!     .await?;
37//! # Ok(())
38//! # }
39//! ```
40//!
41//! ## Latest Only
42//!
43//! ```rust
44//! use mecha10::prelude::*;
45//! use mecha10::topics::sensor;
46//!
47//! # async fn example(ctx: &Context) -> Result<()> {
48//! // Always get the latest, drop old messages
49//! let mut odom = ctx.subscribe(sensor::ODOMETRY)
50//!     .latest()
51//!     .await?;
52//! # Ok(())
53//! # }
54//! ```
55//!
56//! ## Zipping Streams
57//!
58//! ```rust
59//! use mecha10::prelude::*;
60//! use mecha10::topics::sensor;
61//!
62//! # async fn example(ctx: &Context) -> Result<()> {
63//! // Combine camera images with LiDAR scans
64//! let mut images = ctx.subscribe(sensor::CAMERA_RGB).await?;
65//! let mut scans = ctx.subscribe(sensor::LIDAR_SCAN).await?;
66//!
67//! let mut zipped = images.zip(scans);
68//!
69//! while let Some((image, scan)) = zipped.recv().await {
70//!     // Process synchronized sensor data
71//!     println!("Image: {}x{}, LiDAR: {} points",
72//!         image.width, image.height, scan.ranges.len());
73//! }
74//! # Ok(())
75//! # }
76//! ```
77//!
78//! ## Windowing
79//!
80//! ```rust
81//! use mecha10::prelude::*;
82//! use mecha10::topics::perception;
83//!
84//! # async fn example(ctx: &Context) -> Result<()> {
85//! // Collect detections over 1-second windows for rate analysis
86//! let mut windowed = ctx.subscribe(perception::DETECTIONS)
87//!     .window(Duration::from_secs(1));
88//!
89//! while let Some(detections) = windowed.recv().await {
90//!     let rate = detections.len() as f32; // detections per second
91//!     println!("Detection rate: {} Hz", rate);
92//!
93//!     // Calculate average confidence over the window
94//!     let avg_conf = detections.iter()
95//!         .map(|d| d.confidence)
96//!         .sum::<f32>() / detections.len() as f32;
97//!     println!("Average confidence: {:.2}", avg_conf);
98//! }
99//! # Ok(())
100//! # }
101//! ```
102
103use crate::context::Receiver;
104use crate::messages::Message;
105use std::time::{Duration, Instant};
106use tokio::sync::mpsc;
107
108/// Builder for creating filtered/transformed message streams
109///
110/// This builder allows chaining operations to create declarative
111/// message processing pipelines.
112pub struct StreamBuilder<T: Message> {
113    receiver: Receiver<T>,
114    #[allow(clippy::type_complexity)]
115    filter_fn: Option<Box<dyn Fn(&T) -> bool + Send + Sync>>,
116    #[allow(clippy::type_complexity)]
117    map_fn: Option<Box<dyn Fn(T) -> T + Send + Sync>>,
118    throttle_duration: Option<Duration>,
119    debounce_duration: Option<Duration>,
120    latest_only: bool,
121    #[allow(dead_code)]
122    batch_size: Option<usize>,
123    #[allow(dead_code)]
124    batch_timeout: Option<Duration>,
125}
126
127impl<T: Message + Send + 'static> StreamBuilder<T> {
128    /// Create a new stream builder from a receiver
129    pub fn new(receiver: Receiver<T>) -> Self {
130        Self {
131            receiver,
132            filter_fn: None,
133            map_fn: None,
134            throttle_duration: None,
135            debounce_duration: None,
136            latest_only: false,
137            batch_size: None,
138            batch_timeout: None,
139        }
140    }
141
142    /// Filter messages based on a predicate
143    ///
144    /// Only messages where the predicate returns `true` will be passed through.
145    ///
146    /// # Example
147    ///
148    /// ```rust
149    /// # use mecha10::prelude::*;
150    /// # async fn example(mut receiver: Receiver<Image>) -> Result<()> {
151    /// let filtered = receiver
152    ///     .filter(|img| img.width >= 640)
153    ///     .build();
154    /// # Ok(())
155    /// # }
156    /// ```
157    pub fn filter<F>(mut self, predicate: F) -> Self
158    where
159        F: Fn(&T) -> bool + Send + Sync + 'static,
160    {
161        self.filter_fn = Some(Box::new(predicate));
162        self
163    }
164
165    /// Throttle messages to a maximum rate
166    ///
167    /// Messages will be rate-limited to at most one per `duration`.
168    /// If multiple messages arrive during the throttle period, only the
169    /// first one is kept and the rest are dropped.
170    ///
171    /// # Example
172    ///
173    /// ```rust
174    /// # use mecha10::prelude::*;
175    /// # async fn example(mut receiver: Receiver<Image>) -> Result<()> {
176    /// // Max 10 Hz
177    /// let throttled = receiver
178    ///     .throttle(Duration::from_millis(100))
179    ///     .build();
180    /// # Ok(())
181    /// # }
182    /// ```
183    pub fn throttle(mut self, duration: Duration) -> Self {
184        self.throttle_duration = Some(duration);
185        self
186    }
187
188    /// Keep only the latest message, dropping older ones
189    ///
190    /// Useful for scenarios where you only care about the most recent
191    /// state and processing old messages is wasteful.
192    ///
193    /// # Example
194    ///
195    /// ```rust
196    /// # use mecha10::prelude::*;
197    /// # async fn example(mut receiver: Receiver<Odometry>) -> Result<()> {
198    /// let latest = receiver.latest().build();
199    /// # Ok(())
200    /// # }
201    /// ```
202    pub fn latest(mut self) -> Self {
203        self.latest_only = true;
204        self
205    }
206
207    /// Transform messages using a mapping function
208    ///
209    /// Applies the given function to each message. The function must return
210    /// the same type (for type-changing transformations, process messages manually).
211    ///
212    /// # Example
213    ///
214    /// ```rust
215    /// # use mecha10::prelude::*;
216    /// # async fn example(mut receiver: Receiver<Image>) -> Result<()> {
217    /// // Resize images to thumbnails (returns Image)
218    /// let thumbnails = receiver
219    ///     .map(|img| resize_to_thumbnail(img))
220    ///     .build();
221    ///
222    /// // Normalize image brightness
223    /// let normalized = receiver
224    ///     .map(|mut img| {
225    ///         normalize_brightness(&mut img);
226    ///         img
227    ///     })
228    ///     .build();
229    /// # Ok(())
230    /// # }
231    /// ```
232    pub fn map<F>(mut self, mapper: F) -> Self
233    where
234        F: Fn(T) -> T + Send + Sync + 'static,
235    {
236        self.map_fn = Some(Box::new(mapper));
237        self
238    }
239
240    /// Debounce messages - ignore duplicates within a time window
241    ///
242    /// Only the first message is kept; subsequent messages within the debounce
243    /// period are dropped. This is different from throttle - debounce resets
244    /// the timer on each new message.
245    ///
246    /// Useful for:
247    /// - Button press handling (ignore bounces)
248    /// - Noisy sensor data
249    /// - Rapid user input events
250    ///
251    /// # Example
252    ///
253    /// ```rust
254    /// # use mecha10::prelude::*;
255    /// # async fn example(mut receiver: Receiver<ButtonPress>) -> Result<()> {
256    /// // Ignore button bounces within 300ms
257    /// let debounced = receiver
258    ///     .debounce(Duration::from_millis(300))
259    ///     .build();
260    ///
261    /// // Only get one event per 300ms window
262    /// while let Some(press) = debounced.recv().await {
263    ///     handle_button_press(press);
264    /// }
265    /// # Ok(())
266    /// # }
267    /// ```
268    pub fn debounce(mut self, duration: Duration) -> Self {
269        self.debounce_duration = Some(duration);
270        self
271    }
272
273    /// Zip this stream with another stream
274    ///
275    /// Combines two streams element-by-element, producing tuples of messages.
276    /// The resulting stream will emit `(T, U)` tuples when both streams have
277    /// messages available.
278    ///
279    /// If one stream ends before the other, the zip stream will also end.
280    ///
281    /// # Arguments
282    ///
283    /// * `other` - Another receiver to zip with this stream
284    ///
285    /// # Returns
286    ///
287    /// A `Receiver<(T, U)>` that yields tuples of messages from both streams
288    ///
289    /// # Example
290    ///
291    /// ```rust
292    /// # use mecha10::prelude::*;
293    /// # async fn example(ctx: &Context) -> Result<()> {
294    /// // Zip camera images with LiDAR scans
295    /// let mut images = ctx.subscribe(sensor::CAMERA_RGB).await?;
296    /// let mut scans = ctx.subscribe(sensor::LIDAR_SCAN).await?;
297    ///
298    /// let mut zipped = images.zip(scans).build();
299    ///
300    /// while let Some((image, scan)) = zipped.recv().await {
301    ///     // Process synchronized image + LiDAR data
302    ///     println!("Image {}x{}, LiDAR {} points",
303    ///         image.width, image.height, scan.ranges.len());
304    /// }
305    /// # Ok(())
306    /// # }
307    /// ```
308    pub fn zip<U>(self, other: Receiver<U>) -> Receiver<(T, U)>
309    where
310        U: Message + Send + 'static,
311    {
312        let (tx, rx) = mpsc::unbounded_channel();
313
314        tokio::spawn(async move {
315            Self::run_zip_pipeline(self.receiver, other, tx).await;
316        });
317
318        Receiver::from(rx)
319    }
320
321    /// Internal method to run the zip pipeline
322    async fn run_zip_pipeline<U>(
323        mut receiver1: Receiver<T>,
324        mut receiver2: Receiver<U>,
325        tx: mpsc::UnboundedSender<(T, U)>,
326    ) where
327        U: Message + Send + 'static,
328    {
329        loop {
330            // Wait for messages from both streams
331            let msg1 = receiver1.recv().await;
332            let msg2 = receiver2.recv().await;
333
334            match (msg1, msg2) {
335                (Some(m1), Some(m2)) => {
336                    // Both messages available, send tuple
337                    if tx.send((m1, m2)).is_err() {
338                        break; // Receiver dropped
339                    }
340                }
341                _ => {
342                    // One or both streams ended
343                    break;
344                }
345            }
346        }
347    }
348
349    /// Window messages over a time period
350    ///
351    /// Collects messages over a fixed time window and emits them as a vector.
352    /// This is useful for time-based aggregation and analysis.
353    ///
354    /// # Arguments
355    ///
356    /// * `window_duration` - Duration of the time window
357    ///
358    /// # Returns
359    ///
360    /// A `Receiver<Vec<T>>` that yields vectors of messages collected in each window
361    ///
362    /// # Example
363    ///
364    /// ```rust
365    /// # use mecha10::prelude::*;
366    /// # async fn example(ctx: &Context) -> Result<()> {
367    /// // Collect detections over 1-second windows
368    /// let mut windowed = ctx.subscribe(perception::DETECTIONS)
369    ///     .window(Duration::from_secs(1));
370    ///
371    /// while let Some(detections) = windowed.recv().await {
372    ///     // Process all detections from the last second
373    ///     println!("Received {} detections in 1 second window", detections.len());
374    ///
375    ///     // Example: Calculate average confidence
376    ///     let avg_confidence = detections.iter()
377    ///         .map(|d| d.confidence)
378    ///         .sum::<f32>() / detections.len() as f32;
379    ///     println!("Average confidence: {:.2}", avg_confidence);
380    /// }
381    /// # Ok(())
382    /// # }
383    /// ```
384    ///
385    /// # Use Cases
386    ///
387    /// - **Rate calculation**: Count messages per time window
388    /// - **Moving averages**: Compute statistics over windows
389    /// - **Burst detection**: Identify spikes in message frequency
390    /// - **Time-series analysis**: Analyze temporal patterns
391    pub fn window(self, window_duration: Duration) -> Receiver<Vec<T>>
392    where
393        T: Clone,
394    {
395        let (tx, rx) = mpsc::unbounded_channel();
396
397        tokio::spawn(async move {
398            Self::run_window_pipeline(self.receiver, tx, window_duration).await;
399        });
400
401        Receiver::from(rx)
402    }
403
404    /// Internal method to run the window pipeline
405    async fn run_window_pipeline(
406        mut receiver: Receiver<T>,
407        tx: mpsc::UnboundedSender<Vec<T>>,
408        window_duration: Duration,
409    ) where
410        T: Clone,
411    {
412        let mut window = Vec::new();
413        let mut deadline = tokio::time::Instant::now() + window_duration;
414
415        loop {
416            tokio::select! {
417                // Receive next message
418                msg_opt = receiver.recv() => {
419                    match msg_opt {
420                        Some(msg) => {
421                            window.push(msg);
422                        }
423                        None => {
424                            // Stream ended - send remaining window if any
425                            if !window.is_empty() {
426                                let _ = tx.send(window);
427                            }
428                            break;
429                        }
430                    }
431                }
432
433                // Window timeout - emit collected messages
434                _ = tokio::time::sleep_until(deadline) => {
435                    // Send window (even if empty for consistent timing)
436                    if tx.send(window.clone()).is_err() {
437                        break; // Receiver dropped
438                    }
439                    window.clear();
440                    deadline = tokio::time::Instant::now() + window_duration;
441                }
442            }
443        }
444    }
445
446    /// Build the transformed receiver
447    ///
448    /// Applies all the configured transformations and returns a new receiver.
449    pub fn build(self) -> Receiver<T> {
450        let (tx, rx) = mpsc::unbounded_channel();
451
452        tokio::spawn(async move {
453            self.run_pipeline(tx).await;
454        });
455
456        Receiver::from(rx)
457    }
458
459    /// Internal method to run the processing pipeline
460    async fn run_pipeline(mut self, tx: mpsc::UnboundedSender<T>) {
461        let mut last_send_time = None::<Instant>;
462        let mut last_message_time = None::<Instant>;
463
464        while let Some(mut msg) = self.receiver.recv().await {
465            let now = Instant::now();
466
467            // Apply filter
468            if let Some(ref filter) = self.filter_fn {
469                if !filter(&msg) {
470                    continue;
471                }
472            }
473
474            // Apply map transformation
475            if let Some(ref mapper) = self.map_fn {
476                msg = mapper(msg);
477            }
478
479            // Apply debounce - drop messages within debounce window
480            if let Some(debounce_dur) = self.debounce_duration {
481                if let Some(last_time) = last_message_time {
482                    if now.duration_since(last_time) < debounce_dur {
483                        // Message arrived too soon, drop it
484                        last_message_time = Some(now); // Reset timer
485                        continue;
486                    }
487                }
488                last_message_time = Some(now);
489            }
490
491            // Apply throttle
492            if let Some(throttle_dur) = self.throttle_duration {
493                if let Some(last_time) = last_send_time {
494                    let elapsed = last_time.elapsed();
495                    if elapsed < throttle_dur {
496                        continue;
497                    }
498                }
499                last_send_time = Some(Instant::now());
500            }
501
502            // Handle latest-only mode
503            if self.latest_only {
504                // Drain any pending messages
505                while self.receiver.try_recv().is_ok() {
506                    // Discard old messages
507                }
508            }
509
510            // Send the message
511            if tx.send(msg).is_err() {
512                // Receiver dropped, stop processing
513                break;
514            }
515        }
516    }
517}
518
519/// Builder for batched message streams
520///
521/// This builder collects messages into batches based on size or timeout,
522/// whichever comes first. This solves the type transformation challenge by
523/// using a separate builder type.
524pub struct BatchBuilder<T: Message> {
525    receiver: Receiver<T>,
526    batch_size: usize,
527    batch_timeout: Duration,
528}
529
530impl<T: Message + Send + Clone + 'static> BatchBuilder<T> {
531    /// Create a new batch builder
532    pub fn new(receiver: Receiver<T>, batch_size: usize, batch_timeout: Duration) -> Self {
533        Self {
534            receiver,
535            batch_size,
536            batch_timeout,
537        }
538    }
539
540    /// Build the batched receiver
541    ///
542    /// Spawns a task that collects messages into batches and emits them
543    /// when either the batch size is reached or the timeout expires.
544    pub fn build(self) -> Receiver<Vec<T>>
545    where
546        T: Clone,
547    {
548        let (tx, rx) = mpsc::unbounded_channel();
549
550        tokio::spawn(async move {
551            self.run_batch_pipeline(tx).await;
552        });
553
554        Receiver::from(rx)
555    }
556
557    /// Internal method to run the batch collection pipeline
558    async fn run_batch_pipeline(mut self, tx: mpsc::UnboundedSender<Vec<T>>) {
559        let mut batch = Vec::with_capacity(self.batch_size);
560        let mut deadline = tokio::time::Instant::now() + self.batch_timeout;
561
562        loop {
563            tokio::select! {
564                // Receive next message
565                msg_opt = self.receiver.recv() => {
566                    match msg_opt {
567                        Some(msg) => {
568                            batch.push(msg);
569
570                            // If batch is full, emit immediately
571                            if batch.len() >= self.batch_size {
572                                if tx.send(batch.clone()).is_err() {
573                                    break; // Receiver dropped
574                                }
575                                batch.clear();
576                                deadline = tokio::time::Instant::now() + self.batch_timeout;
577                            }
578                        }
579                        None => {
580                            // Stream ended - send remaining batch if any
581                            if !batch.is_empty() {
582                                let _ = tx.send(batch);
583                            }
584                            break;
585                        }
586                    }
587                }
588
589                // Batch timeout - emit collected messages
590                _ = tokio::time::sleep_until(deadline) => {
591                    if !batch.is_empty() {
592                        if tx.send(batch.clone()).is_err() {
593                            break; // Receiver dropped
594                        }
595                        batch.clear();
596                    }
597                    deadline = tokio::time::Instant::now() + self.batch_timeout;
598                }
599            }
600        }
601    }
602}
603
604/// Extension trait to add stream combinator methods to Receiver
605pub trait ReceiverExt<T: Message>: Sized {
606    /// Start building a stream pipeline
607    fn stream(self) -> StreamBuilder<T>;
608
609    /// Filter messages based on a predicate
610    fn filter<F>(self, predicate: F) -> StreamBuilder<T>
611    where
612        F: Fn(&T) -> bool + Send + Sync + 'static;
613
614    /// Transform messages using a mapping function
615    fn map<F>(self, mapper: F) -> StreamBuilder<T>
616    where
617        F: Fn(T) -> T + Send + Sync + 'static;
618
619    /// Debounce messages - ignore duplicates within a time window
620    fn debounce(self, duration: Duration) -> StreamBuilder<T>;
621
622    /// Throttle messages to a maximum rate
623    fn throttle(self, duration: Duration) -> StreamBuilder<T>;
624
625    /// Keep only the latest message
626    fn latest(self) -> StreamBuilder<T>;
627
628    /// Batch messages together
629    ///
630    /// Collects messages into batches of size `size`, or until `timeout`
631    /// elapses, whichever comes first. Returns a BatchBuilder which can
632    /// be built into a Receiver<Vec<T>>.
633    ///
634    /// # Arguments
635    ///
636    /// * `size` - Maximum number of messages per batch
637    /// * `timeout` - Maximum time to wait before emitting a partial batch
638    ///
639    /// # Example
640    ///
641    /// ```rust
642    /// # use mecha10::prelude::*;
643    /// # use std::time::Duration;
644    /// # async fn example(ctx: &Context) -> Result<()> {
645    /// // Collect messages into batches of 10, or every 1 second
646    /// let mut batched = ctx.subscribe(sensor::IMU)
647    ///     .await?
648    ///     .batch(10, Duration::from_secs(1))
649    ///     .build();
650    ///
651    /// while let Some(batch) = batched.recv().await {
652    ///     println!("Received batch of {} messages", batch.len());
653    ///     // Process batch efficiently
654    /// }
655    /// # Ok(())
656    /// # }
657    /// ```
658    fn batch(self, size: usize, timeout: Duration) -> BatchBuilder<T>
659    where
660        T: Clone;
661
662    /// Zip this stream with another stream
663    fn zip<U>(self, other: Receiver<U>) -> Receiver<(T, U)>
664    where
665        U: Message + Send + 'static;
666
667    /// Window messages over a time period
668    fn window(self, window_duration: Duration) -> Receiver<Vec<T>>
669    where
670        T: Clone;
671}
672
673impl<T: Message + Send + 'static> ReceiverExt<T> for Receiver<T> {
674    fn stream(self) -> StreamBuilder<T> {
675        StreamBuilder::new(self)
676    }
677
678    fn filter<F>(self, predicate: F) -> StreamBuilder<T>
679    where
680        F: Fn(&T) -> bool + Send + Sync + 'static,
681    {
682        StreamBuilder::new(self).filter(predicate)
683    }
684
685    fn map<F>(self, mapper: F) -> StreamBuilder<T>
686    where
687        F: Fn(T) -> T + Send + Sync + 'static,
688    {
689        StreamBuilder::new(self).map(mapper)
690    }
691
692    fn debounce(self, duration: Duration) -> StreamBuilder<T> {
693        StreamBuilder::new(self).debounce(duration)
694    }
695
696    fn throttle(self, duration: Duration) -> StreamBuilder<T> {
697        StreamBuilder::new(self).throttle(duration)
698    }
699
700    fn latest(self) -> StreamBuilder<T> {
701        StreamBuilder::new(self).latest()
702    }
703
704    fn batch(self, size: usize, timeout: Duration) -> BatchBuilder<T>
705    where
706        T: Clone,
707    {
708        BatchBuilder::new(self, size, timeout)
709    }
710
711    fn zip<U>(self, other: Receiver<U>) -> Receiver<(T, U)>
712    where
713        U: Message + Send + 'static,
714    {
715        StreamBuilder::new(self).zip(other)
716    }
717
718    fn window(self, window_duration: Duration) -> Receiver<Vec<T>>
719    where
720        T: Clone,
721    {
722        StreamBuilder::new(self).window(window_duration)
723    }
724}