rs2_stream/
rs2_stream_ext.rs

1use async_stream::stream;
2use async_trait;
3use futures_core::Stream;
4use futures_util::future;
5use futures_util::pin_mut;
6use futures_util::stream::{BoxStream, StreamExt};
7use log;
8use num_cpus;
9use serde;
10use serde::Serialize;
11use serde_json;
12use std::future::Future;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::Mutex;
17
18use crate::error::StreamResult;
19use crate::schema_validation::SchemaError;
20use crate::schema_validation::SchemaValidator;
21use crate::stream_configuration::{BufferConfig, GrowthStrategy};
22use crate::stream_performance_metrics::{HealthThresholds, StreamMetrics};
23use crate::{
24    auto_backpressure, batch_process, bracket, chunk, debounce, distinct_until_changed,
25    distinct_until_changed_by, drop, drop_while, either, fold, group_adjacent_by, group_by,
26    interleave, interrupt_when, merge, par_eval_map, par_eval_map_unordered, par_join, prefetch,
27    sample, scan, sliding_window, take, take_while, throttle, tick, timeout, with_metrics,
28    zip_with, BackpressureConfig, RS2Stream,
29};
30
31/// Extension trait providing RS2-like combinators on Streams
32pub trait RS2StreamExt: Stream + Sized + Unpin + Send + 'static {
33    /// Apply automatic backpressure with default configuration
34    fn auto_backpressure_rs2(self) -> RS2Stream<Self::Item>
35    where
36        Self::Item: Send + 'static,
37    {
38        auto_backpressure(self.boxed(), BackpressureConfig::default())
39    }
40
41    /// Apply automatic backpressure with custom configuration
42    fn auto_backpressure_with_rs2(self, config: BackpressureConfig) -> RS2Stream<Self::Item>
43    where
44        Self::Item: Send + 'static,
45    {
46        auto_backpressure(self.boxed(), config)
47    }
48
49    /// Map elements of the rs2_stream with a function
50    fn map_rs2<U, F>(self, f: F) -> RS2Stream<U>
51    where
52        F: FnMut(Self::Item) -> U + Send + 'static,
53        U: Send + 'static,
54    {
55        self.map(f).boxed()
56    }
57
58    /// Transforms each element of the stream in parallel using all available CPU cores.
59    ///
60    /// This method applies the given synchronous function to each element concurrently,
61    /// automatically detecting the number of CPU cores and using that as the concurrency limit.
62    /// Perfect for CPU-bound operations that benefit from parallelization.
63    ///
64    /// # Arguments
65    ///
66    /// * `f` - A synchronous function that transforms each stream element. Must be `Send + Sync + Clone`.
67    ///
68    /// # Returns
69    ///
70    /// A new `RS2Stream` containing the transformed elements. Order may not be preserved.
71    ///
72    /// # Performance
73    ///
74    /// - **Concurrency**: Automatically uses `num_cpus::get()` concurrent tasks
75    /// - **Best for**: CPU-intensive computations (math, parsing, compression)
76    /// - **Memory**: Uses one task per CPU core, moderate memory overhead
77    /// - **Backpressure**: Inherits from underlying `par_eval_map_rs2`
78    ///
79    /// # When to Use
80    ///
81    /// - ✅ **CPU-bound work**: Mathematical calculations, data parsing, compression
82    /// - ✅ **Simple parallelization**: Don't want to think about optimal concurrency
83    /// - ✅ **Balanced workloads**: Each task takes roughly the same time
84    /// - ❌ **I/O-bound work**: Use `par_eval_map_rs2` with higher concurrency instead
85    /// - ❌ **Memory-intensive**: May overwhelm system with too many concurrent tasks
86    ///
87    /// # See Also
88    ///
89    /// * [`map_parallel_with_concurrency_rs2`] - For custom concurrency control
90    /// * [`par_eval_map_rs2`] - For async functions and fine-tuned concurrency
91    fn map_parallel_rs2<O, F>(self, f: F) -> RS2Stream<O>
92    where
93        F: Fn(Self::Item) -> O + Send + Sync + Clone + 'static,
94        Self::Item: Send + 'static,
95        O: Send + 'static,
96    {
97        let concurrency = num_cpus::get();
98        self.par_eval_map_rs2(concurrency, move |x| {
99            let f = f.clone();
100            async move { f(x) }
101        })
102    }
103
104    /// Transforms each element of the stream in parallel with custom concurrency control.
105    ///
106    /// This method applies the given synchronous function to each element concurrently,
107    /// using exactly the specified number of concurrent tasks. Ideal when you need precise
108    /// control over resource usage or when the optimal concurrency differs from CPU count.
109    ///
110    /// # Arguments
111    ///
112    /// * `concurrency` - Maximum number of concurrent tasks (must be > 0)
113    /// * `f` - A synchronous function that transforms each stream element. Must be `Send + Sync + Clone`.
114    ///
115    /// # Returns
116    ///
117    /// A new `RS2Stream` containing the transformed elements. Order may not be preserved.
118    ///
119    /// # Performance
120    ///
121    /// - **Concurrency**: Uses exactly `concurrency` concurrent tasks
122    /// - **Best for**: I/O-bound operations, memory-constrained environments, fine-tuning
123    /// - **Memory**: Scales with concurrency parameter
124    /// - **Backpressure**: Inherits from underlying `par_eval_map_rs2`
125    ///
126    /// # Concurrency Guidelines
127    ///
128    /// | **Workload Type** | **Recommended Concurrency** | **Reasoning** |
129    /// |-------------------|------------------------------|---------------|
130    /// | **CPU-bound** | `num_cpus::get()` | Match CPU cores |
131    /// | **I/O-bound** | `50-200` | Network can handle many concurrent requests |
132    /// | **Memory-heavy** | `1-4` | Prevent out-of-memory errors |
133    /// | **Database queries** | `10-50` | Respect connection pool limits |
134    /// | **File I/O** | `4-16` | Balance throughput vs file handle limits |
135    ///
136    /// # When to Use
137    ///
138    /// - ✅ **I/O-bound operations**: Network requests, file operations, database queries
139    /// - ✅ **Resource constraints**: Limited memory, connection pools, rate limits
140    /// - ✅ **Performance tuning**: Benchmarked optimal concurrency for your workload
141    /// - ✅ **Mixed workloads**: Some tasks much slower/faster than others
142    /// - ❌ **Simple CPU-bound work**: Use `map_parallel_rs2` for automatic optimization
143    ///
144    /// # Panics
145    ///
146    /// This function will panic if `concurrency` is 0. Always use a positive value.
147    ///
148    /// # See Also
149    ///
150    /// * [`map_parallel_rs2`] - For automatic concurrency based on CPU cores
151    /// * [`par_eval_map_rs2`] - For async functions with the same concurrency control
152    fn map_parallel_with_concurrency_rs2<O, F>(self, concurrency: usize, f: F) -> RS2Stream<O>
153    where
154        F: Fn(Self::Item) -> O + Send + Sync + Clone + 'static,
155        Self::Item: Send + 'static,
156        O: Send + 'static,
157    {
158        self.par_eval_map_rs2(concurrency, move |x| {
159            let f = f.clone();
160            async move { f(x) }
161        })
162    }
163
164    /// Filter elements of the rs2_stream with a predicate
165    fn filter_rs2<F>(self, mut f: F) -> RS2Stream<Self::Item>
166    where
167        F: FnMut(&Self::Item) -> bool + Send + 'static,
168        Self::Item: Send + 'static,
169    {
170        self.filter(move |item| future::ready(f(item))).boxed()
171    }
172
173    /// Flat map elements of the rs2_stream with a function that returns a rs2_stream
174    fn flat_map_rs2<U, St, F>(self, f: F) -> RS2Stream<U>
175    where
176        F: FnMut(Self::Item) -> St + Send + 'static,
177        St: Stream<Item = U> + Send + 'static,
178        U: Send + 'static,
179    {
180        self.flat_map(f).boxed()
181    }
182
183    /// Map elements of the rs2_stream with an async function
184    fn eval_map_rs2<U, Fut, F>(self, f: F) -> RS2Stream<U>
185    where
186        F: FnMut(Self::Item) -> Fut + Send + 'static,
187        Fut: Future<Output = U> + Send + 'static,
188        U: Send + 'static,
189    {
190        self.then(f).boxed()
191    }
192
193    /// Merge this rs2_stream with another rs2_stream
194    fn merge_rs2(self, other: RS2Stream<Self::Item>) -> RS2Stream<Self::Item>
195    where
196        Self::Item: Send + 'static,
197    {
198        merge(self, other)
199    }
200
201    /// Zip this rs2_stream with another rs2_stream
202    fn zip_rs2<U>(self, other: RS2Stream<U>) -> RS2Stream<(Self::Item, U)>
203    where
204        Self::Item: Send + 'static,
205        U: Send + 'static,
206    {
207        self.zip(other).boxed()
208    }
209
210    /// Zip this rs2_stream with another rs2_stream, applying a function to each pair
211    fn zip_with_rs2<U, O, F>(self, other: RS2Stream<U>, f: F) -> RS2Stream<O>
212    where
213        Self::Item: Send + 'static,
214        U: Send + 'static,
215        O: Send + 'static,
216        F: FnMut(Self::Item, U) -> O + Send + 'static,
217    {
218        zip_with(self, other, f)
219    }
220
221    /// Throttle this rs2_stream to emit at most one element per duration
222    fn throttle_rs2(self, duration: Duration) -> RS2Stream<Self::Item>
223    where
224        Self::Item: Send + 'static,
225    {
226        throttle(self.boxed(), duration)
227    }
228
229    /// Debounce this rs2_stream, only emitting an element after a specified quiet period has passed
230    /// without receiving another element
231    fn debounce_rs2(self, duration: Duration) -> RS2Stream<Self::Item>
232    where
233        Self::Item: Send + 'static,
234    {
235        debounce(self.boxed(), duration)
236    }
237
238    /// Sample this rs2_stream at regular intervals, emitting the most recent value
239    ///
240    /// This combinator samples the most recent value from a rs2_stream at a regular interval.
241    /// It only emits a value if at least one new value has arrived since the last emission.
242    /// If no new value has arrived during an interval, that interval is skipped.
243    fn sample_rs2(self, interval: Duration) -> RS2Stream<Self::Item>
244    where
245        Self::Item: Clone + Send + 'static,
246    {
247        sample(self.boxed(), interval)
248    }
249
250    /// Process elements in parallel with bounded concurrency, preserving order
251    /// ### Use when: `par_eval_map_rs2`
252    /// - Already have async functions**
253    /// - Need custom concurrency control**
254    /// - Want maximum control/performance**
255    fn par_eval_map_rs2<U, Fut, F>(self, concurrency: usize, f: F) -> RS2Stream<U>
256    where
257        F: FnMut(Self::Item) -> Fut + Send + 'static,
258        Fut: Future<Output = U> + Send + 'static,
259        U: Send + 'static,
260        Self::Item: Send + 'static,
261    {
262        par_eval_map(self.boxed(), concurrency, f)
263    }
264
265    /// Process elements in parallel with bounded concurrency, without preserving order
266    fn par_eval_map_unordered_rs2<U, Fut, F>(self, concurrency: usize, f: F) -> RS2Stream<U>
267    where
268        F: FnMut(Self::Item) -> Fut + Send + 'static,
269        Fut: Future<Output = U> + Send + 'static,
270        U: Send + 'static,
271        Self::Item: Send + 'static,
272    {
273        par_eval_map_unordered(self.boxed(), concurrency, f)
274    }
275
276    /// Run multiple streams concurrently and combine their outputs
277    ///
278    /// This combinator takes a rs2_stream of streams and a concurrency limit, and runs
279    /// up to n inner streams concurrently. It emits all elements from the inner streams,
280    /// and starts new inner streams as others complete.
281    fn par_join_rs2<S, O>(self, concurrency: usize) -> RS2Stream<O>
282    where
283        Self: Stream<Item = S>,
284        S: Stream<Item = O> + Send + 'static + Unpin,
285        O: Send + 'static,
286    {
287        par_join(self.boxed(), concurrency)
288    }
289
290    /// Add timeout to rs2_stream operations
291    fn timeout_rs2(self, duration: Duration) -> RS2Stream<StreamResult<Self::Item>>
292    where
293        Self::Item: Send + 'static,
294    {
295        timeout(self.boxed(), duration)
296    }
297
298    /// Prefetch a specified number of elements ahead of consumption
299    ///
300    /// This can improve performance by starting to process the next elements
301    /// before they're actually needed.
302    fn prefetch_rs2(self, prefetch_count: usize) -> RS2Stream<Self::Item>
303    where
304        Self::Item: Send + 'static,
305    {
306        prefetch(self.boxed(), prefetch_count)
307    }
308
309    /// Filter out consecutive duplicate elements from this rs2_stream
310    ///
311    /// This combinator only emits elements that are different from the previous element.
312    /// It uses the default equality operator (`==`) to compare elements.
313    /// The first element is always emitted.
314    fn distinct_until_changed_rs2(self) -> RS2Stream<Self::Item>
315    where
316        Self::Item: Clone + Send + PartialEq + 'static,
317    {
318        distinct_until_changed(self.boxed())
319    }
320
321    /// Filter out consecutive duplicate elements from this rs2_stream using a custom equality function
322    ///
323    /// This combinator only emits elements that are different from the previous element.
324    /// It uses the provided equality function to compare elements.
325    /// The first element is always emitted.
326    fn distinct_until_changed_by_rs2<F>(self, eq: F) -> RS2Stream<Self::Item>
327    where
328        Self::Item: Clone + Send + 'static,
329        F: FnMut(&Self::Item, &Self::Item) -> bool + Send + 'static,
330    {
331        distinct_until_changed_by(self.boxed(), eq)
332    }
333
334    /// Interrupt this rs2_stream when a signal is received
335    ///
336    /// This combinator stops processing the rs2_stream when the signal future completes.
337    /// Resources are properly cleaned up when the rs2_stream is interrupted.
338    fn interrupt_when_rs2<F>(self, signal: F) -> RS2Stream<Self::Item>
339    where
340        Self::Item: Send + 'static,
341        F: Future<Output = ()> + Send + 'static,
342    {
343        interrupt_when(self.boxed(), signal)
344    }
345
346    /// Take elements from this rs2_stream while a predicate returns true
347    ///
348    /// This combinator yields elements from the stream as long as the predicate returns true.
349    /// It stops (and does not yield) the first element where the predicate returns false.
350    fn take_while_rs2<F, Fut>(self, predicate: F) -> RS2Stream<Self::Item>
351    where
352        F: FnMut(&Self::Item) -> Fut + Send + 'static,
353        Fut: Future<Output = bool> + Send + 'static,
354        Self::Item: Send + 'static,
355    {
356        take_while(self.boxed(), predicate)
357    }
358
359    /// Skip elements from this rs2_stream while a predicate returns true
360    ///
361    /// This combinator skips elements from the stream as long as the predicate returns true.
362    /// Once the predicate returns false, it yields that element and all remaining elements.
363    fn drop_while_rs2<F, Fut>(self, predicate: F) -> RS2Stream<Self::Item>
364    where
365        F: FnMut(&Self::Item) -> Fut + Send + 'static,
366        Fut: Future<Output = bool> + Send + 'static,
367        Self::Item: Send + 'static,
368    {
369        drop_while(self.boxed(), predicate)
370    }
371
372    /// Group adjacent elements that share a common key
373    ///
374    /// This combinator groups consecutive elements that produce the same key.
375    /// It emits groups as they complete (when the key changes or the rs2_stream ends).
376    /// Each emitted item is a tuple containing the key and a vector of elements.
377    fn group_adjacent_by_rs2<K, F>(self, key_fn: F) -> RS2Stream<(K, Vec<Self::Item>)>
378    where
379        Self::Item: Clone + Send + 'static,
380        K: Eq + Clone + Send + 'static,
381        F: FnMut(&Self::Item) -> K + Send + 'static,
382    {
383        group_adjacent_by(self.boxed(), key_fn)
384    }
385
386    /// Group consecutive elements that share a common key
387    ///
388    /// This combinator groups consecutive elements that produce the same key.
389    /// It emits groups as they complete (when the key changes or the rs2_stream ends).
390    /// Each emitted item is a tuple containing the key and a vector of elements.
391    fn group_by_rs2<K, F>(self, key_fn: F) -> RS2Stream<(K, Vec<Self::Item>)>
392    where
393        Self::Item: Clone + Send + 'static,
394        K: Eq + Clone + Send + 'static,
395        F: FnMut(&Self::Item) -> K + Send + 'static,
396    {
397        group_by(self.boxed(), key_fn)
398    }
399
400    /// Fold operation that accumulates a value over a stream
401    ///
402    /// This combinator applies a function to each element in the stream, accumulating a single result.
403    /// It returns a Future that resolves to the final accumulated value.
404    fn fold_rs2<A, F, Fut>(self, init: A, f: F) -> impl Future<Output = A>
405    where
406        F: FnMut(A, Self::Item) -> Fut + Send + 'static,
407        Fut: Future<Output = A> + Send + 'static,
408        Self::Item: Send + 'static,
409        A: Send + 'static,
410    {
411        fold(self.boxed(), init, f)
412    }
413
414    /// Scan operation that applies a function to each element and emits intermediate accumulated values
415    ///
416    /// This combinator is similar to fold but emits each intermediate accumulated value.
417    /// It applies a function to each element in the stream, accumulating a result and yielding
418    /// each intermediate accumulated value.
419    fn scan_rs2<U, F>(self, init: U, f: F) -> RS2Stream<U>
420    where
421        F: FnMut(U, Self::Item) -> U + Send + 'static,
422        Self::Item: Send + 'static,
423        U: Clone + Send + 'static,
424    {
425        scan(self.boxed(), init, f)
426    }
427
428    /// Apply a function to each element in the stream
429    ///
430    /// This combinator applies a function to each element in the stream without accumulating a result.
431    /// It returns a Future that completes when the stream is exhausted.
432    fn for_each_rs2<F, Fut>(self, mut f: F) -> impl Future<Output = ()>
433    where
434        F: FnMut(Self::Item) -> Fut + Send + 'static,
435        Fut: Future<Output = ()> + Send + 'static,
436        Self::Item: Send + 'static,
437    {
438        let mut stream = self.boxed();
439        async move {
440            while let Some(item) = stream.next().await {
441                f(item).await;
442            }
443        }
444    }
445
446    /// Take the first n elements from the stream
447    ///
448    /// This combinator yields the first n elements from the stream and then stops.
449    fn take_rs2(self, n: usize) -> RS2Stream<Self::Item>
450    where
451        Self::Item: Send + 'static,
452    {
453        take(self.boxed(), n)
454    }
455
456    /// Drop the first n elements from the stream
457    ///
458    /// This combinator skips the first n elements from the stream and yields all remaining elements.
459    fn drop_rs2(self, n: usize) -> RS2Stream<Self::Item>
460    where
461        Self::Item: Send + 'static,
462    {
463        drop(self.boxed(), n)
464    }
465
466    /// Skip the first n elements from the stream
467    ///
468    /// This combinator skips the first n elements from the stream and yields all remaining elements.
469    fn skip_rs2(self, n: usize) -> RS2Stream<Self::Item>
470    where
471        Self::Item: Send + 'static,
472    {
473        drop(self.boxed(), n)
474    }
475
476    /// Select between this rs2_stream and another rs2_stream based on which one produces a value first
477    ///
478    /// This combinator emits values from whichever rs2_stream produces a value first.
479    /// Once a value is received from one rs2_stream, the other rs2_stream is cancelled.
480    /// If either rs2_stream completes (returns None), the combinator switches to the other rs2_stream exclusively.
481    fn either_rs2(self, other: RS2Stream<Self::Item>) -> RS2Stream<Self::Item>
482    where
483        Self::Item: Send + 'static,
484    {
485        either(self, other)
486    }
487
488    /// Collect all items from the stream into a collection
489    ///
490    /// This combinator collects all items from the stream into a collection of type B.
491    /// It returns a Future that resolves to the collection.
492    ///
493    /// # Examples
494    /// ```
495    /// use rs2_stream::rs2::*;
496    /// use futures_util::stream::StreamExt;
497    ///
498    /// # async fn example() {
499    /// let stream = from_iter(vec![1, 2, 3, 4, 5]);
500    /// let result = stream.collect_rs2::<Vec<_>>().await;
501    /// assert_eq!(result, vec![1, 2, 3, 4, 5]);
502    /// # }
503    /// ```
504    fn collect_rs2<B>(self) -> impl Future<Output = B>
505    where
506        B: Default + Extend<Self::Item> + Send + 'static,
507        Self::Item: Send + 'static,
508    {
509        self.collect_with_config_rs2(BufferConfig::default())
510    }
511
512    /// Collect all items from the stream into a collection with custom buffer configuration
513    ///
514    /// This combinator collects all items from the stream into a collection of type B.
515    /// It returns a Future that resolves to the collection.
516    /// The buffer configuration allows for optimized memory allocation and growth strategies.
517    // Enhanced version that uses all BufferConfig fields
518    fn collect_with_config_rs2<B>(self, config: BufferConfig) -> impl Future<Output = B>
519    where
520        B: Default + Extend<Self::Item> + Send + 'static,
521        Self::Item: Send + 'static,
522    {
523        let mut stream = self.boxed();
524        async move {
525            if std::any::TypeId::of::<B>() == std::any::TypeId::of::<Vec<Self::Item>>() {
526                // Create Vec with smart capacity management
527                let mut vec = Vec::with_capacity(config.initial_capacity);
528                let mut items_collected = 0;
529
530                while let Some(item) = stream.next().await {
531                    // Check max_capacity limit
532                    if let Some(max_cap) = config.max_capacity {
533                        if items_collected >= max_cap {
534                            break; // Respect size limit
535                        }
536                    }
537
538                    // Apply growth strategy when needed
539                    if vec.len() == vec.capacity() {
540                        let new_capacity = match config.growth_strategy {
541                            GrowthStrategy::Linear(step) => vec.capacity() + step,
542                            GrowthStrategy::Exponential(factor) => {
543                                (vec.capacity() as f64 * factor) as usize
544                            }
545                            GrowthStrategy::Fixed => vec.capacity(), // No growth
546                        };
547
548                        let capped_capacity = if let Some(max_cap) = config.max_capacity {
549                            new_capacity.min(max_cap)
550                        } else {
551                            new_capacity
552                        };
553
554                        vec.reserve(capped_capacity - vec.capacity());
555                    }
556
557                    vec.push(item);
558                    items_collected += 1;
559                }
560
561                // Safe transmute back to B
562                let result = unsafe {
563                    let ptr = &vec as *const Vec<Self::Item> as *const B;
564                    let result = std::ptr::read(ptr);
565                    std::mem::forget(vec);
566                    result
567                };
568                result
569            } else {
570                // Fallback for other collection types
571                let mut collection = B::default();
572                while let Some(item) = stream.next().await {
573                    collection.extend(std::iter::once(item));
574                }
575                collection
576            }
577        }
578    }
579
580    /// Create a sliding window of elements from the stream
581    ///
582    /// This combinator creates a sliding window of the specified size over the stream.
583    /// It yields a vector of items for each window position.
584    fn sliding_window_rs2(self, size: usize) -> RS2Stream<Vec<Self::Item>>
585    where
586        Self::Item: Clone + Send + 'static,
587    {
588        sliding_window(self.boxed(), size)
589    }
590
591    /// Process items in batches for better throughput
592    ///
593    /// This combinator processes items in batches of the specified size,
594    /// applying the processor function to each batch.
595    fn batch_process_rs2<U, F>(self, batch_size: usize, processor: F) -> RS2Stream<U>
596    where
597        F: FnMut(Vec<Self::Item>) -> Vec<U> + Send + 'static,
598        Self::Item: Send + 'static,
599        U: Send + 'static,
600    {
601        batch_process(self.boxed(), batch_size, processor)
602    }
603
604    /// Collect metrics while processing the stream
605    ///
606    /// This combinator collects metrics while processing the stream,
607    /// returning both the stream and the metrics.
608    fn with_metrics_rs2(
609        self,
610        name: String,
611        health_thresholds: HealthThresholds,
612    ) -> (RS2Stream<Self::Item>, Arc<Mutex<StreamMetrics>>)
613    where
614        Self::Item: Send + 'static,
615    {
616        with_metrics(self.boxed(), name, health_thresholds)
617    }
618
619    /// Interleave multiple streams in a round-robin fashion
620    ///
621    /// This combinator takes a vector of streams and interleaves their elements
622    /// in a round-robin fashion.
623    fn interleave_rs2<S>(self, streams: Vec<S>) -> RS2Stream<Self::Item>
624    where
625        S: Stream<Item = Self::Item> + Send + 'static + Unpin,
626        Self::Item: Send + 'static,
627    {
628        let mut all_streams = vec![self.boxed()];
629        all_streams.extend(streams.into_iter().map(|s| s.boxed()));
630        interleave(all_streams)
631    }
632
633    /// Chunk the stream into vectors of the specified size
634    ///
635    /// This combinator collects elements from the stream into vectors of the specified size.
636    /// If the stream ends before a chunk is filled, the final chunk may contain fewer elements.
637    fn chunk_rs2(self, size: usize) -> RS2Stream<Vec<Self::Item>>
638    where
639        Self::Item: Send + 'static,
640    {
641        chunk(self.boxed(), size)
642    }
643
644    /// Create a stream that emits values at a fixed rate
645    ///
646    /// This combinator creates a stream that emits the provided item at a fixed rate.
647    fn tick_rs<O>(self, period: Duration, item: O) -> RS2Stream<O>
648    where
649        O: Clone + Send + 'static,
650    {
651        tick(period, item)
652    }
653
654    /// Bracket for resource management
655    ///
656    /// This combinator ensures that a resource is properly released after use.
657    /// It takes three parameters:
658    /// 1. A future that acquires a resource
659    /// 2. A function that uses the resource and returns a stream
660    /// 3. A function that releases the resource
661    fn bracket_rs<A, O, St, FAcq, FUse, FRel, R>(
662        self,
663        acquire: FAcq,
664        use_fn: FUse,
665        release: FRel,
666    ) -> RS2Stream<O>
667    where
668        FAcq: Future<Output = A> + Send + 'static,
669        FUse: FnOnce(A) -> St + Send + 'static,
670        St: Stream<Item = O> + Send + 'static,
671        FRel: FnOnce(A) -> R + Send + 'static,
672        R: Future<Output = ()> + Send + 'static,
673        O: Send + 'static,
674        A: Clone + Send + 'static,
675    {
676        bracket(acquire, use_fn, release)
677    }
678
679    fn with_schema_validation_rs2<V, T>(
680        self,
681        validator: V,
682    ) -> Pin<Box<dyn futures_util::Stream<Item = T> + Send>>
683    where
684        V: SchemaValidator + 'static,
685        T: serde::de::DeserializeOwned + Serialize + Send + 'static,
686        Self: futures_util::Stream<Item = T> + Send + 'static,
687    {
688        use futures_util::StreamExt;
689        let validator = std::sync::Arc::new(validator);
690        self.filter_map(move |item| {
691            let validator = validator.clone();
692            async move {
693                let bytes = match serde_json::to_vec(&item) {
694                    Ok(b) => b,
695                    Err(e) => {
696                        log::error!("Schema validation: failed to serialize item: {}", e);
697                        return None;
698                    }
699                };
700                match validator.validate(&bytes).await {
701                    Ok(()) => Some(item),
702                    Err(e) => {
703                        log::warn!("Schema validation failed: {}", e);
704                        None
705                    }
706                }
707            }
708        })
709        .boxed()
710    }
711}
712
713impl<S> RS2StreamExt for S where S: Stream + Sized + Unpin + Send + 'static {}