rs2_stream/rs2_stream_ext.rs
1use async_stream::stream;
2use async_trait;
3use futures_core::Stream;
4use futures_util::future;
5use futures_util::pin_mut;
6use futures_util::stream::{BoxStream, StreamExt};
7use log;
8use num_cpus;
9use serde;
10use serde::Serialize;
11use serde_json;
12use std::future::Future;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::Mutex;
17
18use crate::error::StreamResult;
19use crate::schema_validation::SchemaError;
20use crate::schema_validation::SchemaValidator;
21use crate::stream_configuration::{BufferConfig, GrowthStrategy};
22use crate::stream_performance_metrics::{HealthThresholds, StreamMetrics};
23use crate::{
24 auto_backpressure, batch_process, bracket, chunk, debounce, distinct_until_changed,
25 distinct_until_changed_by, drop, drop_while, either, fold, group_adjacent_by, group_by,
26 interleave, interrupt_when, merge, par_eval_map, par_eval_map_unordered, par_join, prefetch,
27 sample, scan, sliding_window, take, take_while, throttle, tick, timeout, with_metrics,
28 zip_with, BackpressureConfig, RS2Stream,
29};
30
31/// Extension trait providing RS2-like combinators on Streams
32pub trait RS2StreamExt: Stream + Sized + Unpin + Send + 'static {
33 /// Apply automatic backpressure with default configuration
34 fn auto_backpressure_rs2(self) -> RS2Stream<Self::Item>
35 where
36 Self::Item: Send + 'static,
37 {
38 auto_backpressure(self.boxed(), BackpressureConfig::default())
39 }
40
41 /// Apply automatic backpressure with custom configuration
42 fn auto_backpressure_with_rs2(self, config: BackpressureConfig) -> RS2Stream<Self::Item>
43 where
44 Self::Item: Send + 'static,
45 {
46 auto_backpressure(self.boxed(), config)
47 }
48
49 /// Map elements of the rs2_stream with a function
50 fn map_rs2<U, F>(self, f: F) -> RS2Stream<U>
51 where
52 F: FnMut(Self::Item) -> U + Send + 'static,
53 U: Send + 'static,
54 {
55 self.map(f).boxed()
56 }
57
58 /// Transforms each element of the stream in parallel using all available CPU cores.
59 ///
60 /// This method applies the given synchronous function to each element concurrently,
61 /// automatically detecting the number of CPU cores and using that as the concurrency limit.
62 /// Perfect for CPU-bound operations that benefit from parallelization.
63 ///
64 /// # Arguments
65 ///
66 /// * `f` - A synchronous function that transforms each stream element. Must be `Send + Sync + Clone`.
67 ///
68 /// # Returns
69 ///
70 /// A new `RS2Stream` containing the transformed elements. Order may not be preserved.
71 ///
72 /// # Performance
73 ///
74 /// - **Concurrency**: Automatically uses `num_cpus::get()` concurrent tasks
75 /// - **Best for**: CPU-intensive computations (math, parsing, compression)
76 /// - **Memory**: Uses one task per CPU core, moderate memory overhead
77 /// - **Backpressure**: Inherits from underlying `par_eval_map_rs2`
78 ///
79 /// # When to Use
80 ///
81 /// - ✅ **CPU-bound work**: Mathematical calculations, data parsing, compression
82 /// - ✅ **Simple parallelization**: Don't want to think about optimal concurrency
83 /// - ✅ **Balanced workloads**: Each task takes roughly the same time
84 /// - ❌ **I/O-bound work**: Use `par_eval_map_rs2` with higher concurrency instead
85 /// - ❌ **Memory-intensive**: May overwhelm system with too many concurrent tasks
86 ///
87 /// # See Also
88 ///
89 /// * [`map_parallel_with_concurrency_rs2`] - For custom concurrency control
90 /// * [`par_eval_map_rs2`] - For async functions and fine-tuned concurrency
91 fn map_parallel_rs2<O, F>(self, f: F) -> RS2Stream<O>
92 where
93 F: Fn(Self::Item) -> O + Send + Sync + Clone + 'static,
94 Self::Item: Send + 'static,
95 O: Send + 'static,
96 {
97 let concurrency = num_cpus::get();
98 self.par_eval_map_rs2(concurrency, move |x| {
99 let f = f.clone();
100 async move { f(x) }
101 })
102 }
103
104 /// Transforms each element of the stream in parallel with custom concurrency control.
105 ///
106 /// This method applies the given synchronous function to each element concurrently,
107 /// using exactly the specified number of concurrent tasks. Ideal when you need precise
108 /// control over resource usage or when the optimal concurrency differs from CPU count.
109 ///
110 /// # Arguments
111 ///
112 /// * `concurrency` - Maximum number of concurrent tasks (must be > 0)
113 /// * `f` - A synchronous function that transforms each stream element. Must be `Send + Sync + Clone`.
114 ///
115 /// # Returns
116 ///
117 /// A new `RS2Stream` containing the transformed elements. Order may not be preserved.
118 ///
119 /// # Performance
120 ///
121 /// - **Concurrency**: Uses exactly `concurrency` concurrent tasks
122 /// - **Best for**: I/O-bound operations, memory-constrained environments, fine-tuning
123 /// - **Memory**: Scales with concurrency parameter
124 /// - **Backpressure**: Inherits from underlying `par_eval_map_rs2`
125 ///
126 /// # Concurrency Guidelines
127 ///
128 /// | **Workload Type** | **Recommended Concurrency** | **Reasoning** |
129 /// |-------------------|------------------------------|---------------|
130 /// | **CPU-bound** | `num_cpus::get()` | Match CPU cores |
131 /// | **I/O-bound** | `50-200` | Network can handle many concurrent requests |
132 /// | **Memory-heavy** | `1-4` | Prevent out-of-memory errors |
133 /// | **Database queries** | `10-50` | Respect connection pool limits |
134 /// | **File I/O** | `4-16` | Balance throughput vs file handle limits |
135 ///
136 /// # When to Use
137 ///
138 /// - ✅ **I/O-bound operations**: Network requests, file operations, database queries
139 /// - ✅ **Resource constraints**: Limited memory, connection pools, rate limits
140 /// - ✅ **Performance tuning**: Benchmarked optimal concurrency for your workload
141 /// - ✅ **Mixed workloads**: Some tasks much slower/faster than others
142 /// - ❌ **Simple CPU-bound work**: Use `map_parallel_rs2` for automatic optimization
143 ///
144 /// # Panics
145 ///
146 /// This function will panic if `concurrency` is 0. Always use a positive value.
147 ///
148 /// # See Also
149 ///
150 /// * [`map_parallel_rs2`] - For automatic concurrency based on CPU cores
151 /// * [`par_eval_map_rs2`] - For async functions with the same concurrency control
152 fn map_parallel_with_concurrency_rs2<O, F>(self, concurrency: usize, f: F) -> RS2Stream<O>
153 where
154 F: Fn(Self::Item) -> O + Send + Sync + Clone + 'static,
155 Self::Item: Send + 'static,
156 O: Send + 'static,
157 {
158 self.par_eval_map_rs2(concurrency, move |x| {
159 let f = f.clone();
160 async move { f(x) }
161 })
162 }
163
164 /// Filter elements of the rs2_stream with a predicate
165 fn filter_rs2<F>(self, mut f: F) -> RS2Stream<Self::Item>
166 where
167 F: FnMut(&Self::Item) -> bool + Send + 'static,
168 Self::Item: Send + 'static,
169 {
170 self.filter(move |item| future::ready(f(item))).boxed()
171 }
172
173 /// Flat map elements of the rs2_stream with a function that returns a rs2_stream
174 fn flat_map_rs2<U, St, F>(self, f: F) -> RS2Stream<U>
175 where
176 F: FnMut(Self::Item) -> St + Send + 'static,
177 St: Stream<Item = U> + Send + 'static,
178 U: Send + 'static,
179 {
180 self.flat_map(f).boxed()
181 }
182
183 /// Map elements of the rs2_stream with an async function
184 fn eval_map_rs2<U, Fut, F>(self, f: F) -> RS2Stream<U>
185 where
186 F: FnMut(Self::Item) -> Fut + Send + 'static,
187 Fut: Future<Output = U> + Send + 'static,
188 U: Send + 'static,
189 {
190 self.then(f).boxed()
191 }
192
193 /// Merge this rs2_stream with another rs2_stream
194 fn merge_rs2(self, other: RS2Stream<Self::Item>) -> RS2Stream<Self::Item>
195 where
196 Self::Item: Send + 'static,
197 {
198 merge(self, other)
199 }
200
201 /// Zip this rs2_stream with another rs2_stream
202 fn zip_rs2<U>(self, other: RS2Stream<U>) -> RS2Stream<(Self::Item, U)>
203 where
204 Self::Item: Send + 'static,
205 U: Send + 'static,
206 {
207 self.zip(other).boxed()
208 }
209
210 /// Zip this rs2_stream with another rs2_stream, applying a function to each pair
211 fn zip_with_rs2<U, O, F>(self, other: RS2Stream<U>, f: F) -> RS2Stream<O>
212 where
213 Self::Item: Send + 'static,
214 U: Send + 'static,
215 O: Send + 'static,
216 F: FnMut(Self::Item, U) -> O + Send + 'static,
217 {
218 zip_with(self, other, f)
219 }
220
221 /// Throttle this rs2_stream to emit at most one element per duration
222 fn throttle_rs2(self, duration: Duration) -> RS2Stream<Self::Item>
223 where
224 Self::Item: Send + 'static,
225 {
226 throttle(self.boxed(), duration)
227 }
228
229 /// Debounce this rs2_stream, only emitting an element after a specified quiet period has passed
230 /// without receiving another element
231 fn debounce_rs2(self, duration: Duration) -> RS2Stream<Self::Item>
232 where
233 Self::Item: Send + 'static,
234 {
235 debounce(self.boxed(), duration)
236 }
237
238 /// Sample this rs2_stream at regular intervals, emitting the most recent value
239 ///
240 /// This combinator samples the most recent value from a rs2_stream at a regular interval.
241 /// It only emits a value if at least one new value has arrived since the last emission.
242 /// If no new value has arrived during an interval, that interval is skipped.
243 fn sample_rs2(self, interval: Duration) -> RS2Stream<Self::Item>
244 where
245 Self::Item: Clone + Send + 'static,
246 {
247 sample(self.boxed(), interval)
248 }
249
250 /// Process elements in parallel with bounded concurrency, preserving order
251 /// ### Use when: `par_eval_map_rs2`
252 /// - Already have async functions**
253 /// - Need custom concurrency control**
254 /// - Want maximum control/performance**
255 fn par_eval_map_rs2<U, Fut, F>(self, concurrency: usize, f: F) -> RS2Stream<U>
256 where
257 F: FnMut(Self::Item) -> Fut + Send + 'static,
258 Fut: Future<Output = U> + Send + 'static,
259 U: Send + 'static,
260 Self::Item: Send + 'static,
261 {
262 par_eval_map(self.boxed(), concurrency, f)
263 }
264
265 /// Process elements in parallel with bounded concurrency, without preserving order
266 fn par_eval_map_unordered_rs2<U, Fut, F>(self, concurrency: usize, f: F) -> RS2Stream<U>
267 where
268 F: FnMut(Self::Item) -> Fut + Send + 'static,
269 Fut: Future<Output = U> + Send + 'static,
270 U: Send + 'static,
271 Self::Item: Send + 'static,
272 {
273 par_eval_map_unordered(self.boxed(), concurrency, f)
274 }
275
276 /// Run multiple streams concurrently and combine their outputs
277 ///
278 /// This combinator takes a rs2_stream of streams and a concurrency limit, and runs
279 /// up to n inner streams concurrently. It emits all elements from the inner streams,
280 /// and starts new inner streams as others complete.
281 fn par_join_rs2<S, O>(self, concurrency: usize) -> RS2Stream<O>
282 where
283 Self: Stream<Item = S>,
284 S: Stream<Item = O> + Send + 'static + Unpin,
285 O: Send + 'static,
286 {
287 par_join(self.boxed(), concurrency)
288 }
289
290 /// Add timeout to rs2_stream operations
291 fn timeout_rs2(self, duration: Duration) -> RS2Stream<StreamResult<Self::Item>>
292 where
293 Self::Item: Send + 'static,
294 {
295 timeout(self.boxed(), duration)
296 }
297
298 /// Prefetch a specified number of elements ahead of consumption
299 ///
300 /// This can improve performance by starting to process the next elements
301 /// before they're actually needed.
302 fn prefetch_rs2(self, prefetch_count: usize) -> RS2Stream<Self::Item>
303 where
304 Self::Item: Send + 'static,
305 {
306 prefetch(self.boxed(), prefetch_count)
307 }
308
309 /// Filter out consecutive duplicate elements from this rs2_stream
310 ///
311 /// This combinator only emits elements that are different from the previous element.
312 /// It uses the default equality operator (`==`) to compare elements.
313 /// The first element is always emitted.
314 fn distinct_until_changed_rs2(self) -> RS2Stream<Self::Item>
315 where
316 Self::Item: Clone + Send + PartialEq + 'static,
317 {
318 distinct_until_changed(self.boxed())
319 }
320
321 /// Filter out consecutive duplicate elements from this rs2_stream using a custom equality function
322 ///
323 /// This combinator only emits elements that are different from the previous element.
324 /// It uses the provided equality function to compare elements.
325 /// The first element is always emitted.
326 fn distinct_until_changed_by_rs2<F>(self, eq: F) -> RS2Stream<Self::Item>
327 where
328 Self::Item: Clone + Send + 'static,
329 F: FnMut(&Self::Item, &Self::Item) -> bool + Send + 'static,
330 {
331 distinct_until_changed_by(self.boxed(), eq)
332 }
333
334 /// Interrupt this rs2_stream when a signal is received
335 ///
336 /// This combinator stops processing the rs2_stream when the signal future completes.
337 /// Resources are properly cleaned up when the rs2_stream is interrupted.
338 fn interrupt_when_rs2<F>(self, signal: F) -> RS2Stream<Self::Item>
339 where
340 Self::Item: Send + 'static,
341 F: Future<Output = ()> + Send + 'static,
342 {
343 interrupt_when(self.boxed(), signal)
344 }
345
346 /// Take elements from this rs2_stream while a predicate returns true
347 ///
348 /// This combinator yields elements from the stream as long as the predicate returns true.
349 /// It stops (and does not yield) the first element where the predicate returns false.
350 fn take_while_rs2<F, Fut>(self, predicate: F) -> RS2Stream<Self::Item>
351 where
352 F: FnMut(&Self::Item) -> Fut + Send + 'static,
353 Fut: Future<Output = bool> + Send + 'static,
354 Self::Item: Send + 'static,
355 {
356 take_while(self.boxed(), predicate)
357 }
358
359 /// Skip elements from this rs2_stream while a predicate returns true
360 ///
361 /// This combinator skips elements from the stream as long as the predicate returns true.
362 /// Once the predicate returns false, it yields that element and all remaining elements.
363 fn drop_while_rs2<F, Fut>(self, predicate: F) -> RS2Stream<Self::Item>
364 where
365 F: FnMut(&Self::Item) -> Fut + Send + 'static,
366 Fut: Future<Output = bool> + Send + 'static,
367 Self::Item: Send + 'static,
368 {
369 drop_while(self.boxed(), predicate)
370 }
371
372 /// Group adjacent elements that share a common key
373 ///
374 /// This combinator groups consecutive elements that produce the same key.
375 /// It emits groups as they complete (when the key changes or the rs2_stream ends).
376 /// Each emitted item is a tuple containing the key and a vector of elements.
377 fn group_adjacent_by_rs2<K, F>(self, key_fn: F) -> RS2Stream<(K, Vec<Self::Item>)>
378 where
379 Self::Item: Clone + Send + 'static,
380 K: Eq + Clone + Send + 'static,
381 F: FnMut(&Self::Item) -> K + Send + 'static,
382 {
383 group_adjacent_by(self.boxed(), key_fn)
384 }
385
386 /// Group consecutive elements that share a common key
387 ///
388 /// This combinator groups consecutive elements that produce the same key.
389 /// It emits groups as they complete (when the key changes or the rs2_stream ends).
390 /// Each emitted item is a tuple containing the key and a vector of elements.
391 fn group_by_rs2<K, F>(self, key_fn: F) -> RS2Stream<(K, Vec<Self::Item>)>
392 where
393 Self::Item: Clone + Send + 'static,
394 K: Eq + Clone + Send + 'static,
395 F: FnMut(&Self::Item) -> K + Send + 'static,
396 {
397 group_by(self.boxed(), key_fn)
398 }
399
400 /// Fold operation that accumulates a value over a stream
401 ///
402 /// This combinator applies a function to each element in the stream, accumulating a single result.
403 /// It returns a Future that resolves to the final accumulated value.
404 fn fold_rs2<A, F, Fut>(self, init: A, f: F) -> impl Future<Output = A>
405 where
406 F: FnMut(A, Self::Item) -> Fut + Send + 'static,
407 Fut: Future<Output = A> + Send + 'static,
408 Self::Item: Send + 'static,
409 A: Send + 'static,
410 {
411 fold(self.boxed(), init, f)
412 }
413
414 /// Scan operation that applies a function to each element and emits intermediate accumulated values
415 ///
416 /// This combinator is similar to fold but emits each intermediate accumulated value.
417 /// It applies a function to each element in the stream, accumulating a result and yielding
418 /// each intermediate accumulated value.
419 fn scan_rs2<U, F>(self, init: U, f: F) -> RS2Stream<U>
420 where
421 F: FnMut(U, Self::Item) -> U + Send + 'static,
422 Self::Item: Send + 'static,
423 U: Clone + Send + 'static,
424 {
425 scan(self.boxed(), init, f)
426 }
427
428 /// Apply a function to each element in the stream
429 ///
430 /// This combinator applies a function to each element in the stream without accumulating a result.
431 /// It returns a Future that completes when the stream is exhausted.
432 fn for_each_rs2<F, Fut>(self, mut f: F) -> impl Future<Output = ()>
433 where
434 F: FnMut(Self::Item) -> Fut + Send + 'static,
435 Fut: Future<Output = ()> + Send + 'static,
436 Self::Item: Send + 'static,
437 {
438 let mut stream = self.boxed();
439 async move {
440 while let Some(item) = stream.next().await {
441 f(item).await;
442 }
443 }
444 }
445
446 /// Take the first n elements from the stream
447 ///
448 /// This combinator yields the first n elements from the stream and then stops.
449 fn take_rs2(self, n: usize) -> RS2Stream<Self::Item>
450 where
451 Self::Item: Send + 'static,
452 {
453 take(self.boxed(), n)
454 }
455
456 /// Drop the first n elements from the stream
457 ///
458 /// This combinator skips the first n elements from the stream and yields all remaining elements.
459 fn drop_rs2(self, n: usize) -> RS2Stream<Self::Item>
460 where
461 Self::Item: Send + 'static,
462 {
463 drop(self.boxed(), n)
464 }
465
466 /// Skip the first n elements from the stream
467 ///
468 /// This combinator skips the first n elements from the stream and yields all remaining elements.
469 fn skip_rs2(self, n: usize) -> RS2Stream<Self::Item>
470 where
471 Self::Item: Send + 'static,
472 {
473 drop(self.boxed(), n)
474 }
475
476 /// Select between this rs2_stream and another rs2_stream based on which one produces a value first
477 ///
478 /// This combinator emits values from whichever rs2_stream produces a value first.
479 /// Once a value is received from one rs2_stream, the other rs2_stream is cancelled.
480 /// If either rs2_stream completes (returns None), the combinator switches to the other rs2_stream exclusively.
481 fn either_rs2(self, other: RS2Stream<Self::Item>) -> RS2Stream<Self::Item>
482 where
483 Self::Item: Send + 'static,
484 {
485 either(self, other)
486 }
487
488 /// Collect all items from the stream into a collection
489 ///
490 /// This combinator collects all items from the stream into a collection of type B.
491 /// It returns a Future that resolves to the collection.
492 ///
493 /// # Examples
494 /// ```
495 /// use rs2_stream::rs2::*;
496 /// use futures_util::stream::StreamExt;
497 ///
498 /// # async fn example() {
499 /// let stream = from_iter(vec![1, 2, 3, 4, 5]);
500 /// let result = stream.collect_rs2::<Vec<_>>().await;
501 /// assert_eq!(result, vec![1, 2, 3, 4, 5]);
502 /// # }
503 /// ```
504 fn collect_rs2<B>(self) -> impl Future<Output = B>
505 where
506 B: Default + Extend<Self::Item> + Send + 'static,
507 Self::Item: Send + 'static,
508 {
509 self.collect_with_config_rs2(BufferConfig::default())
510 }
511
512 /// Collect all items from the stream into a collection with custom buffer configuration
513 ///
514 /// This combinator collects all items from the stream into a collection of type B.
515 /// It returns a Future that resolves to the collection.
516 /// The buffer configuration allows for optimized memory allocation and growth strategies.
517 // Enhanced version that uses all BufferConfig fields
518 fn collect_with_config_rs2<B>(self, config: BufferConfig) -> impl Future<Output = B>
519 where
520 B: Default + Extend<Self::Item> + Send + 'static,
521 Self::Item: Send + 'static,
522 {
523 let mut stream = self.boxed();
524 async move {
525 if std::any::TypeId::of::<B>() == std::any::TypeId::of::<Vec<Self::Item>>() {
526 // Create Vec with smart capacity management
527 let mut vec = Vec::with_capacity(config.initial_capacity);
528 let mut items_collected = 0;
529
530 while let Some(item) = stream.next().await {
531 // Check max_capacity limit
532 if let Some(max_cap) = config.max_capacity {
533 if items_collected >= max_cap {
534 break; // Respect size limit
535 }
536 }
537
538 // Apply growth strategy when needed
539 if vec.len() == vec.capacity() {
540 let new_capacity = match config.growth_strategy {
541 GrowthStrategy::Linear(step) => vec.capacity() + step,
542 GrowthStrategy::Exponential(factor) => {
543 (vec.capacity() as f64 * factor) as usize
544 }
545 GrowthStrategy::Fixed => vec.capacity(), // No growth
546 };
547
548 let capped_capacity = if let Some(max_cap) = config.max_capacity {
549 new_capacity.min(max_cap)
550 } else {
551 new_capacity
552 };
553
554 vec.reserve(capped_capacity - vec.capacity());
555 }
556
557 vec.push(item);
558 items_collected += 1;
559 }
560
561 // Safe transmute back to B
562 let result = unsafe {
563 let ptr = &vec as *const Vec<Self::Item> as *const B;
564 let result = std::ptr::read(ptr);
565 std::mem::forget(vec);
566 result
567 };
568 result
569 } else {
570 // Fallback for other collection types
571 let mut collection = B::default();
572 while let Some(item) = stream.next().await {
573 collection.extend(std::iter::once(item));
574 }
575 collection
576 }
577 }
578 }
579
580 /// Create a sliding window of elements from the stream
581 ///
582 /// This combinator creates a sliding window of the specified size over the stream.
583 /// It yields a vector of items for each window position.
584 fn sliding_window_rs2(self, size: usize) -> RS2Stream<Vec<Self::Item>>
585 where
586 Self::Item: Clone + Send + 'static,
587 {
588 sliding_window(self.boxed(), size)
589 }
590
591 /// Process items in batches for better throughput
592 ///
593 /// This combinator processes items in batches of the specified size,
594 /// applying the processor function to each batch.
595 fn batch_process_rs2<U, F>(self, batch_size: usize, processor: F) -> RS2Stream<U>
596 where
597 F: FnMut(Vec<Self::Item>) -> Vec<U> + Send + 'static,
598 Self::Item: Send + 'static,
599 U: Send + 'static,
600 {
601 batch_process(self.boxed(), batch_size, processor)
602 }
603
604 /// Collect metrics while processing the stream
605 ///
606 /// This combinator collects metrics while processing the stream,
607 /// returning both the stream and the metrics.
608 fn with_metrics_rs2(
609 self,
610 name: String,
611 health_thresholds: HealthThresholds,
612 ) -> (RS2Stream<Self::Item>, Arc<Mutex<StreamMetrics>>)
613 where
614 Self::Item: Send + 'static,
615 {
616 with_metrics(self.boxed(), name, health_thresholds)
617 }
618
619 /// Interleave multiple streams in a round-robin fashion
620 ///
621 /// This combinator takes a vector of streams and interleaves their elements
622 /// in a round-robin fashion.
623 fn interleave_rs2<S>(self, streams: Vec<S>) -> RS2Stream<Self::Item>
624 where
625 S: Stream<Item = Self::Item> + Send + 'static + Unpin,
626 Self::Item: Send + 'static,
627 {
628 let mut all_streams = vec![self.boxed()];
629 all_streams.extend(streams.into_iter().map(|s| s.boxed()));
630 interleave(all_streams)
631 }
632
633 /// Chunk the stream into vectors of the specified size
634 ///
635 /// This combinator collects elements from the stream into vectors of the specified size.
636 /// If the stream ends before a chunk is filled, the final chunk may contain fewer elements.
637 fn chunk_rs2(self, size: usize) -> RS2Stream<Vec<Self::Item>>
638 where
639 Self::Item: Send + 'static,
640 {
641 chunk(self.boxed(), size)
642 }
643
644 /// Create a stream that emits values at a fixed rate
645 ///
646 /// This combinator creates a stream that emits the provided item at a fixed rate.
647 fn tick_rs<O>(self, period: Duration, item: O) -> RS2Stream<O>
648 where
649 O: Clone + Send + 'static,
650 {
651 tick(period, item)
652 }
653
654 /// Bracket for resource management
655 ///
656 /// This combinator ensures that a resource is properly released after use.
657 /// It takes three parameters:
658 /// 1. A future that acquires a resource
659 /// 2. A function that uses the resource and returns a stream
660 /// 3. A function that releases the resource
661 fn bracket_rs<A, O, St, FAcq, FUse, FRel, R>(
662 self,
663 acquire: FAcq,
664 use_fn: FUse,
665 release: FRel,
666 ) -> RS2Stream<O>
667 where
668 FAcq: Future<Output = A> + Send + 'static,
669 FUse: FnOnce(A) -> St + Send + 'static,
670 St: Stream<Item = O> + Send + 'static,
671 FRel: FnOnce(A) -> R + Send + 'static,
672 R: Future<Output = ()> + Send + 'static,
673 O: Send + 'static,
674 A: Clone + Send + 'static,
675 {
676 bracket(acquire, use_fn, release)
677 }
678
679 fn with_schema_validation_rs2<V, T>(
680 self,
681 validator: V,
682 ) -> Pin<Box<dyn futures_util::Stream<Item = T> + Send>>
683 where
684 V: SchemaValidator + 'static,
685 T: serde::de::DeserializeOwned + Serialize + Send + 'static,
686 Self: futures_util::Stream<Item = T> + Send + 'static,
687 {
688 use futures_util::StreamExt;
689 let validator = std::sync::Arc::new(validator);
690 self.filter_map(move |item| {
691 let validator = validator.clone();
692 async move {
693 let bytes = match serde_json::to_vec(&item) {
694 Ok(b) => b,
695 Err(e) => {
696 log::error!("Schema validation: failed to serialize item: {}", e);
697 return None;
698 }
699 };
700 match validator.validate(&bytes).await {
701 Ok(()) => Some(item),
702 Err(e) => {
703 log::warn!("Schema validation failed: {}", e);
704 None
705 }
706 }
707 }
708 })
709 .boxed()
710 }
711}
712
713impl<S> RS2StreamExt for S where S: Stream + Sized + Unpin + Send + 'static {}