rs2_stream/
rs2.rs

1//! RStream - A Rust streaming library inspired by FS2/RS2
2//!
3//! This module provides the core streaming functionality with functional
4//! programming patterns, backpressure handling, and resource management.
5
6use async_stream::stream;
7use futures::channel::mpsc::{channel, Receiver, Sender};
8use futures_core::Stream;
9use futures_util::pin_mut;
10use futures_util::{
11    future,
12    stream::{self, BoxStream, FuturesUnordered, StreamExt},
13    SinkExt,
14};
15use std::future::Future;
16use std::time::Duration;
17use std::sync::Arc;
18use tokio::{spawn, time::sleep};
19use tokio::sync::Mutex;
20
21use crate::error::{StreamError, StreamResult, RetryPolicy};
22use crate::stream_performance_metrics::StreamMetrics;
23
24/// A boxed, heap-allocated Rust Stream analogous to RS2's Stream[F, O]
25pub type RS2Stream<O> = BoxStream<'static, O>;
26
27/// Backpressure strategy for automatic flow control
28#[derive(Debug, Clone, Copy)]
29pub enum BackpressureStrategy {
30    /// Drop oldest items when buffer is full
31    DropOldest,
32    /// Drop newest items when buffer is full  
33    DropNewest,
34    /// Block producer until consumer catches up
35    Block,
36    /// Fail fast when buffer is full
37    Error,
38}
39
40/// Configuration for automatic backpressure
41#[derive(Debug, Clone)]
42pub struct BackpressureConfig {
43    pub strategy: BackpressureStrategy,
44    pub buffer_size: usize,
45    pub low_watermark: Option<usize>,  // Resume at this level
46    pub high_watermark: Option<usize>, // Pause at this level
47}
48
49impl Default for BackpressureConfig {
50    fn default() -> Self {
51        Self {
52            strategy: BackpressureStrategy::Block,
53            buffer_size: 100,
54            low_watermark: Some(25),
55            high_watermark: Some(75),
56        }
57    }
58}
59
60/// ExitCase for bracketCase semantics
61#[derive(Debug, Clone)]
62pub enum ExitCase<E> {
63    Completed,
64    Errored(E),
65}
66
67// ================================
68// Core Stream Constructors
69// ================================
70
71/// Emit a single element as a rs2_stream
72pub fn emit<O>(item: O) -> RS2Stream<O>
73where
74    O: Send + 'static,
75{
76    stream::once(future::ready(item)).boxed()
77}
78
79/// Create an empty rs2_stream that completes immediately
80pub fn empty<O>() -> RS2Stream<O>
81where
82    O: Send + 'static,
83{
84    stream::empty().boxed()
85}
86
87/// Create a rs2_stream from an iterator
88pub fn from_iter<I, O>(iter: I) -> RS2Stream<O>
89where
90    I: IntoIterator<Item = O> + Send + 'static,
91    <I as IntoIterator>::IntoIter: Send,
92    O: Send + 'static,
93{
94    stream::iter(iter).boxed()
95}
96
97/// Evaluate a Future and emit its output
98pub fn eval<O, F>(fut: F) -> RS2Stream<O>
99where
100    F: Future<Output = O> + Send + 'static,
101    O: Send + 'static,
102{
103    stream::once(fut).boxed()
104}
105
106/// Repeat a value indefinitely
107pub fn repeat<O>(item: O) -> RS2Stream<O>
108where
109    O: Clone + Send + 'static,
110{
111    stream::repeat(item).boxed()
112}
113
114/// Create a rs2_stream that emits a single value after a delay
115pub fn emit_after<O>(item: O, duration: Duration) -> RS2Stream<O>
116where
117    O: Send + 'static,
118{
119    stream::once(async move {
120        sleep(duration).await;
121        item
122    }).boxed()
123}
124
125/// Generate a rs2_stream from a seed value and a function
126///
127/// This combinator takes an initial state and a function that produces an element and the next state.
128/// It continues until the function returns None.
129///
130/// # Examples
131/// ```
132/// use rs2_stream::rs2::*;
133/// use futures_util::stream::StreamExt;
134/// 
135/// # async fn example() {
136/// // Create a rs2_stream of Fibonacci numbers
137/// let fibonacci = unfold(
138///     (0, 1),
139///     |state| async move {
140///         let (a, b) = state;
141///         Some((a, (b, a + b)))
142///     }
143/// );
144///
145/// // Take the first 10 Fibonacci numbers
146/// let result = fibonacci.take(10).collect::<Vec<_>>().await;
147/// assert_eq!(result, vec![0, 1, 1, 2, 3, 5, 8, 13, 21, 34]);
148/// # }
149/// ```
150pub fn unfold<S, O, F, Fut>(init: S, mut f: F) -> RS2Stream<O>
151where
152    S: Send + 'static,
153    O: Send + 'static,
154    F: FnMut(S) -> Fut + Send + 'static,
155    Fut: Future<Output = Option<(O, S)>> + Send + 'static,
156{
157    stream! {
158        let mut state_opt = Some(init);
159
160        loop {
161            let state = state_opt.take().expect("State should be available");
162            let fut = f(state);
163            match fut.await {
164                Some((item, next_state)) => {
165                    yield item;
166                    state_opt = Some(next_state);
167                },
168                None => break,
169            }
170        }
171    }
172    .boxed()
173}
174
175// ================================
176// Stream Transformations
177// ================================
178
179/// Group adjacent elements that share a common key
180///
181/// This combinator groups consecutive elements that produce the same key.
182/// It emits groups as they complete (when the key changes or the rs2_stream ends).
183/// Each emitted item is a tuple containing the key and a vector of elements.
184///
185/// # Examples
186/// ```
187/// use rs2_stream::rs2::*;
188/// use futures_util::stream::StreamExt;
189///
190/// # async fn example() {
191/// let rs2_stream = from_iter(vec![1, 1, 2, 2, 3, 3, 2, 1]);
192/// let result = group_adjacent_by(rs2_stream, |&x| x % 2).collect::<Vec<_>>().await;
193/// assert_eq!(result, vec![(1, vec![1, 1]), (0, vec![2, 2]), (1, vec![3, 3]), (0, vec![2]), (1, vec![1])]);
194/// # }
195/// ```
196pub fn group_adjacent_by<O, K, F>(s: RS2Stream<O>, mut key_fn: F) -> RS2Stream<(K, Vec<O>)>
197where
198    O: Clone + Send + 'static,
199    K: Eq + Clone + Send + 'static,
200    F: FnMut(&O) -> K + Send + 'static,
201{
202    stream! {
203        pin_mut!(s);
204        let mut current_key: Option<K> = None;
205        let mut current_group: Vec<O> = Vec::new();
206
207        while let Some(item) = s.next().await {
208            let key = key_fn(&item);
209
210            match &current_key {
211                Some(k) if *k == key => {
212                    current_group.push(item);
213                },
214                _ => {
215                    if !current_group.is_empty() {
216                        yield (current_key.clone().unwrap(), std::mem::take(&mut current_group));
217                    }
218                    current_key = Some(key);
219                    current_group.push(item);
220                }
221            }
222        }
223
224        if !current_group.is_empty() {
225            yield (current_key.clone().unwrap(), std::mem::take(&mut current_group));
226        }
227    }
228    .boxed()
229}
230
231/// Slice: take first n items
232pub fn take<O>(s: RS2Stream<O>, n: usize) -> RS2Stream<O>
233where
234    O: Send + 'static,
235{
236    s.take(n).boxed()
237}
238
239/// Slice: drop first n items
240pub fn drop<O>(s: RS2Stream<O>, n: usize) -> RS2Stream<O>
241where
242    O: Send + 'static,
243{
244    s.skip(n).boxed()
245}
246
247/// Chunk the rs2_stream into Vecs of size n
248pub fn chunk<O>(s: RS2Stream<O>, size: usize) -> RS2Stream<Vec<O>>
249where
250    O: Send + 'static,
251{
252    stream! {
253        let mut buf = Vec::with_capacity(size);
254        pin_mut!(s);
255        while let Some(item) = s.next().await {
256            buf.push(item);
257            if buf.len() == size {
258                yield std::mem::take(&mut buf);
259            }
260        }
261        if !buf.is_empty() {
262            yield std::mem::take(&mut buf);
263        }
264    }
265        .boxed()
266}
267
268/// Add timeout support to any rs2_stream
269pub fn timeout<T>(s: RS2Stream<T>, duration: Duration) -> RS2Stream<StreamResult<T>>
270where
271    T: Send + 'static,
272{
273    stream! {
274        pin_mut!(s);
275        loop {
276            match tokio::time::timeout(duration, s.next()).await {
277                Ok(Some(value)) => yield Ok(value),
278                Ok(None) => break,
279                Err(_) => yield Err(StreamError::Timeout),
280            }
281        }
282    }.boxed()
283}
284
285/// Scan operation (like fold but emits intermediate results)
286pub fn scan<T, U, F>(s: RS2Stream<T>, init: U, mut f: F) -> RS2Stream<U>
287where
288    F: FnMut(U, T) -> U + Send + 'static,
289    T: Send + 'static,
290    U: Clone + Send + 'static,
291{
292    stream! {
293        let mut acc = init;
294        pin_mut!(s);
295        while let Some(item) = s.next().await {
296            acc = f(acc.clone(), item);
297            yield acc.clone();
298        }
299    }.boxed()
300}
301
302/// Fold operation that accumulates a value over a stream
303pub fn fold<T, A, F, Fut>(s: RS2Stream<T>, init: A, mut f: F) -> impl Future<Output = A>
304where
305    F: FnMut(A, T) -> Fut + Send + 'static,
306    Fut: Future<Output = A> + Send + 'static,
307    T: Send + 'static,
308    A: Send + 'static,
309{
310    async move {
311        let mut acc = init;
312        pin_mut!(s);
313        while let Some(item) = s.next().await {
314            acc = f(acc, item).await;
315        }
316        acc
317    }
318}
319
320/// Reduce operation that combines all elements in a stream using a binary operation
321pub fn reduce<T, F, Fut>(s: RS2Stream<T>, mut f: F) -> impl Future<Output = Option<T>>
322where
323    F: FnMut(T, T) -> Fut + Send + 'static,
324    Fut: Future<Output = T> + Send + 'static,
325    T: Send + 'static,
326{
327    async move {
328        pin_mut!(s);
329        let first = match s.next().await {
330            Some(item) => item,
331            None => return None, // Return None for empty streams
332        };
333
334        let mut acc = first;
335        while let Some(item) = s.next().await {
336            acc = f(acc, item).await;
337        }
338
339        Some(acc)
340    }
341}
342
343/// Filter and map elements of a stream in one operation
344pub fn filter_map<T, U, F, Fut>(s: RS2Stream<T>, f: F) -> RS2Stream<U>
345where
346    F: FnMut(T) -> Fut + Send + 'static,
347    Fut: Future<Output = Option<U>> + Send + 'static,
348    T: Send + 'static,
349    U: Send + 'static,
350{
351    s.filter_map(f).boxed()
352}
353
354/// Take elements from a stream while a predicate returns true
355///
356/// This combinator yields elements from the stream as long as the predicate returns true.
357/// It stops (and does not yield) the first element where the predicate returns false.
358///
359/// # Examples
360/// ```
361/// use rs2_stream::rs2::*;
362/// use futures_util::stream::StreamExt;
363///
364/// # async fn example() {
365/// let stream = from_iter(vec![1, 2, 3, 4, 5]);
366/// let result = take_while(stream, |&x| async move { x < 4 }).collect::<Vec<_>>().await;
367/// assert_eq!(result, vec![1, 2, 3]);
368/// # }
369/// ```
370pub fn take_while<T, F, Fut>(s: RS2Stream<T>, mut predicate: F) -> RS2Stream<T>
371where
372    F: FnMut(&T) -> Fut + Send + 'static,
373    Fut: Future<Output = bool> + Send + 'static,
374    T: Send + 'static,
375{
376    stream! {
377        pin_mut!(s);
378        while let Some(item) = s.next().await {
379            if predicate(&item).await {
380                yield item;
381            } else {
382                break;
383            }
384        }
385    }.boxed()
386}
387
388/// Skip elements from a stream while a predicate returns true
389///
390/// This combinator skips elements from the stream as long as the predicate returns true.
391/// Once the predicate returns false, it yields that element and all remaining elements.
392///
393/// # Examples
394/// ```
395/// use rs2_stream::rs2::*;
396/// use futures_util::stream::StreamExt;
397///
398/// # async fn example() {
399/// let stream = from_iter(vec![1, 2, 3, 4, 5]);
400/// let result = drop_while(stream, |&x| async move { x < 4 }).collect::<Vec<_>>().await;
401/// assert_eq!(result, vec![4, 5]);
402/// # }
403/// ```
404pub fn drop_while<T, F, Fut>(s: RS2Stream<T>, mut predicate: F) -> RS2Stream<T>
405where
406    F: FnMut(&T) -> Fut + Send + 'static,
407    Fut: Future<Output = bool> + Send + 'static,
408    T: Send + 'static,
409{
410    stream! {
411        pin_mut!(s);
412
413        let mut found_false = false;
414        while let Some(item) = s.next().await {
415            if !found_false && predicate(&item).await {
416                continue;
417            } else {
418                found_false = true;
419                yield item;
420            }
421        }
422    }.boxed()
423}
424
425/// Group consecutive elements that share a common key
426///
427/// This combinator groups consecutive elements that produce the same key.
428/// It emits groups as they complete (when the key changes or the stream ends).
429/// Each emitted item is a tuple containing the key and a vector of elements.
430///
431/// # Examples
432/// ```
433/// use rs2_stream::rs2::*;
434/// use futures_util::stream::StreamExt;
435///
436/// # async fn example() {
437/// let stream = from_iter(vec![1, 1, 2, 2, 3, 3, 2, 1]);
438/// let result = group_by(stream, |&x| x % 2).collect::<Vec<_>>().await;
439/// assert_eq!(result, vec![(1, vec![1, 1]), (0, vec![2, 2]), (1, vec![3, 3]), (0, vec![2]), (1, vec![1])]);
440/// # }
441/// ```
442pub fn group_by<T, K, F>(s: RS2Stream<T>, mut key_fn: F) -> RS2Stream<(K, Vec<T>)>
443where
444    T: Clone + Send + 'static,
445    K: Eq + Clone + Send + 'static,
446    F: FnMut(&T) -> K + Send + 'static,
447{
448    stream! {
449        pin_mut!(s);
450        let mut current_key: Option<K> = None;
451        let mut current_group: Vec<T> = Vec::new();
452
453        while let Some(item) = s.next().await {
454            let key = key_fn(&item);
455
456            match &current_key {
457                Some(k) if *k == key => {
458                    current_group.push(item);
459                },
460                _ => {
461                    if !current_group.is_empty() {
462                        yield (current_key.clone().unwrap(), std::mem::take(&mut current_group));
463                    }
464                    current_key = Some(key);
465                    current_group.push(item);
466                }
467            }
468        }
469
470        if !current_group.is_empty() {
471            yield (current_key.clone().unwrap(), std::mem::take(&mut current_group));
472        }
473    }
474    .boxed()
475}
476
477/// Sliding window operation
478pub fn sliding_window<T>(s: RS2Stream<T>, size: usize) -> RS2Stream<Vec<T>>
479where
480    T: Clone + Send + 'static,
481{
482    if size == 0 {
483        return empty();
484    }
485
486    stream! {
487        let mut window = Vec::with_capacity(size);
488        pin_mut!(s);
489
490        while let Some(item) = s.next().await {
491            window.push(item);
492
493            if window.len() > size {
494                window.remove(0);
495            }
496
497            if window.len() == size {
498                yield window.clone();
499            }
500        }
501    }.boxed()
502}
503
504/// Batch processing for better throughput
505pub fn batch_process<T, U, F>(
506    s: RS2Stream<T>,
507    batch_size: usize,
508    mut processor: F
509) -> RS2Stream<U>
510where
511    F: FnMut(Vec<T>) -> Vec<U> + Send + 'static,
512    T: Send + 'static,
513    U: Send + 'static,
514{
515    stream! {
516        let chunked = chunk(s, batch_size);
517        pin_mut!(chunked);
518        while let Some(batch) = chunked.next().await {
519            for item in processor(batch) {
520                yield item;
521            }
522        }
523    }.boxed()
524}
525
526/// Collect metrics while processing rs2_stream
527pub fn with_metrics<T>(s: RS2Stream<T>, _name: String) -> (RS2Stream<T>, Arc<Mutex<StreamMetrics>>)
528where
529    T: Send + 'static,
530{
531    let metrics = Arc::new(Mutex::new(StreamMetrics::new()));
532    let metrics_clone = Arc::clone(&metrics);
533
534    let monitored_stream = stream! {
535        pin_mut!(s);
536        while let Some(item) = s.next().await {
537            {
538                let mut m = metrics_clone.lock().await;
539                m.record_item(std::mem::size_of_val(&item) as u64);
540            }
541            yield item;
542        }
543
544        {
545            let mut m = metrics_clone.lock().await;
546            m.finalize();
547        }
548    }.boxed();
549
550    (monitored_stream, metrics)
551}
552
553// ================================
554// Backpressure Management
555// ================================
556
557/// Automatic backpressure with configurable strategy
558pub fn auto_backpressure<O>(s: RS2Stream<O>, config: BackpressureConfig) -> RS2Stream<O>
559where
560    O: Send + 'static,
561{
562    match config.strategy {
563        BackpressureStrategy::Block => auto_backpressure_block(s, config.buffer_size),
564        BackpressureStrategy::DropOldest => auto_backpressure_drop_oldest(s, config.buffer_size),
565        BackpressureStrategy::DropNewest => auto_backpressure_drop_newest(s, config.buffer_size),
566        BackpressureStrategy::Error => auto_backpressure_error(s, config.buffer_size),
567    }
568}
569
570/// Automatic backpressure with blocking strategy
571pub fn auto_backpressure_block<O>(s: RS2Stream<O>, buffer_size: usize) -> RS2Stream<O>
572where
573    O: Send + 'static,
574{
575    let (mut tx, rx): (Sender<O>, Receiver<O>) = channel(buffer_size);
576
577    spawn(async move {
578        pin_mut!(s);
579        while let Some(item) = s.next().await {
580            if tx.send(item).await.is_err() {
581                break;
582            }
583        }
584    });
585
586    stream! {
587        let mut rx = rx;
588        while let Some(item) = rx.next().await {
589            yield item;
590        }
591    }
592        .boxed()
593}
594
595/// Automatic backpressure that drops oldest items when buffer is full
596pub fn auto_backpressure_drop_oldest<O>(s: RS2Stream<O>, buffer_size: usize) -> RS2Stream<O>
597where
598    O: Send + 'static,
599{
600    use std::collections::VecDeque;
601
602    let buffer = Arc::new(Mutex::new(VecDeque::<O>::new()));
603    let buffer_clone = Arc::clone(&buffer);
604    let (done_tx, mut done_rx) = tokio::sync::mpsc::channel(1);
605
606    spawn(async move {
607        pin_mut!(s);
608        while let Some(item) = s.next().await {
609            let mut buf = buffer_clone.lock().await;
610
611            if buf.len() >= buffer_size {
612                buf.pop_front();
613            }
614
615            buf.push_back(item);
616        }
617
618        let _ = done_tx.send(()).await;
619    });
620
621    stream! {
622        let mut source_done = false;
623
624        loop {
625            if let Ok(_) = done_rx.try_recv() {
626                source_done = true;
627            }
628
629            let item = {
630                let mut buf = buffer.lock().await;
631                buf.pop_front()
632            };
633
634            match item {
635                Some(item) => yield item,
636                None => {
637                    if source_done {
638                        break;
639                    }
640                    tokio::time::sleep(Duration::from_millis(1)).await;
641                }
642            }
643        }
644    }
645        .boxed()
646}
647
648/// Automatic backpressure that drops newest items when buffer is full
649pub fn auto_backpressure_drop_newest<O>(s: RS2Stream<O>, buffer_size: usize) -> RS2Stream<O>
650where
651    O: Send + 'static,
652{
653    use std::collections::VecDeque;
654
655    let buffer = Arc::new(Mutex::new(VecDeque::<O>::new()));
656    let buffer_clone = Arc::clone(&buffer);
657    let (done_tx, mut done_rx) = tokio::sync::mpsc::channel(1);
658
659    spawn(async move {
660        pin_mut!(s);
661        while let Some(item) = s.next().await {
662            let mut buf = buffer_clone.lock().await;
663
664            if buf.len() < buffer_size {
665                buf.push_back(item);
666            }
667        }
668
669        let _ = done_tx.send(()).await;
670    });
671
672    stream! {
673        let mut source_done = false;
674
675        loop {
676            if let Ok(_) = done_rx.try_recv() {
677                source_done = true;
678            }
679
680            let item = {
681                let mut buf = buffer.lock().await;
682                buf.pop_front()
683            };
684
685            match item {
686                Some(item) => yield item,
687                None => {
688                    if source_done {
689                        break;
690                    }
691                    tokio::time::sleep(Duration::from_millis(1)).await;
692                }
693            }
694        }
695    }
696        .boxed()
697}
698
699/// Automatic backpressure that errors when buffer is full
700pub fn auto_backpressure_error<O>(s: RS2Stream<O>, buffer_size: usize) -> RS2Stream<O>
701where
702    O: Send + 'static,
703{
704    use tokio::sync::mpsc;
705
706    let (tx, mut rx) = mpsc::channel(buffer_size);
707
708    spawn(async move {
709        pin_mut!(s);
710        while let Some(item) = s.next().await {
711            if tx.send(item).await.is_err() {
712                break;
713            }
714        }
715    });
716
717    stream! {
718        while let Some(item) = rx.recv().await {
719            yield item;
720        }
721    }
722        .boxed()
723}
724
725// ================================
726// Stream Combinators
727// ================================
728
729/// Interrupt a rs2_stream when a signal is received
730///
731/// This combinator takes a rs2_stream and a future that signals interruption.
732/// It stops processing the rs2_stream when the signal future completes.
733/// Resources are properly cleaned up when the rs2_stream is interrupted.
734///
735/// # Examples
736/// ```
737/// use rs2_stream::rs2::*;
738/// use std::time::Duration;
739/// use tokio::time::sleep;
740/// use async_stream::stream;
741/// use futures_util::stream::StreamExt;
742///
743/// # async fn example() {
744/// // Create a rs2_stream that emits numbers every 100ms
745/// let rs2_stream = from_iter(0..100)
746///     .throttle_rs2(Duration::from_millis(100));
747///
748/// // Create a future that completes after 250ms
749/// let interrupt_signal = sleep(Duration::from_millis(250));
750///
751/// // The rs2_stream will be interrupted after about 250ms,
752/// // so we should get approximately 2-3 items
753/// let result = interrupt_when(rs2_stream, interrupt_signal)
754///     .collect::<Vec<_>>()
755///     .await;
756///
757/// assert!(result.len() >= 2 && result.len() <= 3);
758/// # }
759/// ```
760pub fn interrupt_when<O, F>(s: RS2Stream<O>, signal: F) -> RS2Stream<O>
761where
762    O: Send + 'static,
763    F: Future<Output = ()> + Send + 'static,
764{
765    stream! {
766        pin_mut!(s);
767        pin_mut!(signal);
768
769        loop {
770            tokio::select! {
771                biased;
772                _ = &mut signal => {
773                    break;
774                },
775
776                maybe_item = s.next() => {
777                    match maybe_item {
778                        Some(item) => yield item,
779                        None => break,
780                    }
781                },
782            }
783        }
784    }
785    .boxed()
786}
787
788/// Concatenate multiple streams sequentially
789pub fn concat<O, S>(streams: Vec<S>) -> RS2Stream<O>
790where
791    S: Stream<Item = O> + Send + 'static,
792    O: Send + 'static,
793{
794    stream! {
795        for s in streams {
796            pin_mut!(s);
797            while let Some(item) = s.next().await {
798                yield item;
799            }
800        }
801    }
802        .boxed()
803}
804
805/// Merge two streams into one interleaved output
806pub fn merge<O, S1, S2>(s1: S1, mut s2: S2) -> RS2Stream<O>
807where
808    S1: Stream<Item = O> + Send + 'static,
809    S2: Stream<Item = O> + Send + 'static + Unpin,
810    O: Send + 'static,
811{
812    let chained = s1
813        .map(Some)
814        .chain(stream! { while let Some(x) = s2.next().await { yield Some(x) } });
815    stream! {
816        pin_mut!(chained);
817        while let Some(item) = chained.next().await {
818            if let Some(x) = item {
819                yield x;
820            }
821        }
822    }
823        .boxed()
824}
825
826
827/// Interleave multiple streams in a round-robin fashion
828pub fn interleave<O, S>(streams: Vec<S>) -> RS2Stream<O>
829where
830    S: Stream<Item = O> + Send + 'static + Unpin,
831    O: Send + 'static,
832{
833    if streams.is_empty() {
834        return empty();
835    }
836
837    stream! {
838        let mut streams: Vec<_> = streams.into_iter().map(|s| Box::pin(s)).collect();
839        let mut index = 0;
840
841        while !streams.is_empty() {
842            if index >= streams.len() {
843                index = 0;
844            }
845
846            match streams[index].next().await {
847                Some(item) => {
848                    yield item;
849                    index += 1;
850                }
851                None => {
852                    streams.remove(index);
853                }
854            }
855        }
856    }
857        .boxed()
858}
859
860/// Combine two streams element-by-element using a provided function
861/// Returns a new rs2_stream with the combined elements
862/// Stops when either input rs2_stream ends
863pub fn zip_with<A, B, O, F, S1, S2>(s1: S1, s2: S2, mut f: F) -> RS2Stream<O>
864where
865    S1: Stream<Item = A> + Send + 'static,
866    S2: Stream<Item = B> + Send + 'static,
867    F: FnMut(A, B) -> O + Send + 'static,
868    A: Send + 'static,
869    B: Send + 'static,
870    O: Send + 'static,
871{
872    stream! {
873        pin_mut!(s1);
874        pin_mut!(s2);
875
876        loop {
877            match futures_util::future::join(s1.next(), s2.next()).await {
878                (Some(a), Some(b)) => yield f(a, b),
879                _ => break, // Stop when either rs2_stream ends
880            }
881        }
882    }
883    .boxed()
884}
885
886/// Select between two streams based on which one produces a value first
887///
888/// This combinator takes two streams and emits values from whichever rs2_stream
889/// produces a value first. Once a value is received from one rs2_stream, the other
890/// rs2_stream is cancelled. If either rs2_stream completes (returns None), the combinator
891/// switches to the other rs2_stream exclusively.
892///
893/// # Examples
894/// ```
895/// use rs2_stream::rs2::*;
896/// use std::time::Duration;
897/// use async_stream::stream;
898/// use tokio::time::sleep;
899/// use futures_util::stream::StreamExt;
900///
901/// # async fn example() {
902/// // Create two streams with different timing
903/// let fast_stream = stream! {
904///     yield 1;
905///     sleep(Duration::from_millis(10)).await;
906///     yield 2;
907///     sleep(Duration::from_millis(100)).await;
908///     yield 3;
909/// };
910///
911/// let slow_stream = stream! {
912///     sleep(Duration::from_millis(50)).await;
913///     yield 10;
914///     sleep(Duration::from_millis(10)).await;
915///     yield 20;
916/// };
917///
918/// // The either combinator will select values from whichever rs2_stream produces first
919/// let result = either(fast_stream.boxed(), slow_stream.boxed())
920///     .collect::<Vec<_>>()
921///     .await;
922///
923/// // We expect to get values from the fast rs2_stream first, then from the slow rs2_stream
924/// // when the fast rs2_stream is waiting longer
925/// assert_eq!(result, vec![1, 2, 10, 20]);
926/// # }
927/// ```
928pub fn either<O, S1, S2>(s1: S1, s2: S2) -> RS2Stream<O>
929where
930    S1: Stream<Item = O> + Send + 'static,
931    S2: Stream<Item = O> + Send + 'static,
932    O: Send + 'static,
933{
934    stream! {
935        pin_mut!(s1);
936        pin_mut!(s2);
937
938        let mut s1_done = false;
939        let mut s2_done = false;
940
941        let mut using_s1 = true;
942
943        loop {
944            if s1_done {
945                match s2.next().await {
946                    Some(item) => yield item,
947                    None => break,
948                }
949                continue;
950            }
951
952            if s2_done {
953                match s1.next().await {
954                    Some(item) => yield item,
955                    None => break,
956                }
957                continue;
958            }
959
960            if using_s1 {
961                match s1.next().await {
962                    Some(item) => {
963                        yield item;
964                    },
965                    None => {
966                        s1_done = true;
967                    }
968                }
969            } else {
970                match s2.next().await {
971                    Some(item) => {
972                        yield item;
973                    },
974                    None => {
975                        s2_done = true;
976                    }
977                }
978            }
979
980            tokio::select! {
981                biased;
982
983                maybe_item = s1.next() => {
984                    match maybe_item {
985                        Some(item) => {
986                            yield item;
987                            using_s1 = true;
988                        },
989                        None => {
990                            s1_done = true;
991                        }
992                    }
993                },
994                maybe_item = s2.next() => {
995                    match maybe_item {
996                        Some(item) => {
997                            yield item;
998                            using_s1 = false;
999                        },
1000                        None => {
1001                            s2_done = true;
1002                        }
1003                    }
1004                }
1005            }
1006        }
1007    }
1008    .boxed()
1009}
1010
1011// ================================
1012// Timing and Rate Control
1013// ================================
1014
1015/// Debounce a rs2_stream, only emitting an element after a specified quiet period has passed
1016/// without receiving another element
1017///
1018/// This combinator waits for a quiet period (specified by `duration`) after receiving an element
1019/// before emitting it. If another element arrives during the quiet period, the timer is reset
1020/// and the new element replaces the previous one.
1021///
1022/// This is useful for handling rapidly updating sources where you only want to process
1023/// the most recent value after the source has settled.
1024pub fn debounce<O>(s: RS2Stream<O>, duration: Duration) -> RS2Stream<O>
1025where
1026    O: Send + 'static,
1027{
1028    stream! {
1029        pin_mut!(s);
1030
1031        let mut latest_item: Option<O> = None;
1032        let mut timer_handle: Option<tokio::task::JoinHandle<()>> = None;
1033
1034        let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(1);
1035
1036        loop {
1037            tokio::select! {
1038                maybe_item = s.next() => {
1039                    match maybe_item {
1040                        Some(item) => {
1041                            if let Some(handle) = timer_handle.take() {
1042                                handle.abort();
1043                            }
1044
1045                            latest_item = Some(item);
1046
1047                            let tx_clone = tx.clone();
1048                            timer_handle = Some(tokio::spawn(async move {
1049                                tokio::time::sleep(duration).await;
1050                                let _ = tx_clone.send(()).await;
1051                            }));
1052                        },
1053                        None => {
1054                            if let Some(item) = latest_item.take() {
1055                                yield item;
1056                            }
1057                            break;
1058                        }
1059                    }
1060                },
1061                _ = rx.recv() => {
1062                    if let Some(item) = latest_item.take() {
1063                        yield item;
1064                    }
1065                }
1066            }
1067        }
1068    }
1069    .boxed()
1070}
1071
1072/// Filter out consecutive duplicate elements from a rs2_stream
1073/// 
1074/// This combinator only emits elements that are different from the previous element.
1075/// It uses the default equality operator (`==`) to compare elements.
1076/// The first element is always emitted.
1077///
1078/// # Examples
1079/// ```
1080/// use rs2_stream::rs2::*;
1081/// use futures_util::stream::StreamExt;
1082///
1083/// # async fn example() {
1084/// let rs2_stream = from_iter(vec![1, 1, 2, 2, 3, 3, 2, 1]);
1085/// let result = distinct_until_changed(rs2_stream).collect::<Vec<_>>().await;
1086/// assert_eq!(result, vec![1, 2, 3, 2, 1]);
1087/// # }
1088/// ```
1089pub fn distinct_until_changed<O>(s: RS2Stream<O>) -> RS2Stream<O>
1090where
1091    O: Clone + Send + PartialEq + 'static,
1092{
1093    stream! {
1094        pin_mut!(s);
1095        let mut prev: Option<O> = None;
1096
1097        while let Some(item) = s.next().await {
1098            match &prev {
1099                Some(p) if p == &item => {
1100                },
1101                _ => {
1102                    yield item.clone();
1103                    prev = Some(item);
1104                }
1105            }
1106        }
1107    }
1108    .boxed()
1109}
1110
1111/// Sample a rs2_stream at regular intervals, emitting the most recent value
1112///
1113/// This combinator samples the most recent value from a rs2_stream at a regular interval.
1114/// It only emits a value if at least one new value has arrived since the last emission.
1115/// If no new value has arrived during an interval, that interval is skipped.
1116///
1117/// # Examples
1118/// ```
1119/// use rs2_stream::rs2::*;
1120/// use futures_util::stream::StreamExt;
1121/// use std::time::Duration;
1122/// use tokio::time::sleep;
1123/// use async_stream::stream;
1124///
1125/// # async fn example() {
1126/// // Create a rs2_stream that emits values faster than the sample interval
1127/// let rs2_stream = stream! {
1128///     yield 1;
1129///     sleep(Duration::from_millis(10)).await;
1130///     yield 2;
1131///     sleep(Duration::from_millis(10)).await;
1132///     yield 3;
1133///     sleep(Duration::from_millis(100)).await;
1134///     yield 4;
1135/// };
1136///
1137/// // Sample the rs2_stream every 50ms
1138/// let result = sample(rs2_stream.boxed(), Duration::from_millis(50))
1139///     .collect::<Vec<_>>()
1140///     .await;
1141///
1142/// // We expect to get the most recent value at each interval:
1143/// // - 3 (the most recent value after the first 50ms)
1144/// // - 4 (the most recent value after the next 50ms)
1145/// assert_eq!(result, vec![3, 4]);
1146/// # }
1147/// ```
1148pub fn sample<O>(s: RS2Stream<O>, interval: Duration) -> RS2Stream<O>
1149where
1150    O: Clone + Send + 'static,
1151{
1152    stream! {
1153        pin_mut!(s);
1154
1155        let mut latest_item: Option<O> = None;
1156        let mut has_new_value = false;
1157
1158        let mut timer = tokio::time::interval(interval);
1159        timer.tick().await;
1160
1161        loop {
1162            tokio::select! {
1163                maybe_item = s.next() => {
1164                    match maybe_item {
1165                        Some(item) => {
1166                            latest_item = Some(item);
1167                            has_new_value = true;
1168                        },
1169                        None => {
1170                            if has_new_value {
1171                                if let Some(item) = latest_item.take() {
1172                                    yield item;
1173                                }
1174                            }
1175                            break;
1176                        }
1177                    }
1178                },
1179                _ = timer.tick() => {
1180                    if has_new_value {
1181                        if let Some(ref item) = latest_item {
1182                            yield item.clone();
1183                            has_new_value = false;
1184                        }
1185                    }
1186                }
1187            }
1188        }
1189    }
1190    .boxed()
1191}
1192
1193/// Filter out consecutive duplicate elements from a rs2_stream using a custom equality function
1194/// 
1195/// This combinator only emits elements that are different from the previous element.
1196/// It uses the provided equality function to compare elements.
1197/// The first element is always emitted.
1198///
1199/// # Examples
1200/// ```
1201/// use rs2_stream::rs2::*;
1202/// use futures_util::stream::StreamExt;
1203/// 
1204/// # async fn example() {
1205/// let rs2_stream = from_iter(vec![1, 1, 2, 2, 3, 3, 2, 1]);
1206/// // Use a custom equality function that considers two numbers equal if they have the same parity
1207/// let result = distinct_until_changed_by(rs2_stream, |a, b| a % 2 == b % 2).collect::<Vec<_>>().await;
1208/// assert_eq!(result, vec![1, 2]);
1209/// # }
1210/// ```
1211pub fn distinct_until_changed_by<O, F>(s: RS2Stream<O>, mut eq: F) -> RS2Stream<O>
1212where
1213    O: Clone + Send + 'static,
1214    F: FnMut(&O, &O) -> bool + Send + 'static,
1215{
1216    stream! {
1217        pin_mut!(s);
1218        let mut prev: Option<O> = None;
1219
1220        while let Some(item) = s.next().await {
1221            match &prev {
1222                Some(p) if eq(p, &item) => {
1223                },
1224                _ => {
1225                    yield item.clone();
1226                    prev = Some(item);
1227                }
1228            }
1229        }
1230    }
1231    .boxed()
1232}
1233
1234/// Prefetch a specified number of elements ahead of consumption
1235/// This combinator eagerly evaluates a specified number of elements ahead of what's been requested,
1236/// storing them in a buffer. This can improve performance by starting to process the next elements
1237/// before they're actually needed.
1238///
1239/// Backpressure is maintained by using a bounded channel with capacity equal to the prefetch count.
1240pub fn prefetch<O>(s: RS2Stream<O>, prefetch_count: usize) -> RS2Stream<O>
1241where
1242    O: Send + 'static,
1243{
1244    if prefetch_count == 0 {
1245        return s;
1246    }
1247
1248    let (mut tx, rx): (Sender<O>, Receiver<O>) = channel(prefetch_count);
1249
1250    spawn(async move {
1251        pin_mut!(s);
1252        while let Some(item) = s.next().await {
1253            if tx.send(item).await.is_err() {
1254                break;
1255            }
1256        }
1257    });
1258
1259    stream! {
1260        let mut rx = rx;
1261        while let Some(item) = rx.next().await {
1262            yield item;
1263        }
1264    }
1265    .boxed()
1266}
1267
1268/// Back-pressure-aware rate limiting via bounded channel (legacy)
1269pub fn rate_limit_backpressure<O>(s: RS2Stream<O>, capacity: usize) -> RS2Stream<O>
1270where
1271    O: Send + 'static,
1272{
1273    auto_backpressure_block(s, capacity)
1274}
1275
1276/// Throttle rs2_stream to emit one element per `duration`
1277pub fn throttle<O>(s: RS2Stream<O>, duration: Duration) -> RS2Stream<O>
1278where
1279    O: Send + 'static,
1280{
1281    stream! {
1282        pin_mut!(s);
1283        while let Some(item) = s.next().await {
1284            yield item;
1285            sleep(duration).await;
1286        }
1287    }
1288        .boxed()
1289}
1290
1291/// Create a rs2_stream that emits values at a fixed rate
1292pub fn tick<O>(period: Duration, item: O) -> RS2Stream<O>
1293where
1294    O: Clone + Send + 'static,
1295{
1296    stream! {
1297        loop {
1298            yield item.clone();
1299            sleep(period).await;
1300        }
1301    }
1302        .boxed()
1303}
1304
1305// ================================
1306// Parallel Processing
1307// ================================
1308
1309/// Parallel evaluation preserving order (parEvalMap) with automatic backpressure
1310pub fn par_eval_map<I, O, Fut, F>(s: RS2Stream<I>, concurrency: usize, mut f: F) -> RS2Stream<O>
1311where
1312    F: FnMut(I) -> Fut + Send + 'static,
1313    Fut: Future<Output = O> + Send + 'static,
1314    O: Send + 'static,
1315    I: Send + 'static,
1316{
1317    let buffered_stream = auto_backpressure_block(s, concurrency * 2);
1318
1319    stream! {
1320        let mut in_flight = FuturesUnordered::new();
1321        pin_mut!(buffered_stream);
1322
1323        while let Some(item) = buffered_stream.next().await {
1324            in_flight.push(f(item));
1325            if in_flight.len() >= concurrency {
1326                if let Some(res) = in_flight.next().await {
1327                    yield res;
1328                }
1329            }
1330        }
1331        while let Some(res) = in_flight.next().await {
1332            yield res;
1333        }
1334    }
1335        .boxed()
1336}
1337
1338/// Parallel evaluation unordered (parEvalMapUnordered) with automatic backpressure
1339pub fn par_eval_map_unordered<I, O, Fut, F>(
1340    s: RS2Stream<I>,
1341    concurrency: usize,
1342    f: F,
1343) -> RS2Stream<O>
1344where
1345    F: FnMut(I) -> Fut + Send + 'static,
1346    Fut: Future<Output = O> + Send + 'static,
1347    O: Send + 'static,
1348    I: Send + 'static,
1349{
1350    let buffered_stream = auto_backpressure_block(s, concurrency * 2);
1351    buffered_stream.map(f).buffer_unordered(concurrency).boxed()
1352}
1353
1354/// Parallel join of streams (parJoin) with automatic backpressure
1355///
1356/// This combinator takes a rs2_stream of streams and a concurrency limit, and runs
1357/// up to n inner streams concurrently. It emits all elements from the inner streams,
1358/// and starts new inner streams as others complete.
1359///
1360/// Backpressure is maintained by using a bounded buffer for the outer rs2_stream.
1361pub fn par_join<O, S>(
1362    s: RS2Stream<S>,
1363    concurrency: usize,
1364) -> RS2Stream<O>
1365where
1366    S: Stream<Item = O> + Send + 'static + Unpin,
1367    O: Send + 'static,
1368{
1369    let buffered_stream = auto_backpressure_block(s, concurrency * 2);
1370
1371    stream! {
1372        pin_mut!(buffered_stream);
1373
1374        let mut active_streams: Vec<S> = Vec::with_capacity(concurrency);
1375
1376        let mut outer_stream_done = false;
1377
1378        loop {
1379            while active_streams.len() < concurrency && !outer_stream_done {
1380                match buffered_stream.next().await {
1381                    Some(inner_stream) => {
1382                        active_streams.push(inner_stream);
1383                    },
1384                    None => {
1385                        outer_stream_done = true;
1386                        break;
1387                    }
1388                }
1389            }
1390            if active_streams.is_empty() && outer_stream_done {
1391                break;
1392            }
1393
1394            let mut i = 0;
1395            while i < active_streams.len() {
1396                match active_streams[i].next().await {
1397                    Some(item) => {
1398                        yield item;
1399                        i += 1;
1400                    },
1401                    None => {
1402                        active_streams.swap_remove(i);
1403                    }
1404                }
1405            }
1406        }
1407    }
1408    .boxed()
1409}
1410
1411// ================================
1412// Resource Management
1413// ================================
1414
1415/// Bracket for simple resource handling
1416pub fn bracket<A, O, St, FAcq, FUse, FRel, R>(
1417    acquire: FAcq,
1418    use_fn: FUse,
1419    release: FRel,
1420) -> RS2Stream<O>
1421where
1422    FAcq: Future<Output = A> + Send + 'static,
1423    FUse: FnOnce(A) -> St + Send + 'static,
1424    St: Stream<Item = O> + Send + 'static,
1425    FRel: FnOnce(A) -> R + Send + 'static,
1426    R: Future<Output = ()> + Send + 'static,
1427    O: Send + 'static,
1428    A: Clone + Send + 'static,
1429{
1430    stream! {
1431        let resource = acquire.await;
1432        let stream = use_fn(resource.clone());
1433        pin_mut!(stream);
1434        while let Some(item) = stream.next().await {
1435            yield item;
1436        }
1437        release(resource).await;
1438    }
1439        .boxed()
1440}
1441
1442/// BracketCase with exit case semantics for streams of Result<O,E>
1443pub fn bracket_case<A, O, E, St, FAcq, FUse, FRel, R>(
1444    acquire: FAcq,
1445    use_fn: FUse,
1446    release: FRel,
1447) -> RS2Stream<Result<O, E>>
1448where
1449    FAcq: Future<Output = A> + Send + 'static,
1450    FUse: FnOnce(A) -> St + Send + 'static,
1451    St: Stream<Item = Result<O, E>> + Send + 'static,
1452    FRel: FnOnce(A, ExitCase<E>) -> R + Send + 'static,
1453    R: Future<Output = ()> + Send + 'static,
1454    O: Send + 'static,
1455    E: Clone + Send + 'static,
1456    A: Clone + Send + 'static,
1457{
1458    stream! {
1459        let resource = acquire.await;
1460        let stream = use_fn(resource.clone());
1461        pin_mut!(stream);
1462        while let Some(item) = stream.next().await {
1463            yield item;
1464        }
1465        release(resource, ExitCase::Completed).await;
1466    }
1467        .boxed()
1468}
1469
1470// ================================
1471// Stream Extensions
1472// ================================
1473
1474// Re-export the extension traits from their respective modules
1475pub use crate::rs2_result_stream_ext::RS2ResultStreamExt;
1476pub use crate::rs2_stream_ext::RS2StreamExt;