rs2_stream/
rs2.rs

1//! RStream - A Rust streaming library inspired by FS2/RS2
2//!
3//! This module provides the core streaming functionality with functional
4//! programming patterns, backpressure handling, and resource management.
5
6use async_stream::stream;
7use futures::channel::mpsc::{channel, Receiver, Sender};
8use futures_core::Stream;
9use futures_util::pin_mut;
10use futures_util::{
11    future,
12    stream::{self, BoxStream, FuturesUnordered, StreamExt},
13    SinkExt,
14};
15use std::future::Future;
16use std::sync::Arc;
17use std::time::Duration;
18use tokio::sync::Mutex;
19use tokio::{spawn, time::sleep};
20
21use crate::error::{StreamError, StreamResult};
22use crate::stream_performance_metrics::{HealthThresholds, StreamMetrics};
23use crate::resource_manager::get_global_resource_manager;
24
25/// A boxed, heap-allocated Rust Stream analogous to RS2's Stream[F, O]
26pub type RS2Stream<O> = BoxStream<'static, O>;
27
28/// Backpressure strategy for automatic flow control
29#[derive(Debug, Clone, Copy)]
30pub enum BackpressureStrategy {
31    /// Drop oldest items when buffer is full
32    DropOldest,
33    /// Drop newest items when buffer is full
34    DropNewest,
35    /// Block producer until consumer catches up
36    Block,
37    /// Fail fast when buffer is full
38    Error,
39}
40
41/// Configuration for automatic backpressure
42#[derive(Debug, Clone)]
43pub struct BackpressureConfig {
44    pub strategy: BackpressureStrategy,
45    pub buffer_size: usize,
46    pub low_watermark: Option<usize>,  // Resume at this level
47    pub high_watermark: Option<usize>, // Pause at this level
48}
49
50impl Default for BackpressureConfig {
51    fn default() -> Self {
52        Self {
53            strategy: BackpressureStrategy::Block,
54            buffer_size: 100,
55            low_watermark: Some(25),
56            high_watermark: Some(75),
57        }
58    }
59}
60
61/// ExitCase for bracketCase semantics
62#[derive(Debug, Clone)]
63pub enum ExitCase<E> {
64    Completed,
65    Errored(E),
66}
67
68// ================================
69// Core Stream Constructors
70// ================================
71
72/// Emit a single element as a rs2_stream
73pub fn emit<O>(item: O) -> RS2Stream<O>
74where
75    O: Send + 'static,
76{
77    stream::once(future::ready(item)).boxed()
78}
79
80/// Create an empty rs2_stream that completes immediately
81pub fn empty<O>() -> RS2Stream<O>
82where
83    O: Send + 'static,
84{
85    stream::empty().boxed()
86}
87
88/// Create a rs2_stream from an iterator
89pub fn from_iter<I, O>(iter: I) -> RS2Stream<O>
90where
91    I: IntoIterator<Item = O> + Send + 'static,
92    <I as IntoIterator>::IntoIter: Send,
93    O: Send + 'static,
94{
95    stream::iter(iter).boxed()
96}
97
98/// Evaluate a Future and emit its output
99pub fn eval<O, F>(fut: F) -> RS2Stream<O>
100where
101    F: Future<Output = O> + Send + 'static,
102    O: Send + 'static,
103{
104    stream::once(fut).boxed()
105}
106
107/// Repeat a value indefinitely
108pub fn repeat<O>(item: O) -> RS2Stream<O>
109where
110    O: Clone + Send + 'static,
111{
112    stream::repeat(item).boxed()
113}
114
115/// Create a rs2_stream that emits a single value after a delay
116pub fn emit_after<O>(item: O, duration: Duration) -> RS2Stream<O>
117where
118    O: Send + 'static,
119{
120    stream::once(async move {
121        sleep(duration).await;
122        item
123    })
124    .boxed()
125}
126
127/// Generate a rs2_stream from a seed value and a function
128///
129/// This combinator takes an initial state and a function that produces an element and the next state.
130/// It continues until the function returns None.
131///
132/// # Examples
133/// ```
134/// use rs2_stream::rs2::*;
135/// use futures_util::stream::StreamExt;
136///
137/// # async fn example() {
138/// // Create a rs2_stream of Fibonacci numbers
139/// let fibonacci = unfold(
140///     (0, 1),
141///     |state| async move {
142///         let (a, b) = state;
143///         Some((a, (b, a + b)))
144///     }
145/// );
146///
147/// // Take the first 10 Fibonacci numbers
148/// let result = fibonacci.take(10).collect::<Vec<_>>().await;
149/// assert_eq!(result, vec![0, 1, 1, 2, 3, 5, 8, 13, 21, 34]);
150/// # }
151/// ```
152pub fn unfold<S, O, F, Fut>(init: S, mut f: F) -> RS2Stream<O>
153where
154    S: Send + 'static,
155    O: Send + 'static,
156    F: FnMut(S) -> Fut + Send + 'static,
157    Fut: Future<Output = Option<(O, S)>> + Send + 'static,
158{
159    stream! {
160        let mut state_opt = Some(init);
161
162        loop {
163            let state = state_opt.take().expect("State should be available");
164            let fut = f(state);
165            match fut.await {
166                Some((item, next_state)) => {
167                    yield item;
168                    state_opt = Some(next_state);
169                },
170                None => break,
171            }
172        }
173    }
174    .boxed()
175}
176
177// ================================
178// Stream Transformations
179// ================================
180
181/// Group adjacent elements that share a common key
182///
183/// This combinator groups consecutive elements that produce the same key.
184/// It emits groups as they complete (when the key changes or the rs2_stream ends).
185/// Each emitted item is a tuple containing the key and a vector of elements.
186///
187/// # Examples
188/// ```
189/// use rs2_stream::rs2::*;
190/// use futures_util::stream::StreamExt;
191///
192/// # async fn example() {
193/// let rs2_stream = from_iter(vec![1, 1, 2, 2, 3, 3, 2, 1]);
194/// let result = group_adjacent_by(rs2_stream, |&x| x % 2).collect::<Vec<_>>().await;
195/// assert_eq!(result, vec![(1, vec![1, 1]), (0, vec![2, 2]), (1, vec![3, 3]), (0, vec![2]), (1, vec![1])]);
196/// # }
197/// ```
198pub fn group_adjacent_by<O, K, F>(s: RS2Stream<O>, mut key_fn: F) -> RS2Stream<(K, Vec<O>)>
199where
200    O: Clone + Send + 'static,
201    K: Eq + Clone + Send + 'static,
202    F: FnMut(&O) -> K + Send + 'static,
203{
204    stream! {
205        pin_mut!(s);
206        let mut current_key: Option<K> = None;
207        let mut current_group: Vec<O> = Vec::new();
208
209        while let Some(item) = s.next().await {
210            let key = key_fn(&item);
211
212            match &current_key {
213                Some(k) if *k == key => {
214                    current_group.push(item);
215                },
216                _ => {
217                    if !current_group.is_empty() {
218                        yield (current_key.clone().unwrap(), std::mem::take(&mut current_group));
219                    }
220                    current_key = Some(key);
221                    current_group.push(item);
222                }
223            }
224        }
225
226        if !current_group.is_empty() {
227            yield (current_key.clone().unwrap(), std::mem::take(&mut current_group));
228        }
229    }
230    .boxed()
231}
232
233/// Slice: take first n items
234pub fn take<O>(s: RS2Stream<O>, n: usize) -> RS2Stream<O>
235where
236    O: Send + 'static,
237{
238    s.take(n).boxed()
239}
240
241/// Slice: drop first n items
242pub fn drop<O>(s: RS2Stream<O>, n: usize) -> RS2Stream<O>
243where
244    O: Send + 'static,
245{
246    s.skip(n).boxed()
247}
248
249/// Chunk the rs2_stream into Vecs of size n
250pub fn chunk<O>(s: RS2Stream<O>, size: usize) -> RS2Stream<Vec<O>>
251where
252    O: Send + 'static,
253{
254    stream! {
255        let mut buf = Vec::with_capacity(size);
256        pin_mut!(s);
257        while let Some(item) = s.next().await {
258            buf.push(item);
259            if buf.len() == size {
260                yield std::mem::take(&mut buf);
261            }
262        }
263        if !buf.is_empty() {
264            yield std::mem::take(&mut buf);
265        }
266    }
267    .boxed()
268}
269
270/// Add timeout support to any rs2_stream
271pub fn timeout<T>(s: RS2Stream<T>, duration: Duration) -> RS2Stream<StreamResult<T>>
272where
273    T: Send + 'static,
274{
275    stream! {
276        pin_mut!(s);
277        loop {
278            match tokio::time::timeout(duration, s.next()).await {
279                Ok(Some(value)) => yield Ok(value),
280                Ok(None) => break,
281                Err(_) => yield Err(StreamError::Timeout),
282            }
283        }
284    }
285    .boxed()
286}
287
288/// Scan operation (like fold but emits intermediate results)
289pub fn scan<T, U, F>(s: RS2Stream<T>, init: U, mut f: F) -> RS2Stream<U>
290where
291    F: FnMut(U, T) -> U + Send + 'static,
292    T: Send + 'static,
293    U: Clone + Send + 'static,
294{
295    stream! {
296        let mut acc = init;
297        pin_mut!(s);
298        while let Some(item) = s.next().await {
299            acc = f(acc.clone(), item);
300            yield acc.clone();
301        }
302    }
303    .boxed()
304}
305
306/// Fold operation that accumulates a value over a stream
307pub fn fold<T, A, F, Fut>(s: RS2Stream<T>, init: A, mut f: F) -> impl Future<Output = A>
308where
309    F: FnMut(A, T) -> Fut + Send + 'static,
310    Fut: Future<Output = A> + Send + 'static,
311    T: Send + 'static,
312    A: Send + 'static,
313{
314    async move {
315        let mut acc = init;
316        pin_mut!(s);
317        while let Some(item) = s.next().await {
318            acc = f(acc, item).await;
319        }
320        acc
321    }
322}
323
324/// Reduce operation that combines all elements in a stream using a binary operation
325pub fn reduce<T, F, Fut>(s: RS2Stream<T>, mut f: F) -> impl Future<Output = Option<T>>
326where
327    F: FnMut(T, T) -> Fut + Send + 'static,
328    Fut: Future<Output = T> + Send + 'static,
329    T: Send + 'static,
330{
331    async move {
332        pin_mut!(s);
333        let first = match s.next().await {
334            Some(item) => item,
335            None => return None, // Return None for empty streams
336        };
337
338        let mut acc = first;
339        while let Some(item) = s.next().await {
340            acc = f(acc, item).await;
341        }
342
343        Some(acc)
344    }
345}
346
347/// Filter and map elements of a stream in one operation
348pub fn filter_map<T, U, F, Fut>(s: RS2Stream<T>, f: F) -> RS2Stream<U>
349where
350    F: FnMut(T) -> Fut + Send + 'static,
351    Fut: Future<Output = Option<U>> + Send + 'static,
352    T: Send + 'static,
353    U: Send + 'static,
354{
355    s.filter_map(f).boxed()
356}
357
358/// Take elements from a stream while a predicate returns true
359///
360/// This combinator yields elements from the stream as long as the predicate returns true.
361/// It stops (and does not yield) the first element where the predicate returns false.
362///
363/// # Examples
364/// ```
365/// use rs2_stream::rs2::*;
366/// use futures_util::stream::StreamExt;
367///
368/// # async fn example() {
369/// let stream = from_iter(vec![1, 2, 3, 4, 5]);
370/// let result = take_while(stream, |&x| async move { x < 4 }).collect::<Vec<_>>().await;
371/// assert_eq!(result, vec![1, 2, 3]);
372/// # }
373/// ```
374pub fn take_while<T, F, Fut>(s: RS2Stream<T>, mut predicate: F) -> RS2Stream<T>
375where
376    F: FnMut(&T) -> Fut + Send + 'static,
377    Fut: Future<Output = bool> + Send + 'static,
378    T: Send + 'static,
379{
380    stream! {
381        pin_mut!(s);
382        while let Some(item) = s.next().await {
383            if predicate(&item).await {
384                yield item;
385            } else {
386                break;
387            }
388        }
389    }
390    .boxed()
391}
392
393/// Skip elements from a stream while a predicate returns true
394///
395/// This combinator skips elements from the stream as long as the predicate returns true.
396/// Once the predicate returns false, it yields that element and all remaining elements.
397///
398/// # Examples
399/// ```
400/// use rs2_stream::rs2::*;
401/// use futures_util::stream::StreamExt;
402///
403/// # async fn example() {
404/// let stream = from_iter(vec![1, 2, 3, 4, 5]);
405/// let result = drop_while(stream, |&x| async move { x < 4 }).collect::<Vec<_>>().await;
406/// assert_eq!(result, vec![4, 5]);
407/// # }
408/// ```
409pub fn drop_while<T, F, Fut>(s: RS2Stream<T>, mut predicate: F) -> RS2Stream<T>
410where
411    F: FnMut(&T) -> Fut + Send + 'static,
412    Fut: Future<Output = bool> + Send + 'static,
413    T: Send + 'static,
414{
415    stream! {
416        pin_mut!(s);
417
418        let mut found_false = false;
419        while let Some(item) = s.next().await {
420            if !found_false && predicate(&item).await {
421                continue;
422            } else {
423                found_false = true;
424                yield item;
425            }
426        }
427    }
428    .boxed()
429}
430
431/// Group consecutive elements that share a common key
432///
433/// This combinator groups consecutive elements that produce the same key.
434/// It emits groups as they complete (when the key changes or the stream ends).
435/// Each emitted item is a tuple containing the key and a vector of elements.
436///
437/// # Examples
438/// ```
439/// use rs2_stream::rs2::*;
440/// use futures_util::stream::StreamExt;
441///
442/// # async fn example() {
443/// let stream = from_iter(vec![1, 1, 2, 2, 3, 3, 2, 1]);
444/// let result = group_by(stream, |&x| x % 2).collect::<Vec<_>>().await;
445/// assert_eq!(result, vec![(1, vec![1, 1]), (0, vec![2, 2]), (1, vec![3, 3]), (0, vec![2]), (1, vec![1])]);
446/// # }
447/// ```
448pub fn group_by<T, K, F>(s: RS2Stream<T>, mut key_fn: F) -> RS2Stream<(K, Vec<T>)>
449where
450    T: Clone + Send + 'static,
451    K: Eq + Clone + Send + 'static,
452    F: FnMut(&T) -> K + Send + 'static,
453{
454    stream! {
455        pin_mut!(s);
456        let mut current_key: Option<K> = None;
457        let mut current_group: Vec<T> = Vec::new();
458
459        while let Some(item) = s.next().await {
460            let key = key_fn(&item);
461
462            match &current_key {
463                Some(k) if *k == key => {
464                    current_group.push(item);
465                },
466                _ => {
467                    if !current_group.is_empty() {
468                        yield (current_key.clone().unwrap(), std::mem::take(&mut current_group));
469                    }
470                    current_key = Some(key);
471                    current_group.push(item);
472                }
473            }
474        }
475
476        if !current_group.is_empty() {
477            yield (current_key.clone().unwrap(), std::mem::take(&mut current_group));
478        }
479    }
480    .boxed()
481}
482
483/// Sliding window operation
484pub fn sliding_window<T>(s: RS2Stream<T>, size: usize) -> RS2Stream<Vec<T>>
485where
486    T: Clone + Send + 'static,
487{
488    if size == 0 {
489        return empty();
490    }
491
492    stream! {
493        let mut window = Vec::with_capacity(size);
494        pin_mut!(s);
495
496        while let Some(item) = s.next().await {
497            window.push(item);
498
499            if window.len() > size {
500                window.remove(0);
501            }
502
503            if window.len() == size {
504                yield window.clone();
505            }
506        }
507    }
508    .boxed()
509}
510
511/// Batch processing for better throughput
512pub fn batch_process<T, U, F>(s: RS2Stream<T>, batch_size: usize, mut processor: F) -> RS2Stream<U>
513where
514    F: FnMut(Vec<T>) -> Vec<U> + Send + 'static,
515    T: Send + 'static,
516    U: Send + 'static,
517{
518    stream! {
519        let chunked = chunk(s, batch_size);
520        pin_mut!(chunked);
521        while let Some(batch) = chunked.next().await {
522            for item in processor(batch) {
523                yield item;
524            }
525        }
526    }
527    .boxed()
528}
529
530/// Collect metrics while processing rs2_stream
531pub fn with_metrics<T>(
532    s: RS2Stream<T>,
533    name: String,
534    thresholds: HealthThresholds,
535) -> (RS2Stream<T>, Arc<Mutex<StreamMetrics>>)
536where
537    T: Send + 'static,
538{
539    let metrics = Arc::new(Mutex::new(
540        StreamMetrics::new()
541            .with_name(name)
542            .with_health_thresholds(thresholds),
543    ));
544
545    let metrics_clone = Arc::clone(&metrics);
546
547    let monitored_stream = stream! {
548        pin_mut!(s);
549        while let Some(item) = s.next().await {
550            {
551                let mut m = metrics_clone.lock().await;
552                m.record_item(size_of_val(&item) as u64);
553            }
554            yield item;
555        }
556
557        {
558            let mut m = metrics_clone.lock().await;
559            m.finalize();
560        }
561    }
562    .boxed();
563
564    (monitored_stream, metrics)
565}
566
567// ================================
568// Backpressure Management
569// ================================
570
571/// Automatic backpressure with configurable strategy
572pub fn auto_backpressure<O>(s: RS2Stream<O>, config: BackpressureConfig) -> RS2Stream<O>
573where
574    O: Send + 'static,
575{
576    match config.strategy {
577        BackpressureStrategy::Block => auto_backpressure_block(s, config.buffer_size),
578        BackpressureStrategy::DropOldest => auto_backpressure_drop_oldest(s, config.buffer_size),
579        BackpressureStrategy::DropNewest => auto_backpressure_drop_newest(s, config.buffer_size),
580        BackpressureStrategy::Error => auto_backpressure_error(s, config.buffer_size),
581    }
582}
583
584/// Automatic backpressure with blocking strategy
585pub fn auto_backpressure_block<O>(s: RS2Stream<O>, buffer_size: usize) -> RS2Stream<O>
586where
587    O: Send + 'static,
588{
589    let (mut tx, rx): (Sender<O>, Receiver<O>) = channel(buffer_size);
590
591    spawn(async move {
592        pin_mut!(s);
593        while let Some(item) = s.next().await {
594            if tx.send(item).await.is_err() {
595                break;
596            }
597        }
598    });
599
600    stream! {
601        let mut rx = rx;
602        while let Some(item) = rx.next().await {
603            yield item;
604        }
605    }
606    .boxed()
607}
608
609/// Automatic backpressure that drops oldest items when buffer is full
610pub fn auto_backpressure_drop_oldest<O>(s: RS2Stream<O>, buffer_size: usize) -> RS2Stream<O>
611where
612    O: Send + 'static,
613{
614    use std::collections::VecDeque;
615
616    let buffer = Arc::new(Mutex::new(VecDeque::<O>::new()));
617    let buffer_clone = Arc::clone(&buffer);
618    let (done_tx, mut done_rx) = tokio::sync::mpsc::channel(1);
619    let resource_manager = get_global_resource_manager();
620    let resource_manager_clone = resource_manager.clone();
621
622    spawn(async move {
623        pin_mut!(s);
624        while let Some(item) = s.next().await {
625            let mut buf = buffer_clone.lock().await;
626            if buf.len() >= buffer_size {
627                buf.pop_front();
628                resource_manager_clone.track_buffer_overflow().await.ok();
629            }
630            buf.push_back(item);
631            resource_manager_clone.track_memory_allocation(1).await.ok(); // Approximate per-item
632        }
633        let _ = done_tx.send(()).await;
634    });
635
636    stream! {
637        let mut source_done = false;
638        loop {
639            if let Ok(_) = done_rx.try_recv() {
640                source_done = true;
641            }
642            let item = {
643                let mut buf = buffer.lock().await;
644                let popped = buf.pop_front();
645                if popped.is_some() {
646                    resource_manager.track_memory_deallocation(1).await;
647                }
648                popped
649            };
650            match item {
651                Some(item) => yield item,
652                None => {
653                    if source_done {
654                        break;
655                    }
656                    tokio::time::sleep(Duration::from_millis(1)).await;
657                }
658            }
659        }
660    }
661    .boxed()
662}
663
664/// Automatic backpressure that drops newest items when buffer is full
665pub fn auto_backpressure_drop_newest<O>(s: RS2Stream<O>, buffer_size: usize) -> RS2Stream<O>
666where
667    O: Send + 'static,
668{
669    use std::collections::VecDeque;
670
671    let buffer = Arc::new(Mutex::new(VecDeque::<O>::new()));
672    let buffer_clone = Arc::clone(&buffer);
673    let (done_tx, mut done_rx) = tokio::sync::mpsc::channel(1);
674    let resource_manager = get_global_resource_manager();
675    let resource_manager_clone = resource_manager.clone();
676
677    spawn(async move {
678        pin_mut!(s);
679        while let Some(item) = s.next().await {
680            let mut buf = buffer_clone.lock().await;
681            if buf.len() < buffer_size {
682                buf.push_back(item);
683                resource_manager_clone.track_memory_allocation(1).await.ok();
684            } else {
685                resource_manager_clone.track_buffer_overflow().await.ok();
686            }
687        }
688        let _ = done_tx.send(()).await;
689    });
690
691    stream! {
692        let mut source_done = false;
693        loop {
694            if let Ok(_) = done_rx.try_recv() {
695                source_done = true;
696            }
697            let item = {
698                let mut buf = buffer.lock().await;
699                let popped = buf.pop_front();
700                if popped.is_some() {
701                    resource_manager.track_memory_deallocation(1).await;
702                }
703                popped
704            };
705            match item {
706                Some(item) => yield item,
707                None => {
708                    if source_done {
709                        break;
710                    }
711                    tokio::time::sleep(Duration::from_millis(1)).await;
712                }
713            }
714        }
715    }
716    .boxed()
717}
718
719/// Automatic backpressure that errors when buffer is full
720pub fn auto_backpressure_error<O>(s: RS2Stream<O>, buffer_size: usize) -> RS2Stream<O>
721where
722    O: Send + 'static,
723{
724    use tokio::sync::mpsc;
725
726    let (tx, mut rx) = mpsc::channel(buffer_size);
727
728    spawn(async move {
729        pin_mut!(s);
730        while let Some(item) = s.next().await {
731            if tx.send(item).await.is_err() {
732                break;
733            }
734        }
735    });
736
737    stream! {
738        while let Some(item) = rx.recv().await {
739            yield item;
740        }
741    }
742    .boxed()
743}
744
745// ================================
746// Stream Combinators
747// ================================
748
749/// Interrupt a rs2_stream when a signal is received
750///
751/// This combinator takes a rs2_stream and a future that signals interruption.
752/// It stops processing the rs2_stream when the signal future completes.
753/// Resources are properly cleaned up when the rs2_stream is interrupted.
754///
755/// # Examples
756/// ```
757/// use rs2_stream::rs2::*;
758/// use std::time::Duration;
759/// use tokio::time::sleep;
760/// use async_stream::stream;
761/// use futures_util::stream::StreamExt;
762///
763/// # async fn example() {
764/// // Create a rs2_stream that emits numbers every 100ms
765/// let rs2_stream = from_iter(0..100)
766///     .throttle_rs2(Duration::from_millis(100));
767///
768/// // Create a future that completes after 250ms
769/// let interrupt_signal = sleep(Duration::from_millis(250));
770///
771/// // The rs2_stream will be interrupted after about 250ms,
772/// // so we should get approximately 2-3 items
773/// let result = interrupt_when(rs2_stream, interrupt_signal)
774///     .collect::<Vec<_>>()
775///     .await;
776///
777/// assert!(result.len() >= 2 && result.len() <= 3);
778/// # }
779/// ```
780pub fn interrupt_when<O, F>(s: RS2Stream<O>, signal: F) -> RS2Stream<O>
781where
782    O: Send + 'static,
783    F: Future<Output = ()> + Send + 'static,
784{
785    stream! {
786        pin_mut!(s);
787        pin_mut!(signal);
788
789        loop {
790            tokio::select! {
791                biased;
792                _ = &mut signal => {
793                    break;
794                },
795
796                maybe_item = s.next() => {
797                    match maybe_item {
798                        Some(item) => yield item,
799                        None => break,
800                    }
801                },
802            }
803        }
804    }
805    .boxed()
806}
807
808/// Concatenate multiple streams sequentially
809pub fn concat<O, S>(streams: Vec<S>) -> RS2Stream<O>
810where
811    S: Stream<Item = O> + Send + 'static,
812    O: Send + 'static,
813{
814    stream! {
815        for s in streams {
816            pin_mut!(s);
817            while let Some(item) = s.next().await {
818                yield item;
819            }
820        }
821    }
822    .boxed()
823}
824
825/// Merge two streams into one interleaved output
826pub fn merge<O, S1, S2>(s1: S1, mut s2: S2) -> RS2Stream<O>
827where
828    S1: Stream<Item = O> + Send + 'static,
829    S2: Stream<Item = O> + Send + 'static + Unpin,
830    O: Send + 'static,
831{
832    let chained = s1
833        .map(Some)
834        .chain(stream! { while let Some(x) = s2.next().await { yield Some(x) } });
835    stream! {
836        pin_mut!(chained);
837        while let Some(item) = chained.next().await {
838            if let Some(x) = item {
839                yield x;
840            }
841        }
842    }
843    .boxed()
844}
845
846/// Interleave multiple streams in a round-robin fashion
847pub fn interleave<O, S>(streams: Vec<S>) -> RS2Stream<O>
848where
849    S: Stream<Item = O> + Send + 'static + Unpin,
850    O: Send + 'static,
851{
852    if streams.is_empty() {
853        return empty();
854    }
855
856    stream! {
857        let mut streams: Vec<_> = streams.into_iter().map(|s| Box::pin(s)).collect();
858        let mut index = 0;
859
860        while !streams.is_empty() {
861            if index >= streams.len() {
862                index = 0;
863            }
864
865            match streams[index].next().await {
866                Some(item) => {
867                    yield item;
868                    index += 1;
869                }
870                None => {
871                    streams.remove(index);
872                }
873            }
874        }
875    }
876    .boxed()
877}
878
879/// Combine two streams element-by-element using a provided function
880/// Returns a new rs2_stream with the combined elements
881/// Stops when either input rs2_stream ends
882pub fn zip_with<A, B, O, F, S1, S2>(s1: S1, s2: S2, mut f: F) -> RS2Stream<O>
883where
884    S1: Stream<Item = A> + Send + 'static,
885    S2: Stream<Item = B> + Send + 'static,
886    F: FnMut(A, B) -> O + Send + 'static,
887    A: Send + 'static,
888    B: Send + 'static,
889    O: Send + 'static,
890{
891    stream! {
892        pin_mut!(s1);
893        pin_mut!(s2);
894
895        loop {
896            match futures_util::future::join(s1.next(), s2.next()).await {
897                (Some(a), Some(b)) => yield f(a, b),
898                _ => break, // Stop when either rs2_stream ends
899            }
900        }
901    }
902    .boxed()
903}
904
905/// Select between two streams based on which one produces a value first
906///
907/// This combinator takes two streams and emits values from whichever rs2_stream
908/// produces a value first. Once a value is received from one rs2_stream, the other
909/// rs2_stream is cancelled. If either rs2_stream completes (returns None), the combinator
910/// switches to the other rs2_stream exclusively.
911///
912/// # Examples
913/// ```
914/// use rs2_stream::rs2::*;
915/// use std::time::Duration;
916/// use async_stream::stream;
917/// use tokio::time::sleep;
918/// use futures_util::stream::StreamExt;
919///
920/// # async fn example() {
921/// // Create two streams with different timing
922/// let fast_stream = stream! {
923///     yield 1;
924///     sleep(Duration::from_millis(10)).await;
925///     yield 2;
926///     sleep(Duration::from_millis(100)).await;
927///     yield 3;
928/// };
929///
930/// let slow_stream = stream! {
931///     sleep(Duration::from_millis(50)).await;
932///     yield 10;
933///     sleep(Duration::from_millis(10)).await;
934///     yield 20;
935/// };
936///
937/// // The either combinator will select values from whichever rs2_stream produces first
938/// let result = either(fast_stream.boxed(), slow_stream.boxed())
939///     .collect::<Vec<_>>()
940///     .await;
941///
942/// // We expect to get values from the fast rs2_stream first, then from the slow rs2_stream
943/// // when the fast rs2_stream is waiting longer
944/// assert_eq!(result, vec![1, 2, 10, 20]);
945/// # }
946/// ```
947pub fn either<O, S1, S2>(s1: S1, s2: S2) -> RS2Stream<O>
948where
949    S1: Stream<Item = O> + Send + 'static,
950    S2: Stream<Item = O> + Send + 'static,
951    O: Send + 'static,
952{
953    stream! {
954        pin_mut!(s1);
955        pin_mut!(s2);
956
957        let mut s1_done = false;
958        let mut s2_done = false;
959
960        let mut using_s1 = true;
961
962        loop {
963            if s1_done {
964                match s2.next().await {
965                    Some(item) => yield item,
966                    None => break,
967                }
968                continue;
969            }
970
971            if s2_done {
972                match s1.next().await {
973                    Some(item) => yield item,
974                    None => break,
975                }
976                continue;
977            }
978
979            if using_s1 {
980                match s1.next().await {
981                    Some(item) => {
982                        yield item;
983                    },
984                    None => {
985                        s1_done = true;
986                    }
987                }
988            } else {
989                match s2.next().await {
990                    Some(item) => {
991                        yield item;
992                    },
993                    None => {
994                        s2_done = true;
995                    }
996                }
997            }
998
999            tokio::select! {
1000                biased;
1001
1002                maybe_item = s1.next() => {
1003                    match maybe_item {
1004                        Some(item) => {
1005                            yield item;
1006                            using_s1 = true;
1007                        },
1008                        None => {
1009                            s1_done = true;
1010                        }
1011                    }
1012                },
1013                maybe_item = s2.next() => {
1014                    match maybe_item {
1015                        Some(item) => {
1016                            yield item;
1017                            using_s1 = false;
1018                        },
1019                        None => {
1020                            s2_done = true;
1021                        }
1022                    }
1023                }
1024            }
1025        }
1026    }
1027    .boxed()
1028}
1029
1030// ================================
1031// Timing and Rate Control
1032// ================================
1033
1034/// Debounce a rs2_stream, only emitting an element after a specified quiet period has passed
1035/// without receiving another element
1036///
1037/// This combinator waits for a quiet period (specified by `duration`) after receiving an element
1038/// before emitting it. If another element arrives during the quiet period, the timer is reset
1039/// and the new element replaces the previous one.
1040///
1041/// This is useful for handling rapidly updating sources where you only want to process
1042/// the most recent value after the source has settled.
1043pub fn debounce<O>(s: RS2Stream<O>, duration: Duration) -> RS2Stream<O>
1044where
1045    O: Send + 'static,
1046{
1047    stream! {
1048        pin_mut!(s);
1049
1050        let mut latest_item: Option<O> = None;
1051        let mut timer_handle: Option<tokio::task::JoinHandle<()>> = None;
1052
1053        let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(1);
1054
1055        loop {
1056            tokio::select! {
1057                maybe_item = s.next() => {
1058                    match maybe_item {
1059                        Some(item) => {
1060                            if let Some(handle) = timer_handle.take() {
1061                                handle.abort();
1062                            }
1063
1064                            latest_item = Some(item);
1065
1066                            let tx_clone = tx.clone();
1067                            timer_handle = Some(tokio::spawn(async move {
1068                                tokio::time::sleep(duration).await;
1069                                let _ = tx_clone.send(()).await;
1070                            }));
1071                        },
1072                        None => {
1073                            if let Some(item) = latest_item.take() {
1074                                yield item;
1075                            }
1076                            break;
1077                        }
1078                    }
1079                },
1080                _ = rx.recv() => {
1081                    if let Some(item) = latest_item.take() {
1082                        yield item;
1083                    }
1084                }
1085            }
1086        }
1087    }
1088    .boxed()
1089}
1090
1091/// Filter out consecutive duplicate elements from a rs2_stream
1092///
1093/// This combinator only emits elements that are different from the previous element.
1094/// It uses the default equality operator (`==`) to compare elements.
1095/// The first element is always emitted.
1096///
1097/// # Examples
1098/// ```
1099/// use rs2_stream::rs2::*;
1100/// use futures_util::stream::StreamExt;
1101///
1102/// # async fn example() {
1103/// let rs2_stream = from_iter(vec![1, 1, 2, 2, 3, 3, 2, 1]);
1104/// let result = distinct_until_changed(rs2_stream).collect::<Vec<_>>().await;
1105/// assert_eq!(result, vec![1, 2, 3, 2, 1]);
1106/// # }
1107/// ```
1108pub fn distinct_until_changed<O>(s: RS2Stream<O>) -> RS2Stream<O>
1109where
1110    O: Clone + Send + PartialEq + 'static,
1111{
1112    stream! {
1113        pin_mut!(s);
1114        let mut prev: Option<O> = None;
1115
1116        while let Some(item) = s.next().await {
1117            match &prev {
1118                Some(p) if p == &item => {
1119                },
1120                _ => {
1121                    yield item.clone();
1122                    prev = Some(item);
1123                }
1124            }
1125        }
1126    }
1127    .boxed()
1128}
1129
1130/// Sample a rs2_stream at regular intervals, emitting the most recent value
1131///
1132/// This combinator samples the most recent value from a rs2_stream at a regular interval.
1133/// It only emits a value if at least one new value has arrived since the last emission.
1134/// If no new value has arrived during an interval, that interval is skipped.
1135///
1136/// # Examples
1137/// ```
1138/// use rs2_stream::rs2::*;
1139/// use futures_util::stream::StreamExt;
1140/// use std::time::Duration;
1141/// use tokio::time::sleep;
1142/// use async_stream::stream;
1143///
1144/// # async fn example() {
1145/// // Create a rs2_stream that emits values faster than the sample interval
1146/// let rs2_stream = stream! {
1147///     yield 1;
1148///     sleep(Duration::from_millis(10)).await;
1149///     yield 2;
1150///     sleep(Duration::from_millis(10)).await;
1151///     yield 3;
1152///     sleep(Duration::from_millis(100)).await;
1153///     yield 4;
1154/// };
1155///
1156/// // Sample the rs2_stream every 50ms
1157/// let result = sample(rs2_stream.boxed(), Duration::from_millis(50))
1158///     .collect::<Vec<_>>()
1159///     .await;
1160///
1161/// // We expect to get the most recent value at each interval:
1162/// // - 3 (the most recent value after the first 50ms)
1163/// // - 4 (the most recent value after the next 50ms)
1164/// assert_eq!(result, vec![3, 4]);
1165/// # }
1166/// ```
1167pub fn sample<O>(s: RS2Stream<O>, interval: Duration) -> RS2Stream<O>
1168where
1169    O: Clone + Send + 'static,
1170{
1171    stream! {
1172        pin_mut!(s);
1173
1174        let mut latest_item: Option<O> = None;
1175        let mut has_new_value = false;
1176
1177        let mut timer = tokio::time::interval(interval);
1178        timer.tick().await;
1179
1180        loop {
1181            tokio::select! {
1182                maybe_item = s.next() => {
1183                    match maybe_item {
1184                        Some(item) => {
1185                            latest_item = Some(item);
1186                            has_new_value = true;
1187                        },
1188                        None => {
1189                            if has_new_value {
1190                                if let Some(item) = latest_item.take() {
1191                                    yield item;
1192                                }
1193                            }
1194                            break;
1195                        }
1196                    }
1197                },
1198                _ = timer.tick() => {
1199                    if has_new_value {
1200                        if let Some(ref item) = latest_item {
1201                            yield item.clone();
1202                            has_new_value = false;
1203                        }
1204                    }
1205                }
1206            }
1207        }
1208    }
1209    .boxed()
1210}
1211
1212/// Filter out consecutive duplicate elements from a rs2_stream using a custom equality function
1213///
1214/// This combinator only emits elements that are different from the previous element.
1215/// It uses the provided equality function to compare elements.
1216/// The first element is always emitted.
1217///
1218/// # Examples
1219/// ```
1220/// use rs2_stream::rs2::*;
1221/// use futures_util::stream::StreamExt;
1222///
1223/// # async fn example() {
1224/// let rs2_stream = from_iter(vec![1, 1, 2, 2, 3, 3, 2, 1]);
1225/// // Use a custom equality function that considers two numbers equal if they have the same parity
1226/// let result = distinct_until_changed_by(rs2_stream, |a, b| a % 2 == b % 2).collect::<Vec<_>>().await;
1227/// assert_eq!(result, vec![1, 2]);
1228/// # }
1229/// ```
1230pub fn distinct_until_changed_by<O, F>(s: RS2Stream<O>, mut eq: F) -> RS2Stream<O>
1231where
1232    O: Clone + Send + 'static,
1233    F: FnMut(&O, &O) -> bool + Send + 'static,
1234{
1235    stream! {
1236        pin_mut!(s);
1237        let mut prev: Option<O> = None;
1238
1239        while let Some(item) = s.next().await {
1240            match &prev {
1241                Some(p) if eq(p, &item) => {
1242                },
1243                _ => {
1244                    yield item.clone();
1245                    prev = Some(item);
1246                }
1247            }
1248        }
1249    }
1250    .boxed()
1251}
1252
1253/// Prefetch a specified number of elements ahead of consumption
1254/// This combinator eagerly evaluates a specified number of elements ahead of what's been requested,
1255/// storing them in a buffer. This can improve performance by starting to process the next elements
1256/// before they're actually needed.
1257///
1258/// Backpressure is maintained by using a bounded channel with capacity equal to the prefetch count.
1259pub fn prefetch<O>(s: RS2Stream<O>, prefetch_count: usize) -> RS2Stream<O>
1260where
1261    O: Send + 'static,
1262{
1263    if prefetch_count == 0 {
1264        return s;
1265    }
1266
1267    let (mut tx, rx): (Sender<O>, Receiver<O>) = channel(prefetch_count);
1268
1269    spawn(async move {
1270        pin_mut!(s);
1271        while let Some(item) = s.next().await {
1272            if tx.send(item).await.is_err() {
1273                break;
1274            }
1275        }
1276    });
1277
1278    stream! {
1279        let mut rx = rx;
1280        while let Some(item) = rx.next().await {
1281            yield item;
1282        }
1283    }
1284    .boxed()
1285}
1286
1287/// Back-pressure-aware rate limiting via bounded channel (legacy)
1288pub fn rate_limit_backpressure<O>(s: RS2Stream<O>, capacity: usize) -> RS2Stream<O>
1289where
1290    O: Send + 'static,
1291{
1292    auto_backpressure_block(s, capacity)
1293}
1294
1295/// Throttle rs2_stream to emit one element per `duration`
1296pub fn throttle<O>(s: RS2Stream<O>, duration: Duration) -> RS2Stream<O>
1297where
1298    O: Send + 'static,
1299{
1300    stream! {
1301        pin_mut!(s);
1302        while let Some(item) = s.next().await {
1303            yield item;
1304            sleep(duration).await;
1305        }
1306    }
1307    .boxed()
1308}
1309
1310/// Create a rs2_stream that emits values at a fixed rate
1311pub fn tick<O>(period: Duration, item: O) -> RS2Stream<O>
1312where
1313    O: Clone + Send + 'static,
1314{
1315    stream! {
1316        loop {
1317            yield item.clone();
1318            sleep(period).await;
1319        }
1320    }
1321    .boxed()
1322}
1323
1324// ================================
1325// Parallel Processing
1326// ================================
1327
1328/// Parallel evaluation preserving order (parEvalMap) with automatic backpressure
1329pub fn par_eval_map<I, O, Fut, F>(s: RS2Stream<I>, concurrency: usize, mut f: F) -> RS2Stream<O>
1330where
1331    F: FnMut(I) -> Fut + Send + 'static,
1332    Fut: Future<Output = O> + Send + 'static,
1333    O: Send + 'static,
1334    I: Send + 'static,
1335{
1336    let buffered_stream = auto_backpressure_block(s, concurrency * 2);
1337
1338    stream! {
1339        let mut in_flight = FuturesUnordered::new();
1340        pin_mut!(buffered_stream);
1341
1342        while let Some(item) = buffered_stream.next().await {
1343            in_flight.push(f(item));
1344            if in_flight.len() >= concurrency {
1345                if let Some(res) = in_flight.next().await {
1346                    yield res;
1347                }
1348            }
1349        }
1350        while let Some(res) = in_flight.next().await {
1351            yield res;
1352        }
1353    }
1354    .boxed()
1355}
1356
1357/// Parallel evaluation unordered (parEvalMapUnordered) with automatic backpressure
1358pub fn par_eval_map_unordered<I, O, Fut, F>(
1359    s: RS2Stream<I>,
1360    concurrency: usize,
1361    f: F,
1362) -> RS2Stream<O>
1363where
1364    F: FnMut(I) -> Fut + Send + 'static,
1365    Fut: Future<Output = O> + Send + 'static,
1366    O: Send + 'static,
1367    I: Send + 'static,
1368{
1369    let buffered_stream = auto_backpressure_block(s, concurrency * 2);
1370    buffered_stream.map(f).buffer_unordered(concurrency).boxed()
1371}
1372
1373/// Parallel join of streams (parJoin) with automatic backpressure
1374///
1375/// This combinator takes a rs2_stream of streams and a concurrency limit, and runs
1376/// up to n inner streams concurrently. It emits all elements from the inner streams,
1377/// and starts new inner streams as others complete.
1378///
1379/// Backpressure is maintained by using a bounded buffer for the outer rs2_stream.
1380pub fn par_join<O, S>(s: RS2Stream<S>, concurrency: usize) -> RS2Stream<O>
1381where
1382    S: Stream<Item = O> + Send + 'static + Unpin,
1383    O: Send + 'static,
1384{
1385    let buffered_stream = auto_backpressure_block(s, concurrency * 2);
1386
1387    stream! {
1388        pin_mut!(buffered_stream);
1389
1390        let mut active_streams: Vec<S> = Vec::with_capacity(concurrency);
1391
1392        let mut outer_stream_done = false;
1393
1394        loop {
1395            while active_streams.len() < concurrency && !outer_stream_done {
1396                match buffered_stream.next().await {
1397                    Some(inner_stream) => {
1398                        active_streams.push(inner_stream);
1399                    },
1400                    None => {
1401                        outer_stream_done = true;
1402                        break;
1403                    }
1404                }
1405            }
1406            if active_streams.is_empty() && outer_stream_done {
1407                break;
1408            }
1409
1410            let mut i = 0;
1411            while i < active_streams.len() {
1412                match active_streams[i].next().await {
1413                    Some(item) => {
1414                        yield item;
1415                        i += 1;
1416                    },
1417                    None => {
1418                        active_streams.swap_remove(i);
1419                    }
1420                }
1421            }
1422        }
1423    }
1424    .boxed()
1425}
1426
1427// ================================
1428// Resource Management
1429// ================================
1430
1431/// Bracket for simple resource handling
1432pub fn bracket<A, O, St, FAcq, FUse, FRel, R>(
1433    acquire: FAcq,
1434    use_fn: FUse,
1435    release: FRel,
1436) -> RS2Stream<O>
1437where
1438    FAcq: Future<Output = A> + Send + 'static,
1439    FUse: FnOnce(A) -> St + Send + 'static,
1440    St: Stream<Item = O> + Send + 'static,
1441    FRel: FnOnce(A) -> R + Send + 'static,
1442    R: Future<Output = ()> + Send + 'static,
1443    O: Send + 'static,
1444    A: Clone + Send + 'static,
1445{
1446    stream! {
1447        let resource = acquire.await;
1448        let stream = use_fn(resource.clone());
1449        pin_mut!(stream);
1450        while let Some(item) = stream.next().await {
1451            yield item;
1452        }
1453        release(resource).await;
1454    }
1455    .boxed()
1456}
1457
1458/// BracketCase with exit case semantics for streams of Result<O,E>
1459pub fn bracket_case<A, O, E, St, FAcq, FUse, FRel, R>(
1460    acquire: FAcq,
1461    use_fn: FUse,
1462    release: FRel,
1463) -> RS2Stream<Result<O, E>>
1464where
1465    FAcq: Future<Output = A> + Send + 'static,
1466    FUse: FnOnce(A) -> St + Send + 'static,
1467    St: Stream<Item = Result<O, E>> + Send + 'static,
1468    FRel: FnOnce(A, ExitCase<E>) -> R + Send + 'static,
1469    R: Future<Output = ()> + Send + 'static,
1470    O: Send + 'static,
1471    E: Clone + Send + 'static,
1472    A: Clone + Send + 'static,
1473{
1474    stream! {
1475        let resource = acquire.await;
1476        let stream = use_fn(resource.clone());
1477        pin_mut!(stream);
1478        while let Some(item) = stream.next().await {
1479            yield item;
1480        }
1481        release(resource, ExitCase::Completed).await;
1482    }
1483    .boxed()
1484}
1485
1486// ================================
1487// Stream Extensions
1488// ================================
1489
1490// Re-export the extension traits from their respective modules
1491pub use crate::rs2_result_stream_ext::RS2ResultStreamExt;
1492pub use crate::rs2_stream_ext::RS2StreamExt;