rs2_stream/
rs2.rs

1//! RStream - A Rust streaming library inspired by FS2/RS2
2//!
3//! This module provides the core streaming functionality with functional
4//! programming patterns, backpressure handling, and resource management.
5
6use async_stream::stream;
7use futures::channel::mpsc::{channel, Receiver, Sender};
8use futures_core::Stream;
9use futures_util::pin_mut;
10use futures_util::{
11    future,
12    stream::{self, BoxStream, FuturesUnordered, StreamExt},
13    SinkExt,
14};
15use std::future::Future;
16use std::time::Duration;
17use std::sync::Arc;
18use tokio::{spawn, time::sleep};
19use tokio::sync::Mutex;
20
21use crate::error::{StreamError, StreamResult, RetryPolicy};
22use crate::stream_performance_metrics::{HealthThresholds, StreamMetrics};
23
24/// A boxed, heap-allocated Rust Stream analogous to RS2's Stream[F, O]
25pub type RS2Stream<O> = BoxStream<'static, O>;
26
27/// Backpressure strategy for automatic flow control
28#[derive(Debug, Clone, Copy)]
29pub enum BackpressureStrategy {
30    /// Drop oldest items when buffer is full
31    DropOldest,
32    /// Drop newest items when buffer is full  
33    DropNewest,
34    /// Block producer until consumer catches up
35    Block,
36    /// Fail fast when buffer is full
37    Error,
38}
39
40/// Configuration for automatic backpressure
41#[derive(Debug, Clone)]
42pub struct BackpressureConfig {
43    pub strategy: BackpressureStrategy,
44    pub buffer_size: usize,
45    pub low_watermark: Option<usize>,  // Resume at this level
46    pub high_watermark: Option<usize>, // Pause at this level
47}
48
49impl Default for BackpressureConfig {
50    fn default() -> Self {
51        Self {
52            strategy: BackpressureStrategy::Block,
53            buffer_size: 100,
54            low_watermark: Some(25),
55            high_watermark: Some(75),
56        }
57    }
58}
59
60/// ExitCase for bracketCase semantics
61#[derive(Debug, Clone)]
62pub enum ExitCase<E> {
63    Completed,
64    Errored(E),
65}
66
67// ================================
68// Core Stream Constructors
69// ================================
70
71/// Emit a single element as a rs2_stream
72pub fn emit<O>(item: O) -> RS2Stream<O>
73where
74    O: Send + 'static,
75{
76    stream::once(future::ready(item)).boxed()
77}
78
79/// Create an empty rs2_stream that completes immediately
80pub fn empty<O>() -> RS2Stream<O>
81where
82    O: Send + 'static,
83{
84    stream::empty().boxed()
85}
86
87/// Create a rs2_stream from an iterator
88pub fn from_iter<I, O>(iter: I) -> RS2Stream<O>
89where
90    I: IntoIterator<Item = O> + Send + 'static,
91    <I as IntoIterator>::IntoIter: Send,
92    O: Send + 'static,
93{
94    stream::iter(iter).boxed()
95}
96
97/// Evaluate a Future and emit its output
98pub fn eval<O, F>(fut: F) -> RS2Stream<O>
99where
100    F: Future<Output = O> + Send + 'static,
101    O: Send + 'static,
102{
103    stream::once(fut).boxed()
104}
105
106/// Repeat a value indefinitely
107pub fn repeat<O>(item: O) -> RS2Stream<O>
108where
109    O: Clone + Send + 'static,
110{
111    stream::repeat(item).boxed()
112}
113
114/// Create a rs2_stream that emits a single value after a delay
115pub fn emit_after<O>(item: O, duration: Duration) -> RS2Stream<O>
116where
117    O: Send + 'static,
118{
119    stream::once(async move {
120        sleep(duration).await;
121        item
122    }).boxed()
123}
124
125/// Generate a rs2_stream from a seed value and a function
126///
127/// This combinator takes an initial state and a function that produces an element and the next state.
128/// It continues until the function returns None.
129///
130/// # Examples
131/// ```
132/// use rs2_stream::rs2::*;
133/// use futures_util::stream::StreamExt;
134/// 
135/// # async fn example() {
136/// // Create a rs2_stream of Fibonacci numbers
137/// let fibonacci = unfold(
138///     (0, 1),
139///     |state| async move {
140///         let (a, b) = state;
141///         Some((a, (b, a + b)))
142///     }
143/// );
144///
145/// // Take the first 10 Fibonacci numbers
146/// let result = fibonacci.take(10).collect::<Vec<_>>().await;
147/// assert_eq!(result, vec![0, 1, 1, 2, 3, 5, 8, 13, 21, 34]);
148/// # }
149/// ```
150pub fn unfold<S, O, F, Fut>(init: S, mut f: F) -> RS2Stream<O>
151where
152    S: Send + 'static,
153    O: Send + 'static,
154    F: FnMut(S) -> Fut + Send + 'static,
155    Fut: Future<Output = Option<(O, S)>> + Send + 'static,
156{
157    stream! {
158        let mut state_opt = Some(init);
159
160        loop {
161            let state = state_opt.take().expect("State should be available");
162            let fut = f(state);
163            match fut.await {
164                Some((item, next_state)) => {
165                    yield item;
166                    state_opt = Some(next_state);
167                },
168                None => break,
169            }
170        }
171    }
172    .boxed()
173}
174
175// ================================
176// Stream Transformations
177// ================================
178
179/// Group adjacent elements that share a common key
180///
181/// This combinator groups consecutive elements that produce the same key.
182/// It emits groups as they complete (when the key changes or the rs2_stream ends).
183/// Each emitted item is a tuple containing the key and a vector of elements.
184///
185/// # Examples
186/// ```
187/// use rs2_stream::rs2::*;
188/// use futures_util::stream::StreamExt;
189///
190/// # async fn example() {
191/// let rs2_stream = from_iter(vec![1, 1, 2, 2, 3, 3, 2, 1]);
192/// let result = group_adjacent_by(rs2_stream, |&x| x % 2).collect::<Vec<_>>().await;
193/// assert_eq!(result, vec![(1, vec![1, 1]), (0, vec![2, 2]), (1, vec![3, 3]), (0, vec![2]), (1, vec![1])]);
194/// # }
195/// ```
196pub fn group_adjacent_by<O, K, F>(s: RS2Stream<O>, mut key_fn: F) -> RS2Stream<(K, Vec<O>)>
197where
198    O: Clone + Send + 'static,
199    K: Eq + Clone + Send + 'static,
200    F: FnMut(&O) -> K + Send + 'static,
201{
202    stream! {
203        pin_mut!(s);
204        let mut current_key: Option<K> = None;
205        let mut current_group: Vec<O> = Vec::new();
206
207        while let Some(item) = s.next().await {
208            let key = key_fn(&item);
209
210            match &current_key {
211                Some(k) if *k == key => {
212                    current_group.push(item);
213                },
214                _ => {
215                    if !current_group.is_empty() {
216                        yield (current_key.clone().unwrap(), std::mem::take(&mut current_group));
217                    }
218                    current_key = Some(key);
219                    current_group.push(item);
220                }
221            }
222        }
223
224        if !current_group.is_empty() {
225            yield (current_key.clone().unwrap(), std::mem::take(&mut current_group));
226        }
227    }
228    .boxed()
229}
230
231/// Slice: take first n items
232pub fn take<O>(s: RS2Stream<O>, n: usize) -> RS2Stream<O>
233where
234    O: Send + 'static,
235{
236    s.take(n).boxed()
237}
238
239/// Slice: drop first n items
240pub fn drop<O>(s: RS2Stream<O>, n: usize) -> RS2Stream<O>
241where
242    O: Send + 'static,
243{
244    s.skip(n).boxed()
245}
246
247/// Chunk the rs2_stream into Vecs of size n
248pub fn chunk<O>(s: RS2Stream<O>, size: usize) -> RS2Stream<Vec<O>>
249where
250    O: Send + 'static,
251{
252    stream! {
253        let mut buf = Vec::with_capacity(size);
254        pin_mut!(s);
255        while let Some(item) = s.next().await {
256            buf.push(item);
257            if buf.len() == size {
258                yield std::mem::take(&mut buf);
259            }
260        }
261        if !buf.is_empty() {
262            yield std::mem::take(&mut buf);
263        }
264    }
265        .boxed()
266}
267
268/// Add timeout support to any rs2_stream
269pub fn timeout<T>(s: RS2Stream<T>, duration: Duration) -> RS2Stream<StreamResult<T>>
270where
271    T: Send + 'static,
272{
273    stream! {
274        pin_mut!(s);
275        loop {
276            match tokio::time::timeout(duration, s.next()).await {
277                Ok(Some(value)) => yield Ok(value),
278                Ok(None) => break,
279                Err(_) => yield Err(StreamError::Timeout),
280            }
281        }
282    }.boxed()
283}
284
285/// Scan operation (like fold but emits intermediate results)
286pub fn scan<T, U, F>(s: RS2Stream<T>, init: U, mut f: F) -> RS2Stream<U>
287where
288    F: FnMut(U, T) -> U + Send + 'static,
289    T: Send + 'static,
290    U: Clone + Send + 'static,
291{
292    stream! {
293        let mut acc = init;
294        pin_mut!(s);
295        while let Some(item) = s.next().await {
296            acc = f(acc.clone(), item);
297            yield acc.clone();
298        }
299    }.boxed()
300}
301
302/// Fold operation that accumulates a value over a stream
303pub fn fold<T, A, F, Fut>(s: RS2Stream<T>, init: A, mut f: F) -> impl Future<Output = A>
304where
305    F: FnMut(A, T) -> Fut + Send + 'static,
306    Fut: Future<Output = A> + Send + 'static,
307    T: Send + 'static,
308    A: Send + 'static,
309{
310    async move {
311        let mut acc = init;
312        pin_mut!(s);
313        while let Some(item) = s.next().await {
314            acc = f(acc, item).await;
315        }
316        acc
317    }
318}
319
320/// Reduce operation that combines all elements in a stream using a binary operation
321pub fn reduce<T, F, Fut>(s: RS2Stream<T>, mut f: F) -> impl Future<Output = Option<T>>
322where
323    F: FnMut(T, T) -> Fut + Send + 'static,
324    Fut: Future<Output = T> + Send + 'static,
325    T: Send + 'static,
326{
327    async move {
328        pin_mut!(s);
329        let first = match s.next().await {
330            Some(item) => item,
331            None => return None, // Return None for empty streams
332        };
333
334        let mut acc = first;
335        while let Some(item) = s.next().await {
336            acc = f(acc, item).await;
337        }
338
339        Some(acc)
340    }
341}
342
343/// Filter and map elements of a stream in one operation
344pub fn filter_map<T, U, F, Fut>(s: RS2Stream<T>, f: F) -> RS2Stream<U>
345where
346    F: FnMut(T) -> Fut + Send + 'static,
347    Fut: Future<Output = Option<U>> + Send + 'static,
348    T: Send + 'static,
349    U: Send + 'static,
350{
351    s.filter_map(f).boxed()
352}
353
354/// Take elements from a stream while a predicate returns true
355///
356/// This combinator yields elements from the stream as long as the predicate returns true.
357/// It stops (and does not yield) the first element where the predicate returns false.
358///
359/// # Examples
360/// ```
361/// use rs2_stream::rs2::*;
362/// use futures_util::stream::StreamExt;
363///
364/// # async fn example() {
365/// let stream = from_iter(vec![1, 2, 3, 4, 5]);
366/// let result = take_while(stream, |&x| async move { x < 4 }).collect::<Vec<_>>().await;
367/// assert_eq!(result, vec![1, 2, 3]);
368/// # }
369/// ```
370pub fn take_while<T, F, Fut>(s: RS2Stream<T>, mut predicate: F) -> RS2Stream<T>
371where
372    F: FnMut(&T) -> Fut + Send + 'static,
373    Fut: Future<Output = bool> + Send + 'static,
374    T: Send + 'static,
375{
376    stream! {
377        pin_mut!(s);
378        while let Some(item) = s.next().await {
379            if predicate(&item).await {
380                yield item;
381            } else {
382                break;
383            }
384        }
385    }.boxed()
386}
387
388/// Skip elements from a stream while a predicate returns true
389///
390/// This combinator skips elements from the stream as long as the predicate returns true.
391/// Once the predicate returns false, it yields that element and all remaining elements.
392///
393/// # Examples
394/// ```
395/// use rs2_stream::rs2::*;
396/// use futures_util::stream::StreamExt;
397///
398/// # async fn example() {
399/// let stream = from_iter(vec![1, 2, 3, 4, 5]);
400/// let result = drop_while(stream, |&x| async move { x < 4 }).collect::<Vec<_>>().await;
401/// assert_eq!(result, vec![4, 5]);
402/// # }
403/// ```
404pub fn drop_while<T, F, Fut>(s: RS2Stream<T>, mut predicate: F) -> RS2Stream<T>
405where
406    F: FnMut(&T) -> Fut + Send + 'static,
407    Fut: Future<Output = bool> + Send + 'static,
408    T: Send + 'static,
409{
410    stream! {
411        pin_mut!(s);
412
413        let mut found_false = false;
414        while let Some(item) = s.next().await {
415            if !found_false && predicate(&item).await {
416                continue;
417            } else {
418                found_false = true;
419                yield item;
420            }
421        }
422    }.boxed()
423}
424
425/// Group consecutive elements that share a common key
426///
427/// This combinator groups consecutive elements that produce the same key.
428/// It emits groups as they complete (when the key changes or the stream ends).
429/// Each emitted item is a tuple containing the key and a vector of elements.
430///
431/// # Examples
432/// ```
433/// use rs2_stream::rs2::*;
434/// use futures_util::stream::StreamExt;
435///
436/// # async fn example() {
437/// let stream = from_iter(vec![1, 1, 2, 2, 3, 3, 2, 1]);
438/// let result = group_by(stream, |&x| x % 2).collect::<Vec<_>>().await;
439/// assert_eq!(result, vec![(1, vec![1, 1]), (0, vec![2, 2]), (1, vec![3, 3]), (0, vec![2]), (1, vec![1])]);
440/// # }
441/// ```
442pub fn group_by<T, K, F>(s: RS2Stream<T>, mut key_fn: F) -> RS2Stream<(K, Vec<T>)>
443where
444    T: Clone + Send + 'static,
445    K: Eq + Clone + Send + 'static,
446    F: FnMut(&T) -> K + Send + 'static,
447{
448    stream! {
449        pin_mut!(s);
450        let mut current_key: Option<K> = None;
451        let mut current_group: Vec<T> = Vec::new();
452
453        while let Some(item) = s.next().await {
454            let key = key_fn(&item);
455
456            match &current_key {
457                Some(k) if *k == key => {
458                    current_group.push(item);
459                },
460                _ => {
461                    if !current_group.is_empty() {
462                        yield (current_key.clone().unwrap(), std::mem::take(&mut current_group));
463                    }
464                    current_key = Some(key);
465                    current_group.push(item);
466                }
467            }
468        }
469
470        if !current_group.is_empty() {
471            yield (current_key.clone().unwrap(), std::mem::take(&mut current_group));
472        }
473    }
474    .boxed()
475}
476
477/// Sliding window operation
478pub fn sliding_window<T>(s: RS2Stream<T>, size: usize) -> RS2Stream<Vec<T>>
479where
480    T: Clone + Send + 'static,
481{
482    if size == 0 {
483        return empty();
484    }
485
486    stream! {
487        let mut window = Vec::with_capacity(size);
488        pin_mut!(s);
489
490        while let Some(item) = s.next().await {
491            window.push(item);
492
493            if window.len() > size {
494                window.remove(0);
495            }
496
497            if window.len() == size {
498                yield window.clone();
499            }
500        }
501    }.boxed()
502}
503
504/// Batch processing for better throughput
505pub fn batch_process<T, U, F>(
506    s: RS2Stream<T>,
507    batch_size: usize,
508    mut processor: F
509) -> RS2Stream<U>
510where
511    F: FnMut(Vec<T>) -> Vec<U> + Send + 'static,
512    T: Send + 'static,
513    U: Send + 'static,
514{
515    stream! {
516        let chunked = chunk(s, batch_size);
517        pin_mut!(chunked);
518        while let Some(batch) = chunked.next().await {
519            for item in processor(batch) {
520                yield item;
521            }
522        }
523    }.boxed()
524}
525
526/// Collect metrics while processing rs2_stream
527pub fn with_metrics<T>(
528    s: RS2Stream<T>,
529    name: String,
530    thresholds: HealthThresholds
531) -> (RS2Stream<T>, Arc<Mutex<StreamMetrics>>)
532where
533    T: Send + 'static,
534{
535    let metrics = Arc::new(Mutex::new(
536        StreamMetrics::new()
537            .with_name(name)
538            .with_health_thresholds(thresholds)
539    ));
540    
541    let metrics_clone = Arc::clone(&metrics);
542
543    let monitored_stream = stream! {
544        pin_mut!(s);
545        while let Some(item) = s.next().await {
546            {
547                let mut m = metrics_clone.lock().await;
548                m.record_item(size_of_val(&item) as u64);
549            }
550            yield item;
551        }
552
553        {
554            let mut m = metrics_clone.lock().await;
555            m.finalize();
556        }
557    }.boxed();
558
559    (monitored_stream, metrics)
560}
561
562
563// ================================
564// Backpressure Management
565// ================================
566
567/// Automatic backpressure with configurable strategy
568pub fn auto_backpressure<O>(s: RS2Stream<O>, config: BackpressureConfig) -> RS2Stream<O>
569where
570    O: Send + 'static,
571{
572    match config.strategy {
573        BackpressureStrategy::Block => auto_backpressure_block(s, config.buffer_size),
574        BackpressureStrategy::DropOldest => auto_backpressure_drop_oldest(s, config.buffer_size),
575        BackpressureStrategy::DropNewest => auto_backpressure_drop_newest(s, config.buffer_size),
576        BackpressureStrategy::Error => auto_backpressure_error(s, config.buffer_size),
577    }
578}
579
580/// Automatic backpressure with blocking strategy
581pub fn auto_backpressure_block<O>(s: RS2Stream<O>, buffer_size: usize) -> RS2Stream<O>
582where
583    O: Send + 'static,
584{
585    let (mut tx, rx): (Sender<O>, Receiver<O>) = channel(buffer_size);
586
587    spawn(async move {
588        pin_mut!(s);
589        while let Some(item) = s.next().await {
590            if tx.send(item).await.is_err() {
591                break;
592            }
593        }
594    });
595
596    stream! {
597        let mut rx = rx;
598        while let Some(item) = rx.next().await {
599            yield item;
600        }
601    }
602        .boxed()
603}
604
605/// Automatic backpressure that drops oldest items when buffer is full
606pub fn auto_backpressure_drop_oldest<O>(s: RS2Stream<O>, buffer_size: usize) -> RS2Stream<O>
607where
608    O: Send + 'static,
609{
610    use std::collections::VecDeque;
611
612    let buffer = Arc::new(Mutex::new(VecDeque::<O>::new()));
613    let buffer_clone = Arc::clone(&buffer);
614    let (done_tx, mut done_rx) = tokio::sync::mpsc::channel(1);
615
616    spawn(async move {
617        pin_mut!(s);
618        while let Some(item) = s.next().await {
619            let mut buf = buffer_clone.lock().await;
620
621            if buf.len() >= buffer_size {
622                buf.pop_front();
623            }
624
625            buf.push_back(item);
626        }
627
628        let _ = done_tx.send(()).await;
629    });
630
631    stream! {
632        let mut source_done = false;
633
634        loop {
635            if let Ok(_) = done_rx.try_recv() {
636                source_done = true;
637            }
638
639            let item = {
640                let mut buf = buffer.lock().await;
641                buf.pop_front()
642            };
643
644            match item {
645                Some(item) => yield item,
646                None => {
647                    if source_done {
648                        break;
649                    }
650                    tokio::time::sleep(Duration::from_millis(1)).await;
651                }
652            }
653        }
654    }
655        .boxed()
656}
657
658/// Automatic backpressure that drops newest items when buffer is full
659pub fn auto_backpressure_drop_newest<O>(s: RS2Stream<O>, buffer_size: usize) -> RS2Stream<O>
660where
661    O: Send + 'static,
662{
663    use std::collections::VecDeque;
664
665    let buffer = Arc::new(Mutex::new(VecDeque::<O>::new()));
666    let buffer_clone = Arc::clone(&buffer);
667    let (done_tx, mut done_rx) = tokio::sync::mpsc::channel(1);
668
669    spawn(async move {
670        pin_mut!(s);
671        while let Some(item) = s.next().await {
672            let mut buf = buffer_clone.lock().await;
673
674            if buf.len() < buffer_size {
675                buf.push_back(item);
676            }
677        }
678
679        let _ = done_tx.send(()).await;
680    });
681
682    stream! {
683        let mut source_done = false;
684
685        loop {
686            if let Ok(_) = done_rx.try_recv() {
687                source_done = true;
688            }
689
690            let item = {
691                let mut buf = buffer.lock().await;
692                buf.pop_front()
693            };
694
695            match item {
696                Some(item) => yield item,
697                None => {
698                    if source_done {
699                        break;
700                    }
701                    tokio::time::sleep(Duration::from_millis(1)).await;
702                }
703            }
704        }
705    }
706        .boxed()
707}
708
709/// Automatic backpressure that errors when buffer is full
710pub fn auto_backpressure_error<O>(s: RS2Stream<O>, buffer_size: usize) -> RS2Stream<O>
711where
712    O: Send + 'static,
713{
714    use tokio::sync::mpsc;
715
716    let (tx, mut rx) = mpsc::channel(buffer_size);
717
718    spawn(async move {
719        pin_mut!(s);
720        while let Some(item) = s.next().await {
721            if tx.send(item).await.is_err() {
722                break;
723            }
724        }
725    });
726
727    stream! {
728        while let Some(item) = rx.recv().await {
729            yield item;
730        }
731    }
732        .boxed()
733}
734
735// ================================
736// Stream Combinators
737// ================================
738
739/// Interrupt a rs2_stream when a signal is received
740///
741/// This combinator takes a rs2_stream and a future that signals interruption.
742/// It stops processing the rs2_stream when the signal future completes.
743/// Resources are properly cleaned up when the rs2_stream is interrupted.
744///
745/// # Examples
746/// ```
747/// use rs2_stream::rs2::*;
748/// use std::time::Duration;
749/// use tokio::time::sleep;
750/// use async_stream::stream;
751/// use futures_util::stream::StreamExt;
752///
753/// # async fn example() {
754/// // Create a rs2_stream that emits numbers every 100ms
755/// let rs2_stream = from_iter(0..100)
756///     .throttle_rs2(Duration::from_millis(100));
757///
758/// // Create a future that completes after 250ms
759/// let interrupt_signal = sleep(Duration::from_millis(250));
760///
761/// // The rs2_stream will be interrupted after about 250ms,
762/// // so we should get approximately 2-3 items
763/// let result = interrupt_when(rs2_stream, interrupt_signal)
764///     .collect::<Vec<_>>()
765///     .await;
766///
767/// assert!(result.len() >= 2 && result.len() <= 3);
768/// # }
769/// ```
770pub fn interrupt_when<O, F>(s: RS2Stream<O>, signal: F) -> RS2Stream<O>
771where
772    O: Send + 'static,
773    F: Future<Output = ()> + Send + 'static,
774{
775    stream! {
776        pin_mut!(s);
777        pin_mut!(signal);
778
779        loop {
780            tokio::select! {
781                biased;
782                _ = &mut signal => {
783                    break;
784                },
785
786                maybe_item = s.next() => {
787                    match maybe_item {
788                        Some(item) => yield item,
789                        None => break,
790                    }
791                },
792            }
793        }
794    }
795    .boxed()
796}
797
798/// Concatenate multiple streams sequentially
799pub fn concat<O, S>(streams: Vec<S>) -> RS2Stream<O>
800where
801    S: Stream<Item = O> + Send + 'static,
802    O: Send + 'static,
803{
804    stream! {
805        for s in streams {
806            pin_mut!(s);
807            while let Some(item) = s.next().await {
808                yield item;
809            }
810        }
811    }
812        .boxed()
813}
814
815/// Merge two streams into one interleaved output
816pub fn merge<O, S1, S2>(s1: S1, mut s2: S2) -> RS2Stream<O>
817where
818    S1: Stream<Item = O> + Send + 'static,
819    S2: Stream<Item = O> + Send + 'static + Unpin,
820    O: Send + 'static,
821{
822    let chained = s1
823        .map(Some)
824        .chain(stream! { while let Some(x) = s2.next().await { yield Some(x) } });
825    stream! {
826        pin_mut!(chained);
827        while let Some(item) = chained.next().await {
828            if let Some(x) = item {
829                yield x;
830            }
831        }
832    }
833        .boxed()
834}
835
836
837/// Interleave multiple streams in a round-robin fashion
838pub fn interleave<O, S>(streams: Vec<S>) -> RS2Stream<O>
839where
840    S: Stream<Item = O> + Send + 'static + Unpin,
841    O: Send + 'static,
842{
843    if streams.is_empty() {
844        return empty();
845    }
846
847    stream! {
848        let mut streams: Vec<_> = streams.into_iter().map(|s| Box::pin(s)).collect();
849        let mut index = 0;
850
851        while !streams.is_empty() {
852            if index >= streams.len() {
853                index = 0;
854            }
855
856            match streams[index].next().await {
857                Some(item) => {
858                    yield item;
859                    index += 1;
860                }
861                None => {
862                    streams.remove(index);
863                }
864            }
865        }
866    }
867        .boxed()
868}
869
870/// Combine two streams element-by-element using a provided function
871/// Returns a new rs2_stream with the combined elements
872/// Stops when either input rs2_stream ends
873pub fn zip_with<A, B, O, F, S1, S2>(s1: S1, s2: S2, mut f: F) -> RS2Stream<O>
874where
875    S1: Stream<Item = A> + Send + 'static,
876    S2: Stream<Item = B> + Send + 'static,
877    F: FnMut(A, B) -> O + Send + 'static,
878    A: Send + 'static,
879    B: Send + 'static,
880    O: Send + 'static,
881{
882    stream! {
883        pin_mut!(s1);
884        pin_mut!(s2);
885
886        loop {
887            match futures_util::future::join(s1.next(), s2.next()).await {
888                (Some(a), Some(b)) => yield f(a, b),
889                _ => break, // Stop when either rs2_stream ends
890            }
891        }
892    }
893    .boxed()
894}
895
896/// Select between two streams based on which one produces a value first
897///
898/// This combinator takes two streams and emits values from whichever rs2_stream
899/// produces a value first. Once a value is received from one rs2_stream, the other
900/// rs2_stream is cancelled. If either rs2_stream completes (returns None), the combinator
901/// switches to the other rs2_stream exclusively.
902///
903/// # Examples
904/// ```
905/// use rs2_stream::rs2::*;
906/// use std::time::Duration;
907/// use async_stream::stream;
908/// use tokio::time::sleep;
909/// use futures_util::stream::StreamExt;
910///
911/// # async fn example() {
912/// // Create two streams with different timing
913/// let fast_stream = stream! {
914///     yield 1;
915///     sleep(Duration::from_millis(10)).await;
916///     yield 2;
917///     sleep(Duration::from_millis(100)).await;
918///     yield 3;
919/// };
920///
921/// let slow_stream = stream! {
922///     sleep(Duration::from_millis(50)).await;
923///     yield 10;
924///     sleep(Duration::from_millis(10)).await;
925///     yield 20;
926/// };
927///
928/// // The either combinator will select values from whichever rs2_stream produces first
929/// let result = either(fast_stream.boxed(), slow_stream.boxed())
930///     .collect::<Vec<_>>()
931///     .await;
932///
933/// // We expect to get values from the fast rs2_stream first, then from the slow rs2_stream
934/// // when the fast rs2_stream is waiting longer
935/// assert_eq!(result, vec![1, 2, 10, 20]);
936/// # }
937/// ```
938pub fn either<O, S1, S2>(s1: S1, s2: S2) -> RS2Stream<O>
939where
940    S1: Stream<Item = O> + Send + 'static,
941    S2: Stream<Item = O> + Send + 'static,
942    O: Send + 'static,
943{
944    stream! {
945        pin_mut!(s1);
946        pin_mut!(s2);
947
948        let mut s1_done = false;
949        let mut s2_done = false;
950
951        let mut using_s1 = true;
952
953        loop {
954            if s1_done {
955                match s2.next().await {
956                    Some(item) => yield item,
957                    None => break,
958                }
959                continue;
960            }
961
962            if s2_done {
963                match s1.next().await {
964                    Some(item) => yield item,
965                    None => break,
966                }
967                continue;
968            }
969
970            if using_s1 {
971                match s1.next().await {
972                    Some(item) => {
973                        yield item;
974                    },
975                    None => {
976                        s1_done = true;
977                    }
978                }
979            } else {
980                match s2.next().await {
981                    Some(item) => {
982                        yield item;
983                    },
984                    None => {
985                        s2_done = true;
986                    }
987                }
988            }
989
990            tokio::select! {
991                biased;
992
993                maybe_item = s1.next() => {
994                    match maybe_item {
995                        Some(item) => {
996                            yield item;
997                            using_s1 = true;
998                        },
999                        None => {
1000                            s1_done = true;
1001                        }
1002                    }
1003                },
1004                maybe_item = s2.next() => {
1005                    match maybe_item {
1006                        Some(item) => {
1007                            yield item;
1008                            using_s1 = false;
1009                        },
1010                        None => {
1011                            s2_done = true;
1012                        }
1013                    }
1014                }
1015            }
1016        }
1017    }
1018    .boxed()
1019}
1020
1021// ================================
1022// Timing and Rate Control
1023// ================================
1024
1025/// Debounce a rs2_stream, only emitting an element after a specified quiet period has passed
1026/// without receiving another element
1027///
1028/// This combinator waits for a quiet period (specified by `duration`) after receiving an element
1029/// before emitting it. If another element arrives during the quiet period, the timer is reset
1030/// and the new element replaces the previous one.
1031///
1032/// This is useful for handling rapidly updating sources where you only want to process
1033/// the most recent value after the source has settled.
1034pub fn debounce<O>(s: RS2Stream<O>, duration: Duration) -> RS2Stream<O>
1035where
1036    O: Send + 'static,
1037{
1038    stream! {
1039        pin_mut!(s);
1040
1041        let mut latest_item: Option<O> = None;
1042        let mut timer_handle: Option<tokio::task::JoinHandle<()>> = None;
1043
1044        let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(1);
1045
1046        loop {
1047            tokio::select! {
1048                maybe_item = s.next() => {
1049                    match maybe_item {
1050                        Some(item) => {
1051                            if let Some(handle) = timer_handle.take() {
1052                                handle.abort();
1053                            }
1054
1055                            latest_item = Some(item);
1056
1057                            let tx_clone = tx.clone();
1058                            timer_handle = Some(tokio::spawn(async move {
1059                                tokio::time::sleep(duration).await;
1060                                let _ = tx_clone.send(()).await;
1061                            }));
1062                        },
1063                        None => {
1064                            if let Some(item) = latest_item.take() {
1065                                yield item;
1066                            }
1067                            break;
1068                        }
1069                    }
1070                },
1071                _ = rx.recv() => {
1072                    if let Some(item) = latest_item.take() {
1073                        yield item;
1074                    }
1075                }
1076            }
1077        }
1078    }
1079    .boxed()
1080}
1081
1082/// Filter out consecutive duplicate elements from a rs2_stream
1083/// 
1084/// This combinator only emits elements that are different from the previous element.
1085/// It uses the default equality operator (`==`) to compare elements.
1086/// The first element is always emitted.
1087///
1088/// # Examples
1089/// ```
1090/// use rs2_stream::rs2::*;
1091/// use futures_util::stream::StreamExt;
1092///
1093/// # async fn example() {
1094/// let rs2_stream = from_iter(vec![1, 1, 2, 2, 3, 3, 2, 1]);
1095/// let result = distinct_until_changed(rs2_stream).collect::<Vec<_>>().await;
1096/// assert_eq!(result, vec![1, 2, 3, 2, 1]);
1097/// # }
1098/// ```
1099pub fn distinct_until_changed<O>(s: RS2Stream<O>) -> RS2Stream<O>
1100where
1101    O: Clone + Send + PartialEq + 'static,
1102{
1103    stream! {
1104        pin_mut!(s);
1105        let mut prev: Option<O> = None;
1106
1107        while let Some(item) = s.next().await {
1108            match &prev {
1109                Some(p) if p == &item => {
1110                },
1111                _ => {
1112                    yield item.clone();
1113                    prev = Some(item);
1114                }
1115            }
1116        }
1117    }
1118    .boxed()
1119}
1120
1121/// Sample a rs2_stream at regular intervals, emitting the most recent value
1122///
1123/// This combinator samples the most recent value from a rs2_stream at a regular interval.
1124/// It only emits a value if at least one new value has arrived since the last emission.
1125/// If no new value has arrived during an interval, that interval is skipped.
1126///
1127/// # Examples
1128/// ```
1129/// use rs2_stream::rs2::*;
1130/// use futures_util::stream::StreamExt;
1131/// use std::time::Duration;
1132/// use tokio::time::sleep;
1133/// use async_stream::stream;
1134///
1135/// # async fn example() {
1136/// // Create a rs2_stream that emits values faster than the sample interval
1137/// let rs2_stream = stream! {
1138///     yield 1;
1139///     sleep(Duration::from_millis(10)).await;
1140///     yield 2;
1141///     sleep(Duration::from_millis(10)).await;
1142///     yield 3;
1143///     sleep(Duration::from_millis(100)).await;
1144///     yield 4;
1145/// };
1146///
1147/// // Sample the rs2_stream every 50ms
1148/// let result = sample(rs2_stream.boxed(), Duration::from_millis(50))
1149///     .collect::<Vec<_>>()
1150///     .await;
1151///
1152/// // We expect to get the most recent value at each interval:
1153/// // - 3 (the most recent value after the first 50ms)
1154/// // - 4 (the most recent value after the next 50ms)
1155/// assert_eq!(result, vec![3, 4]);
1156/// # }
1157/// ```
1158pub fn sample<O>(s: RS2Stream<O>, interval: Duration) -> RS2Stream<O>
1159where
1160    O: Clone + Send + 'static,
1161{
1162    stream! {
1163        pin_mut!(s);
1164
1165        let mut latest_item: Option<O> = None;
1166        let mut has_new_value = false;
1167
1168        let mut timer = tokio::time::interval(interval);
1169        timer.tick().await;
1170
1171        loop {
1172            tokio::select! {
1173                maybe_item = s.next() => {
1174                    match maybe_item {
1175                        Some(item) => {
1176                            latest_item = Some(item);
1177                            has_new_value = true;
1178                        },
1179                        None => {
1180                            if has_new_value {
1181                                if let Some(item) = latest_item.take() {
1182                                    yield item;
1183                                }
1184                            }
1185                            break;
1186                        }
1187                    }
1188                },
1189                _ = timer.tick() => {
1190                    if has_new_value {
1191                        if let Some(ref item) = latest_item {
1192                            yield item.clone();
1193                            has_new_value = false;
1194                        }
1195                    }
1196                }
1197            }
1198        }
1199    }
1200    .boxed()
1201}
1202
1203/// Filter out consecutive duplicate elements from a rs2_stream using a custom equality function
1204/// 
1205/// This combinator only emits elements that are different from the previous element.
1206/// It uses the provided equality function to compare elements.
1207/// The first element is always emitted.
1208///
1209/// # Examples
1210/// ```
1211/// use rs2_stream::rs2::*;
1212/// use futures_util::stream::StreamExt;
1213/// 
1214/// # async fn example() {
1215/// let rs2_stream = from_iter(vec![1, 1, 2, 2, 3, 3, 2, 1]);
1216/// // Use a custom equality function that considers two numbers equal if they have the same parity
1217/// let result = distinct_until_changed_by(rs2_stream, |a, b| a % 2 == b % 2).collect::<Vec<_>>().await;
1218/// assert_eq!(result, vec![1, 2]);
1219/// # }
1220/// ```
1221pub fn distinct_until_changed_by<O, F>(s: RS2Stream<O>, mut eq: F) -> RS2Stream<O>
1222where
1223    O: Clone + Send + 'static,
1224    F: FnMut(&O, &O) -> bool + Send + 'static,
1225{
1226    stream! {
1227        pin_mut!(s);
1228        let mut prev: Option<O> = None;
1229
1230        while let Some(item) = s.next().await {
1231            match &prev {
1232                Some(p) if eq(p, &item) => {
1233                },
1234                _ => {
1235                    yield item.clone();
1236                    prev = Some(item);
1237                }
1238            }
1239        }
1240    }
1241    .boxed()
1242}
1243
1244/// Prefetch a specified number of elements ahead of consumption
1245/// This combinator eagerly evaluates a specified number of elements ahead of what's been requested,
1246/// storing them in a buffer. This can improve performance by starting to process the next elements
1247/// before they're actually needed.
1248///
1249/// Backpressure is maintained by using a bounded channel with capacity equal to the prefetch count.
1250pub fn prefetch<O>(s: RS2Stream<O>, prefetch_count: usize) -> RS2Stream<O>
1251where
1252    O: Send + 'static,
1253{
1254    if prefetch_count == 0 {
1255        return s;
1256    }
1257
1258    let (mut tx, rx): (Sender<O>, Receiver<O>) = channel(prefetch_count);
1259
1260    spawn(async move {
1261        pin_mut!(s);
1262        while let Some(item) = s.next().await {
1263            if tx.send(item).await.is_err() {
1264                break;
1265            }
1266        }
1267    });
1268
1269    stream! {
1270        let mut rx = rx;
1271        while let Some(item) = rx.next().await {
1272            yield item;
1273        }
1274    }
1275    .boxed()
1276}
1277
1278/// Back-pressure-aware rate limiting via bounded channel (legacy)
1279pub fn rate_limit_backpressure<O>(s: RS2Stream<O>, capacity: usize) -> RS2Stream<O>
1280where
1281    O: Send + 'static,
1282{
1283    auto_backpressure_block(s, capacity)
1284}
1285
1286/// Throttle rs2_stream to emit one element per `duration`
1287pub fn throttle<O>(s: RS2Stream<O>, duration: Duration) -> RS2Stream<O>
1288where
1289    O: Send + 'static,
1290{
1291    stream! {
1292        pin_mut!(s);
1293        while let Some(item) = s.next().await {
1294            yield item;
1295            sleep(duration).await;
1296        }
1297    }
1298        .boxed()
1299}
1300
1301/// Create a rs2_stream that emits values at a fixed rate
1302pub fn tick<O>(period: Duration, item: O) -> RS2Stream<O>
1303where
1304    O: Clone + Send + 'static,
1305{
1306    stream! {
1307        loop {
1308            yield item.clone();
1309            sleep(period).await;
1310        }
1311    }
1312        .boxed()
1313}
1314
1315// ================================
1316// Parallel Processing
1317// ================================
1318
1319/// Parallel evaluation preserving order (parEvalMap) with automatic backpressure
1320pub fn par_eval_map<I, O, Fut, F>(s: RS2Stream<I>, concurrency: usize, mut f: F) -> RS2Stream<O>
1321where
1322    F: FnMut(I) -> Fut + Send + 'static,
1323    Fut: Future<Output = O> + Send + 'static,
1324    O: Send + 'static,
1325    I: Send + 'static,
1326{
1327    let buffered_stream = auto_backpressure_block(s, concurrency * 2);
1328
1329    stream! {
1330        let mut in_flight = FuturesUnordered::new();
1331        pin_mut!(buffered_stream);
1332
1333        while let Some(item) = buffered_stream.next().await {
1334            in_flight.push(f(item));
1335            if in_flight.len() >= concurrency {
1336                if let Some(res) = in_flight.next().await {
1337                    yield res;
1338                }
1339            }
1340        }
1341        while let Some(res) = in_flight.next().await {
1342            yield res;
1343        }
1344    }
1345        .boxed()
1346}
1347
1348/// Parallel evaluation unordered (parEvalMapUnordered) with automatic backpressure
1349pub fn par_eval_map_unordered<I, O, Fut, F>(
1350    s: RS2Stream<I>,
1351    concurrency: usize,
1352    f: F,
1353) -> RS2Stream<O>
1354where
1355    F: FnMut(I) -> Fut + Send + 'static,
1356    Fut: Future<Output = O> + Send + 'static,
1357    O: Send + 'static,
1358    I: Send + 'static,
1359{
1360    let buffered_stream = auto_backpressure_block(s, concurrency * 2);
1361    buffered_stream.map(f).buffer_unordered(concurrency).boxed()
1362}
1363
1364/// Parallel join of streams (parJoin) with automatic backpressure
1365///
1366/// This combinator takes a rs2_stream of streams and a concurrency limit, and runs
1367/// up to n inner streams concurrently. It emits all elements from the inner streams,
1368/// and starts new inner streams as others complete.
1369///
1370/// Backpressure is maintained by using a bounded buffer for the outer rs2_stream.
1371pub fn par_join<O, S>(
1372    s: RS2Stream<S>,
1373    concurrency: usize,
1374) -> RS2Stream<O>
1375where
1376    S: Stream<Item = O> + Send + 'static + Unpin,
1377    O: Send + 'static,
1378{
1379    let buffered_stream = auto_backpressure_block(s, concurrency * 2);
1380
1381    stream! {
1382        pin_mut!(buffered_stream);
1383
1384        let mut active_streams: Vec<S> = Vec::with_capacity(concurrency);
1385
1386        let mut outer_stream_done = false;
1387
1388        loop {
1389            while active_streams.len() < concurrency && !outer_stream_done {
1390                match buffered_stream.next().await {
1391                    Some(inner_stream) => {
1392                        active_streams.push(inner_stream);
1393                    },
1394                    None => {
1395                        outer_stream_done = true;
1396                        break;
1397                    }
1398                }
1399            }
1400            if active_streams.is_empty() && outer_stream_done {
1401                break;
1402            }
1403
1404            let mut i = 0;
1405            while i < active_streams.len() {
1406                match active_streams[i].next().await {
1407                    Some(item) => {
1408                        yield item;
1409                        i += 1;
1410                    },
1411                    None => {
1412                        active_streams.swap_remove(i);
1413                    }
1414                }
1415            }
1416        }
1417    }
1418    .boxed()
1419}
1420
1421// ================================
1422// Resource Management
1423// ================================
1424
1425/// Bracket for simple resource handling
1426pub fn bracket<A, O, St, FAcq, FUse, FRel, R>(
1427    acquire: FAcq,
1428    use_fn: FUse,
1429    release: FRel,
1430) -> RS2Stream<O>
1431where
1432    FAcq: Future<Output = A> + Send + 'static,
1433    FUse: FnOnce(A) -> St + Send + 'static,
1434    St: Stream<Item = O> + Send + 'static,
1435    FRel: FnOnce(A) -> R + Send + 'static,
1436    R: Future<Output = ()> + Send + 'static,
1437    O: Send + 'static,
1438    A: Clone + Send + 'static,
1439{
1440    stream! {
1441        let resource = acquire.await;
1442        let stream = use_fn(resource.clone());
1443        pin_mut!(stream);
1444        while let Some(item) = stream.next().await {
1445            yield item;
1446        }
1447        release(resource).await;
1448    }
1449        .boxed()
1450}
1451
1452/// BracketCase with exit case semantics for streams of Result<O,E>
1453pub fn bracket_case<A, O, E, St, FAcq, FUse, FRel, R>(
1454    acquire: FAcq,
1455    use_fn: FUse,
1456    release: FRel,
1457) -> RS2Stream<Result<O, E>>
1458where
1459    FAcq: Future<Output = A> + Send + 'static,
1460    FUse: FnOnce(A) -> St + Send + 'static,
1461    St: Stream<Item = Result<O, E>> + Send + 'static,
1462    FRel: FnOnce(A, ExitCase<E>) -> R + Send + 'static,
1463    R: Future<Output = ()> + Send + 'static,
1464    O: Send + 'static,
1465    E: Clone + Send + 'static,
1466    A: Clone + Send + 'static,
1467{
1468    stream! {
1469        let resource = acquire.await;
1470        let stream = use_fn(resource.clone());
1471        pin_mut!(stream);
1472        while let Some(item) = stream.next().await {
1473            yield item;
1474        }
1475        release(resource, ExitCase::Completed).await;
1476    }
1477        .boxed()
1478}
1479
1480// ================================
1481// Stream Extensions
1482// ================================
1483
1484// Re-export the extension traits from their respective modules
1485pub use crate::rs2_result_stream_ext::RS2ResultStreamExt;
1486pub use crate::rs2_stream_ext::RS2StreamExt;