mecha10_core/stream.rs
1//! Stream combinators for declarative message processing
2//!
3//! This module provides a fluent API for transforming and filtering message streams.
4//!
5//! # Examples
6//!
7//! ## Filtering
8//!
9//! ```rust
10//! use mecha10::prelude::*;
11//! use mecha10::topics::perception;
12//!
13//! # async fn example(ctx: &Context) -> Result<()> {
14//! // Only process high-confidence detections
15//! let mut detections = ctx.subscribe(perception::DETECTIONS)
16//! .filter(|d| d.iter().any(|det| det.confidence > 0.8))
17//! .await?;
18//!
19//! while let Some(dets) = detections.recv().await {
20//! // All detections have confidence > 0.8
21//! }
22//! # Ok(())
23//! # }
24//! ```
25//!
26//! ## Throttling
27//!
28//! ```rust
29//! use mecha10::prelude::*;
30//! use mecha10::topics::sensor;
31//!
32//! # async fn example(ctx: &Context) -> Result<()> {
33//! // Limit to 10 Hz
34//! let mut images = ctx.subscribe(sensor::CAMERA_RGB)
35//! .throttle(Duration::from_millis(100))
36//! .await?;
37//! # Ok(())
38//! # }
39//! ```
40//!
41//! ## Latest Only
42//!
43//! ```rust
44//! use mecha10::prelude::*;
45//! use mecha10::topics::sensor;
46//!
47//! # async fn example(ctx: &Context) -> Result<()> {
48//! // Always get the latest, drop old messages
49//! let mut odom = ctx.subscribe(sensor::ODOMETRY)
50//! .latest()
51//! .await?;
52//! # Ok(())
53//! # }
54//! ```
55//!
56//! ## Zipping Streams
57//!
58//! ```rust
59//! use mecha10::prelude::*;
60//! use mecha10::topics::sensor;
61//!
62//! # async fn example(ctx: &Context) -> Result<()> {
63//! // Combine camera images with LiDAR scans
64//! let mut images = ctx.subscribe(sensor::CAMERA_RGB).await?;
65//! let mut scans = ctx.subscribe(sensor::LIDAR_SCAN).await?;
66//!
67//! let mut zipped = images.zip(scans);
68//!
69//! while let Some((image, scan)) = zipped.recv().await {
70//! // Process synchronized sensor data
71//! println!("Image: {}x{}, LiDAR: {} points",
72//! image.width, image.height, scan.ranges.len());
73//! }
74//! # Ok(())
75//! # }
76//! ```
77//!
78//! ## Windowing
79//!
80//! ```rust
81//! use mecha10::prelude::*;
82//! use mecha10::topics::perception;
83//!
84//! # async fn example(ctx: &Context) -> Result<()> {
85//! // Collect detections over 1-second windows for rate analysis
86//! let mut windowed = ctx.subscribe(perception::DETECTIONS)
87//! .window(Duration::from_secs(1));
88//!
89//! while let Some(detections) = windowed.recv().await {
90//! let rate = detections.len() as f32; // detections per second
91//! println!("Detection rate: {} Hz", rate);
92//!
93//! // Calculate average confidence over the window
94//! let avg_conf = detections.iter()
95//! .map(|d| d.confidence)
96//! .sum::<f32>() / detections.len() as f32;
97//! println!("Average confidence: {:.2}", avg_conf);
98//! }
99//! # Ok(())
100//! # }
101//! ```
102
103use crate::context::Receiver;
104use crate::messages::Message;
105use std::time::{Duration, Instant};
106use tokio::sync::mpsc;
107
108/// Builder for creating filtered/transformed message streams
109///
110/// This builder allows chaining operations to create declarative
111/// message processing pipelines.
112pub struct StreamBuilder<T: Message> {
113 receiver: Receiver<T>,
114 #[allow(clippy::type_complexity)]
115 filter_fn: Option<Box<dyn Fn(&T) -> bool + Send + Sync>>,
116 #[allow(clippy::type_complexity)]
117 map_fn: Option<Box<dyn Fn(T) -> T + Send + Sync>>,
118 throttle_duration: Option<Duration>,
119 debounce_duration: Option<Duration>,
120 latest_only: bool,
121 #[allow(dead_code)]
122 batch_size: Option<usize>,
123 #[allow(dead_code)]
124 batch_timeout: Option<Duration>,
125}
126
127impl<T: Message + Send + 'static> StreamBuilder<T> {
128 /// Create a new stream builder from a receiver
129 pub fn new(receiver: Receiver<T>) -> Self {
130 Self {
131 receiver,
132 filter_fn: None,
133 map_fn: None,
134 throttle_duration: None,
135 debounce_duration: None,
136 latest_only: false,
137 batch_size: None,
138 batch_timeout: None,
139 }
140 }
141
142 /// Filter messages based on a predicate
143 ///
144 /// Only messages where the predicate returns `true` will be passed through.
145 ///
146 /// # Example
147 ///
148 /// ```rust
149 /// # use mecha10::prelude::*;
150 /// # async fn example(mut receiver: Receiver<Image>) -> Result<()> {
151 /// let filtered = receiver
152 /// .filter(|img| img.width >= 640)
153 /// .build();
154 /// # Ok(())
155 /// # }
156 /// ```
157 pub fn filter<F>(mut self, predicate: F) -> Self
158 where
159 F: Fn(&T) -> bool + Send + Sync + 'static,
160 {
161 self.filter_fn = Some(Box::new(predicate));
162 self
163 }
164
165 /// Throttle messages to a maximum rate
166 ///
167 /// Messages will be rate-limited to at most one per `duration`.
168 /// If multiple messages arrive during the throttle period, only the
169 /// first one is kept and the rest are dropped.
170 ///
171 /// # Example
172 ///
173 /// ```rust
174 /// # use mecha10::prelude::*;
175 /// # async fn example(mut receiver: Receiver<Image>) -> Result<()> {
176 /// // Max 10 Hz
177 /// let throttled = receiver
178 /// .throttle(Duration::from_millis(100))
179 /// .build();
180 /// # Ok(())
181 /// # }
182 /// ```
183 pub fn throttle(mut self, duration: Duration) -> Self {
184 self.throttle_duration = Some(duration);
185 self
186 }
187
188 /// Keep only the latest message, dropping older ones
189 ///
190 /// Useful for scenarios where you only care about the most recent
191 /// state and processing old messages is wasteful.
192 ///
193 /// # Example
194 ///
195 /// ```rust
196 /// # use mecha10::prelude::*;
197 /// # async fn example(mut receiver: Receiver<Odometry>) -> Result<()> {
198 /// let latest = receiver.latest().build();
199 /// # Ok(())
200 /// # }
201 /// ```
202 pub fn latest(mut self) -> Self {
203 self.latest_only = true;
204 self
205 }
206
207 /// Transform messages using a mapping function
208 ///
209 /// Applies the given function to each message. The function must return
210 /// the same type (for type-changing transformations, process messages manually).
211 ///
212 /// # Example
213 ///
214 /// ```rust
215 /// # use mecha10::prelude::*;
216 /// # async fn example(mut receiver: Receiver<Image>) -> Result<()> {
217 /// // Resize images to thumbnails (returns Image)
218 /// let thumbnails = receiver
219 /// .map(|img| resize_to_thumbnail(img))
220 /// .build();
221 ///
222 /// // Normalize image brightness
223 /// let normalized = receiver
224 /// .map(|mut img| {
225 /// normalize_brightness(&mut img);
226 /// img
227 /// })
228 /// .build();
229 /// # Ok(())
230 /// # }
231 /// ```
232 pub fn map<F>(mut self, mapper: F) -> Self
233 where
234 F: Fn(T) -> T + Send + Sync + 'static,
235 {
236 self.map_fn = Some(Box::new(mapper));
237 self
238 }
239
240 /// Debounce messages - ignore duplicates within a time window
241 ///
242 /// Only the first message is kept; subsequent messages within the debounce
243 /// period are dropped. This is different from throttle - debounce resets
244 /// the timer on each new message.
245 ///
246 /// Useful for:
247 /// - Button press handling (ignore bounces)
248 /// - Noisy sensor data
249 /// - Rapid user input events
250 ///
251 /// # Example
252 ///
253 /// ```rust
254 /// # use mecha10::prelude::*;
255 /// # async fn example(mut receiver: Receiver<ButtonPress>) -> Result<()> {
256 /// // Ignore button bounces within 300ms
257 /// let debounced = receiver
258 /// .debounce(Duration::from_millis(300))
259 /// .build();
260 ///
261 /// // Only get one event per 300ms window
262 /// while let Some(press) = debounced.recv().await {
263 /// handle_button_press(press);
264 /// }
265 /// # Ok(())
266 /// # }
267 /// ```
268 pub fn debounce(mut self, duration: Duration) -> Self {
269 self.debounce_duration = Some(duration);
270 self
271 }
272
273 /// Zip this stream with another stream
274 ///
275 /// Combines two streams element-by-element, producing tuples of messages.
276 /// The resulting stream will emit `(T, U)` tuples when both streams have
277 /// messages available.
278 ///
279 /// If one stream ends before the other, the zip stream will also end.
280 ///
281 /// # Arguments
282 ///
283 /// * `other` - Another receiver to zip with this stream
284 ///
285 /// # Returns
286 ///
287 /// A `Receiver<(T, U)>` that yields tuples of messages from both streams
288 ///
289 /// # Example
290 ///
291 /// ```rust
292 /// # use mecha10::prelude::*;
293 /// # async fn example(ctx: &Context) -> Result<()> {
294 /// // Zip camera images with LiDAR scans
295 /// let mut images = ctx.subscribe(sensor::CAMERA_RGB).await?;
296 /// let mut scans = ctx.subscribe(sensor::LIDAR_SCAN).await?;
297 ///
298 /// let mut zipped = images.zip(scans).build();
299 ///
300 /// while let Some((image, scan)) = zipped.recv().await {
301 /// // Process synchronized image + LiDAR data
302 /// println!("Image {}x{}, LiDAR {} points",
303 /// image.width, image.height, scan.ranges.len());
304 /// }
305 /// # Ok(())
306 /// # }
307 /// ```
308 pub fn zip<U>(self, other: Receiver<U>) -> Receiver<(T, U)>
309 where
310 U: Message + Send + 'static,
311 {
312 let (tx, rx) = mpsc::unbounded_channel();
313
314 tokio::spawn(async move {
315 Self::run_zip_pipeline(self.receiver, other, tx).await;
316 });
317
318 Receiver::from(rx)
319 }
320
321 /// Internal method to run the zip pipeline
322 async fn run_zip_pipeline<U>(
323 mut receiver1: Receiver<T>,
324 mut receiver2: Receiver<U>,
325 tx: mpsc::UnboundedSender<(T, U)>,
326 ) where
327 U: Message + Send + 'static,
328 {
329 loop {
330 // Wait for messages from both streams
331 let msg1 = receiver1.recv().await;
332 let msg2 = receiver2.recv().await;
333
334 match (msg1, msg2) {
335 (Some(m1), Some(m2)) => {
336 // Both messages available, send tuple
337 if tx.send((m1, m2)).is_err() {
338 break; // Receiver dropped
339 }
340 }
341 _ => {
342 // One or both streams ended
343 break;
344 }
345 }
346 }
347 }
348
349 /// Window messages over a time period
350 ///
351 /// Collects messages over a fixed time window and emits them as a vector.
352 /// This is useful for time-based aggregation and analysis.
353 ///
354 /// # Arguments
355 ///
356 /// * `window_duration` - Duration of the time window
357 ///
358 /// # Returns
359 ///
360 /// A `Receiver<Vec<T>>` that yields vectors of messages collected in each window
361 ///
362 /// # Example
363 ///
364 /// ```rust
365 /// # use mecha10::prelude::*;
366 /// # async fn example(ctx: &Context) -> Result<()> {
367 /// // Collect detections over 1-second windows
368 /// let mut windowed = ctx.subscribe(perception::DETECTIONS)
369 /// .window(Duration::from_secs(1));
370 ///
371 /// while let Some(detections) = windowed.recv().await {
372 /// // Process all detections from the last second
373 /// println!("Received {} detections in 1 second window", detections.len());
374 ///
375 /// // Example: Calculate average confidence
376 /// let avg_confidence = detections.iter()
377 /// .map(|d| d.confidence)
378 /// .sum::<f32>() / detections.len() as f32;
379 /// println!("Average confidence: {:.2}", avg_confidence);
380 /// }
381 /// # Ok(())
382 /// # }
383 /// ```
384 ///
385 /// # Use Cases
386 ///
387 /// - **Rate calculation**: Count messages per time window
388 /// - **Moving averages**: Compute statistics over windows
389 /// - **Burst detection**: Identify spikes in message frequency
390 /// - **Time-series analysis**: Analyze temporal patterns
391 pub fn window(self, window_duration: Duration) -> Receiver<Vec<T>>
392 where
393 T: Clone,
394 {
395 let (tx, rx) = mpsc::unbounded_channel();
396
397 tokio::spawn(async move {
398 Self::run_window_pipeline(self.receiver, tx, window_duration).await;
399 });
400
401 Receiver::from(rx)
402 }
403
404 /// Internal method to run the window pipeline
405 async fn run_window_pipeline(
406 mut receiver: Receiver<T>,
407 tx: mpsc::UnboundedSender<Vec<T>>,
408 window_duration: Duration,
409 ) where
410 T: Clone,
411 {
412 let mut window = Vec::new();
413 let mut deadline = tokio::time::Instant::now() + window_duration;
414
415 loop {
416 tokio::select! {
417 // Receive next message
418 msg_opt = receiver.recv() => {
419 match msg_opt {
420 Some(msg) => {
421 window.push(msg);
422 }
423 None => {
424 // Stream ended - send remaining window if any
425 if !window.is_empty() {
426 let _ = tx.send(window);
427 }
428 break;
429 }
430 }
431 }
432
433 // Window timeout - emit collected messages
434 _ = tokio::time::sleep_until(deadline) => {
435 // Send window (even if empty for consistent timing)
436 if tx.send(window.clone()).is_err() {
437 break; // Receiver dropped
438 }
439 window.clear();
440 deadline = tokio::time::Instant::now() + window_duration;
441 }
442 }
443 }
444 }
445
446 /// Build the transformed receiver
447 ///
448 /// Applies all the configured transformations and returns a new receiver.
449 pub fn build(self) -> Receiver<T> {
450 let (tx, rx) = mpsc::unbounded_channel();
451
452 tokio::spawn(async move {
453 self.run_pipeline(tx).await;
454 });
455
456 Receiver::from(rx)
457 }
458
459 /// Internal method to run the processing pipeline
460 async fn run_pipeline(mut self, tx: mpsc::UnboundedSender<T>) {
461 let mut last_send_time = None::<Instant>;
462 let mut last_message_time = None::<Instant>;
463
464 while let Some(mut msg) = self.receiver.recv().await {
465 let now = Instant::now();
466
467 // Apply filter
468 if let Some(ref filter) = self.filter_fn {
469 if !filter(&msg) {
470 continue;
471 }
472 }
473
474 // Apply map transformation
475 if let Some(ref mapper) = self.map_fn {
476 msg = mapper(msg);
477 }
478
479 // Apply debounce - drop messages within debounce window
480 if let Some(debounce_dur) = self.debounce_duration {
481 if let Some(last_time) = last_message_time {
482 if now.duration_since(last_time) < debounce_dur {
483 // Message arrived too soon, drop it
484 last_message_time = Some(now); // Reset timer
485 continue;
486 }
487 }
488 last_message_time = Some(now);
489 }
490
491 // Apply throttle
492 if let Some(throttle_dur) = self.throttle_duration {
493 if let Some(last_time) = last_send_time {
494 let elapsed = last_time.elapsed();
495 if elapsed < throttle_dur {
496 continue;
497 }
498 }
499 last_send_time = Some(Instant::now());
500 }
501
502 // Handle latest-only mode
503 if self.latest_only {
504 // Drain any pending messages
505 while self.receiver.try_recv().is_ok() {
506 // Discard old messages
507 }
508 }
509
510 // Send the message
511 if tx.send(msg).is_err() {
512 // Receiver dropped, stop processing
513 break;
514 }
515 }
516 }
517}
518
519/// Builder for batched message streams
520///
521/// This builder collects messages into batches based on size or timeout,
522/// whichever comes first. This solves the type transformation challenge by
523/// using a separate builder type.
524pub struct BatchBuilder<T: Message> {
525 receiver: Receiver<T>,
526 batch_size: usize,
527 batch_timeout: Duration,
528}
529
530impl<T: Message + Send + Clone + 'static> BatchBuilder<T> {
531 /// Create a new batch builder
532 pub fn new(receiver: Receiver<T>, batch_size: usize, batch_timeout: Duration) -> Self {
533 Self {
534 receiver,
535 batch_size,
536 batch_timeout,
537 }
538 }
539
540 /// Build the batched receiver
541 ///
542 /// Spawns a task that collects messages into batches and emits them
543 /// when either the batch size is reached or the timeout expires.
544 pub fn build(self) -> Receiver<Vec<T>>
545 where
546 T: Clone,
547 {
548 let (tx, rx) = mpsc::unbounded_channel();
549
550 tokio::spawn(async move {
551 self.run_batch_pipeline(tx).await;
552 });
553
554 Receiver::from(rx)
555 }
556
557 /// Internal method to run the batch collection pipeline
558 async fn run_batch_pipeline(mut self, tx: mpsc::UnboundedSender<Vec<T>>) {
559 let mut batch = Vec::with_capacity(self.batch_size);
560 let mut deadline = tokio::time::Instant::now() + self.batch_timeout;
561
562 loop {
563 tokio::select! {
564 // Receive next message
565 msg_opt = self.receiver.recv() => {
566 match msg_opt {
567 Some(msg) => {
568 batch.push(msg);
569
570 // If batch is full, emit immediately
571 if batch.len() >= self.batch_size {
572 if tx.send(batch.clone()).is_err() {
573 break; // Receiver dropped
574 }
575 batch.clear();
576 deadline = tokio::time::Instant::now() + self.batch_timeout;
577 }
578 }
579 None => {
580 // Stream ended - send remaining batch if any
581 if !batch.is_empty() {
582 let _ = tx.send(batch);
583 }
584 break;
585 }
586 }
587 }
588
589 // Batch timeout - emit collected messages
590 _ = tokio::time::sleep_until(deadline) => {
591 if !batch.is_empty() {
592 if tx.send(batch.clone()).is_err() {
593 break; // Receiver dropped
594 }
595 batch.clear();
596 }
597 deadline = tokio::time::Instant::now() + self.batch_timeout;
598 }
599 }
600 }
601 }
602}
603
604/// Extension trait to add stream combinator methods to Receiver
605pub trait ReceiverExt<T: Message>: Sized {
606 /// Start building a stream pipeline
607 fn stream(self) -> StreamBuilder<T>;
608
609 /// Filter messages based on a predicate
610 fn filter<F>(self, predicate: F) -> StreamBuilder<T>
611 where
612 F: Fn(&T) -> bool + Send + Sync + 'static;
613
614 /// Transform messages using a mapping function
615 fn map<F>(self, mapper: F) -> StreamBuilder<T>
616 where
617 F: Fn(T) -> T + Send + Sync + 'static;
618
619 /// Debounce messages - ignore duplicates within a time window
620 fn debounce(self, duration: Duration) -> StreamBuilder<T>;
621
622 /// Throttle messages to a maximum rate
623 fn throttle(self, duration: Duration) -> StreamBuilder<T>;
624
625 /// Keep only the latest message
626 fn latest(self) -> StreamBuilder<T>;
627
628 /// Batch messages together
629 ///
630 /// Collects messages into batches of size `size`, or until `timeout`
631 /// elapses, whichever comes first. Returns a BatchBuilder which can
632 /// be built into a Receiver<Vec<T>>.
633 ///
634 /// # Arguments
635 ///
636 /// * `size` - Maximum number of messages per batch
637 /// * `timeout` - Maximum time to wait before emitting a partial batch
638 ///
639 /// # Example
640 ///
641 /// ```rust
642 /// # use mecha10::prelude::*;
643 /// # use std::time::Duration;
644 /// # async fn example(ctx: &Context) -> Result<()> {
645 /// // Collect messages into batches of 10, or every 1 second
646 /// let mut batched = ctx.subscribe(sensor::IMU)
647 /// .await?
648 /// .batch(10, Duration::from_secs(1))
649 /// .build();
650 ///
651 /// while let Some(batch) = batched.recv().await {
652 /// println!("Received batch of {} messages", batch.len());
653 /// // Process batch efficiently
654 /// }
655 /// # Ok(())
656 /// # }
657 /// ```
658 fn batch(self, size: usize, timeout: Duration) -> BatchBuilder<T>
659 where
660 T: Clone;
661
662 /// Zip this stream with another stream
663 fn zip<U>(self, other: Receiver<U>) -> Receiver<(T, U)>
664 where
665 U: Message + Send + 'static;
666
667 /// Window messages over a time period
668 fn window(self, window_duration: Duration) -> Receiver<Vec<T>>
669 where
670 T: Clone;
671}
672
673impl<T: Message + Send + 'static> ReceiverExt<T> for Receiver<T> {
674 fn stream(self) -> StreamBuilder<T> {
675 StreamBuilder::new(self)
676 }
677
678 fn filter<F>(self, predicate: F) -> StreamBuilder<T>
679 where
680 F: Fn(&T) -> bool + Send + Sync + 'static,
681 {
682 StreamBuilder::new(self).filter(predicate)
683 }
684
685 fn map<F>(self, mapper: F) -> StreamBuilder<T>
686 where
687 F: Fn(T) -> T + Send + Sync + 'static,
688 {
689 StreamBuilder::new(self).map(mapper)
690 }
691
692 fn debounce(self, duration: Duration) -> StreamBuilder<T> {
693 StreamBuilder::new(self).debounce(duration)
694 }
695
696 fn throttle(self, duration: Duration) -> StreamBuilder<T> {
697 StreamBuilder::new(self).throttle(duration)
698 }
699
700 fn latest(self) -> StreamBuilder<T> {
701 StreamBuilder::new(self).latest()
702 }
703
704 fn batch(self, size: usize, timeout: Duration) -> BatchBuilder<T>
705 where
706 T: Clone,
707 {
708 BatchBuilder::new(self, size, timeout)
709 }
710
711 fn zip<U>(self, other: Receiver<U>) -> Receiver<(T, U)>
712 where
713 U: Message + Send + 'static,
714 {
715 StreamBuilder::new(self).zip(other)
716 }
717
718 fn window(self, window_duration: Duration) -> Receiver<Vec<T>>
719 where
720 T: Clone,
721 {
722 StreamBuilder::new(self).window(window_duration)
723 }
724}