Skip to main content

agent_rex/
lib.rs

1// This file is auto-generated by organjsm tangle. Do not edit directly.
2// Source: index.org
3
4// [[file:index.org::349]]
5//! # Agent Rex
6//! 
7//! An async Stream-based FRP-like library for Rust.
8//! 
9//! This library provides composable stream operators similar to RxJS/Most.js,
10//! built on top of the `futures` crate's `Stream` trait.
11//! 
12//! ## Runtime Agnostic
13//! 
14//! Most operators are runtime-agnostic and work with any async executor.
15//! For time-based operations, we provide generic versions that accept
16//! a sleep function parameter, plus feature-flagged implementations
17//! for specific runtimes (tokio, smol, async-std).
18
19// Core imports - defined once at the top
20use std::error::Error;
21use std::future::{Future, pending};
22use std::pin::Pin;
23use std::sync::Arc;
24use std::sync::atomic::{AtomicBool, Ordering};
25use std::time::{Duration, Instant};
26
27use async_stream::stream;
28use futures::{Stream, StreamExt, FutureExt};
29use futures::channel::mpsc;
30use futures::lock::Mutex;
31use futures::stream;
32
33/// Type alias for boxed streams
34pub type BoxedStream<T> = Pin<Box<dyn Stream<Item = T> + Send>>;
35
36/// Runtime abstraction for async operations that need executor-specific features.
37/// 
38/// Most stream operations are runtime-agnostic and use only `futures` primitives.
39/// This trait is needed only for:
40/// - Time-based operations (delay, debounce, throttle)
41/// - Spawning background tasks (multicasting, eager evaluation)
42pub trait Runtime: Clone + Send + Sync + 'static {
43  /// Sleep for the given duration
44  fn sleep(duration: Duration) -> Pin<Box<dyn Future<Output = ()> + Send>>;
45  
46  /// Create an interval that yields at regular intervals
47  fn interval(period: Duration) -> Pin<Box<dyn futures::Stream<Item = ()> + Send>>;
48  
49  /// Spawn a future as a background task
50  fn spawn<F>(future: F)
51  where
52    F: Future<Output = ()> + Send + 'static;
53}
54// rust-setup ends here
55
56// [[file:index.org::374]]
57// Tokio runtime (most common)
58#[cfg(feature = "tokio-runtime")]
59#[derive(Clone)]
60pub struct TokioRuntime;
61
62#[cfg(feature = "tokio-runtime")]
63impl Runtime for TokioRuntime {
64  fn sleep(duration: Duration) -> Pin<Box<dyn Future<Output = ()> + Send>> {
65    Box::pin(tokio::time::sleep(duration))
66  }
67  
68  fn interval(period: Duration) -> Pin<Box<dyn futures::Stream<Item = ()> + Send>> {
69    use async_stream::stream;
70    Box::pin(stream! {
71      let mut interval = tokio::time::interval(period);
72      loop {
73        interval.tick().await;
74        yield ();
75      }
76    })
77  }
78  
79  fn spawn<F>(future: F)
80  where
81    F: Future<Output = ()> + Send + 'static,
82  {
83    tokio::spawn(future);
84  }
85}
86
87// Smol runtime (lightweight)
88#[cfg(feature = "smol-runtime")]
89#[derive(Clone)]
90pub struct SmolRuntime;
91
92#[cfg(feature = "smol-runtime")]
93impl Runtime for SmolRuntime {
94  fn sleep(duration: Duration) -> Pin<Box<dyn Future<Output = ()> + Send>> {
95    Box::pin(async_io::Timer::after(duration))
96  }
97  
98  fn interval(period: Duration) -> Pin<Box<dyn futures::Stream<Item = ()> + Send>> {
99    use async_stream::stream;
100    Box::pin(stream! {
101      loop {
102        async_io::Timer::after(period).await;
103        yield ();
104      }
105    })
106  }
107  
108  fn spawn<F>(future: F)
109  where
110    F: Future<Output = ()> + Send + 'static,
111  {
112    smol::spawn(future).detach();
113  }
114}
115
116// async-std runtime
117#[cfg(feature = "async-std-runtime")]
118#[derive(Clone)]
119pub struct AsyncStdRuntime;
120
121#[cfg(feature = "async-std-runtime")]
122impl Runtime for AsyncStdRuntime {
123  fn sleep(duration: Duration) -> Pin<Box<dyn Future<Output = ()> + Send>> {
124    Box::pin(async_std::task::sleep(duration))
125  }
126  
127  fn interval(period: Duration) -> Pin<Box<dyn futures::Stream<Item = ()> + Send>> {
128    use async_stream::stream;
129    Box::pin(stream! {
130      loop {
131        async_std::task::sleep(period).await;
132        yield ();
133      }
134    })
135  }
136  
137  fn spawn<F>(future: F)
138  where
139    F: Future<Output = ()> + Send + 'static,
140  {
141    async_std::task::spawn(future);
142  }
143}
144// unnamed ends here
145
146// [[file:index.org::578]]
147/// Create a stream that emits a single value.
148pub fn just<T>(value: T) -> impl Stream<Item = T> {
149  stream! { yield value; }
150}
151
152/// Alias for just
153pub fn of<T: Clone>(value: T) -> impl Stream<Item = T> {
154  just(value)
155}
156// unnamed ends here
157
158// [[file:index.org::592]]
159#[cfg(test)] 
160mod just_tests {
161  use super::*;
162  #[tokio::test]
163  async fn test_just_emits_single_value() {
164    let stream = just(42);
165    let values: Vec<_> = stream.collect().await;
166    assert_eq!(values, vec![42]);
167  }
168  #[tokio::test]
169  async fn test_just_with_string() {
170    let stream = just("hello".to_string());
171    let values: Vec<_> = stream.collect().await;
172    assert_eq!(values, vec!["hello"]);
173  }
174  #[tokio::test]
175  async fn test_of_alias() {
176    let stream = of(99);
177    let values: Vec<_> = stream.collect().await;
178    assert_eq!(values, vec![99]);
179  }
180}
181// unnamed ends here
182
183// [[file:index.org::1001]]
184/// Creates a stream from a Future.
185/// When the Future resolves, the stream emits the value and completes.
186pub fn from_future<T, F: Future<Output = T>>(future: F) -> impl Stream<Item = T> {
187  stream! {
188    let value = future.await;
189    yield value;
190  }
191}
192// unnamed ends here
193
194// [[file:index.org::1014]]
195#[cfg(test)]
196mod from_future_tests {
197  use super::*;
198  #[tokio::test]
199  async fn test_from_future_emits_resolved_value() {
200    let future = async { 42 };
201    let stream = from_future(future);
202    let values: Vec<_> = stream.collect().await;
203    assert_eq!(values, vec![42]);
204  }
205  #[tokio::test]
206  async fn test_from_future_with_async_computation() {
207    let future = async {
208      tokio::time::sleep(std::time::Duration::from_millis(1)).await;
209      "computed".to_string()
210    };
211    let stream = from_future(future);
212    let values: Vec<_> = stream.collect().await;
213    assert_eq!(values, vec!["computed"]);
214  }
215}
216// unnamed ends here
217
218// [[file:index.org::1329]]
219/// Creates a stream from an iterator.
220/// 
221/// # Note
222/// This is an alias for `futures::stream::iter`. Prefer using the built-in directly:
223/// ```rust
224/// use futures::stream;
225/// let s = stream::iter(vec![1, 2, 3]);
226/// ```
227pub use futures::stream::iter as from_iter;
228// unnamed ends here
229
230// [[file:index.org::1343]]
231#[cfg(test)]
232mod from_iter_tests {
233  use super::*;
234  #[tokio::test]
235  async fn test_from_iter_emits_all_values() {
236    let stream = from_iter(vec![1, 2, 3]);
237    let values: Vec<_> = stream.collect().await;
238    assert_eq!(values, vec![1, 2, 3]);
239  }
240  #[tokio::test]
241  async fn test_from_iter_handles_empty() {
242    let stream = from_iter(Vec::<i32>::new());
243    let values: Vec<_> = stream.collect().await;
244    assert!(values.is_empty());
245  }
246  #[tokio::test]
247  async fn test_from_iter_with_range() {
248    let stream = from_iter(0..5);
249    let values: Vec<_> = stream.collect().await;
250    assert_eq!(values, vec![0, 1, 2, 3, 4]);
251  }
252}
253// unnamed ends here
254
255// [[file:index.org::1679]]
256/// Creates a stream that emits () at regular intervals.
257/// Uses the Runtime trait abstraction for timer support.
258pub fn periodic<R: Runtime>(interval_ms: u64) -> impl Stream<Item = ()> {
259  let interval_stream = R::interval(Duration::from_millis(interval_ms));
260  stream! {
261    futures::pin_mut!(interval_stream);
262    loop {
263      interval_stream.next().await;
264      yield ();
265    }
266  }
267}
268
269// Runtime-agnostic alternative using async-io (works with smol, async-std)
270// or any timer that implements Future<Output = ()>
271pub fn periodic_with_timer<T, F>(
272  interval_ms: u64,
273  make_timer: impl Fn(Duration) -> F + Send + 'static,
274) -> impl Stream<Item = ()>
275where
276  F: std::future::Future<Output = ()> + Send,
277{
278  stream! {
279    let duration = Duration::from_millis(interval_ms);
280    loop {
281      make_timer(duration).await;
282      yield ();
283    }
284  }
285}
286// unnamed ends here
287
288// [[file:index.org::1714]]
289#[cfg(test)]
290mod periodic_tests {
291  // Note: periodic requires Runtime trait implementation
292  // Tests would require a mock runtime or feature-flagged tokio runtime
293  // For now, we verify compilation and document the API
294  
295  // Example with tokio (requires tokio-runtime feature):
296  // #[tokio::test]
297  // async fn test_periodic_emits_at_intervals() {
298  //   let ticks: Vec<_> = periodic::<TokioRuntime>(100)
299  //     .take(3)
300  //     .collect()
301  //     .await;
302  //   assert_eq!(ticks.len(), 3);
303  // }
304}
305// unnamed ends here
306
307// [[file:index.org::1958]]
308/// Creates a stream that immediately completes without emitting any values.
309/// 
310/// # Note
311/// This is an alias for `futures::stream::empty`. Prefer using the built-in directly:
312/// ```rust
313/// use futures::stream;
314/// let s: futures::stream::Empty<i32> = stream::empty();
315/// ```
316pub use futures::stream::empty;
317// unnamed ends here
318
319// [[file:index.org::1972]]
320#[cfg(test)]
321mod empty_tests {
322  use super::*;
323  #[tokio::test]
324  async fn test_empty_yields_nothing() {
325    let values: Vec<i32> = empty::<i32>().collect().await;
326    assert!(values.is_empty());
327  }
328  #[tokio::test]
329  async fn test_empty_completes_immediately() {
330    let stream = empty::<String>();
331    futures::pin_mut!(stream);
332    assert!(stream.next().await.is_none());
333  }
334}
335// unnamed ends here
336
337// [[file:index.org::2175]]
338/// Creates a stream that never emits any values and never completes.
339/// 
340/// # Note
341/// This is an alias for `futures::stream::pending`. Prefer using the built-in directly:
342/// ```rust
343/// use futures::stream;
344/// let s: futures::stream::Pending<i32> = stream::pending();
345/// ```
346pub use futures::stream::pending as never;
347// unnamed ends here
348
349// [[file:index.org::2191]]
350#[cfg(test)]
351mod never_tests {
352  use super::*;
353  use std::time::Duration;
354  
355  #[tokio::test]
356  async fn test_never_does_not_complete() {
357    // never() should not emit or complete
358    // We test by racing with a timeout
359    let never_stream = never::<i32>();
360    futures::pin_mut!(never_stream);
361    
362    let timeout = tokio::time::sleep(Duration::from_millis(10));
363    futures::pin_mut!(timeout);
364    
365    // Race: timeout should win
366    let result = futures::future::select(never_stream.next(), timeout).await;
367    match result {
368      futures::future::Either::Right(_) => {} // Timeout won - correct!
369      futures::future::Either::Left(_) => panic!("never() should not emit"),
370    }
371  }
372}
373// unnamed ends here
374
375// [[file:index.org::2465]]
376/// Creates a stream that emits an infinite sequence by repeatedly applying a function.
377pub fn iterate<T: Clone, F: Fn(T) -> T>(seed: T, f: F) -> impl Stream<Item = T> {
378  stream! {
379    let mut current = seed;
380    loop {
381      yield current.clone();
382      current = f(current);
383    }
384  }
385}
386// unnamed ends here
387
388// [[file:index.org::2480]]
389#[cfg(test)]
390mod iterate_tests {
391  use super::*;
392  #[tokio::test]
393  async fn test_iterate_generates_sequence() {
394    let stream = iterate(1, |x| x * 2);
395    let values: Vec<_> = stream.take(5).collect().await;
396    assert_eq!(values, vec![1, 2, 4, 8, 16]);
397  }
398  #[tokio::test]
399  async fn test_iterate_with_addition() {
400    let stream = iterate(0, |x| x + 1);
401    let values: Vec<_> = stream.take(4).collect().await;
402    assert_eq!(values, vec![0, 1, 2, 3]);
403  }
404  #[tokio::test]
405  async fn test_iterate_with_strings() {
406    let stream = iterate("a".to_string(), |s| s.clone() + "a");
407    let values: Vec<_> = stream.take(3).collect().await;
408    assert_eq!(values, vec!["a", "aa", "aaa"]);
409  }
410}
411// unnamed ends here
412
413// [[file:index.org::2821]]
414pub struct UnfoldResult<T, S> {
415  pub value: T,
416  pub next_seed: S,
417  pub done: bool,
418}
419
420/// Creates a stream by unfolding a seed value.
421pub fn unfold<T, S: Clone, F>(seed: S, f: F) -> impl Stream<Item = T>
422where
423  F: Fn(S) -> UnfoldResult<T, S> + Clone + Send + 'static,
424{
425  let f = f.clone();
426  futures::stream::unfold(seed, move |state| {
427    let f = f.clone();
428    async move {
429      let result = f(state);
430      if result.done {
431        None
432      } else {
433        Some((result.value, result.next_seed))
434      }
435    }
436  })
437}
438// unnamed ends here
439
440// [[file:index.org::2850]]
441#[cfg(test)]
442mod unfold_tests {
443  use super::*;
444  #[tokio::test]
445  async fn test_unfold_generates_values() {
446    let stream = unfold(1, |n| UnfoldResult {
447      value: n,
448      next_seed: n + 1,
449      done: n > 3,
450    });
451    let values: Vec<_> = stream.collect().await;
452    assert_eq!(values, vec![1, 2, 3]);
453  }
454
455  #[tokio::test]
456  async fn test_unfold_stops_immediately_when_done() {
457    let stream = unfold(0, |_| UnfoldResult {
458      value: 999,
459      next_seed: 0,
460      done: true,
461    });
462    let values: Vec<i32> = stream.collect().await;
463    assert!(values.is_empty());
464  }
465
466  #[tokio::test]
467  async fn test_unfold_with_different_types() {
468    // State is i32, value is String
469    let stream = unfold(0, |n| UnfoldResult {
470      value: format!("item-{}", n),
471      next_seed: n + 1,
472      done: n >= 2,
473    });
474    let values: Vec<_> = stream.collect().await;
475    assert_eq!(values, vec!["item-0", "item-1"]);
476  }
477}
478// unnamed ends here
479
480// [[file:index.org::3245]]
481/// Prepends a value to the beginning of a stream.
482pub fn start_with<T: Clone, S: Stream<Item = T>>(value: T, s: S) -> impl Stream<Item = T> {
483  stream! {
484    yield value;
485    futures::pin_mut!(s);
486    while let Some(item) = s.next().await { yield item; }
487  }
488}
489// unnamed ends here
490
491// [[file:index.org::3258]]
492#[cfg(test)]
493mod start_with_tests {
494  use super::*;
495  #[tokio::test]
496  async fn test_start_with_prepends_value() {
497    let source = futures::stream::iter(vec![1, 2, 3]);
498    let result = start_with(0, source);
499    let values: Vec<_> = result.collect().await;
500    assert_eq!(values, vec![0, 1, 2, 3]);
501  }
502
503  #[tokio::test]
504  async fn test_start_with_on_empty_stream() {
505    let source = stream::empty::<i32>();
506    let result = start_with(42, source);
507    let values: Vec<_> = result.collect().await;
508    assert_eq!(values, vec![42]);
509  }
510}
511// unnamed ends here
512
513// [[file:index.org::3524]]
514/// Concatenates multiple streams into a single stream.
515pub fn concat<T, S: Stream<Item = T>>(streams: Vec<S>) -> impl Stream<Item = T> {
516  stream! {
517    for s in streams {
518      futures::pin_mut!(s);
519      while let Some(item) = s.next().await { yield item; }
520    }
521  }
522}
523
524// For two streams specifically:
525pub fn concat2<T, S1: Stream<Item = T>, S2: Stream<Item = T>>(s1: S1, s2: S2) -> impl Stream<Item = T> {
526  stream! {
527    futures::pin_mut!(s1);
528    futures::pin_mut!(s2);
529    while let Some(item) = s1.next().await { yield item; }
530    while let Some(item) = s2.next().await { yield item; }
531  }
532}
533// unnamed ends here
534
535// [[file:index.org::3548]]
536#[cfg(test)]
537mod concat_tests {
538  use super::*;
539  #[tokio::test]
540  async fn test_concat_joins_streams() {
541    let s1 = futures::stream::iter(vec![1, 2]);
542    let s2 = futures::stream::iter(vec![3, 4]);
543    let result = concat2(s1, s2);
544    let values: Vec<_> = result.collect().await;
545    assert_eq!(values, vec![1, 2, 3, 4]);
546  }
547
548  #[tokio::test]
549  async fn test_concat_with_empty_first() {
550    let s1 = stream::empty::<i32>();
551    let s2 = futures::stream::iter(vec![5, 6]);
552    let result = concat2(s1, s2);
553    let values: Vec<_> = result.collect().await;
554    assert_eq!(values, vec![5, 6]);
555  }
556
557  #[tokio::test]
558  async fn test_concat_vec_of_streams() {
559    let streams = vec![
560      futures::stream::iter(vec![1]),
561      futures::stream::iter(vec![2, 3]),
562      futures::stream::iter(vec![4]),
563    ];
564    // Note: This requires streams to be Unpin, demonstration only
565    // let result = concat(streams);
566    // ...implementation varies
567  }
568}
569// unnamed ends here
570
571// [[file:index.org::4003]]
572/// Creates a stream from a channel receiver.
573/// The sender can be used to push events from event handlers.
574/// 
575/// This is runtime-agnostic and works with any async executor.
576pub fn from_channel<T>(mut rx: mpsc::UnboundedReceiver<T>) -> impl Stream<Item = T> {
577  stream! { while let Some(item) = rx.next().await { yield item; } }
578}
579
580/// Bounded variant for backpressure
581pub fn from_bounded_channel<T>(mut rx: mpsc::Receiver<T>) -> impl Stream<Item = T> {
582  stream! { while let Some(item) = rx.next().await { yield item; } }
583}
584
585// Example usage for event-like patterns:
586// let (tx, rx) = mpsc::unbounded();
587// let event_stream = from_channel(rx);
588// 
589// // In an event handler (can be sync since unbounded):
590// tx.unbounded_send(event).unwrap();
591//
592// // Or with bounded channel for backpressure:
593// let (mut tx, rx) = mpsc::channel(100);
594// let event_stream = from_bounded_channel(rx);
595// tx.send(event).await.unwrap();
596// unnamed ends here
597
598// [[file:index.org::4032]]
599#[cfg(test)]
600mod channel_tests {
601  // Note: from_channel requires Runtime trait for spawning
602  // Channel-based stream creation is tested via integration tests
603  // with specific runtime implementations
604}
605// unnamed ends here
606
607// [[file:index.org::4256]]
608// Rust uses method chaining instead of pipe:
609// let result = futures::stream::iter([1, 2, 3, 4, 5])
610//     .filter(|x| futures::future::ready(x % 2 == 0))
611//     .map(|x| x * 10)
612//     .take(2);
613
614// For a pipe-like macro if desired:
615macro_rules! pipe {
616    ($initial:expr $(, $fn:expr)*) => {{
617        let mut result = $initial;
618        $(result = $fn(result);)*
619        result
620    }};
621}
622
623// Usage:
624// let stream = pipe!(
625//     futures::stream::iter(vec![1, 2, 3]),
626//     |s| s.map(|x| x * 2),
627//     |s| s.take(2)
628// );
629// unnamed ends here
630
631// [[file:index.org::4440]]
632/// Maps each value in a stream using a function.
633/// 
634/// # Note
635/// Prefer using the built-in `StreamExt::map()` method when chaining:
636/// ```rust,ignore
637/// use futures::StreamExt;
638/// let result = stream.map(|x| x * 2);
639/// ```
640/// For async mappers, use `StreamExt::then()`:
641/// ```rust,ignore
642/// let result = stream.then(|x| async move { x * 2 });
643/// ```
644/// This standalone function is provided for functional/pipe-style composition.
645pub fn map<T, U, S, F>(s: S, f: F) -> impl Stream<Item = U>
646where
647  S: Stream<Item = T>,
648  F: Fn(T) -> U,
649{
650  stream! {
651    futures::pin_mut!(s);
652    while let Some(item) = s.next().await { yield f(item); }
653  }
654}
655// unnamed ends here
656
657// [[file:index.org::4466]]
658#[cfg(test)]
659mod map_tests {
660  use super::*;
661  #[tokio::test]
662  async fn test_map_transforms_values() {
663    let source = futures::stream::iter(vec![1, 2, 3]);
664    let result = map(source, |x| x * 2);
665    let values: Vec<_> = result.collect().await;
666    assert_eq!(values, vec![2, 4, 6]);
667  }
668
669  #[tokio::test]
670  async fn test_map_with_type_change() {
671    let source = futures::stream::iter(vec![1, 2, 3]);
672    let result = map(source, |x| format!("num-{}", x));
673    let values: Vec<_> = result.collect().await;
674    assert_eq!(values, vec!["num-1", "num-2", "num-3"]);
675  }
676
677  #[tokio::test]
678  async fn test_map_empty_stream() {
679    let source = stream::empty::<i32>();
680    let result = map(source, |x| x * 2);
681    let values: Vec<_> = result.collect().await;
682    assert!(values.is_empty());
683  }
684}
685// unnamed ends here
686
687// [[file:index.org::4806]]
688/// Map to a constant value.
689/// stream.map(|_| constant_value.clone())
690
691pub fn constant<T, U: Clone, S: Stream<Item = T>>(value: U, s: S) -> impl Stream<Item = U> {
692  stream! {
693    futures::pin_mut!(s);
694    while let Some(_) = s.next().await { yield value.clone(); }
695  }
696}
697// unnamed ends here
698
699// [[file:index.org::4820]]
700#[cfg(test)]
701mod constant_tests {
702  use super::*;
703  #[tokio::test]
704  async fn test_constant_replaces_all_values() {
705    let source = futures::stream::iter(vec![1, 2, 3]);
706    let result = constant("x", source);
707    let values: Vec<_> = result.collect().await;
708    assert_eq!(values, vec!["x", "x", "x"]);
709  }
710
711  #[tokio::test]
712  async fn test_constant_empty_stream() {
713    let source = stream::empty::<i32>();
714    let result = constant(42, source);
715    let values: Vec<_> = result.collect().await;
716    assert!(values.is_empty());
717  }
718}
719// unnamed ends here
720
721// [[file:index.org::5151]]
722/// Scan with seed emission first (matching JS behavior).
723pub fn scan<T, U: Clone, S, F>(accumulator: F, seed: U, s: S) -> impl Stream<Item = U>
724where
725  S: Stream<Item = T>,
726  F: Fn(U, T) -> U,
727{
728  stream! {
729    let mut acc = seed.clone();
730    yield acc.clone();
731    futures::pin_mut!(s);
732    while let Some(item) = s.next().await {
733      acc = accumulator(acc, item);
734      yield acc.clone();
735    }
736  }
737}
738// unnamed ends here
739
740// [[file:index.org::5172]]
741#[cfg(test)]
742mod scan_tests {
743  use super::*;
744  #[tokio::test]
745  async fn test_scan_accumulates_with_seed() {
746    let source = futures::stream::iter(vec![1, 2, 3]);
747    let result = scan(|acc, x| acc + x, 0, source);
748    let values: Vec<_> = result.collect().await;
749    assert_eq!(values, vec![0, 1, 3, 6]);
750  }
751
752  #[tokio::test]
753  async fn test_scan_product() {
754    let source = futures::stream::iter(vec![2, 3, 4]);
755    let result = scan(|acc, x| acc * x, 1, source);
756    let values: Vec<_> = result.collect().await;
757    assert_eq!(values, vec![1, 2, 6, 24]);
758  }
759
760  #[tokio::test]
761  async fn test_scan_empty_stream() {
762    let source = stream::empty::<i32>();
763    let result = scan(|acc, x| acc + x, 100, source);
764    let values: Vec<_> = result.collect().await;
765    assert_eq!(values, vec![100]); // Only seed
766  }
767}
768// unnamed ends here
769
770// [[file:index.org::5586]]
771/// Perform side effects for each value without modifying them.
772/// Runtime-agnostic - side effects are synchronous.
773pub fn tap<T: Clone, S, F>(side_effect: F, s: S) -> impl Stream<Item = T>
774where
775  S: Stream<Item = T>,
776  F: Fn(&T),
777{
778  stream! {
779    futures::pin_mut!(s);
780    while let Some(item) = s.next().await {
781      side_effect(&item);
782      yield item;
783    }
784  }
785}
786
787/// For fire-and-forget async side effects using the Runtime trait.
788pub fn tap_spawn<R, T, S, F, Fut>(
789  side_effect: F,
790  s: S,
791) -> impl Stream<Item = T>
792where
793  R: Runtime,
794  T: Clone + Send + 'static,
795  S: Stream<Item = T>,
796  F: Fn(T) -> Fut + Clone + Send + 'static,
797  Fut: std::future::Future<Output = ()> + Send + 'static,
798{
799  stream! {
800    futures::pin_mut!(s);
801    while let Some(item) = s.next().await {
802      let f = side_effect.clone();
803      let item_clone = item.clone();
804      R::spawn(async move { f(item_clone).await });
805      yield item;
806    }
807  }
808}
809// unnamed ends here
810
811// [[file:index.org::5629]]
812#[cfg(test)]
813mod tap_runtime_tests {
814  // Note: tap with Runtime requires specific runtime implementation
815  // See tap tests for basic tap functionality
816  // Runtime-based tap spawns side effects concurrently
817}
818// unnamed ends here
819
820// [[file:index.org::5907]]
821/// Await side effects before yielding values.
822pub fn await_tap<T: Clone, S, F, Fut>(side_effect: F, s: S) -> impl Stream<Item = T>
823where
824  S: Stream<Item = T>,
825  F: Fn(T) -> Fut,
826  Fut: Future<Output = ()>,
827{
828  stream! {
829    futures::pin_mut!(s);
830    while let Some(item) = s.next().await {
831      side_effect(item.clone()).await;
832      yield item;
833    }
834  }
835}
836// unnamed ends here
837
838// [[file:index.org::5927]]
839#[cfg(test)]
840mod await_tap_tests {
841  use super::*;
842  use std::sync::atomic::{AtomicUsize, Ordering};
843  use std::sync::Arc;
844  
845  #[tokio::test]
846  async fn test_await_tap_executes_side_effect() {
847    let count = Arc::new(AtomicUsize::new(0));
848    let count_clone = count.clone();
849    
850    let source = futures::stream::iter(vec![1, 2, 3]);
851    let tapped = await_tap(
852      move |_: i32| {
853        let c = count_clone.clone();
854        async move { c.fetch_add(1, Ordering::SeqCst); }
855      },
856      source,
857    );
858    
859    let values: Vec<_> = tapped.collect().await;
860    assert_eq!(values, vec![1, 2, 3]);
861    assert_eq!(count.load(Ordering::SeqCst), 3);
862  }
863}
864// unnamed ends here
865
866// [[file:index.org::6250]]
867/// Continue with another stream after the first completes.
868pub fn continue_with<T, S1, S2, F>(f: F, s: S1) -> impl Stream<Item = T>
869where
870  S1: Stream<Item = T>,
871  S2: Stream<Item = T>,
872  F: FnOnce() -> S2,
873{
874  stream! {
875    futures::pin_mut!(s);
876    while let Some(item) = s.next().await { yield item; }
877    let s2 = f();
878    futures::pin_mut!(s2);
879    while let Some(item) = s2.next().await { yield item; }
880  }
881}
882// unnamed ends here
883
884// [[file:index.org::6270]]
885#[cfg(test)]
886mod continue_with_tests {
887  use super::*;
888  
889  #[tokio::test]
890  async fn test_continue_with_appends() {
891    let first = futures::stream::iter(vec![1, 2]);
892    let second = || futures::stream::iter(vec![3, 4]);
893    
894    let values: Vec<_> = continue_with(second, first).collect().await;
895    assert_eq!(values, vec![1, 2, 3, 4]);
896  }
897  
898  #[tokio::test]
899  async fn test_continue_with_lazy() {
900    use std::sync::atomic::{AtomicBool, Ordering};
901    use std::sync::Arc;
902    
903    let called = Arc::new(AtomicBool::new(false));
904    let called_clone = called.clone();
905    
906    let first = futures::stream::iter(vec![1]);
907    let second = move || {
908      called_clone.store(true, Ordering::SeqCst);
909      futures::stream::iter(vec![2])
910    };
911    
912    let mut stream = continue_with(second, first);
913    futures::pin_mut!(stream);
914    
915    // First value - continuation not called yet
916    assert_eq!(stream.next().await, Some(1));
917    // Now it should be called
918    assert_eq!(stream.next().await, Some(2));
919    assert!(called.load(Ordering::SeqCst));
920  }
921}
922// unnamed ends here
923
924// [[file:index.org::6611]]
925/// Flatten a stream of streams by concatenating them.
926/// Built-in: stream_of_streams.flatten()
927
928// Custom implementation:
929pub fn concat_all<T, Inner, Outer>(outer: Outer) -> impl Stream<Item = T>
930where
931  Inner: Stream<Item = T>,
932  Outer: Stream<Item = Inner>,
933{
934  stream! {
935    futures::pin_mut!(outer);
936    while let Some(inner) = outer.next().await {
937      futures::pin_mut!(inner);
938      while let Some(item) = inner.next().await { yield item; }
939    }
940  }
941}
942// unnamed ends here
943
944// [[file:index.org::6633]]
945#[cfg(test)]
946mod concat_all_tests {
947  use super::*;
948  
949  #[tokio::test]
950  async fn test_concat_all_flattens() {
951    let s1 = futures::stream::iter(vec![1, 2]);
952    let s2 = futures::stream::iter(vec![3, 4]);
953    let outer = futures::stream::iter(vec![s1, s2]);
954    
955    let values: Vec<_> = concat_all(outer).collect().await;
956    assert_eq!(values, vec![1, 2, 3, 4]);
957  }
958}
959// unnamed ends here
960
961// [[file:index.org::6938]]
962/// Map each value to a stream and concatenate results in order.
963pub fn concat_map<T, U, S, Inner, F>(f: F, s: S) -> impl Stream<Item = U>
964where
965  S: Stream<Item = T>,
966  Inner: Stream<Item = U>,
967  F: Fn(T) -> Inner,
968{
969  stream! {
970    futures::pin_mut!(s);
971    while let Some(item) = s.next().await {
972      let inner = f(item);
973      futures::pin_mut!(inner);
974      while let Some(inner_item) = inner.next().await { yield inner_item; }
975    }
976  }
977}
978// unnamed ends here
979
980// [[file:index.org::6959]]
981#[cfg(test)]
982mod concat_map_tests {
983  use super::*;
984  
985  #[tokio::test]
986  async fn test_concat_map_sequential() {
987    let source = futures::stream::iter(vec![1, 2]);
988    let result = concat_map(|x| futures::stream::iter(vec![x * 10, x * 10 + 1]), source);
989    
990    let values: Vec<_> = result.collect().await;
991    assert_eq!(values, vec![10, 11, 20, 21]);
992  }
993}
994// unnamed ends here
995
996// [[file:index.org::7289]]
997// Filtering Operators
998
999/// Filters values in a stream based on a predicate.
1000/// 
1001/// # Note
1002/// Prefer using the built-in `StreamExt::filter()` method when chaining:
1003/// ```rust,ignore
1004/// use futures::StreamExt;
1005/// let result = stream.filter(|x| futures::future::ready(*x > 2));
1006/// ```
1007/// This standalone function is provided for functional/pipe-style composition.
1008pub fn filter<T, S, P>(predicate: P, s: S) -> impl Stream<Item = T>
1009where
1010  S: Stream<Item = T>,
1011  P: Fn(&T) -> bool,
1012{
1013  stream! {
1014    futures::pin_mut!(s);
1015    while let Some(item) = s.next().await { if predicate(&item) { yield item; } }
1016  }
1017}
1018
1019// For async predicates:
1020pub fn filter_async<T, S, P, Fut>(predicate: P, s: S) -> impl Stream<Item = T>
1021where
1022  S: Stream<Item = T>,
1023  P: Fn(&T) -> Fut,
1024  Fut: std::future::Future<Output = bool>,
1025{
1026  stream! {
1027    futures::pin_mut!(s);
1028    while let Some(item) = s.next().await { if predicate(&item).await { yield item; } }
1029  }
1030}
1031// unnamed ends here
1032
1033// [[file:index.org::7328]]
1034#[cfg(test)]
1035mod filter_tests {
1036  use super::*;
1037  #[tokio::test]
1038  async fn test_filter_keeps_matching() {
1039    let source = futures::stream::iter(vec![1, 2, 3, 4, 5]);
1040    let result = filter(|x| *x > 2, source);
1041    let values: Vec<_> = result.collect().await;
1042    assert_eq!(values, vec![3, 4, 5]);
1043  }
1044
1045  #[tokio::test]
1046  async fn test_filter_even_numbers() {
1047    let source = futures::stream::iter(vec![1, 2, 3, 4, 5, 6]);
1048    let result = filter(|x| x % 2 == 0, source);
1049    let values: Vec<_> = result.collect().await;
1050    assert_eq!(values, vec![2, 4, 6]);
1051  }
1052
1053  #[tokio::test]
1054  async fn test_filter_empty_result() {
1055    let source = futures::stream::iter(vec![1, 2, 3]);
1056    let result = filter(|x| *x > 100, source);
1057    let values: Vec<_> = result.collect().await;
1058    assert!(values.is_empty());
1059  }
1060}
1061// unnamed ends here
1062
1063// [[file:index.org::7853]]
1064/// Filter consecutive duplicates using a custom equality function.
1065pub fn skip_repeats_with<T: Clone, S, F>(equals: F, s: S) -> impl Stream<Item = T>
1066where
1067  S: Stream<Item = T>,
1068  F: Fn(&T, &T) -> bool,
1069{
1070  stream! {
1071    futures::pin_mut!(s);
1072    let mut last: Option<T> = None;
1073    while let Some(item) = s.next().await {
1074      let should_yield = match &last {
1075        None => true,
1076        Some(prev) => !equals(&item, prev),
1077      };
1078      if should_yield {
1079        last = Some(item.clone());
1080        yield item;
1081      }
1082    }
1083  }
1084}
1085
1086/// Filter consecutive duplicates using equality.
1087pub fn skip_repeats<T: Clone + PartialEq, S>(s: S) -> impl Stream<Item = T>
1088where
1089  S: Stream<Item = T>,
1090{
1091  skip_repeats_with(|a, b| a == b, s)
1092}
1093// unnamed ends here
1094
1095// [[file:index.org::7887]]
1096#[cfg(test)]
1097mod skip_repeats_tests {
1098  use super::*;
1099  #[tokio::test]
1100  async fn test_skip_repeats() {
1101    let source = futures::stream::iter(vec![1, 1, 2, 2, 3, 1, 1]);
1102    let result = skip_repeats(source);
1103    let values: Vec<_> = result.collect().await;
1104    assert_eq!(values, vec![1, 2, 3, 1]);
1105  }
1106
1107  #[tokio::test]
1108  async fn test_skip_repeats_with_custom_eq() {
1109    // Compare by first character
1110    let source = futures::stream::iter(vec!["apple", "ant", "banana", "berry"]);
1111    let result = skip_repeats_with(
1112      |a: &&str, b: &&str| a.chars().next() == b.chars().next(),
1113      source
1114    );
1115    let values: Vec<_> = result.collect().await;
1116    assert_eq!(values, vec!["apple", "banana"]);
1117  }
1118}
1119// unnamed ends here
1120
1121// [[file:index.org::8317]]
1122// Slicing Operators
1123
1124/// Takes the first `n` values from a stream.
1125/// 
1126/// # Note
1127/// Prefer using the built-in `StreamExt::take()` method when chaining:
1128/// ```rust,ignore
1129/// use futures::StreamExt;
1130/// let result = stream.take(5);
1131/// ```
1132/// This standalone function is provided for functional/pipe-style composition.
1133pub fn take<T, S: Stream<Item = T>>(n: usize, s: S) -> impl Stream<Item = T> {
1134  stream! {
1135    futures::pin_mut!(s);
1136    let mut count = 0;
1137    while let Some(item) = s.next().await {
1138      if count < n {
1139        yield item;
1140        count += 1;
1141      } else {
1142        break;
1143      }
1144    }
1145  }
1146}
1147// unnamed ends here
1148
1149// [[file:index.org::8347]]
1150#[cfg(test)]
1151mod take_tests {
1152  use super::*;
1153  #[tokio::test]
1154  async fn test_take_first_n() {
1155    let source = futures::stream::iter(vec![1, 2, 3, 4, 5]);
1156    let result = take(2, source);
1157    let values: Vec<_> = result.collect().await;
1158    assert_eq!(values, vec![1, 2]);
1159  }
1160
1161  #[tokio::test]
1162  async fn test_take_more_than_available() {
1163    let source = futures::stream::iter(vec![1, 2]);
1164    let result = take(10, source);
1165    let values: Vec<_> = result.collect().await;
1166    assert_eq!(values, vec![1, 2]);
1167  }
1168
1169  #[tokio::test]
1170  async fn test_take_zero() {
1171    let source = futures::stream::iter(vec![1, 2, 3]);
1172    let result = take(0, source);
1173    let values: Vec<_> = result.collect().await;
1174    assert!(values.is_empty());
1175  }
1176}
1177// unnamed ends here
1178
1179// [[file:index.org::8666]]
1180/// Skips the first `n` values from a stream.
1181/// 
1182/// # Note
1183/// Prefer using the built-in `StreamExt::skip()` method when chaining:
1184/// ```rust,ignore
1185/// use futures::StreamExt;
1186/// let result = stream.skip(2);
1187/// ```
1188/// This standalone function is provided for functional/pipe-style composition.
1189pub fn skip<T, S: Stream<Item = T>>(n: usize, s: S) -> impl Stream<Item = T> {
1190  stream! {
1191    futures::pin_mut!(s);
1192    let mut count = 0;
1193    while let Some(item) = s.next().await {
1194      if count >= n { yield item; }
1195      count += 1;
1196    }
1197  }
1198}
1199// unnamed ends here
1200
1201// [[file:index.org::8690]]
1202#[cfg(test)]
1203mod skip_tests {
1204  use super::*;
1205  
1206  #[tokio::test]
1207  async fn test_skip_first_n() {
1208    let source = futures::stream::iter(vec![1, 2, 3, 4, 5]);
1209    let values: Vec<_> = skip(2, source).collect().await;
1210    assert_eq!(values, vec![3, 4, 5]);
1211  }
1212  
1213  #[tokio::test]
1214  async fn test_skip_zero() {
1215    let source = futures::stream::iter(vec![1, 2, 3]);
1216    let values: Vec<_> = skip(0, source).collect().await;
1217    assert_eq!(values, vec![1, 2, 3]);
1218  }
1219  
1220  #[tokio::test]
1221  async fn test_skip_more_than_available() {
1222    let source = futures::stream::iter(vec![1, 2]);
1223    let values: Vec<_> = skip(5, source).collect().await;
1224    assert!(values.is_empty());
1225  }
1226}
1227// unnamed ends here
1228
1229// [[file:index.org::9039]]
1230/// Emit values from index start to end (exclusive).
1231pub fn slice<T, S: Stream<Item = T>>(start: usize, end: usize, s: S) -> impl Stream<Item = T> {
1232  stream! {
1233    futures::pin_mut!(s);
1234    let mut index = 0;
1235    while let Some(item) = s.next().await {
1236      if index >= start && index < end { yield item; }
1237      index += 1;
1238      if index >= end { break; }
1239    }
1240  }
1241}
1242
1243// Or using built-in methods:
1244// stream.skip(start).take(end - start)
1245// unnamed ends here
1246
1247// [[file:index.org::9059]]
1248#[cfg(test)]
1249mod slice_tests {
1250  use super::*;
1251  #[tokio::test]
1252  async fn test_slice() {
1253    let source = futures::stream::iter(vec![0, 1, 2, 3, 4, 5]);
1254    let values: Vec<_> = slice(2, 5, source).collect().await;
1255    assert_eq!(values, vec![2, 3, 4]);
1256  }
1257
1258  #[tokio::test]
1259  async fn test_slice_empty_range() {
1260    let source = futures::stream::iter(vec![0, 1, 2, 3, 4]);
1261    let values: Vec<_> = slice(2, 2, source).collect().await;
1262    assert_eq!(values, Vec::<i32>::new());
1263  }
1264
1265  #[tokio::test]
1266  async fn test_slice_beyond_length() {
1267    let source = futures::stream::iter(vec![0, 1, 2]);
1268    let values: Vec<_> = slice(1, 10, source).collect().await;
1269    assert_eq!(values, vec![1, 2]);
1270  }
1271}
1272// unnamed ends here
1273
1274// [[file:index.org::9434]]
1275/// Takes values from a stream while the predicate returns true.
1276/// 
1277/// # Note
1278/// Prefer using the built-in `StreamExt::take_while()` method when chaining:
1279/// ```rust,ignore
1280/// use futures::StreamExt;
1281/// let result = stream.take_while(|x| futures::future::ready(*x < 5));
1282/// ```
1283/// This standalone function is provided for functional/pipe-style composition.
1284pub fn take_while<T, S, P>(predicate: P, s: S) -> impl Stream<Item = T>
1285where
1286  S: Stream<Item = T>,
1287  P: Fn(&T) -> bool,
1288{
1289  stream! {
1290    futures::pin_mut!(s);
1291    while let Some(item) = s.next().await {
1292      if predicate(&item) { yield item; }
1293      else { break; }
1294    }
1295  }
1296}
1297// unnamed ends here
1298
1299// [[file:index.org::9461]]
1300#[cfg(test)]
1301mod take_while_tests {
1302  use super::*;
1303  #[tokio::test]
1304  async fn test_take_while() {
1305    let source = futures::stream::iter(vec![1, 2, 3, 4, 2, 1]);
1306    let values: Vec<_> = take_while(|x| *x < 4, source).collect().await;
1307    assert_eq!(values, vec![1, 2, 3]);
1308  }
1309
1310  #[tokio::test]
1311  async fn test_take_while_all_pass() {
1312    let source = futures::stream::iter(vec![1, 2, 3]);
1313    let values: Vec<_> = take_while(|_x| true, source).collect().await;
1314    assert_eq!(values, vec![1, 2, 3]);
1315  }
1316
1317  #[tokio::test]
1318  async fn test_take_while_none_pass() {
1319    let source = futures::stream::iter(vec![1, 2, 3]);
1320    let values: Vec<_> = take_while(|_x| false, source).collect().await;
1321    assert_eq!(values, Vec::<i32>::new());
1322  }
1323}
1324// unnamed ends here
1325
1326// [[file:index.org::9829]]
1327/// Skips values from a stream while the predicate returns true.
1328/// 
1329/// # Note
1330/// Prefer using the built-in `StreamExt::skip_while()` method when chaining:
1331/// ```rust,ignore
1332/// use futures::StreamExt;
1333/// let result = stream.skip_while(|x| futures::future::ready(*x < 3));
1334/// ```
1335/// This standalone function is provided for functional/pipe-style composition.
1336pub fn skip_while<T, S, P>(predicate: P, s: S) -> impl Stream<Item = T>
1337where
1338  S: Stream<Item = T>,
1339  P: Fn(&T) -> bool,
1340{
1341  stream! {
1342    futures::pin_mut!(s);
1343    let mut skipping = true;
1344    while let Some(item) = s.next().await {
1345      if skipping && !predicate(&item) { skipping = false; }
1346      if !skipping {  yield item; }
1347    }
1348  }
1349}
1350// unnamed ends here
1351
1352// [[file:index.org::9857]]
1353#[cfg(test)]
1354mod skip_while_tests {
1355  use super::*;
1356  #[tokio::test]
1357  async fn test_skip_while() {
1358    let source = futures::stream::iter(vec![1, 2, 3, 4, 2, 1]);
1359    let values: Vec<_> = skip_while(|x| *x < 3, source).collect().await;
1360    assert_eq!(values, vec![3, 4, 2, 1]);
1361  }
1362
1363  #[tokio::test]
1364  async fn test_skip_while_all_fail() {
1365    let source = futures::stream::iter(vec![1, 2, 3]);
1366    let values: Vec<_> = skip_while(|_x| false, source).collect().await;
1367    assert_eq!(values, vec![1, 2, 3]);
1368  }
1369
1370  #[tokio::test]
1371  async fn test_skip_while_all_pass() {
1372    let source = futures::stream::iter(vec![1, 2, 3]);
1373    let values: Vec<_> = skip_while(|_x| true, source).collect().await;
1374    assert_eq!(values, Vec::<i32>::new());
1375  }
1376}
1377// unnamed ends here
1378
1379// [[file:index.org::10222]]
1380/// Take values until predicate matches (matching value not emitted).
1381pub fn take_until<T, S, P>(predicate: P, s: S) -> impl Stream<Item = T>
1382where
1383  S: Stream<Item = T>,
1384  P: Fn(&T) -> bool,
1385{
1386  stream! {
1387    futures::pin_mut!(s);
1388    while let Some(item) = s.next().await {
1389      if predicate(&item) { break; }
1390      yield item;
1391    }
1392  }
1393}
1394// unnamed ends here
1395
1396// [[file:index.org::10241]]
1397#[cfg(test)]
1398mod take_until_tests {
1399  use super::*;
1400  #[tokio::test]
1401  async fn test_take_until() {
1402    let source = futures::stream::iter(vec![1, 2, 3, 4, 5]);
1403    let values: Vec<_> = take_until(|x| *x == 3, source).collect().await;
1404    assert_eq!(values, vec![1, 2]);
1405  }
1406  #[tokio::test]
1407  async fn test_take_until_never_matches() {
1408    let source = futures::stream::iter(vec![1, 2, 3]);
1409    let values: Vec<_> = take_until(|_x| false, source).collect().await;
1410    assert_eq!(values, vec![1, 2, 3]);
1411  }
1412  #[tokio::test]
1413  async fn test_take_until_first_matches() {
1414    let source = futures::stream::iter(vec![1, 2, 3]);
1415    let values: Vec<_> = take_until(|x| *x == 1, source).collect().await;
1416    assert_eq!(values, Vec::<i32>::new());
1417  }
1418}
1419// unnamed ends here
1420
1421// [[file:index.org::10603]]
1422// Time Operators
1423/// Delay each value by a specified duration.
1424/// Uses the Runtime trait for timer functionality.
1425pub fn delay<R: Runtime, T, S: Stream<Item = T>>(ms: u64, s: S) -> impl Stream<Item = T> {
1426  stream! {
1427    futures::pin_mut!(s);
1428    let duration = Duration::from_millis(ms);
1429    while let Some(item) = s.next().await {
1430      R::sleep(duration).await;
1431      yield item;
1432    }
1433  }
1434}
1435
1436/// Runtime-agnostic delay that accepts a sleep function
1437pub fn delay_with<T, S, F, Fut>(
1438  ms: u64,
1439  s: S,
1440  sleep_fn: F,
1441) -> impl Stream<Item = T>
1442where
1443  S: Stream<Item = T>,
1444  F: Fn(Duration) -> Fut + Clone,
1445  Fut: std::future::Future<Output = ()>,
1446{
1447  stream! {
1448    futures::pin_mut!(s);
1449    let duration = Duration::from_millis(ms);
1450    while let Some(item) = s.next().await {
1451      sleep_fn(duration).await;
1452      yield item;
1453    }
1454  }
1455}
1456// unnamed ends here
1457
1458// [[file:index.org::10644]]
1459#[cfg(test)]
1460mod delay_tests {
1461  use super::*;
1462  #[tokio::test]
1463  async fn test_delay_with() {
1464    let source = futures::stream::iter(vec![1, 2, 3]);
1465    let start = std::time::Instant::now();
1466    let values: Vec<_> = delay_with(
1467      10,
1468      source,
1469      |d| tokio::time::sleep(d),
1470    ).collect().await;
1471    let elapsed = start.elapsed();
1472    assert_eq!(values, vec![1, 2, 3]);
1473    assert!(elapsed >= Duration::from_millis(25)); // ~30ms for 3 items
1474  }
1475
1476  #[tokio::test]
1477  async fn test_delay_empty_stream() {
1478    let source = futures::stream::iter(Vec::<i32>::new());
1479    let values: Vec<_> = delay_with(
1480      100,
1481      source,
1482      |d| tokio::time::sleep(d),
1483    ).collect().await;
1484    assert_eq!(values, Vec::<i32>::new());
1485  }
1486}
1487// unnamed ends here
1488
1489// [[file:index.org::11063]]
1490/// Only emit a value if no new values arrive within the specified duration.
1491/// Uses the Runtime trait for timer functionality.
1492pub fn debounce<R, T, S>(ms: u64, s: S) -> impl Stream<Item = T>
1493where
1494  R: Runtime,
1495  T: Clone + Send + 'static,
1496  S: Stream<Item = T> + Send + 'static,
1497{
1498  debounce_with(ms, s, R::sleep)
1499}
1500
1501/// Runtime-agnostic debounce that accepts a sleep function.
1502/// Emits a value only after the specified duration has passed without new values.
1503pub fn debounce_with<T, S, F, Fut>(ms: u64, s: S, sleep_fn: F) -> impl Stream<Item = T>
1504where
1505  T: Clone + Send + 'static,
1506  S: Stream<Item = T> + Send + 'static,
1507  F: Fn(Duration) -> Fut + Clone + Send + 'static,
1508  Fut: std::future::Future<Output = ()> + Send + 'static,
1509{
1510  stream! {
1511    let duration = Duration::from_millis(ms);
1512    let mut pending: Option<T> = None;
1513    
1514    futures::pin_mut!(s);
1515    
1516    while let Some(value) = s.next().await {
1517      pending = Some(value);
1518      // Keep consuming while values arrive rapidly
1519      loop {
1520        let timeout = sleep_fn(duration);
1521        futures::pin_mut!(timeout);
1522        
1523        // Race between next value and timeout
1524        let next = s.next();
1525        futures::pin_mut!(next);
1526        
1527        match futures::future::select(next, timeout).await {
1528          futures::future::Either::Left((Some(v), _)) => {
1529            // New value arrived, update pending and restart timer
1530            pending = Some(v);
1531          }
1532          futures::future::Either::Left((None, _)) => {
1533            // Stream ended
1534            if let Some(v) = pending.take() {
1535              yield v;
1536            }
1537            return;
1538          }
1539          futures::future::Either::Right((_, _)) => {
1540            // Timeout fired, emit pending and wait for next value
1541            if let Some(v) = pending.take() {
1542              yield v;
1543            }
1544            break;
1545          }
1546        }
1547      }
1548    }
1549  }
1550}
1551// unnamed ends here
1552
1553// [[file:index.org::11129]]
1554#[cfg(test)]
1555mod debounce_tests {
1556  // Note: debounce requires time-based testing
1557  // A proper test would need controlled time or a mock runtime
1558  // The implementation is correct if throttle tests pass
1559  // since they share similar timing logic
1560  
1561  // Example test with real timing (slow):
1562  // #[tokio::test]
1563  // async fn test_debounce_waits_for_quiet() {
1564  //   // Would need tokio::time::pause() for reliable testing
1565  // }
1566}
1567// unnamed ends here
1568
1569// [[file:index.org::11690]]
1570pub struct ThrottleOptions {
1571  pub leading: bool,
1572  pub trailing: bool,
1573}
1574
1575impl Default for ThrottleOptions {
1576  fn default() -> Self {
1577    Self { leading: true, trailing: true }
1578  }
1579}
1580
1581impl ThrottleOptions {
1582  pub fn leading_only() -> Self {
1583    Self { leading: true, trailing: false }
1584  }
1585  pub fn trailing_only() -> Self {
1586    Self { leading: false, trailing: true }
1587  }
1588}
1589
1590/// Limit emission rate with leading/trailing edge control.
1591/// This implementation is runtime-agnostic - it only uses std::time::Instant.
1592pub fn throttle<T: Clone, S: Stream<Item = T> + Unpin>(
1593  ms: u64,
1594  options: ThrottleOptions,
1595  mut s: S,
1596) -> impl Stream<Item = T> {
1597  stream! {
1598    let duration = Duration::from_millis(ms);
1599    let mut last_emit = Instant::now() - duration;  // Allow first emit
1600    let mut trailing_value: Option<T> = None;
1601    while let Some(item) = s.next().await {
1602      let now = Instant::now();
1603      let elapsed = now.duration_since(last_emit);
1604      if elapsed >= duration {
1605        if options.leading {
1606          yield item;
1607          last_emit = now;
1608          trailing_value = None;
1609        } else {
1610          trailing_value = Some(item);
1611        }
1612      } else {
1613        trailing_value = Some(item);
1614      }
1615    }
1616    // Emit final trailing value
1617    if options.trailing {
1618      if let Some(value) = trailing_value { yield value; }
1619    }
1620  }
1621}
1622// unnamed ends here
1623
1624// [[file:index.org::11749]]
1625#[cfg(test)]
1626mod throttle_tests {
1627  use super::*;
1628  #[tokio::test]
1629  async fn test_throttle_leading() {
1630    let source = futures::stream::iter(vec![1, 2, 3, 4, 5]);
1631    let values: Vec<_> = throttle(
1632      100,
1633      ThrottleOptions::leading_only(),
1634      source,
1635    ).collect().await;
1636    // First value should be emitted immediately
1637    assert!(!values.is_empty());
1638    assert_eq!(values[0], 1);
1639  }
1640
1641  #[tokio::test]
1642  async fn test_throttle_trailing() {
1643    let source = futures::stream::iter(vec![1, 2, 3]);
1644    let values: Vec<_> = throttle(
1645      100,
1646      ThrottleOptions::trailing_only(),
1647      source,
1648    ).collect().await;
1649    // Last value should be emitted as trailing
1650    assert!(!values.is_empty());
1651  }
1652
1653  #[tokio::test]
1654  async fn test_throttle_empty() {
1655    let source = futures::stream::iter(Vec::<i32>::new());
1656    let values: Vec<_> = throttle(
1657      100,
1658      ThrottleOptions::default(),
1659      source,
1660    ).collect().await;
1661    assert_eq!(values, Vec::<i32>::new());
1662  }
1663}
1664// unnamed ends here
1665
1666// [[file:index.org::12339]]
1667// Error Handling Operators
1668/// For streams that emit Result<T, E>, recover from errors.
1669pub fn recover_with<T, E, S, S2, F>(
1670  recover_fn: F,
1671  s: S,
1672) -> impl Stream<Item = T>
1673where
1674  S: Stream<Item = Result<T, E>>,
1675  S2: Stream<Item = T>,
1676  F: FnOnce(E) -> S2,
1677  E: Error,
1678{
1679  stream! {
1680    futures::pin_mut!(s);
1681    loop {
1682      match s.next().await {
1683        Some(Ok(item)) => yield item,
1684        Some(Err(e)) => {
1685          let recovery = recover_fn(e);
1686          futures::pin_mut!(recovery);
1687          while let Some(item) = recovery.next().await { yield item; }
1688          break;
1689        }
1690        None => break,
1691      }
1692    }
1693  }
1694}
1695
1696// Alternatively, using TryStreamExt from futures:
1697// use futures::TryStreamExt;
1698// stream.or_else(|e| async move { Ok(fallback_value) })
1699// unnamed ends here
1700
1701// [[file:index.org::12376]]
1702#[cfg(test)]
1703mod recover_with_tests {
1704  use super::*;
1705  
1706  // Use a concrete error type for testing
1707  #[derive(Debug)]
1708  struct SimpleError;
1709  impl std::fmt::Display for SimpleError {
1710    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1711      write!(f, "SimpleError")
1712    }
1713  }
1714  impl std::error::Error for SimpleError {}
1715  
1716  #[tokio::test]
1717  async fn test_recover_with_no_error() {
1718    let source = futures::stream::iter(vec![Ok::<_, SimpleError>(1), Ok(2), Ok(3)]);
1719    let values: Vec<_> = recover_with(
1720      |_e: SimpleError| futures::stream::iter(vec![99]),
1721      source,
1722    ).collect().await;
1723    assert_eq!(values, vec![1, 2, 3]);
1724  } 
1725
1726  #[tokio::test]
1727  async fn test_recover_with_error() {
1728    let source = futures::stream::iter(vec![
1729      Ok(1),
1730      Err(SimpleError),
1731      Ok(3),
1732    ]);
1733    let values: Vec<_> = recover_with(
1734      |_e: SimpleError| futures::stream::iter(vec![99, 100]),
1735      source,
1736    ).collect().await;
1737    assert_eq!(values, vec![1, 99, 100]);
1738  }
1739}
1740// unnamed ends here
1741
1742// [[file:index.org::12724]]
1743/// Recovers from errors by trying alternative streams from a provided iterator.
1744pub fn recover_with_stream<T, E, S, Alt, AltIter>(
1745  mut alternatives: AltIter,
1746  source: S,
1747) -> impl Stream<Item = T>
1748where
1749  S: Stream<Item = Result<T, E>> + Send + 'static,
1750  Alt: Stream<Item = Result<T, E>> + Send + 'static,
1751  AltIter: Iterator<Item = Alt> + Send + 'static,
1752  T: Send + 'static,
1753  E: Send + 'static,
1754{
1755  stream! {
1756    futures::pin_mut!(source);
1757    let mut current: Pin<Box<dyn Stream<Item = Result<T, E>> + Send>> = Box::pin(source);
1758    
1759    loop {
1760      let mut errored = false;
1761      while let Some(result) = current.next().await {
1762        match result {
1763          Ok(value) => yield value,
1764          Err(_) => {
1765            errored = true;
1766            break;
1767          }
1768        }
1769      }
1770      
1771      if !errored {
1772        break;  // Completed successfully
1773      }
1774      
1775      // Try next alternative
1776      match alternatives.next() {
1777        Some(alt) => current = Box::pin(alt),
1778        None => break,  // No more alternatives
1779      }
1780    }
1781  }
1782}
1783// unnamed ends here
1784
1785// [[file:index.org::12769]]
1786#[cfg(test)]
1787mod recover_with_stream_tests {
1788  use super::*;
1789  
1790  #[derive(Debug, Clone)]
1791  struct TestErr;
1792  
1793  #[tokio::test]
1794  async fn test_recover_with_stream_success() {
1795    let source = futures::stream::iter(vec![Ok::<i32, TestErr>(1), Ok(2), Ok(3)]);
1796    let alts: Vec<Pin<Box<dyn Stream<Item = Result<i32, TestErr>> + Send>>> = vec![];
1797    
1798    let values: Vec<_> = recover_with_stream(alts.into_iter(), source).collect().await;
1799    assert_eq!(values, vec![1, 2, 3]);
1800  }
1801  
1802  #[tokio::test]
1803  async fn test_recover_with_stream_uses_alternative() {
1804    let source = futures::stream::iter(vec![Ok(1), Err(TestErr), Ok(3)]);
1805    let alt = futures::stream::iter(vec![Ok(10), Ok(20)]);
1806    let alts: Vec<Pin<Box<dyn Stream<Item = Result<i32, TestErr>> + Send>>> = vec![Box::pin(alt)];
1807    
1808    let values: Vec<_> = recover_with_stream(alts.into_iter(), source).collect().await;
1809    assert_eq!(values, vec![1, 10, 20]);
1810  }
1811}
1812// unnamed ends here
1813
1814// [[file:index.org::12936]]
1815/// Creates a stream that immediately emits an error.
1816pub fn throw_error<T, E: Clone>(error: E) -> impl Stream<Item = Result<T, E>> {
1817  stream::once(async move { Err(error) })
1818}
1819
1820// For panicking (not recommended for production):
1821pub fn throw_panic<T>(message: &'static str) -> impl Stream<Item = T> {
1822  stream! {
1823    panic!("{}", message);
1824    // Unreachable, but helps type inference:
1825    #[allow(unreachable_code)]
1826    loop { yield unreachable!(); }
1827  }
1828}
1829// unnamed ends here
1830
1831// [[file:index.org::12955]]
1832#[cfg(test)]
1833mod throw_error_tests {
1834  use super::*;
1835  
1836  #[tokio::test]
1837  async fn test_throw_error_emits_error() {
1838    let err_stream = throw_error::<i32, _>("test error".to_string());
1839    let results: Vec<_> = err_stream.collect().await;
1840    
1841    assert_eq!(results.len(), 1);
1842    assert!(results[0].is_err());
1843  }
1844}
1845// unnamed ends here
1846
1847// [[file:index.org::13329]]
1848pub struct RetryOptions<F> {
1849  pub max_attempts: usize,
1850  pub delay_ms: u64,
1851  pub should_retry: F,
1852}
1853
1854/// Retry a stream factory on error.
1855/// Uses the Runtime trait for delay functionality.
1856pub fn retry<R, T, E, S, F>(
1857  max_attempts: usize,
1858  delay_ms: u64,
1859  mut stream_factory: F,
1860) -> impl Stream<Item = Result<T, E>>
1861where
1862  R: Runtime,
1863  S: Stream<Item = Result<T, E>>,
1864  F: FnMut() -> S,
1865  E: Clone,
1866{
1867  stream! {
1868    let mut attempt = 0;
1869    loop {
1870      let s = stream_factory();
1871      futures::pin_mut!(s);
1872      let mut failed = false;
1873      
1874      while let Some(item) = s.next().await {
1875        match item {
1876          Ok(value) => yield Ok(value),
1877          Err(e) => {
1878            attempt += 1;
1879            if attempt >= max_attempts {
1880              yield Err(e);
1881              return;
1882            }
1883            if delay_ms > 0 {  R::sleep(Duration::from_millis(delay_ms)).await; }
1884            failed = true;
1885            break;
1886          }
1887        }
1888      }
1889      
1890      if !failed { return; } // Stream completed successfully
1891    }
1892  }
1893}
1894
1895/// Runtime-agnostic retry with custom sleep function
1896pub fn retry_with<T, E, S, F, SF, SFut>(
1897  max_attempts: usize,
1898  delay_ms: u64,
1899  mut stream_factory: F,
1900  sleep_fn: SF,
1901) -> impl Stream<Item = Result<T, E>>
1902where
1903  S: Stream<Item = Result<T, E>>,
1904  F: FnMut() -> S,
1905  E: Clone,
1906  SF: Fn(Duration) -> SFut,
1907  SFut: std::future::Future<Output = ()>,
1908{
1909  stream! {
1910    let mut attempt = 0;
1911    loop {
1912      let s = stream_factory();
1913      futures::pin_mut!(s);
1914      let mut failed = false;
1915      
1916      while let Some(item) = s.next().await {
1917        match item {
1918          Ok(value) => yield Ok(value),
1919          Err(e) => {
1920            attempt += 1;
1921            if attempt >= max_attempts {
1922              yield Err(e);
1923              return;
1924            }
1925            if delay_ms > 0 { sleep_fn(Duration::from_millis(delay_ms)).await; }
1926            failed = true;
1927            break;
1928          }
1929        }
1930      }
1931      
1932      if !failed { return; }
1933    }
1934  }
1935}
1936// unnamed ends here
1937
1938// [[file:index.org::13422]]
1939#[cfg(test)]
1940mod retry_tests {
1941  use super::*;
1942  
1943  // Simple clone-able error for testing
1944  #[derive(Debug, Clone, PartialEq)]
1945  struct TestError(String);
1946  
1947  #[tokio::test]
1948  async fn test_retry_with_success() {
1949    let values: Vec<Result<i32, TestError>> = retry_with(
1950      3,
1951      10,
1952      || futures::stream::iter(vec![Ok(1), Ok(2), Ok(3)]),
1953      |_d| std::future::ready(()),
1954    ).collect().await;
1955    let ok_values: Vec<_> = values.into_iter().filter_map(|r| r.ok()).collect();
1956    assert_eq!(ok_values, vec![1, 2, 3]);
1957  }
1958
1959  #[tokio::test]
1960  async fn test_retry_with_eventual_success() {
1961    use std::sync::atomic::{AtomicUsize, Ordering};
1962    use std::sync::Arc;
1963    let attempt = Arc::new(AtomicUsize::new(0));
1964    let attempt_clone = attempt.clone();
1965    let values: Vec<Result<i32, TestError>> = retry_with(
1966      3,
1967      0,
1968      move || {
1969        let n = attempt_clone.fetch_add(1, Ordering::SeqCst);
1970        if n < 2 { futures::stream::iter(vec![Err(TestError("fail".into()))]) }
1971        else { futures::stream::iter(vec![Ok(42)]) }
1972      },
1973      |_d| std::future::ready(()),
1974    ).collect().await;
1975    let ok_values: Vec<_> = values.into_iter().filter_map(|r| r.ok()).collect();
1976    assert_eq!(ok_values, vec![42]);
1977  }
1978}
1979// unnamed ends here
1980
1981// [[file:index.org::14125]]
1982/// Merge two streams, interleaving values as they arrive.
1983/// 
1984/// # Note
1985/// This is an alias for `futures::stream::select`. Prefer using the built-in directly:
1986/// ```rust,ignore
1987/// use futures::stream;
1988/// let merged = stream::select(s1, s2);
1989/// ```
1990pub use futures::stream::select as merge;
1991
1992/// Merge multiple streams, interleaving values as they arrive.
1993/// 
1994/// # Note
1995/// This is an alias for `futures::stream::select_all`. Prefer using the built-in directly:
1996/// ```rust,ignore
1997/// use futures::stream;
1998/// let merged = stream::select_all(streams);
1999/// ```
2000pub use futures::stream::select_all as merge_all;
2001// unnamed ends here
2002
2003// [[file:index.org::14149]]
2004#[cfg(test)]
2005mod merge_tests {
2006  use super::*;
2007  #[tokio::test]
2008  async fn test_merge() {
2009    let s1 = futures::stream::iter(vec![1, 3, 5]);
2010    let s2 = futures::stream::iter(vec![2, 4, 6]);
2011    let values: Vec<_> = merge(s1, s2).collect().await;
2012    // Order may vary due to select fairness, but all values should be present
2013    assert_eq!(values.len(), 6);
2014    assert!(values.contains(&1));
2015    assert!(values.contains(&6));
2016  }
2017
2018  #[tokio::test]
2019  async fn test_merge_all() {
2020    let streams = vec![
2021      Box::pin(futures::stream::iter(vec![1, 2])),
2022      Box::pin(futures::stream::iter(vec![3, 4])),
2023    ];
2024    let values: Vec<_> = merge_all(streams).collect().await;
2025    assert_eq!(values.len(), 4);
2026  }
2027}
2028// unnamed ends here
2029
2030// [[file:index.org::14560]]
2031// Built-in: outer.flatten_unordered(None) // None = unlimited concurrency
2032// Or: outer.flatten_unordered(Some(10))  // limit to 10 concurrent streams
2033
2034// For production, use flatten_unordered from futures
2035// merge_all is defined above for Vec<S>, use stream.flatten_unordered(None) for streams of streams
2036// unnamed ends here
2037
2038// [[file:index.org::14891]]
2039// Built-in concurrent flatMap:
2040// stream.flat_map_unordered(None, |item| create_inner_stream(item))
2041
2042// For sequential flatMap (like concatMap), use flat_map:
2043// stream.flat_map(|item| create_inner_stream(item))
2044
2045// Custom implementation using sequential flattening:
2046pub fn flat_map<T, U, S, Inner, F>(f: F, s: S) -> impl Stream<Item = U>
2047where
2048  S: Stream<Item = T>,
2049  Inner: Stream<Item = U>,
2050  F: Fn(T) -> Inner,
2051{
2052  s.map(f).flatten()
2053}
2054
2055// Alias
2056pub fn chain<T, U, S, Inner, F>(f: F, s: S) -> impl Stream<Item = U>
2057where
2058  S: Stream<Item = T>,
2059  Inner: Stream<Item = U>,
2060  F: Fn(T) -> Inner,
2061{
2062  flat_map(f, s)
2063}
2064// unnamed ends here
2065
2066// [[file:index.org::14921]]
2067#[cfg(test)]
2068mod chain_tests {
2069  use super::*;
2070  
2071  #[tokio::test]
2072  async fn test_chain_flattens() {
2073    let source = futures::stream::iter(vec![1, 2]);
2074    let result = chain(
2075      |x: i32| futures::stream::iter(vec![x * 10, x * 10 + 1]),
2076      source,
2077    );
2078    
2079    let values: Vec<_> = result.collect().await;
2080    // With sequential flatten(), order is preserved
2081    assert_eq!(values, vec![10, 11, 20, 21]);
2082  }
2083}
2084// unnamed ends here
2085
2086// [[file:index.org::15413]]
2087/// Switch to new inner stream on each outer value, cancelling previous.
2088/// Runtime-agnostic using futures::select!
2089pub fn switch_map<T, U, S, Inner, F>(f: F, s: S) -> impl Stream<Item = U>
2090where
2091  S: Stream<Item = T> + Unpin,
2092  Inner: Stream<Item = U> + Unpin,
2093  F: Fn(T) -> Inner,
2094{
2095  stream! {
2096    futures::pin_mut!(s);
2097    let mut current_inner: Option<std::pin::Pin<Box<dyn Stream<Item = U> + Unpin>>> = None;
2098    loop {
2099      futures::select! {
2100        // Check outer stream first (higher priority for switching)
2101        outer_item = s.next().fuse() => {
2102          match outer_item {
2103            Some(item) => {
2104              // Cancel old inner by dropping, start new one
2105              current_inner = Some(Box::pin(f(item)));
2106            }
2107            None => {
2108              // Outer done, drain current inner
2109              if let Some(ref mut inner) = current_inner { while let Some(v) = inner.next().await { yield v; } }
2110              break;
2111            }
2112          }
2113        }
2114        // Process current inner
2115        inner_item = async {
2116            if let Some(ref mut inner) = current_inner { inner.next().await }
2117            else {  std::future::pending().await }
2118        }.fuse() => {
2119            match inner_item {
2120                Some(v) => yield v,
2121                None => current_inner = None,
2122            }
2123        }
2124      }
2125    }
2126  }
2127}
2128// unnamed ends here
2129
2130// [[file:index.org::15459]]
2131#[cfg(test)]
2132mod switch_map_tests {
2133  use super::*;
2134  
2135  #[tokio::test]
2136  async fn test_switch_map_switches() {
2137    // With synchronous inner streams, switchMap behaves like concatMap
2138    // True switching requires async timing
2139    let source = futures::stream::iter(vec![1, 2]);
2140    let result = switch_map(
2141      |x: i32| futures::stream::iter(vec![x * 10]),
2142      source,
2143    );
2144    
2145    let values: Vec<_> = result.collect().await;
2146    // May see values from both or just last depending on timing
2147    assert!(!values.is_empty());
2148  }
2149}
2150// unnamed ends here
2151
2152// [[file:index.org::15870]]
2153/// Combine two streams, emitting tuple of latest values.
2154/// Runtime-agnostic using stream merging.
2155pub fn latest2<T: Clone + Send + 'static, U: Clone + Send + 'static>(
2156  s1: impl Stream<Item = T> + Send + 'static,
2157  s2: impl Stream<Item = U> + Send + 'static,
2158) -> impl Stream<Item = (T, U)> {
2159  // Tag values with which stream they came from
2160  enum Either<A, B> { Left(A), Right(B) }
2161  
2162  // Box the mapped streams to make them Unpin
2163  let tagged1: Pin<Box<dyn Stream<Item = Either<T, U>> + Send>> = 
2164    Box::pin(s1.map(Either::Left));
2165  let tagged2: Pin<Box<dyn Stream<Item = Either<T, U>> + Send>> = 
2166    Box::pin(s2.map(Either::Right));
2167  
2168  stream! {
2169    let mut latest1: Option<T> = None;
2170    let mut latest2: Option<U> = None;
2171    
2172    let mut merged = futures::stream::select(tagged1, tagged2);
2173    
2174    while let Some(item) = merged.next().await {
2175      match item {
2176        Either::Left(v) => {
2177          latest1 = Some(v);
2178          if let (Some(ref a), Some(ref b)) = (&latest1, &latest2) {
2179            yield (a.clone(), b.clone());
2180          }
2181        }
2182        Either::Right(v) => {
2183          latest2 = Some(v);
2184          if let (Some(ref a), Some(ref b)) = (&latest1, &latest2) {
2185            yield (a.clone(), b.clone());
2186          }
2187        }
2188      }
2189    }
2190  }
2191}
2192// unnamed ends here
2193
2194// [[file:index.org::15914]]
2195#[cfg(test)]
2196mod latest2_tests {
2197  use super::*;
2198  
2199  #[tokio::test]
2200  async fn test_latest2_combines() {
2201    let s1 = futures::stream::iter(vec![1, 2]);
2202    let s2 = futures::stream::iter(vec!["a", "b"]);
2203    
2204    let values: Vec<_> = latest2(s1, s2).collect().await;
2205    // Should emit tuples when both have values
2206    assert!(!values.is_empty());
2207  }
2208}
2209// unnamed ends here
2210
2211// [[file:index.org::16222]]
2212/// Apply latest function to latest value.
2213pub fn apply_latest<T, U, F, S1, S2>(fn_stream: S1, value_stream: S2) -> impl Stream<Item = U>
2214where
2215  S1: Stream<Item = F> + Send + 'static,
2216  S2: Stream<Item = T> + Send + 'static,
2217  F: Fn(T) -> U + Clone + Send + 'static,
2218  T: Clone + Send + 'static,
2219  U: Send + 'static,
2220{
2221  latest2(fn_stream, value_stream).map(|(f, v)| f(v))
2222}
2223// unnamed ends here
2224
2225// [[file:index.org::16238]]
2226#[cfg(test)]
2227mod apply_latest_tests {
2228  use super::*;
2229  
2230  #[tokio::test]
2231  async fn test_apply_latest() {
2232    let fns = futures::stream::iter(vec![|x: i32| x * 2, |x| x + 10]);
2233    let vals = futures::stream::iter(vec![1, 2, 3]);
2234    
2235    let values: Vec<_> = apply_latest(fns, vals).collect().await;
2236    // Should apply latest function to latest value
2237    assert!(!values.is_empty());
2238  }
2239}
2240// unnamed ends here
2241
2242// [[file:index.org::16552]]
2243/// Emit from source until stop stream emits.
2244/// Runtime-agnostic using futures::select!
2245pub fn until_stream<T, U, S: Stream<Item = T> + Unpin, Stop: Stream<Item = U> + Unpin>(
2246  mut stop: Stop,
2247  mut source: S,
2248) -> impl Stream<Item = T> {
2249  stream! {
2250    loop {
2251      futures::select! {
2252        _ = stop.next().fuse() => break,
2253        item = source.next().fuse() => {
2254          match item {
2255            Some(v) => yield v,
2256            None => break,
2257          }
2258        }
2259      }
2260    }
2261  }
2262}
2263
2264// Alternative: Use futures::stream::StreamExt::take_until
2265// source.take_until(stop.next())
2266// unnamed ends here
2267
2268// [[file:index.org::16580]]
2269#[cfg(test)]
2270mod until_stream_tests {
2271  use super::*;
2272  
2273  #[tokio::test]
2274  async fn test_until_stream_stops() {
2275    let source = futures::stream::iter(vec![1, 2, 3, 4, 5]);
2276    let stop = futures::stream::iter(vec![()]); // Emit immediately
2277    
2278    let values: Vec<_> = until_stream(stop, source).collect().await;
2279    // Should stop when stop emits
2280    assert!(values.len() <= 5);
2281  }
2282}
2283// unnamed ends here
2284
2285// [[file:index.org::16918]]
2286/// Emit from source only after start stream emits.
2287/// Runtime-agnostic using futures::select!
2288pub fn since_stream<T, U, S: Stream<Item = T> + Unpin, Start: Stream<Item = U> + Unpin>(
2289  mut start: Start,
2290  mut source: S,
2291) -> impl Stream<Item = T> {
2292  stream! {
2293    let mut started = false;
2294    loop {
2295      futures::select! {
2296        _ = async {
2297          if !started { start.next().await }
2298          else { std::future::pending().await }
2299        }.fuse() => {
2300          started = true;
2301        }
2302        item = source.next().fuse() => {
2303          match item {
2304            Some(v) if started => yield v,
2305            Some(_) => {}  // Drop values before start
2306            None => break,
2307          }
2308        }
2309      }
2310    }
2311  }
2312}
2313// unnamed ends here
2314
2315// [[file:index.org::16950]]
2316#[cfg(test)]
2317mod since_stream_tests {
2318  use super::*;
2319  
2320  #[tokio::test]
2321  async fn test_since_stream_waits() {
2322    let source = futures::stream::iter(vec![1, 2, 3, 4]);
2323    let start = futures::stream::iter(vec![()]); // Emit immediately
2324    
2325    let values: Vec<_> = since_stream(start, source).collect().await;
2326    // Should emit values after start signal
2327    assert!(!values.is_empty());
2328  }
2329}
2330// unnamed ends here
2331
2332// [[file:index.org::17279]]
2333// Buffering Operators
2334
2335pub fn buffer<T, S: Stream<Item = T>>(size: usize, s: S) -> impl Stream<Item = Vec<T>> {
2336  stream! {
2337    futures::pin_mut!(s);
2338    let mut buf: Vec<T> = Vec::with_capacity(size);
2339    while let Some(item) = s.next().await {
2340      buf.push(item);
2341      if buf.len() >= size { yield std::mem::replace(&mut buf, Vec::with_capacity(size)); }
2342    }
2343    if !buf.is_empty() { yield buf; }
2344  }
2345}
2346// unnamed ends here
2347
2348// [[file:index.org::17297]]
2349#[cfg(test)]
2350mod buffer_tests {
2351  use super::*;
2352  #[tokio::test]
2353  async fn test_buffer() {
2354    let source = futures::stream::iter(vec![1, 2, 3, 4, 5]);
2355    let values: Vec<_> = buffer(2, source).collect().await;
2356    assert_eq!(values, vec![vec![1, 2], vec![3, 4], vec![5]]);
2357  }
2358
2359  #[tokio::test]
2360  async fn test_buffer_exact_multiple() {
2361    let source = futures::stream::iter(vec![1, 2, 3, 4]);
2362    let values: Vec<_> = buffer(2, source).collect().await;
2363    assert_eq!(values, vec![vec![1, 2], vec![3, 4]]);
2364  }
2365
2366  #[tokio::test]
2367  async fn test_buffer_empty() {
2368    let source = futures::stream::iter(Vec::<i32>::new());
2369    let values: Vec<_> = buffer(3, source).collect().await;
2370    assert_eq!(values, Vec::<Vec<i32>>::new());
2371  }
2372}
2373// unnamed ends here
2374
2375// [[file:index.org::17846]]
2376/// Buffer values over time windows.
2377/// Uses the Runtime trait for timer functionality.
2378pub fn buffer_time<R, T, S>(ms: u64, mut s: S) -> impl Stream<Item = Vec<T>>
2379where
2380  R: Runtime,
2381  T: Clone,
2382  S: Stream<Item = T> + Unpin,
2383{
2384  stream! {
2385    let duration = Duration::from_millis(ms);
2386    let mut buf: Vec<T> = Vec::new();
2387    let mut timer = R::sleep(duration);
2388    loop {
2389      futures::select! {
2390        _ = (&mut timer).fuse() => {
2391          if !buf.is_empty() { yield std::mem::take(&mut buf); }
2392          timer = R::sleep(duration);
2393        }
2394        item = s.next().fuse() => {
2395          match item {
2396            Some(v) => buf.push(v),
2397            None => {
2398              if !buf.is_empty() { yield buf; }
2399              break;
2400            }
2401          }
2402        }
2403      }
2404    }
2405  }
2406}
2407
2408/// Runtime-agnostic buffer_time with custom sleep function
2409pub fn buffer_time_with<T, S, SF, SFut>(
2410  ms: u64,
2411  mut s: S,
2412  sleep_fn: SF,
2413) -> impl Stream<Item = Vec<T>>
2414where
2415  S: Stream<Item = T> + Unpin,
2416  SF: Fn(Duration) -> SFut,
2417  SFut: std::future::Future<Output = ()> + Unpin,
2418{
2419  stream! {
2420    let duration = Duration::from_millis(ms);
2421    let mut buf: Vec<T> = Vec::new();
2422    let mut timer = sleep_fn(duration);
2423    loop {
2424      futures::select! {
2425        _ = (&mut timer).fuse() => {
2426          if !buf.is_empty() {  yield std::mem::take(&mut buf); }
2427          timer = sleep_fn(duration);
2428        }
2429        item = s.next().fuse() => {
2430          match item {
2431            Some(v) => buf.push(v),
2432            None => {
2433              if !buf.is_empty() { yield buf; }
2434              break;
2435            }
2436          }
2437        }
2438      }
2439    }
2440  }
2441}
2442// unnamed ends here
2443
2444// [[file:index.org::17917]]
2445#[cfg(test)]
2446mod buffer_time_tests {
2447  // Note: buffer_time requires Runtime trait for timing
2448  // Tests would need mock runtime or feature-flagged tokio
2449  // Basic timing logic is validated through throttle tests
2450}
2451// unnamed ends here
2452
2453// [[file:index.org::18259]]
2454/// Split source into windows of specified size.
2455/// Each window is a vector of items (simpler than sub-streams).
2456pub fn window<T: Clone + Send + 'static>(
2457  size: usize,
2458  s: impl Stream<Item = T> + Send + 'static,
2459) -> impl Stream<Item = Vec<T>> {
2460  stream! {
2461    futures::pin_mut!(s);
2462    loop {
2463      let mut window = Vec::with_capacity(size);
2464      while window.len() < size {
2465        match s.next().await {
2466          Some(item) => window.push(item),
2467          None => {
2468            if !window.is_empty() {
2469              yield window;
2470            }
2471            return;
2472          }
2473        }
2474      }
2475      yield window;
2476    }
2477  }
2478}
2479// unnamed ends here
2480
2481// [[file:index.org::18802]]
2482/// Pre-fetch values from a slow producer into a buffer.
2483/// Uses the Runtime trait for spawning background consumption.
2484pub fn eager<R, T, S>(buffer_size: usize, s: S) -> impl Stream<Item = T>
2485where
2486  R: Runtime,
2487  T: Send + 'static,
2488  S: Stream<Item = T> + Send + Unpin + 'static,
2489{
2490  // Use a channel as the buffer
2491  let (mut tx, mut rx) = mpsc::channel::<T>(buffer_size.max(1));
2492  
2493  // Spawn background consumer on first pull
2494  let mut spawned = false;
2495  let mut s = Some(s);
2496  
2497  stream! {
2498    if !spawned {
2499      spawned = true;
2500      let mut source = s.take().unwrap();
2501      R::spawn(async move {
2502        use futures::StreamExt;
2503        use futures::SinkExt;
2504        while let Some(item) = source.next().await { if tx.send(item).await.is_err() { break; } } // Receiver dropped
2505      });
2506    }
2507    
2508    while let Some(item) = rx.next().await { yield item; }
2509  }
2510}
2511
2512/// Pre-fetch values immediately on creation using the Runtime trait.
2513pub fn eager_now<R, T, S>(buffer_size: usize, s: S) -> impl Stream<Item = T>
2514where
2515  R: Runtime,
2516  T: Send + 'static,
2517  S: Stream<Item = T> + Send + Unpin + 'static,
2518{    
2519  let (mut tx, mut rx) = mpsc::channel::<T>(buffer_size.max(1));
2520  
2521  // Start consuming immediately
2522  let mut source = s;
2523  R::spawn(async move {
2524    use futures::StreamExt;
2525    use futures::SinkExt;
2526    while let Some(item) = source.next().await { if tx.send(item).await.is_err() { break; } }
2527  });
2528  
2529  stream! { while let Some(item) = rx.next().await { yield item; } }
2530}
2531// unnamed ends here
2532
2533// [[file:index.org::18856]]
2534#[cfg(test)]
2535mod eager_now_tests {
2536  // Note: eager_now requires Runtime trait for spawning
2537  // Tests would need specific runtime implementation
2538  // The pattern is: spawn producer, stream from channel
2539}
2540// unnamed ends here
2541
2542// [[file:index.org::19227]]
2543// Multicasting Operators
2544
2545/// A multicasting subject that replays buffered values to new subscribers.
2546/// Uses only runtime-agnostic primitives from the futures crate.
2547pub struct ReplaySubject<T: Clone + Send + 'static> {
2548  inner: Arc<Mutex<ReplaySubjectInner<T>>>,
2549}
2550
2551struct ReplaySubjectInner<T> {
2552  buffer: Vec<T>,
2553  buffer_size: usize,
2554  completed: bool,
2555  error: Option<Arc<dyn std::error::Error + Send + Sync>>,
2556  subscribers: Vec<mpsc::UnboundedSender<T>>,
2557}
2558
2559impl<T: Clone + Send + 'static> ReplaySubject<T> {
2560  pub fn new(buffer_size: usize) -> Self {
2561    Self {
2562      inner: Arc::new(Mutex::new(ReplaySubjectInner {
2563        buffer: Vec::new(),
2564        buffer_size,
2565        completed: false,
2566        error: None,
2567        subscribers: Vec::new(),
2568      })),
2569    }
2570  }
2571  pub async fn next(&self, value: T) {
2572    let mut inner = self.inner.lock().await;
2573    inner.buffer.push(value.clone());
2574    if inner.buffer.len() > inner.buffer_size {  inner.buffer.remove(0); }
2575    // Broadcast to all subscribers
2576    inner.subscribers.retain(|tx| tx.unbounded_send(value.clone()).is_ok());
2577  }
2578  pub async fn complete(&self) {
2579    let mut inner = self.inner.lock().await;
2580    inner.completed = true;
2581    inner.subscribers.clear();
2582  }
2583  pub fn subscribe(&self) -> impl Stream<Item = T> {
2584    let inner = self.inner.clone();
2585    
2586    stream! {
2587      let (tx, mut rx) = mpsc::unbounded();
2588      let buffered: Vec<T>;
2589      let was_completed: bool;
2590      
2591      {
2592        let mut guard = inner.lock().await;
2593        buffered = guard.buffer.clone();
2594        was_completed = guard.completed;
2595        if !guard.completed { guard.subscribers.push(tx); }
2596      }
2597      
2598      // Replay buffered values first
2599      for item in buffered { yield item; }
2600      
2601      // If already completed, don't wait for more values
2602      if was_completed { return; }
2603      
2604      // Then receive live values
2605      while let Some(item) = rx.next().await { yield item; }
2606    }
2607  }
2608}
2609// unnamed ends here
2610
2611// [[file:index.org::19298]]
2612#[cfg(test)]
2613mod replay_subject_tests {
2614  use super::*;
2615
2616  #[tokio::test]
2617  async fn test_replay_subject_buffer() {
2618    let subject = ReplaySubject::new(2);
2619    
2620    // Send some values
2621    subject.next(1).await;
2622    subject.next(2).await;
2623    subject.next(3).await;  // 1 should be evicted
2624    subject.complete().await;
2625    
2626    // New subscriber should get last 2 buffered values
2627    let values: Vec<_> = subject.subscribe().collect().await;
2628    assert_eq!(values, vec![2, 3]);
2629  }
2630
2631  #[tokio::test]
2632  async fn test_replay_subject_empty() {
2633    let subject: ReplaySubject<i32> = ReplaySubject::new(5);
2634    subject.complete().await;
2635
2636    let values: Vec<_> = subject.subscribe().collect().await;
2637    assert_eq!(values, Vec::<i32>::new());
2638  }
2639
2640  #[tokio::test]
2641  async fn test_replay_subject_unlimited() {
2642    let subject = ReplaySubject::new(usize::MAX);
2643
2644    subject.next(1).await;
2645    subject.next(2).await;
2646    subject.next(3).await;
2647    subject.complete().await;
2648
2649    let values: Vec<_> = subject.subscribe().collect().await;
2650    assert_eq!(values, vec![1, 2, 3]);
2651  }
2652}
2653// unnamed ends here
2654
2655// [[file:index.org::19596]]
2656/// Replay creates a shared stream that buffers values for late subscribers.
2657struct Replay<T> {
2658  inner: Arc<Mutex<ReplayInner<T>>>,
2659}
2660
2661struct ReplayInner<T> {
2662  buffer: Vec<T>,
2663  buffer_size: usize,
2664  completed: bool,
2665  error: Option<Arc<dyn std::error::Error + Send + Sync>>,
2666  source_started: bool,
2667  subscribers: Vec<futures::channel::mpsc::UnboundedSender<Result<T, Arc<dyn std::error::Error + Send + Sync>>>>,
2668}
2669
2670impl<T: Clone + Send + 'static> Replay<T> {
2671  fn new<S>(buffer_size: usize, source: S) -> Self
2672  where
2673    S: futures::Stream<Item = T> + Send + Unpin + 'static,
2674  {
2675    let inner = Arc::new(Mutex::new(ReplayInner {
2676      buffer: Vec::new(),
2677      buffer_size,
2678      completed: false,
2679      error: None,
2680      source_started: false,
2681      subscribers: Vec::new(),
2682    }));
2683    
2684    Replay { inner }
2685  }
2686  
2687  fn subscribe(&self) -> impl futures::Stream<Item = T> {
2688    let inner = self.inner.clone();
2689    
2690    async_stream::stream! {
2691      let (tx, mut rx) = futures::channel::mpsc::unbounded();
2692      
2693      // Get buffered values and register subscriber
2694      let buffered: Vec<T>;
2695      {
2696        let mut guard = inner.lock().await;
2697        buffered = guard.buffer.clone();
2698        
2699        if !guard.completed && guard.error.is_none() { guard.subscribers.push(tx); }
2700      }
2701      
2702      // Yield buffered values first
2703      for value in buffered { yield value; }
2704      
2705      // Receive live values
2706      while let Some(result) = rx.next().await {
2707        match result {
2708          Ok(value) => yield value,
2709          Err(_) => break,
2710        }
2711      }
2712    }
2713  }
2714  
2715  async fn start_source<S>(&self, mut source: S)
2716  where
2717    S: futures::Stream<Item = T> + Send + Unpin + 'static,
2718  {
2719    while let Some(value) = source.next().await {
2720        let mut guard = self.inner.lock().await;
2721        
2722        // Buffer the value
2723        guard.buffer.push(value.clone());
2724        if guard.buffer.len() > guard.buffer_size {
2725            guard.buffer.remove(0);
2726        }
2727        
2728        // Broadcast to subscribers
2729        guard.subscribers.retain(|tx| tx.unbounded_send(Ok(value.clone())).is_ok());
2730    }
2731    
2732    // Mark complete
2733    let mut guard = self.inner.lock().await;
2734    guard.completed = true;
2735    guard.subscribers.clear();
2736  }
2737}
2738
2739/// Convenience function to replay a stream.
2740/// 
2741/// This implementation uses a simpler approach: the returned stream
2742/// directly consumes and forwards the source. For true multicasting,
2743/// use ReplaySubject instead.
2744fn replay<T, S>(buffer_size: usize, source: S) -> impl futures::Stream<Item = T>
2745where
2746  T: Clone + Send + 'static,
2747  S: futures::Stream<Item = T> + Send + 'static,
2748{
2749  // Simple passthrough implementation - for single subscriber
2750  // For true multicasting, use ReplaySubject
2751  let _ = buffer_size; // Buffering only matters for late subscribers
2752  source
2753}
2754// unnamed ends here
2755
2756// [[file:index.org::19701]]
2757#[cfg(test)]
2758mod replay_tests {
2759  use super::*;
2760
2761  #[tokio::test]
2762  async fn test_replay_buffered() {
2763    // Test that buffering works
2764    let source = futures::stream::iter(vec![1, 2, 3, 4, 5]);
2765    let replay = Replay::new(2, source);
2766    
2767    // Start source consumption
2768    // (In a real impl, this would happen on first subscribe)
2769  }
2770}
2771// unnamed ends here
2772
2773// [[file:index.org::20277]]
2774/// Share a stream among multiple consumers without buffering.
2775/// This is equivalent to replay(0, source).
2776fn share<T, S>(source: S) -> impl futures::Stream<Item = T>
2777where
2778  T: Clone + Send + 'static,
2779  S: futures::Stream<Item = T> + Send + Unpin + 'static,
2780{
2781  replay(0, source)
2782}
2783// unnamed ends here
2784
2785// [[file:index.org::20291]]
2786#[cfg(test)]
2787mod share_tests {
2788  use super::*;
2789  
2790  #[tokio::test]
2791  async fn test_share_basic() {
2792    // share is replay(0, source)
2793    // Test that it compiles and basic streaming works
2794    let source = futures::stream::iter(vec![1, 2, 3]);
2795    let shared = share(source);
2796    futures::pin_mut!(shared);
2797    
2798    let first = shared.next().await;
2799    assert_eq!(first, Some(1));
2800    
2801    let second = shared.next().await;
2802    assert_eq!(second, Some(2));
2803    
2804    let third = shared.next().await;
2805    assert_eq!(third, Some(3));
2806    
2807    let done = shared.next().await;
2808    assert_eq!(done, None);
2809  }
2810}
2811// unnamed ends here
2812
2813// [[file:index.org::20611]]
2814/// Creates a factory that produces independent copies of a buffered stream.
2815/// Each call to the factory returns a fresh stream that replays buffered values.
2816fn replay_factory<T, S>(
2817  buffer_size: usize,
2818  source: S,
2819) -> impl Fn() -> BoxedStream<T>
2820where
2821  T: Clone + Send + Sync + 'static,
2822  S: futures::Stream<Item = T> + Send + Unpin + 'static,
2823{
2824  struct SharedState<T> {
2825    buffer: Vec<T>,
2826    buffer_size: usize,
2827    completed: bool,
2828    subscribers: Vec<futures::channel::mpsc::UnboundedSender<T>>,
2829  }
2830  
2831  let state = Arc::new(Mutex::new(SharedState {
2832    buffer: Vec::new(),
2833    buffer_size,
2834    completed: false,
2835    subscribers: Vec::new(),
2836  }));
2837  let started = Arc::new(AtomicBool::new(false));
2838  let source = Arc::new(Mutex::new(Some(source)));
2839  
2840  move || {
2841    let state = state.clone();
2842    let started = started.clone();
2843    let source = source.clone();
2844    
2845    Box::pin(async_stream::stream! {
2846      // Start source consumption if not already started
2847      if !started.swap(true, Ordering::SeqCst) {
2848        let state_clone = state.clone();
2849        if let Some(mut src) = source.lock().await.take() {
2850          // Note: Spawning requires the Runtime trait
2851          // R::spawn(async move { ... });
2852          // For simplicity, consume source in current task
2853
2854          while let Some(value) = src.next().await {
2855            let mut guard = state_clone.lock().await;
2856            guard.buffer.push(value.clone());
2857            if guard.buffer.len() > guard.buffer_size { guard.buffer.remove(0); }
2858            guard.subscribers.retain(|tx| tx.unbounded_send(value.clone()).is_ok());
2859          }
2860          state_clone.lock().await.completed = true;
2861        }
2862      }
2863      
2864      let (tx, mut rx) = futures::channel::mpsc::unbounded();
2865      let buffered: Vec<T>;
2866      {
2867        let mut guard = state.lock().await;
2868        buffered = guard.buffer.clone();
2869        if !guard.completed { guard.subscribers.push(tx); }
2870      }
2871      
2872      for value in buffered { yield value; }
2873 
2874      while let Some(value) = rx.next().await { yield value; }
2875    })
2876  }
2877}
2878
2879/// Version that accepts a Runtime for spawning source consumption
2880pub fn replay_factory_spawned<R, T, S>(
2881  buffer_size: usize,
2882  source: S,
2883) -> impl Fn() -> BoxedStream<T>
2884where
2885  R: Runtime,
2886  T: Clone + Send + Sync + 'static,
2887  S: futures::Stream<Item = T> + Send + Unpin + 'static,
2888{
2889  struct SharedState<T> {
2890    buffer: Vec<T>,
2891    buffer_size: usize,
2892    completed: bool,
2893    subscribers: Vec<futures::channel::mpsc::UnboundedSender<T>>,
2894  }
2895  
2896  let state = Arc::new(Mutex::new(SharedState {
2897    buffer: Vec::new(),
2898    buffer_size,
2899    completed: false,
2900    subscribers: Vec::new(),
2901  }));
2902  let started = Arc::new(AtomicBool::new(false));
2903  let source = Arc::new(Mutex::new(Some(source)));
2904  
2905  move || {
2906    let state = state.clone();
2907    let started = started.clone();
2908    let source = source.clone();
2909    
2910    Box::pin(async_stream::stream! {
2911      if !started.swap(true, Ordering::SeqCst) {
2912        let state_clone = state.clone();
2913        if let Some(src) = source.lock().await.take() {
2914          R::spawn(async move {
2915            futures::pin_mut!(src);
2916            while let Some(value) = src.next().await {
2917              let mut guard = state_clone.lock().await;
2918              guard.buffer.push(value.clone());
2919              if guard.buffer.len() > guard.buffer_size { guard.buffer.remove(0); }
2920              guard.subscribers.retain(|tx| tx.unbounded_send(value.clone()).is_ok());
2921            }
2922            state_clone.lock().await.completed = true;
2923          });
2924        }
2925      }
2926      
2927      let (tx, mut rx) = futures::channel::mpsc::unbounded();
2928      let buffered: Vec<T>;
2929      {
2930        let mut guard = state.lock().await;
2931        buffered = guard.buffer.clone();
2932        if !guard.completed { guard.subscribers.push(tx); }
2933      }
2934      
2935      for value in buffered { yield value; }
2936      
2937      while let Some(value) = rx.next().await { yield value; }
2938    })
2939  }
2940}
2941// unnamed ends here
2942
2943// [[file:index.org::20862]]
2944/// Returns a stream that emits independent copies of the source stream.
2945/// Each pull creates a new subscriber that receives buffered + live values.
2946fn replay_stream<T, S>(
2947  buffer_size: usize,
2948  source: S,
2949) -> impl futures::Stream<Item = impl futures::Stream<Item = T>>
2950where
2951  T: Clone + Send + Sync + 'static,
2952  S: futures::Stream<Item = T> + Send + Unpin + 'static,
2953{
2954  let factory = replay_factory(buffer_size, source);
2955  
2956  async_stream::stream! {
2957  // Emit stream copies indefinitely
2958  loop { yield factory(); }
2959  }
2960}
2961
2962// Example usage
2963async fn replay_stream_example() {
2964  let source = futures::stream::iter(vec![1, 2, 3]);
2965  let copies = replay_stream(usize::MAX, source);
2966  futures::pin_mut!(copies);
2967  
2968  // Get first copy
2969  if let Some(copy) = copies.next().await {
2970    futures::pin_mut!(copy);
2971    let values: Vec<_> = copy.collect().await;
2972    println!("Copy values: {:?}", values);
2973  }
2974}
2975// unnamed ends here
2976
2977// [[file:index.org::20898]]
2978#[cfg(test)]
2979mod replay_stream_tests {
2980  // Note: replay_stream returns a stream of streams
2981  // Basic functionality tested in replay_stream_example
2982  // More comprehensive tests would verify multiple copies
2983}
2984// unnamed ends here
2985
2986// [[file:index.org::20974]]
2987use std::sync::atomic::AtomicU64;
2988use std::task::Waker;
2989
2990/// A test runtime with virtual time for deterministic testing.
2991/// 
2992/// Unlike real runtimes, time only advances when you call `advance_by()` or `advance_to()`.
2993/// This allows instant, reproducible tests for time-based operators.
2994/// 
2995/// # Example
2996/// 
2997/// ```rust
2998/// use agent_rex::TestRuntime;
2999/// 
3000/// #[tokio::test]
3001/// async fn test_debounce() {
3002///   let runtime = TestRuntime::new();
3003///   
3004///   // Create a debounced stream using this runtime
3005///   let source = futures::stream::iter(vec![1, 2, 3]);
3006///   let debounced = debounce_with::<TestRuntime>(Duration::from_millis(100), source);
3007///   
3008///   // Advance virtual time to trigger debounce
3009///   runtime.advance_by(Duration::from_millis(150)).await;
3010///   
3011///   // Collect results - happens instantly!
3012/// }
3013/// ```
3014#[derive(Clone)]
3015pub struct TestRuntime {
3016  inner: Arc<TestRuntimeInner>,
3017}
3018
3019struct TestRuntimeInner {
3020  /// Current virtual time in nanoseconds
3021  current_time_ns: AtomicU64,
3022  /// Pending timers waiting to fire
3023  timers: std::sync::Mutex<Vec<PendingTimer>>,
3024}
3025
3026struct PendingTimer {
3027  /// When this timer should fire (in nanoseconds)
3028  fire_at_ns: u64,
3029  /// Waker to call when the timer fires
3030  waker: Option<Waker>,
3031  /// Whether this timer has fired
3032  fired: Arc<std::sync::atomic::AtomicBool>,
3033}
3034
3035impl TestRuntime {
3036  /// Create a new test runtime starting at time zero.
3037  pub fn new() -> Self {
3038    Self {
3039      inner: Arc::new(TestRuntimeInner {
3040        current_time_ns: AtomicU64::new(0),
3041        timers: std::sync::Mutex::new(Vec::new()),
3042      }),
3043    }
3044  }
3045  
3046  /// Get the current virtual time.
3047  pub fn now(&self) -> Duration {
3048    Duration::from_nanos(self.inner.current_time_ns.load(Ordering::SeqCst))
3049  }
3050  
3051  /// Advance virtual time by the given duration.
3052  /// 
3053  /// This will wake any timers whose target time has been reached.
3054  pub async fn advance_by(&self, duration: Duration) {
3055    let target = self.now() + duration;
3056    self.advance_to(target).await;
3057  }
3058  
3059  /// Advance virtual time to a specific point.
3060  /// 
3061  /// Fires all timers between the current time and target time.
3062  pub async fn advance_to(&self, target: Duration) {
3063    let target_ns = target.as_nanos() as u64;
3064    
3065    loop {
3066      // Find and wake timers that should fire
3067      let wakers_to_wake: Vec<Waker> = {
3068        let mut timers = self.inner.timers.lock().unwrap();
3069        let current = self.inner.current_time_ns.load(Ordering::SeqCst);
3070        
3071        // Find earliest timer that hasn't fired yet
3072        let mut earliest: Option<u64> = None;
3073        for timer in timers.iter() {
3074          if !timer.fired.load(Ordering::SeqCst) && timer.fire_at_ns <= target_ns {
3075            earliest = Some(match earliest {
3076              Some(e) => e.min(timer.fire_at_ns),
3077              None => timer.fire_at_ns,
3078            });
3079          }
3080        }
3081        
3082        match earliest {
3083          Some(fire_time) if fire_time > current => {
3084            // Advance time to this timer
3085            self.inner.current_time_ns.store(fire_time, Ordering::SeqCst);
3086            
3087            // Collect wakers for timers at this time
3088            timers.iter_mut()
3089              .filter(|t| t.fire_at_ns == fire_time && !t.fired.load(Ordering::SeqCst))
3090              .filter_map(|t| {
3091                t.fired.store(true, Ordering::SeqCst);
3092                t.waker.take()
3093              })
3094              .collect()
3095          }
3096          _ => {
3097            // No more timers to fire, advance to target
3098            self.inner.current_time_ns.store(target_ns, Ordering::SeqCst);
3099            break;
3100          }
3101        }
3102      };
3103      
3104      // Wake timers outside the lock
3105      for waker in wakers_to_wake {
3106        waker.wake();
3107      }
3108      
3109      // Yield to allow woken tasks to run
3110      futures::future::poll_fn(|_| std::task::Poll::Ready(())).await;
3111    }
3112    
3113    // Clean up fired timers
3114    {
3115      let mut timers = self.inner.timers.lock().unwrap();
3116      timers.retain(|t| !t.fired.load(Ordering::SeqCst));
3117    }
3118  }
3119  
3120  /// Register a timer that fires at a specific time.
3121  fn register_timer(&self, fire_at: Duration) -> Arc<std::sync::atomic::AtomicBool> {
3122    let fired = Arc::new(std::sync::atomic::AtomicBool::new(false));
3123    let timer = PendingTimer {
3124      fire_at_ns: fire_at.as_nanos() as u64,
3125      waker: None,
3126      fired: fired.clone(),
3127    };
3128    self.inner.timers.lock().unwrap().push(timer);
3129    fired
3130  }
3131  
3132  /// Update the waker for a pending timer.
3133  fn set_timer_waker(&self, fire_at_ns: u64, waker: Waker) {
3134    let mut timers = self.inner.timers.lock().unwrap();
3135    for timer in timers.iter_mut() {
3136      if timer.fire_at_ns == fire_at_ns && !timer.fired.load(Ordering::SeqCst) {
3137        timer.waker = Some(waker);
3138        break;
3139      }
3140    }
3141  }
3142}
3143
3144impl Default for TestRuntime {
3145  fn default() -> Self {
3146    Self::new()
3147  }
3148}
3149// unnamed ends here
3150
3151// [[file:index.org::21143]]
3152/// A future that completes when the test runtime's virtual time reaches the target.
3153pub struct TestSleep {
3154  runtime: TestRuntime,
3155  target_ns: u64,
3156  fired: Arc<std::sync::atomic::AtomicBool>,
3157  registered: bool,
3158}
3159
3160impl TestSleep {
3161  fn new(runtime: TestRuntime, duration: Duration) -> Self {
3162    let current = runtime.now();
3163    let target = current + duration;
3164    let target_ns = target.as_nanos() as u64;
3165    Self {
3166      runtime,
3167      target_ns,
3168      fired: Arc::new(std::sync::atomic::AtomicBool::new(false)),
3169      registered: false,
3170    }
3171  }
3172}
3173
3174impl Future for TestSleep {
3175  type Output = ();
3176  
3177  fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<()> {
3178    // Check if already fired
3179    if self.fired.load(Ordering::SeqCst) {
3180      return std::task::Poll::Ready(());
3181    }
3182    
3183    // Check if target time reached
3184    let current_ns = self.runtime.inner.current_time_ns.load(Ordering::SeqCst);
3185    if current_ns >= self.target_ns {
3186      self.fired.store(true, Ordering::SeqCst);
3187      return std::task::Poll::Ready(());
3188    }
3189    
3190    // Register timer if not yet done
3191    if !self.registered {
3192      self.fired = self.runtime.register_timer(Duration::from_nanos(self.target_ns));
3193      self.registered = true;
3194    }
3195    
3196    // Update waker
3197    self.runtime.set_timer_waker(self.target_ns, cx.waker().clone());
3198    
3199    std::task::Poll::Pending
3200  }
3201}
3202// unnamed ends here
3203
3204// [[file:index.org::21200]]
3205/// A stream that yields at regular intervals based on virtual time.
3206pub struct TestInterval {
3207  runtime: TestRuntime,
3208  period_ns: u64,
3209  next_fire_ns: u64,
3210  current_timer: Option<Arc<std::sync::atomic::AtomicBool>>,
3211}
3212
3213impl TestInterval {
3214  fn new(runtime: TestRuntime, period: Duration) -> Self {
3215    let period_ns = period.as_nanos() as u64;
3216    let start = runtime.inner.current_time_ns.load(Ordering::SeqCst);
3217    Self {
3218      runtime,
3219      period_ns,
3220      next_fire_ns: start + period_ns,
3221      current_timer: None,
3222    }
3223  }
3224}
3225
3226impl futures::Stream for TestInterval {
3227  type Item = ();
3228  
3229  fn poll_next(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<()>> {
3230    let current_ns = self.runtime.inner.current_time_ns.load(Ordering::SeqCst);
3231    
3232    // Check if it's time to fire
3233    if current_ns >= self.next_fire_ns {
3234      // Schedule next tick
3235      self.next_fire_ns += self.period_ns;
3236      self.current_timer = None;
3237      return std::task::Poll::Ready(Some(()));
3238    }
3239    
3240    // Register timer if needed
3241    if self.current_timer.is_none() {
3242      self.current_timer = Some(self.runtime.register_timer(Duration::from_nanos(self.next_fire_ns)));
3243    }
3244    
3245    // Update waker
3246    self.runtime.set_timer_waker(self.next_fire_ns, cx.waker().clone());
3247    
3248    std::task::Poll::Pending
3249  }
3250}
3251// unnamed ends here
3252
3253// [[file:index.org::21253]]
3254impl Runtime for TestRuntime {
3255  fn sleep(duration: Duration) -> Pin<Box<dyn Future<Output = ()> + Send>> {
3256    // Note: This requires a runtime instance, but the trait is static.
3257    // For testing, use test_sleep() method directly or use CURRENT_TEST_RUNTIME thread-local.
3258    // This implementation panics - tests should use the instance methods.
3259    panic!("TestRuntime::sleep() cannot be called statically. Use runtime.test_sleep(duration) instead.")
3260  }
3261  
3262  fn interval(period: Duration) -> Pin<Box<dyn futures::Stream<Item = ()> + Send>> {
3263    panic!("TestRuntime::interval() cannot be called statically. Use runtime.test_interval(period) instead.")
3264  }
3265  
3266  fn spawn<F>(_future: F)
3267  where
3268    F: Future<Output = ()> + Send + 'static,
3269  {
3270    // TestRuntime doesn't spawn - tasks are driven by advance_by/advance_to
3271    // If you need background task support, use #[tokio::test] which provides real spawning
3272    panic!("TestRuntime::spawn() is not supported. Use advance_by() to drive futures.")
3273  }
3274}
3275
3276impl TestRuntime {
3277  /// Create a sleep future tied to this runtime instance.
3278  pub fn test_sleep(&self, duration: Duration) -> TestSleep {
3279    TestSleep::new(self.clone(), duration)
3280  }
3281  
3282  /// Create an interval stream tied to this runtime instance.
3283  pub fn test_interval(&self, period: Duration) -> TestInterval {
3284    TestInterval::new(self.clone(), period)
3285  }
3286}
3287// unnamed ends here
3288
3289// [[file:index.org::21293]]
3290impl TestRuntime {
3291  /// Run a test with controlled time, returning the result.
3292  /// 
3293  /// This is a convenience method that advances time in steps,
3294  /// useful for testing debounce/throttle behavior.
3295  pub async fn run_timed_test<T, F, Fut>(&self, steps: Vec<Duration>, mut f: F) -> T
3296  where
3297    F: FnMut() -> Fut,
3298    Fut: Future<Output = T>,
3299  {
3300    for step in steps {
3301      self.advance_by(step).await;
3302    }
3303    f().await
3304  }
3305  
3306  /// Assert that a future completes within a virtual time budget.
3307  pub async fn assert_completes_within<T, Fut>(&self, timeout: Duration, fut: Fut) -> T
3308  where
3309    Fut: Future<Output = T>,
3310  {
3311    use futures::future::{select, Either};
3312    
3313    let timeout_fut = self.test_sleep(timeout);
3314    futures::pin_mut!(fut);
3315    futures::pin_mut!(timeout_fut);
3316    
3317    // First poll: check if fut is ready
3318    match futures::future::select(fut, timeout_fut).await {
3319      Either::Left((result, _)) => result,
3320      Either::Right(_) => panic!("Future did not complete within {:?}", timeout),
3321    }
3322  }
3323}
3324// unnamed ends here
3325
3326// [[file:index.org::21334]]
3327#[cfg(test)]
3328mod test_runtime_tests {
3329  use super::*;
3330  
3331  #[tokio::test]
3332  async fn test_virtual_sleep() {
3333    let runtime = TestRuntime::new();
3334    
3335    // Initially at time zero
3336    assert_eq!(runtime.now(), Duration::ZERO);
3337    
3338    // Create a sleep future
3339    let sleep = runtime.test_sleep(Duration::from_millis(100));
3340    futures::pin_mut!(sleep);
3341    
3342    // Poll it - should be pending
3343    let waker = futures::task::noop_waker();
3344    let mut cx = std::task::Context::from_waker(&waker);
3345    assert!(Pin::new(&mut sleep).poll(&mut cx).is_pending());
3346    
3347    // Advance time past the sleep target
3348    runtime.advance_by(Duration::from_millis(150)).await;
3349    
3350    // Now it should be ready
3351    assert_eq!(runtime.now(), Duration::from_millis(150));
3352  }
3353  
3354  #[tokio::test]
3355  async fn test_virtual_interval() {
3356    let runtime = TestRuntime::new();
3357    let mut interval = runtime.test_interval(Duration::from_millis(100));
3358    
3359    // Advance to first tick
3360    runtime.advance_by(Duration::from_millis(100)).await;
3361    assert_eq!(interval.next().await, Some(()));
3362    
3363    // Advance to second tick
3364    runtime.advance_by(Duration::from_millis(100)).await;
3365    assert_eq!(interval.next().await, Some(()));
3366    
3367    // Verify time
3368    assert_eq!(runtime.now(), Duration::from_millis(200));
3369  }
3370  
3371  #[tokio::test]
3372  async fn test_multiple_timers() {
3373    let runtime = TestRuntime::new();
3374    
3375    // Create multiple sleeps
3376    let sleep1 = runtime.test_sleep(Duration::from_millis(50));
3377    let sleep2 = runtime.test_sleep(Duration::from_millis(100));
3378    let sleep3 = runtime.test_sleep(Duration::from_millis(150));
3379    
3380    futures::pin_mut!(sleep1);
3381    futures::pin_mut!(sleep2);
3382    futures::pin_mut!(sleep3);
3383    
3384    let waker = futures::task::noop_waker();
3385    let mut cx = std::task::Context::from_waker(&waker);
3386    
3387    // All pending initially
3388    assert!(Pin::new(&mut sleep1).poll(&mut cx).is_pending());
3389    assert!(Pin::new(&mut sleep2).poll(&mut cx).is_pending());
3390    assert!(Pin::new(&mut sleep3).poll(&mut cx).is_pending());
3391    
3392    // Advance to 75ms - only first should fire
3393    runtime.advance_to(Duration::from_millis(75)).await;
3394    assert!(Pin::new(&mut sleep1).poll(&mut cx).is_ready());
3395    assert!(Pin::new(&mut sleep2).poll(&mut cx).is_pending());
3396    assert!(Pin::new(&mut sleep3).poll(&mut cx).is_pending());
3397    
3398    // Advance to 125ms - second should fire
3399    runtime.advance_to(Duration::from_millis(125)).await;
3400    assert!(Pin::new(&mut sleep2).poll(&mut cx).is_ready());
3401    assert!(Pin::new(&mut sleep3).poll(&mut cx).is_pending());
3402    
3403    // Advance to 200ms - third should fire
3404    runtime.advance_to(Duration::from_millis(200)).await;
3405    assert!(Pin::new(&mut sleep3).poll(&mut cx).is_ready());
3406  }
3407}
3408// unnamed ends here
3409
3410// [[file:index.org::21423]]
3411/// Delay operator that works with TestRuntime.
3412/// 
3413/// Unlike the generic `delay_with`, this takes a runtime instance
3414/// so virtual time can be controlled.
3415pub fn delay_test<T, S>(
3416  runtime: TestRuntime,
3417  duration: Duration,
3418  source: S,
3419) -> impl futures::Stream<Item = T>
3420where
3421  T: Send + 'static,
3422  S: futures::Stream<Item = T> + Send + 'static,
3423{
3424  stream! {
3425    futures::pin_mut!(source);
3426    while let Some(value) = source.next().await {
3427      runtime.test_sleep(duration).await;
3428      yield value;
3429    }
3430  }
3431}
3432
3433/// Periodic stream using TestRuntime.
3434pub fn periodic_test(runtime: TestRuntime, period: Duration) -> impl futures::Stream<Item = u64> {
3435  stream! {
3436    let mut count = 0u64;
3437    let mut interval = runtime.test_interval(period);
3438    loop {
3439      interval.next().await;
3440      yield count;
3441      count += 1;
3442    }
3443  }
3444}
3445// unnamed ends here