1use std::error::Error;
21use std::future::{Future, pending};
22use std::pin::Pin;
23use std::sync::Arc;
24use std::sync::atomic::{AtomicBool, Ordering};
25use std::time::{Duration, Instant};
26
27use async_stream::stream;
28use futures::{Stream, StreamExt, FutureExt};
29use futures::channel::mpsc;
30use futures::lock::Mutex;
31use futures::stream;
32
33pub type BoxedStream<T> = Pin<Box<dyn Stream<Item = T> + Send>>;
35
36pub trait Runtime: Clone + Send + Sync + 'static {
43 fn sleep(duration: Duration) -> Pin<Box<dyn Future<Output = ()> + Send>>;
45
46 fn interval(period: Duration) -> Pin<Box<dyn futures::Stream<Item = ()> + Send>>;
48
49 fn spawn<F>(future: F)
51 where
52 F: Future<Output = ()> + Send + 'static;
53}
54#[cfg(feature = "tokio-runtime")]
59#[derive(Clone)]
60pub struct TokioRuntime;
61
62#[cfg(feature = "tokio-runtime")]
63impl Runtime for TokioRuntime {
64 fn sleep(duration: Duration) -> Pin<Box<dyn Future<Output = ()> + Send>> {
65 Box::pin(tokio::time::sleep(duration))
66 }
67
68 fn interval(period: Duration) -> Pin<Box<dyn futures::Stream<Item = ()> + Send>> {
69 use async_stream::stream;
70 Box::pin(stream! {
71 let mut interval = tokio::time::interval(period);
72 loop {
73 interval.tick().await;
74 yield ();
75 }
76 })
77 }
78
79 fn spawn<F>(future: F)
80 where
81 F: Future<Output = ()> + Send + 'static,
82 {
83 tokio::spawn(future);
84 }
85}
86
87#[cfg(feature = "smol-runtime")]
89#[derive(Clone)]
90pub struct SmolRuntime;
91
92#[cfg(feature = "smol-runtime")]
93impl Runtime for SmolRuntime {
94 fn sleep(duration: Duration) -> Pin<Box<dyn Future<Output = ()> + Send>> {
95 Box::pin(async_io::Timer::after(duration))
96 }
97
98 fn interval(period: Duration) -> Pin<Box<dyn futures::Stream<Item = ()> + Send>> {
99 use async_stream::stream;
100 Box::pin(stream! {
101 loop {
102 async_io::Timer::after(period).await;
103 yield ();
104 }
105 })
106 }
107
108 fn spawn<F>(future: F)
109 where
110 F: Future<Output = ()> + Send + 'static,
111 {
112 smol::spawn(future).detach();
113 }
114}
115
116#[cfg(feature = "async-std-runtime")]
118#[derive(Clone)]
119pub struct AsyncStdRuntime;
120
121#[cfg(feature = "async-std-runtime")]
122impl Runtime for AsyncStdRuntime {
123 fn sleep(duration: Duration) -> Pin<Box<dyn Future<Output = ()> + Send>> {
124 Box::pin(async_std::task::sleep(duration))
125 }
126
127 fn interval(period: Duration) -> Pin<Box<dyn futures::Stream<Item = ()> + Send>> {
128 use async_stream::stream;
129 Box::pin(stream! {
130 loop {
131 async_std::task::sleep(period).await;
132 yield ();
133 }
134 })
135 }
136
137 fn spawn<F>(future: F)
138 where
139 F: Future<Output = ()> + Send + 'static,
140 {
141 async_std::task::spawn(future);
142 }
143}
144pub fn just<T>(value: T) -> impl Stream<Item = T> {
149 stream! { yield value; }
150}
151
152pub fn of<T: Clone>(value: T) -> impl Stream<Item = T> {
154 just(value)
155}
156#[cfg(test)]
160mod just_tests {
161 use super::*;
162 #[tokio::test]
163 async fn test_just_emits_single_value() {
164 let stream = just(42);
165 let values: Vec<_> = stream.collect().await;
166 assert_eq!(values, vec![42]);
167 }
168 #[tokio::test]
169 async fn test_just_with_string() {
170 let stream = just("hello".to_string());
171 let values: Vec<_> = stream.collect().await;
172 assert_eq!(values, vec!["hello"]);
173 }
174 #[tokio::test]
175 async fn test_of_alias() {
176 let stream = of(99);
177 let values: Vec<_> = stream.collect().await;
178 assert_eq!(values, vec![99]);
179 }
180}
181pub fn from_future<T, F: Future<Output = T>>(future: F) -> impl Stream<Item = T> {
187 stream! {
188 let value = future.await;
189 yield value;
190 }
191}
192#[cfg(test)]
196mod from_future_tests {
197 use super::*;
198 #[tokio::test]
199 async fn test_from_future_emits_resolved_value() {
200 let future = async { 42 };
201 let stream = from_future(future);
202 let values: Vec<_> = stream.collect().await;
203 assert_eq!(values, vec![42]);
204 }
205 #[tokio::test]
206 async fn test_from_future_with_async_computation() {
207 let future = async {
208 tokio::time::sleep(std::time::Duration::from_millis(1)).await;
209 "computed".to_string()
210 };
211 let stream = from_future(future);
212 let values: Vec<_> = stream.collect().await;
213 assert_eq!(values, vec!["computed"]);
214 }
215}
216pub use futures::stream::iter as from_iter;
228#[cfg(test)]
232mod from_iter_tests {
233 use super::*;
234 #[tokio::test]
235 async fn test_from_iter_emits_all_values() {
236 let stream = from_iter(vec![1, 2, 3]);
237 let values: Vec<_> = stream.collect().await;
238 assert_eq!(values, vec![1, 2, 3]);
239 }
240 #[tokio::test]
241 async fn test_from_iter_handles_empty() {
242 let stream = from_iter(Vec::<i32>::new());
243 let values: Vec<_> = stream.collect().await;
244 assert!(values.is_empty());
245 }
246 #[tokio::test]
247 async fn test_from_iter_with_range() {
248 let stream = from_iter(0..5);
249 let values: Vec<_> = stream.collect().await;
250 assert_eq!(values, vec![0, 1, 2, 3, 4]);
251 }
252}
253pub fn periodic<R: Runtime>(interval_ms: u64) -> impl Stream<Item = ()> {
259 let interval_stream = R::interval(Duration::from_millis(interval_ms));
260 stream! {
261 futures::pin_mut!(interval_stream);
262 loop {
263 interval_stream.next().await;
264 yield ();
265 }
266 }
267}
268
269pub fn periodic_with_timer<T, F>(
272 interval_ms: u64,
273 make_timer: impl Fn(Duration) -> F + Send + 'static,
274) -> impl Stream<Item = ()>
275where
276 F: std::future::Future<Output = ()> + Send,
277{
278 stream! {
279 let duration = Duration::from_millis(interval_ms);
280 loop {
281 make_timer(duration).await;
282 yield ();
283 }
284 }
285}
286#[cfg(test)]
290mod periodic_tests {
291 }
305pub use futures::stream::empty;
317#[cfg(test)]
321mod empty_tests {
322 use super::*;
323 #[tokio::test]
324 async fn test_empty_yields_nothing() {
325 let values: Vec<i32> = empty::<i32>().collect().await;
326 assert!(values.is_empty());
327 }
328 #[tokio::test]
329 async fn test_empty_completes_immediately() {
330 let stream = empty::<String>();
331 futures::pin_mut!(stream);
332 assert!(stream.next().await.is_none());
333 }
334}
335pub use futures::stream::pending as never;
347#[cfg(test)]
351mod never_tests {
352 use super::*;
353 use std::time::Duration;
354
355 #[tokio::test]
356 async fn test_never_does_not_complete() {
357 let never_stream = never::<i32>();
360 futures::pin_mut!(never_stream);
361
362 let timeout = tokio::time::sleep(Duration::from_millis(10));
363 futures::pin_mut!(timeout);
364
365 let result = futures::future::select(never_stream.next(), timeout).await;
367 match result {
368 futures::future::Either::Right(_) => {} futures::future::Either::Left(_) => panic!("never() should not emit"),
370 }
371 }
372}
373pub fn iterate<T: Clone, F: Fn(T) -> T>(seed: T, f: F) -> impl Stream<Item = T> {
378 stream! {
379 let mut current = seed;
380 loop {
381 yield current.clone();
382 current = f(current);
383 }
384 }
385}
386#[cfg(test)]
390mod iterate_tests {
391 use super::*;
392 #[tokio::test]
393 async fn test_iterate_generates_sequence() {
394 let stream = iterate(1, |x| x * 2);
395 let values: Vec<_> = stream.take(5).collect().await;
396 assert_eq!(values, vec![1, 2, 4, 8, 16]);
397 }
398 #[tokio::test]
399 async fn test_iterate_with_addition() {
400 let stream = iterate(0, |x| x + 1);
401 let values: Vec<_> = stream.take(4).collect().await;
402 assert_eq!(values, vec![0, 1, 2, 3]);
403 }
404 #[tokio::test]
405 async fn test_iterate_with_strings() {
406 let stream = iterate("a".to_string(), |s| s.clone() + "a");
407 let values: Vec<_> = stream.take(3).collect().await;
408 assert_eq!(values, vec!["a", "aa", "aaa"]);
409 }
410}
411pub struct UnfoldResult<T, S> {
415 pub value: T,
416 pub next_seed: S,
417 pub done: bool,
418}
419
420pub fn unfold<T, S: Clone, F>(seed: S, f: F) -> impl Stream<Item = T>
422where
423 F: Fn(S) -> UnfoldResult<T, S> + Clone + Send + 'static,
424{
425 let f = f.clone();
426 futures::stream::unfold(seed, move |state| {
427 let f = f.clone();
428 async move {
429 let result = f(state);
430 if result.done {
431 None
432 } else {
433 Some((result.value, result.next_seed))
434 }
435 }
436 })
437}
438#[cfg(test)]
442mod unfold_tests {
443 use super::*;
444 #[tokio::test]
445 async fn test_unfold_generates_values() {
446 let stream = unfold(1, |n| UnfoldResult {
447 value: n,
448 next_seed: n + 1,
449 done: n > 3,
450 });
451 let values: Vec<_> = stream.collect().await;
452 assert_eq!(values, vec![1, 2, 3]);
453 }
454
455 #[tokio::test]
456 async fn test_unfold_stops_immediately_when_done() {
457 let stream = unfold(0, |_| UnfoldResult {
458 value: 999,
459 next_seed: 0,
460 done: true,
461 });
462 let values: Vec<i32> = stream.collect().await;
463 assert!(values.is_empty());
464 }
465
466 #[tokio::test]
467 async fn test_unfold_with_different_types() {
468 let stream = unfold(0, |n| UnfoldResult {
470 value: format!("item-{}", n),
471 next_seed: n + 1,
472 done: n >= 2,
473 });
474 let values: Vec<_> = stream.collect().await;
475 assert_eq!(values, vec!["item-0", "item-1"]);
476 }
477}
478pub fn start_with<T: Clone, S: Stream<Item = T>>(value: T, s: S) -> impl Stream<Item = T> {
483 stream! {
484 yield value;
485 futures::pin_mut!(s);
486 while let Some(item) = s.next().await { yield item; }
487 }
488}
489#[cfg(test)]
493mod start_with_tests {
494 use super::*;
495 #[tokio::test]
496 async fn test_start_with_prepends_value() {
497 let source = futures::stream::iter(vec![1, 2, 3]);
498 let result = start_with(0, source);
499 let values: Vec<_> = result.collect().await;
500 assert_eq!(values, vec![0, 1, 2, 3]);
501 }
502
503 #[tokio::test]
504 async fn test_start_with_on_empty_stream() {
505 let source = stream::empty::<i32>();
506 let result = start_with(42, source);
507 let values: Vec<_> = result.collect().await;
508 assert_eq!(values, vec![42]);
509 }
510}
511pub fn concat<T, S: Stream<Item = T>>(streams: Vec<S>) -> impl Stream<Item = T> {
516 stream! {
517 for s in streams {
518 futures::pin_mut!(s);
519 while let Some(item) = s.next().await { yield item; }
520 }
521 }
522}
523
524pub fn concat2<T, S1: Stream<Item = T>, S2: Stream<Item = T>>(s1: S1, s2: S2) -> impl Stream<Item = T> {
526 stream! {
527 futures::pin_mut!(s1);
528 futures::pin_mut!(s2);
529 while let Some(item) = s1.next().await { yield item; }
530 while let Some(item) = s2.next().await { yield item; }
531 }
532}
533#[cfg(test)]
537mod concat_tests {
538 use super::*;
539 #[tokio::test]
540 async fn test_concat_joins_streams() {
541 let s1 = futures::stream::iter(vec![1, 2]);
542 let s2 = futures::stream::iter(vec![3, 4]);
543 let result = concat2(s1, s2);
544 let values: Vec<_> = result.collect().await;
545 assert_eq!(values, vec![1, 2, 3, 4]);
546 }
547
548 #[tokio::test]
549 async fn test_concat_with_empty_first() {
550 let s1 = stream::empty::<i32>();
551 let s2 = futures::stream::iter(vec![5, 6]);
552 let result = concat2(s1, s2);
553 let values: Vec<_> = result.collect().await;
554 assert_eq!(values, vec![5, 6]);
555 }
556
557 #[tokio::test]
558 async fn test_concat_vec_of_streams() {
559 let streams = vec![
560 futures::stream::iter(vec![1]),
561 futures::stream::iter(vec![2, 3]),
562 futures::stream::iter(vec![4]),
563 ];
564 }
568}
569pub fn from_channel<T>(mut rx: mpsc::UnboundedReceiver<T>) -> impl Stream<Item = T> {
577 stream! { while let Some(item) = rx.next().await { yield item; } }
578}
579
580pub fn from_bounded_channel<T>(mut rx: mpsc::Receiver<T>) -> impl Stream<Item = T> {
582 stream! { while let Some(item) = rx.next().await { yield item; } }
583}
584
585#[cfg(test)]
600mod channel_tests {
601 }
605macro_rules! pipe {
616 ($initial:expr $(, $fn:expr)*) => {{
617 let mut result = $initial;
618 $(result = $fn(result);)*
619 result
620 }};
621}
622
623pub fn map<T, U, S, F>(s: S, f: F) -> impl Stream<Item = U>
646where
647 S: Stream<Item = T>,
648 F: Fn(T) -> U,
649{
650 stream! {
651 futures::pin_mut!(s);
652 while let Some(item) = s.next().await { yield f(item); }
653 }
654}
655#[cfg(test)]
659mod map_tests {
660 use super::*;
661 #[tokio::test]
662 async fn test_map_transforms_values() {
663 let source = futures::stream::iter(vec![1, 2, 3]);
664 let result = map(source, |x| x * 2);
665 let values: Vec<_> = result.collect().await;
666 assert_eq!(values, vec![2, 4, 6]);
667 }
668
669 #[tokio::test]
670 async fn test_map_with_type_change() {
671 let source = futures::stream::iter(vec![1, 2, 3]);
672 let result = map(source, |x| format!("num-{}", x));
673 let values: Vec<_> = result.collect().await;
674 assert_eq!(values, vec!["num-1", "num-2", "num-3"]);
675 }
676
677 #[tokio::test]
678 async fn test_map_empty_stream() {
679 let source = stream::empty::<i32>();
680 let result = map(source, |x| x * 2);
681 let values: Vec<_> = result.collect().await;
682 assert!(values.is_empty());
683 }
684}
685pub fn constant<T, U: Clone, S: Stream<Item = T>>(value: U, s: S) -> impl Stream<Item = U> {
692 stream! {
693 futures::pin_mut!(s);
694 while let Some(_) = s.next().await { yield value.clone(); }
695 }
696}
697#[cfg(test)]
701mod constant_tests {
702 use super::*;
703 #[tokio::test]
704 async fn test_constant_replaces_all_values() {
705 let source = futures::stream::iter(vec![1, 2, 3]);
706 let result = constant("x", source);
707 let values: Vec<_> = result.collect().await;
708 assert_eq!(values, vec!["x", "x", "x"]);
709 }
710
711 #[tokio::test]
712 async fn test_constant_empty_stream() {
713 let source = stream::empty::<i32>();
714 let result = constant(42, source);
715 let values: Vec<_> = result.collect().await;
716 assert!(values.is_empty());
717 }
718}
719pub fn scan<T, U: Clone, S, F>(accumulator: F, seed: U, s: S) -> impl Stream<Item = U>
724where
725 S: Stream<Item = T>,
726 F: Fn(U, T) -> U,
727{
728 stream! {
729 let mut acc = seed.clone();
730 yield acc.clone();
731 futures::pin_mut!(s);
732 while let Some(item) = s.next().await {
733 acc = accumulator(acc, item);
734 yield acc.clone();
735 }
736 }
737}
738#[cfg(test)]
742mod scan_tests {
743 use super::*;
744 #[tokio::test]
745 async fn test_scan_accumulates_with_seed() {
746 let source = futures::stream::iter(vec![1, 2, 3]);
747 let result = scan(|acc, x| acc + x, 0, source);
748 let values: Vec<_> = result.collect().await;
749 assert_eq!(values, vec![0, 1, 3, 6]);
750 }
751
752 #[tokio::test]
753 async fn test_scan_product() {
754 let source = futures::stream::iter(vec![2, 3, 4]);
755 let result = scan(|acc, x| acc * x, 1, source);
756 let values: Vec<_> = result.collect().await;
757 assert_eq!(values, vec![1, 2, 6, 24]);
758 }
759
760 #[tokio::test]
761 async fn test_scan_empty_stream() {
762 let source = stream::empty::<i32>();
763 let result = scan(|acc, x| acc + x, 100, source);
764 let values: Vec<_> = result.collect().await;
765 assert_eq!(values, vec![100]); }
767}
768pub fn tap<T: Clone, S, F>(side_effect: F, s: S) -> impl Stream<Item = T>
774where
775 S: Stream<Item = T>,
776 F: Fn(&T),
777{
778 stream! {
779 futures::pin_mut!(s);
780 while let Some(item) = s.next().await {
781 side_effect(&item);
782 yield item;
783 }
784 }
785}
786
787pub fn tap_spawn<R, T, S, F, Fut>(
789 side_effect: F,
790 s: S,
791) -> impl Stream<Item = T>
792where
793 R: Runtime,
794 T: Clone + Send + 'static,
795 S: Stream<Item = T>,
796 F: Fn(T) -> Fut + Clone + Send + 'static,
797 Fut: std::future::Future<Output = ()> + Send + 'static,
798{
799 stream! {
800 futures::pin_mut!(s);
801 while let Some(item) = s.next().await {
802 let f = side_effect.clone();
803 let item_clone = item.clone();
804 R::spawn(async move { f(item_clone).await });
805 yield item;
806 }
807 }
808}
809#[cfg(test)]
813mod tap_runtime_tests {
814 }
818pub fn await_tap<T: Clone, S, F, Fut>(side_effect: F, s: S) -> impl Stream<Item = T>
823where
824 S: Stream<Item = T>,
825 F: Fn(T) -> Fut,
826 Fut: Future<Output = ()>,
827{
828 stream! {
829 futures::pin_mut!(s);
830 while let Some(item) = s.next().await {
831 side_effect(item.clone()).await;
832 yield item;
833 }
834 }
835}
836#[cfg(test)]
840mod await_tap_tests {
841 use super::*;
842 use std::sync::atomic::{AtomicUsize, Ordering};
843 use std::sync::Arc;
844
845 #[tokio::test]
846 async fn test_await_tap_executes_side_effect() {
847 let count = Arc::new(AtomicUsize::new(0));
848 let count_clone = count.clone();
849
850 let source = futures::stream::iter(vec![1, 2, 3]);
851 let tapped = await_tap(
852 move |_: i32| {
853 let c = count_clone.clone();
854 async move { c.fetch_add(1, Ordering::SeqCst); }
855 },
856 source,
857 );
858
859 let values: Vec<_> = tapped.collect().await;
860 assert_eq!(values, vec![1, 2, 3]);
861 assert_eq!(count.load(Ordering::SeqCst), 3);
862 }
863}
864pub fn continue_with<T, S1, S2, F>(f: F, s: S1) -> impl Stream<Item = T>
869where
870 S1: Stream<Item = T>,
871 S2: Stream<Item = T>,
872 F: FnOnce() -> S2,
873{
874 stream! {
875 futures::pin_mut!(s);
876 while let Some(item) = s.next().await { yield item; }
877 let s2 = f();
878 futures::pin_mut!(s2);
879 while let Some(item) = s2.next().await { yield item; }
880 }
881}
882#[cfg(test)]
886mod continue_with_tests {
887 use super::*;
888
889 #[tokio::test]
890 async fn test_continue_with_appends() {
891 let first = futures::stream::iter(vec![1, 2]);
892 let second = || futures::stream::iter(vec![3, 4]);
893
894 let values: Vec<_> = continue_with(second, first).collect().await;
895 assert_eq!(values, vec![1, 2, 3, 4]);
896 }
897
898 #[tokio::test]
899 async fn test_continue_with_lazy() {
900 use std::sync::atomic::{AtomicBool, Ordering};
901 use std::sync::Arc;
902
903 let called = Arc::new(AtomicBool::new(false));
904 let called_clone = called.clone();
905
906 let first = futures::stream::iter(vec![1]);
907 let second = move || {
908 called_clone.store(true, Ordering::SeqCst);
909 futures::stream::iter(vec![2])
910 };
911
912 let mut stream = continue_with(second, first);
913 futures::pin_mut!(stream);
914
915 assert_eq!(stream.next().await, Some(1));
917 assert_eq!(stream.next().await, Some(2));
919 assert!(called.load(Ordering::SeqCst));
920 }
921}
922pub fn concat_all<T, Inner, Outer>(outer: Outer) -> impl Stream<Item = T>
930where
931 Inner: Stream<Item = T>,
932 Outer: Stream<Item = Inner>,
933{
934 stream! {
935 futures::pin_mut!(outer);
936 while let Some(inner) = outer.next().await {
937 futures::pin_mut!(inner);
938 while let Some(item) = inner.next().await { yield item; }
939 }
940 }
941}
942#[cfg(test)]
946mod concat_all_tests {
947 use super::*;
948
949 #[tokio::test]
950 async fn test_concat_all_flattens() {
951 let s1 = futures::stream::iter(vec![1, 2]);
952 let s2 = futures::stream::iter(vec![3, 4]);
953 let outer = futures::stream::iter(vec![s1, s2]);
954
955 let values: Vec<_> = concat_all(outer).collect().await;
956 assert_eq!(values, vec![1, 2, 3, 4]);
957 }
958}
959pub fn concat_map<T, U, S, Inner, F>(f: F, s: S) -> impl Stream<Item = U>
964where
965 S: Stream<Item = T>,
966 Inner: Stream<Item = U>,
967 F: Fn(T) -> Inner,
968{
969 stream! {
970 futures::pin_mut!(s);
971 while let Some(item) = s.next().await {
972 let inner = f(item);
973 futures::pin_mut!(inner);
974 while let Some(inner_item) = inner.next().await { yield inner_item; }
975 }
976 }
977}
978#[cfg(test)]
982mod concat_map_tests {
983 use super::*;
984
985 #[tokio::test]
986 async fn test_concat_map_sequential() {
987 let source = futures::stream::iter(vec![1, 2]);
988 let result = concat_map(|x| futures::stream::iter(vec![x * 10, x * 10 + 1]), source);
989
990 let values: Vec<_> = result.collect().await;
991 assert_eq!(values, vec![10, 11, 20, 21]);
992 }
993}
994pub fn filter<T, S, P>(predicate: P, s: S) -> impl Stream<Item = T>
1009where
1010 S: Stream<Item = T>,
1011 P: Fn(&T) -> bool,
1012{
1013 stream! {
1014 futures::pin_mut!(s);
1015 while let Some(item) = s.next().await { if predicate(&item) { yield item; } }
1016 }
1017}
1018
1019pub fn filter_async<T, S, P, Fut>(predicate: P, s: S) -> impl Stream<Item = T>
1021where
1022 S: Stream<Item = T>,
1023 P: Fn(&T) -> Fut,
1024 Fut: std::future::Future<Output = bool>,
1025{
1026 stream! {
1027 futures::pin_mut!(s);
1028 while let Some(item) = s.next().await { if predicate(&item).await { yield item; } }
1029 }
1030}
1031#[cfg(test)]
1035mod filter_tests {
1036 use super::*;
1037 #[tokio::test]
1038 async fn test_filter_keeps_matching() {
1039 let source = futures::stream::iter(vec![1, 2, 3, 4, 5]);
1040 let result = filter(|x| *x > 2, source);
1041 let values: Vec<_> = result.collect().await;
1042 assert_eq!(values, vec![3, 4, 5]);
1043 }
1044
1045 #[tokio::test]
1046 async fn test_filter_even_numbers() {
1047 let source = futures::stream::iter(vec![1, 2, 3, 4, 5, 6]);
1048 let result = filter(|x| x % 2 == 0, source);
1049 let values: Vec<_> = result.collect().await;
1050 assert_eq!(values, vec![2, 4, 6]);
1051 }
1052
1053 #[tokio::test]
1054 async fn test_filter_empty_result() {
1055 let source = futures::stream::iter(vec![1, 2, 3]);
1056 let result = filter(|x| *x > 100, source);
1057 let values: Vec<_> = result.collect().await;
1058 assert!(values.is_empty());
1059 }
1060}
1061pub fn skip_repeats_with<T: Clone, S, F>(equals: F, s: S) -> impl Stream<Item = T>
1066where
1067 S: Stream<Item = T>,
1068 F: Fn(&T, &T) -> bool,
1069{
1070 stream! {
1071 futures::pin_mut!(s);
1072 let mut last: Option<T> = None;
1073 while let Some(item) = s.next().await {
1074 let should_yield = match &last {
1075 None => true,
1076 Some(prev) => !equals(&item, prev),
1077 };
1078 if should_yield {
1079 last = Some(item.clone());
1080 yield item;
1081 }
1082 }
1083 }
1084}
1085
1086pub fn skip_repeats<T: Clone + PartialEq, S>(s: S) -> impl Stream<Item = T>
1088where
1089 S: Stream<Item = T>,
1090{
1091 skip_repeats_with(|a, b| a == b, s)
1092}
1093#[cfg(test)]
1097mod skip_repeats_tests {
1098 use super::*;
1099 #[tokio::test]
1100 async fn test_skip_repeats() {
1101 let source = futures::stream::iter(vec![1, 1, 2, 2, 3, 1, 1]);
1102 let result = skip_repeats(source);
1103 let values: Vec<_> = result.collect().await;
1104 assert_eq!(values, vec![1, 2, 3, 1]);
1105 }
1106
1107 #[tokio::test]
1108 async fn test_skip_repeats_with_custom_eq() {
1109 let source = futures::stream::iter(vec!["apple", "ant", "banana", "berry"]);
1111 let result = skip_repeats_with(
1112 |a: &&str, b: &&str| a.chars().next() == b.chars().next(),
1113 source
1114 );
1115 let values: Vec<_> = result.collect().await;
1116 assert_eq!(values, vec!["apple", "banana"]);
1117 }
1118}
1119pub fn take<T, S: Stream<Item = T>>(n: usize, s: S) -> impl Stream<Item = T> {
1134 stream! {
1135 futures::pin_mut!(s);
1136 let mut count = 0;
1137 while let Some(item) = s.next().await {
1138 if count < n {
1139 yield item;
1140 count += 1;
1141 } else {
1142 break;
1143 }
1144 }
1145 }
1146}
1147#[cfg(test)]
1151mod take_tests {
1152 use super::*;
1153 #[tokio::test]
1154 async fn test_take_first_n() {
1155 let source = futures::stream::iter(vec![1, 2, 3, 4, 5]);
1156 let result = take(2, source);
1157 let values: Vec<_> = result.collect().await;
1158 assert_eq!(values, vec![1, 2]);
1159 }
1160
1161 #[tokio::test]
1162 async fn test_take_more_than_available() {
1163 let source = futures::stream::iter(vec![1, 2]);
1164 let result = take(10, source);
1165 let values: Vec<_> = result.collect().await;
1166 assert_eq!(values, vec![1, 2]);
1167 }
1168
1169 #[tokio::test]
1170 async fn test_take_zero() {
1171 let source = futures::stream::iter(vec![1, 2, 3]);
1172 let result = take(0, source);
1173 let values: Vec<_> = result.collect().await;
1174 assert!(values.is_empty());
1175 }
1176}
1177pub fn skip<T, S: Stream<Item = T>>(n: usize, s: S) -> impl Stream<Item = T> {
1190 stream! {
1191 futures::pin_mut!(s);
1192 let mut count = 0;
1193 while let Some(item) = s.next().await {
1194 if count >= n { yield item; }
1195 count += 1;
1196 }
1197 }
1198}
1199#[cfg(test)]
1203mod skip_tests {
1204 use super::*;
1205
1206 #[tokio::test]
1207 async fn test_skip_first_n() {
1208 let source = futures::stream::iter(vec![1, 2, 3, 4, 5]);
1209 let values: Vec<_> = skip(2, source).collect().await;
1210 assert_eq!(values, vec![3, 4, 5]);
1211 }
1212
1213 #[tokio::test]
1214 async fn test_skip_zero() {
1215 let source = futures::stream::iter(vec![1, 2, 3]);
1216 let values: Vec<_> = skip(0, source).collect().await;
1217 assert_eq!(values, vec![1, 2, 3]);
1218 }
1219
1220 #[tokio::test]
1221 async fn test_skip_more_than_available() {
1222 let source = futures::stream::iter(vec![1, 2]);
1223 let values: Vec<_> = skip(5, source).collect().await;
1224 assert!(values.is_empty());
1225 }
1226}
1227pub fn slice<T, S: Stream<Item = T>>(start: usize, end: usize, s: S) -> impl Stream<Item = T> {
1232 stream! {
1233 futures::pin_mut!(s);
1234 let mut index = 0;
1235 while let Some(item) = s.next().await {
1236 if index >= start && index < end { yield item; }
1237 index += 1;
1238 if index >= end { break; }
1239 }
1240 }
1241}
1242
1243#[cfg(test)]
1249mod slice_tests {
1250 use super::*;
1251 #[tokio::test]
1252 async fn test_slice() {
1253 let source = futures::stream::iter(vec![0, 1, 2, 3, 4, 5]);
1254 let values: Vec<_> = slice(2, 5, source).collect().await;
1255 assert_eq!(values, vec![2, 3, 4]);
1256 }
1257
1258 #[tokio::test]
1259 async fn test_slice_empty_range() {
1260 let source = futures::stream::iter(vec![0, 1, 2, 3, 4]);
1261 let values: Vec<_> = slice(2, 2, source).collect().await;
1262 assert_eq!(values, Vec::<i32>::new());
1263 }
1264
1265 #[tokio::test]
1266 async fn test_slice_beyond_length() {
1267 let source = futures::stream::iter(vec![0, 1, 2]);
1268 let values: Vec<_> = slice(1, 10, source).collect().await;
1269 assert_eq!(values, vec![1, 2]);
1270 }
1271}
1272pub fn take_while<T, S, P>(predicate: P, s: S) -> impl Stream<Item = T>
1285where
1286 S: Stream<Item = T>,
1287 P: Fn(&T) -> bool,
1288{
1289 stream! {
1290 futures::pin_mut!(s);
1291 while let Some(item) = s.next().await {
1292 if predicate(&item) { yield item; }
1293 else { break; }
1294 }
1295 }
1296}
1297#[cfg(test)]
1301mod take_while_tests {
1302 use super::*;
1303 #[tokio::test]
1304 async fn test_take_while() {
1305 let source = futures::stream::iter(vec![1, 2, 3, 4, 2, 1]);
1306 let values: Vec<_> = take_while(|x| *x < 4, source).collect().await;
1307 assert_eq!(values, vec![1, 2, 3]);
1308 }
1309
1310 #[tokio::test]
1311 async fn test_take_while_all_pass() {
1312 let source = futures::stream::iter(vec![1, 2, 3]);
1313 let values: Vec<_> = take_while(|_x| true, source).collect().await;
1314 assert_eq!(values, vec![1, 2, 3]);
1315 }
1316
1317 #[tokio::test]
1318 async fn test_take_while_none_pass() {
1319 let source = futures::stream::iter(vec![1, 2, 3]);
1320 let values: Vec<_> = take_while(|_x| false, source).collect().await;
1321 assert_eq!(values, Vec::<i32>::new());
1322 }
1323}
1324pub fn skip_while<T, S, P>(predicate: P, s: S) -> impl Stream<Item = T>
1337where
1338 S: Stream<Item = T>,
1339 P: Fn(&T) -> bool,
1340{
1341 stream! {
1342 futures::pin_mut!(s);
1343 let mut skipping = true;
1344 while let Some(item) = s.next().await {
1345 if skipping && !predicate(&item) { skipping = false; }
1346 if !skipping { yield item; }
1347 }
1348 }
1349}
1350#[cfg(test)]
1354mod skip_while_tests {
1355 use super::*;
1356 #[tokio::test]
1357 async fn test_skip_while() {
1358 let source = futures::stream::iter(vec![1, 2, 3, 4, 2, 1]);
1359 let values: Vec<_> = skip_while(|x| *x < 3, source).collect().await;
1360 assert_eq!(values, vec![3, 4, 2, 1]);
1361 }
1362
1363 #[tokio::test]
1364 async fn test_skip_while_all_fail() {
1365 let source = futures::stream::iter(vec![1, 2, 3]);
1366 let values: Vec<_> = skip_while(|_x| false, source).collect().await;
1367 assert_eq!(values, vec![1, 2, 3]);
1368 }
1369
1370 #[tokio::test]
1371 async fn test_skip_while_all_pass() {
1372 let source = futures::stream::iter(vec![1, 2, 3]);
1373 let values: Vec<_> = skip_while(|_x| true, source).collect().await;
1374 assert_eq!(values, Vec::<i32>::new());
1375 }
1376}
1377pub fn take_until<T, S, P>(predicate: P, s: S) -> impl Stream<Item = T>
1382where
1383 S: Stream<Item = T>,
1384 P: Fn(&T) -> bool,
1385{
1386 stream! {
1387 futures::pin_mut!(s);
1388 while let Some(item) = s.next().await {
1389 if predicate(&item) { break; }
1390 yield item;
1391 }
1392 }
1393}
1394#[cfg(test)]
1398mod take_until_tests {
1399 use super::*;
1400 #[tokio::test]
1401 async fn test_take_until() {
1402 let source = futures::stream::iter(vec![1, 2, 3, 4, 5]);
1403 let values: Vec<_> = take_until(|x| *x == 3, source).collect().await;
1404 assert_eq!(values, vec![1, 2]);
1405 }
1406 #[tokio::test]
1407 async fn test_take_until_never_matches() {
1408 let source = futures::stream::iter(vec![1, 2, 3]);
1409 let values: Vec<_> = take_until(|_x| false, source).collect().await;
1410 assert_eq!(values, vec![1, 2, 3]);
1411 }
1412 #[tokio::test]
1413 async fn test_take_until_first_matches() {
1414 let source = futures::stream::iter(vec![1, 2, 3]);
1415 let values: Vec<_> = take_until(|x| *x == 1, source).collect().await;
1416 assert_eq!(values, Vec::<i32>::new());
1417 }
1418}
1419pub fn delay<R: Runtime, T, S: Stream<Item = T>>(ms: u64, s: S) -> impl Stream<Item = T> {
1426 stream! {
1427 futures::pin_mut!(s);
1428 let duration = Duration::from_millis(ms);
1429 while let Some(item) = s.next().await {
1430 R::sleep(duration).await;
1431 yield item;
1432 }
1433 }
1434}
1435
1436pub fn delay_with<T, S, F, Fut>(
1438 ms: u64,
1439 s: S,
1440 sleep_fn: F,
1441) -> impl Stream<Item = T>
1442where
1443 S: Stream<Item = T>,
1444 F: Fn(Duration) -> Fut + Clone,
1445 Fut: std::future::Future<Output = ()>,
1446{
1447 stream! {
1448 futures::pin_mut!(s);
1449 let duration = Duration::from_millis(ms);
1450 while let Some(item) = s.next().await {
1451 sleep_fn(duration).await;
1452 yield item;
1453 }
1454 }
1455}
1456#[cfg(test)]
1460mod delay_tests {
1461 use super::*;
1462 #[tokio::test]
1463 async fn test_delay_with() {
1464 let source = futures::stream::iter(vec![1, 2, 3]);
1465 let start = std::time::Instant::now();
1466 let values: Vec<_> = delay_with(
1467 10,
1468 source,
1469 |d| tokio::time::sleep(d),
1470 ).collect().await;
1471 let elapsed = start.elapsed();
1472 assert_eq!(values, vec![1, 2, 3]);
1473 assert!(elapsed >= Duration::from_millis(25)); }
1475
1476 #[tokio::test]
1477 async fn test_delay_empty_stream() {
1478 let source = futures::stream::iter(Vec::<i32>::new());
1479 let values: Vec<_> = delay_with(
1480 100,
1481 source,
1482 |d| tokio::time::sleep(d),
1483 ).collect().await;
1484 assert_eq!(values, Vec::<i32>::new());
1485 }
1486}
1487pub fn debounce<R, T, S>(ms: u64, s: S) -> impl Stream<Item = T>
1493where
1494 R: Runtime,
1495 T: Clone + Send + 'static,
1496 S: Stream<Item = T> + Send + 'static,
1497{
1498 debounce_with(ms, s, R::sleep)
1499}
1500
1501pub fn debounce_with<T, S, F, Fut>(ms: u64, s: S, sleep_fn: F) -> impl Stream<Item = T>
1504where
1505 T: Clone + Send + 'static,
1506 S: Stream<Item = T> + Send + 'static,
1507 F: Fn(Duration) -> Fut + Clone + Send + 'static,
1508 Fut: std::future::Future<Output = ()> + Send + 'static,
1509{
1510 stream! {
1511 let duration = Duration::from_millis(ms);
1512 let mut pending: Option<T> = None;
1513
1514 futures::pin_mut!(s);
1515
1516 while let Some(value) = s.next().await {
1517 pending = Some(value);
1518 loop {
1520 let timeout = sleep_fn(duration);
1521 futures::pin_mut!(timeout);
1522
1523 let next = s.next();
1525 futures::pin_mut!(next);
1526
1527 match futures::future::select(next, timeout).await {
1528 futures::future::Either::Left((Some(v), _)) => {
1529 pending = Some(v);
1531 }
1532 futures::future::Either::Left((None, _)) => {
1533 if let Some(v) = pending.take() {
1535 yield v;
1536 }
1537 return;
1538 }
1539 futures::future::Either::Right((_, _)) => {
1540 if let Some(v) = pending.take() {
1542 yield v;
1543 }
1544 break;
1545 }
1546 }
1547 }
1548 }
1549 }
1550}
1551#[cfg(test)]
1555mod debounce_tests {
1556 }
1567pub struct ThrottleOptions {
1571 pub leading: bool,
1572 pub trailing: bool,
1573}
1574
1575impl Default for ThrottleOptions {
1576 fn default() -> Self {
1577 Self { leading: true, trailing: true }
1578 }
1579}
1580
1581impl ThrottleOptions {
1582 pub fn leading_only() -> Self {
1583 Self { leading: true, trailing: false }
1584 }
1585 pub fn trailing_only() -> Self {
1586 Self { leading: false, trailing: true }
1587 }
1588}
1589
1590pub fn throttle<T: Clone, S: Stream<Item = T> + Unpin>(
1593 ms: u64,
1594 options: ThrottleOptions,
1595 mut s: S,
1596) -> impl Stream<Item = T> {
1597 stream! {
1598 let duration = Duration::from_millis(ms);
1599 let mut last_emit = Instant::now() - duration; let mut trailing_value: Option<T> = None;
1601 while let Some(item) = s.next().await {
1602 let now = Instant::now();
1603 let elapsed = now.duration_since(last_emit);
1604 if elapsed >= duration {
1605 if options.leading {
1606 yield item;
1607 last_emit = now;
1608 trailing_value = None;
1609 } else {
1610 trailing_value = Some(item);
1611 }
1612 } else {
1613 trailing_value = Some(item);
1614 }
1615 }
1616 if options.trailing {
1618 if let Some(value) = trailing_value { yield value; }
1619 }
1620 }
1621}
1622#[cfg(test)]
1626mod throttle_tests {
1627 use super::*;
1628 #[tokio::test]
1629 async fn test_throttle_leading() {
1630 let source = futures::stream::iter(vec![1, 2, 3, 4, 5]);
1631 let values: Vec<_> = throttle(
1632 100,
1633 ThrottleOptions::leading_only(),
1634 source,
1635 ).collect().await;
1636 assert!(!values.is_empty());
1638 assert_eq!(values[0], 1);
1639 }
1640
1641 #[tokio::test]
1642 async fn test_throttle_trailing() {
1643 let source = futures::stream::iter(vec![1, 2, 3]);
1644 let values: Vec<_> = throttle(
1645 100,
1646 ThrottleOptions::trailing_only(),
1647 source,
1648 ).collect().await;
1649 assert!(!values.is_empty());
1651 }
1652
1653 #[tokio::test]
1654 async fn test_throttle_empty() {
1655 let source = futures::stream::iter(Vec::<i32>::new());
1656 let values: Vec<_> = throttle(
1657 100,
1658 ThrottleOptions::default(),
1659 source,
1660 ).collect().await;
1661 assert_eq!(values, Vec::<i32>::new());
1662 }
1663}
1664pub fn recover_with<T, E, S, S2, F>(
1670 recover_fn: F,
1671 s: S,
1672) -> impl Stream<Item = T>
1673where
1674 S: Stream<Item = Result<T, E>>,
1675 S2: Stream<Item = T>,
1676 F: FnOnce(E) -> S2,
1677 E: Error,
1678{
1679 stream! {
1680 futures::pin_mut!(s);
1681 loop {
1682 match s.next().await {
1683 Some(Ok(item)) => yield item,
1684 Some(Err(e)) => {
1685 let recovery = recover_fn(e);
1686 futures::pin_mut!(recovery);
1687 while let Some(item) = recovery.next().await { yield item; }
1688 break;
1689 }
1690 None => break,
1691 }
1692 }
1693 }
1694}
1695
1696#[cfg(test)]
1703mod recover_with_tests {
1704 use super::*;
1705
1706 #[derive(Debug)]
1708 struct SimpleError;
1709 impl std::fmt::Display for SimpleError {
1710 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1711 write!(f, "SimpleError")
1712 }
1713 }
1714 impl std::error::Error for SimpleError {}
1715
1716 #[tokio::test]
1717 async fn test_recover_with_no_error() {
1718 let source = futures::stream::iter(vec![Ok::<_, SimpleError>(1), Ok(2), Ok(3)]);
1719 let values: Vec<_> = recover_with(
1720 |_e: SimpleError| futures::stream::iter(vec![99]),
1721 source,
1722 ).collect().await;
1723 assert_eq!(values, vec![1, 2, 3]);
1724 }
1725
1726 #[tokio::test]
1727 async fn test_recover_with_error() {
1728 let source = futures::stream::iter(vec![
1729 Ok(1),
1730 Err(SimpleError),
1731 Ok(3),
1732 ]);
1733 let values: Vec<_> = recover_with(
1734 |_e: SimpleError| futures::stream::iter(vec![99, 100]),
1735 source,
1736 ).collect().await;
1737 assert_eq!(values, vec![1, 99, 100]);
1738 }
1739}
1740pub fn recover_with_stream<T, E, S, Alt, AltIter>(
1745 mut alternatives: AltIter,
1746 source: S,
1747) -> impl Stream<Item = T>
1748where
1749 S: Stream<Item = Result<T, E>> + Send + 'static,
1750 Alt: Stream<Item = Result<T, E>> + Send + 'static,
1751 AltIter: Iterator<Item = Alt> + Send + 'static,
1752 T: Send + 'static,
1753 E: Send + 'static,
1754{
1755 stream! {
1756 futures::pin_mut!(source);
1757 let mut current: Pin<Box<dyn Stream<Item = Result<T, E>> + Send>> = Box::pin(source);
1758
1759 loop {
1760 let mut errored = false;
1761 while let Some(result) = current.next().await {
1762 match result {
1763 Ok(value) => yield value,
1764 Err(_) => {
1765 errored = true;
1766 break;
1767 }
1768 }
1769 }
1770
1771 if !errored {
1772 break; }
1774
1775 match alternatives.next() {
1777 Some(alt) => current = Box::pin(alt),
1778 None => break, }
1780 }
1781 }
1782}
1783#[cfg(test)]
1787mod recover_with_stream_tests {
1788 use super::*;
1789
1790 #[derive(Debug, Clone)]
1791 struct TestErr;
1792
1793 #[tokio::test]
1794 async fn test_recover_with_stream_success() {
1795 let source = futures::stream::iter(vec![Ok::<i32, TestErr>(1), Ok(2), Ok(3)]);
1796 let alts: Vec<Pin<Box<dyn Stream<Item = Result<i32, TestErr>> + Send>>> = vec![];
1797
1798 let values: Vec<_> = recover_with_stream(alts.into_iter(), source).collect().await;
1799 assert_eq!(values, vec![1, 2, 3]);
1800 }
1801
1802 #[tokio::test]
1803 async fn test_recover_with_stream_uses_alternative() {
1804 let source = futures::stream::iter(vec![Ok(1), Err(TestErr), Ok(3)]);
1805 let alt = futures::stream::iter(vec![Ok(10), Ok(20)]);
1806 let alts: Vec<Pin<Box<dyn Stream<Item = Result<i32, TestErr>> + Send>>> = vec![Box::pin(alt)];
1807
1808 let values: Vec<_> = recover_with_stream(alts.into_iter(), source).collect().await;
1809 assert_eq!(values, vec![1, 10, 20]);
1810 }
1811}
1812pub fn throw_error<T, E: Clone>(error: E) -> impl Stream<Item = Result<T, E>> {
1817 stream::once(async move { Err(error) })
1818}
1819
1820pub fn throw_panic<T>(message: &'static str) -> impl Stream<Item = T> {
1822 stream! {
1823 panic!("{}", message);
1824 #[allow(unreachable_code)]
1826 loop { yield unreachable!(); }
1827 }
1828}
1829#[cfg(test)]
1833mod throw_error_tests {
1834 use super::*;
1835
1836 #[tokio::test]
1837 async fn test_throw_error_emits_error() {
1838 let err_stream = throw_error::<i32, _>("test error".to_string());
1839 let results: Vec<_> = err_stream.collect().await;
1840
1841 assert_eq!(results.len(), 1);
1842 assert!(results[0].is_err());
1843 }
1844}
1845pub struct RetryOptions<F> {
1849 pub max_attempts: usize,
1850 pub delay_ms: u64,
1851 pub should_retry: F,
1852}
1853
1854pub fn retry<R, T, E, S, F>(
1857 max_attempts: usize,
1858 delay_ms: u64,
1859 mut stream_factory: F,
1860) -> impl Stream<Item = Result<T, E>>
1861where
1862 R: Runtime,
1863 S: Stream<Item = Result<T, E>>,
1864 F: FnMut() -> S,
1865 E: Clone,
1866{
1867 stream! {
1868 let mut attempt = 0;
1869 loop {
1870 let s = stream_factory();
1871 futures::pin_mut!(s);
1872 let mut failed = false;
1873
1874 while let Some(item) = s.next().await {
1875 match item {
1876 Ok(value) => yield Ok(value),
1877 Err(e) => {
1878 attempt += 1;
1879 if attempt >= max_attempts {
1880 yield Err(e);
1881 return;
1882 }
1883 if delay_ms > 0 { R::sleep(Duration::from_millis(delay_ms)).await; }
1884 failed = true;
1885 break;
1886 }
1887 }
1888 }
1889
1890 if !failed { return; } }
1892 }
1893}
1894
1895pub fn retry_with<T, E, S, F, SF, SFut>(
1897 max_attempts: usize,
1898 delay_ms: u64,
1899 mut stream_factory: F,
1900 sleep_fn: SF,
1901) -> impl Stream<Item = Result<T, E>>
1902where
1903 S: Stream<Item = Result<T, E>>,
1904 F: FnMut() -> S,
1905 E: Clone,
1906 SF: Fn(Duration) -> SFut,
1907 SFut: std::future::Future<Output = ()>,
1908{
1909 stream! {
1910 let mut attempt = 0;
1911 loop {
1912 let s = stream_factory();
1913 futures::pin_mut!(s);
1914 let mut failed = false;
1915
1916 while let Some(item) = s.next().await {
1917 match item {
1918 Ok(value) => yield Ok(value),
1919 Err(e) => {
1920 attempt += 1;
1921 if attempt >= max_attempts {
1922 yield Err(e);
1923 return;
1924 }
1925 if delay_ms > 0 { sleep_fn(Duration::from_millis(delay_ms)).await; }
1926 failed = true;
1927 break;
1928 }
1929 }
1930 }
1931
1932 if !failed { return; }
1933 }
1934 }
1935}
1936#[cfg(test)]
1940mod retry_tests {
1941 use super::*;
1942
1943 #[derive(Debug, Clone, PartialEq)]
1945 struct TestError(String);
1946
1947 #[tokio::test]
1948 async fn test_retry_with_success() {
1949 let values: Vec<Result<i32, TestError>> = retry_with(
1950 3,
1951 10,
1952 || futures::stream::iter(vec![Ok(1), Ok(2), Ok(3)]),
1953 |_d| std::future::ready(()),
1954 ).collect().await;
1955 let ok_values: Vec<_> = values.into_iter().filter_map(|r| r.ok()).collect();
1956 assert_eq!(ok_values, vec![1, 2, 3]);
1957 }
1958
1959 #[tokio::test]
1960 async fn test_retry_with_eventual_success() {
1961 use std::sync::atomic::{AtomicUsize, Ordering};
1962 use std::sync::Arc;
1963 let attempt = Arc::new(AtomicUsize::new(0));
1964 let attempt_clone = attempt.clone();
1965 let values: Vec<Result<i32, TestError>> = retry_with(
1966 3,
1967 0,
1968 move || {
1969 let n = attempt_clone.fetch_add(1, Ordering::SeqCst);
1970 if n < 2 { futures::stream::iter(vec![Err(TestError("fail".into()))]) }
1971 else { futures::stream::iter(vec![Ok(42)]) }
1972 },
1973 |_d| std::future::ready(()),
1974 ).collect().await;
1975 let ok_values: Vec<_> = values.into_iter().filter_map(|r| r.ok()).collect();
1976 assert_eq!(ok_values, vec![42]);
1977 }
1978}
1979pub use futures::stream::select as merge;
1991
1992pub use futures::stream::select_all as merge_all;
2001#[cfg(test)]
2005mod merge_tests {
2006 use super::*;
2007 #[tokio::test]
2008 async fn test_merge() {
2009 let s1 = futures::stream::iter(vec![1, 3, 5]);
2010 let s2 = futures::stream::iter(vec![2, 4, 6]);
2011 let values: Vec<_> = merge(s1, s2).collect().await;
2012 assert_eq!(values.len(), 6);
2014 assert!(values.contains(&1));
2015 assert!(values.contains(&6));
2016 }
2017
2018 #[tokio::test]
2019 async fn test_merge_all() {
2020 let streams = vec![
2021 Box::pin(futures::stream::iter(vec![1, 2])),
2022 Box::pin(futures::stream::iter(vec![3, 4])),
2023 ];
2024 let values: Vec<_> = merge_all(streams).collect().await;
2025 assert_eq!(values.len(), 4);
2026 }
2027}
2028pub fn flat_map<T, U, S, Inner, F>(f: F, s: S) -> impl Stream<Item = U>
2047where
2048 S: Stream<Item = T>,
2049 Inner: Stream<Item = U>,
2050 F: Fn(T) -> Inner,
2051{
2052 s.map(f).flatten()
2053}
2054
2055pub fn chain<T, U, S, Inner, F>(f: F, s: S) -> impl Stream<Item = U>
2057where
2058 S: Stream<Item = T>,
2059 Inner: Stream<Item = U>,
2060 F: Fn(T) -> Inner,
2061{
2062 flat_map(f, s)
2063}
2064#[cfg(test)]
2068mod chain_tests {
2069 use super::*;
2070
2071 #[tokio::test]
2072 async fn test_chain_flattens() {
2073 let source = futures::stream::iter(vec![1, 2]);
2074 let result = chain(
2075 |x: i32| futures::stream::iter(vec![x * 10, x * 10 + 1]),
2076 source,
2077 );
2078
2079 let values: Vec<_> = result.collect().await;
2080 assert_eq!(values, vec![10, 11, 20, 21]);
2082 }
2083}
2084pub fn switch_map<T, U, S, Inner, F>(f: F, s: S) -> impl Stream<Item = U>
2090where
2091 S: Stream<Item = T> + Unpin,
2092 Inner: Stream<Item = U> + Unpin,
2093 F: Fn(T) -> Inner,
2094{
2095 stream! {
2096 futures::pin_mut!(s);
2097 let mut current_inner: Option<std::pin::Pin<Box<dyn Stream<Item = U> + Unpin>>> = None;
2098 loop {
2099 futures::select! {
2100 outer_item = s.next().fuse() => {
2102 match outer_item {
2103 Some(item) => {
2104 current_inner = Some(Box::pin(f(item)));
2106 }
2107 None => {
2108 if let Some(ref mut inner) = current_inner { while let Some(v) = inner.next().await { yield v; } }
2110 break;
2111 }
2112 }
2113 }
2114 inner_item = async {
2116 if let Some(ref mut inner) = current_inner { inner.next().await }
2117 else { std::future::pending().await }
2118 }.fuse() => {
2119 match inner_item {
2120 Some(v) => yield v,
2121 None => current_inner = None,
2122 }
2123 }
2124 }
2125 }
2126 }
2127}
2128#[cfg(test)]
2132mod switch_map_tests {
2133 use super::*;
2134
2135 #[tokio::test]
2136 async fn test_switch_map_switches() {
2137 let source = futures::stream::iter(vec![1, 2]);
2140 let result = switch_map(
2141 |x: i32| futures::stream::iter(vec![x * 10]),
2142 source,
2143 );
2144
2145 let values: Vec<_> = result.collect().await;
2146 assert!(!values.is_empty());
2148 }
2149}
2150pub fn latest2<T: Clone + Send + 'static, U: Clone + Send + 'static>(
2156 s1: impl Stream<Item = T> + Send + 'static,
2157 s2: impl Stream<Item = U> + Send + 'static,
2158) -> impl Stream<Item = (T, U)> {
2159 enum Either<A, B> { Left(A), Right(B) }
2161
2162 let tagged1: Pin<Box<dyn Stream<Item = Either<T, U>> + Send>> =
2164 Box::pin(s1.map(Either::Left));
2165 let tagged2: Pin<Box<dyn Stream<Item = Either<T, U>> + Send>> =
2166 Box::pin(s2.map(Either::Right));
2167
2168 stream! {
2169 let mut latest1: Option<T> = None;
2170 let mut latest2: Option<U> = None;
2171
2172 let mut merged = futures::stream::select(tagged1, tagged2);
2173
2174 while let Some(item) = merged.next().await {
2175 match item {
2176 Either::Left(v) => {
2177 latest1 = Some(v);
2178 if let (Some(ref a), Some(ref b)) = (&latest1, &latest2) {
2179 yield (a.clone(), b.clone());
2180 }
2181 }
2182 Either::Right(v) => {
2183 latest2 = Some(v);
2184 if let (Some(ref a), Some(ref b)) = (&latest1, &latest2) {
2185 yield (a.clone(), b.clone());
2186 }
2187 }
2188 }
2189 }
2190 }
2191}
2192#[cfg(test)]
2196mod latest2_tests {
2197 use super::*;
2198
2199 #[tokio::test]
2200 async fn test_latest2_combines() {
2201 let s1 = futures::stream::iter(vec![1, 2]);
2202 let s2 = futures::stream::iter(vec!["a", "b"]);
2203
2204 let values: Vec<_> = latest2(s1, s2).collect().await;
2205 assert!(!values.is_empty());
2207 }
2208}
2209pub fn apply_latest<T, U, F, S1, S2>(fn_stream: S1, value_stream: S2) -> impl Stream<Item = U>
2214where
2215 S1: Stream<Item = F> + Send + 'static,
2216 S2: Stream<Item = T> + Send + 'static,
2217 F: Fn(T) -> U + Clone + Send + 'static,
2218 T: Clone + Send + 'static,
2219 U: Send + 'static,
2220{
2221 latest2(fn_stream, value_stream).map(|(f, v)| f(v))
2222}
2223#[cfg(test)]
2227mod apply_latest_tests {
2228 use super::*;
2229
2230 #[tokio::test]
2231 async fn test_apply_latest() {
2232 let fns = futures::stream::iter(vec![|x: i32| x * 2, |x| x + 10]);
2233 let vals = futures::stream::iter(vec![1, 2, 3]);
2234
2235 let values: Vec<_> = apply_latest(fns, vals).collect().await;
2236 assert!(!values.is_empty());
2238 }
2239}
2240pub fn until_stream<T, U, S: Stream<Item = T> + Unpin, Stop: Stream<Item = U> + Unpin>(
2246 mut stop: Stop,
2247 mut source: S,
2248) -> impl Stream<Item = T> {
2249 stream! {
2250 loop {
2251 futures::select! {
2252 _ = stop.next().fuse() => break,
2253 item = source.next().fuse() => {
2254 match item {
2255 Some(v) => yield v,
2256 None => break,
2257 }
2258 }
2259 }
2260 }
2261 }
2262}
2263
2264#[cfg(test)]
2270mod until_stream_tests {
2271 use super::*;
2272
2273 #[tokio::test]
2274 async fn test_until_stream_stops() {
2275 let source = futures::stream::iter(vec![1, 2, 3, 4, 5]);
2276 let stop = futures::stream::iter(vec![()]); let values: Vec<_> = until_stream(stop, source).collect().await;
2279 assert!(values.len() <= 5);
2281 }
2282}
2283pub fn since_stream<T, U, S: Stream<Item = T> + Unpin, Start: Stream<Item = U> + Unpin>(
2289 mut start: Start,
2290 mut source: S,
2291) -> impl Stream<Item = T> {
2292 stream! {
2293 let mut started = false;
2294 loop {
2295 futures::select! {
2296 _ = async {
2297 if !started { start.next().await }
2298 else { std::future::pending().await }
2299 }.fuse() => {
2300 started = true;
2301 }
2302 item = source.next().fuse() => {
2303 match item {
2304 Some(v) if started => yield v,
2305 Some(_) => {} None => break,
2307 }
2308 }
2309 }
2310 }
2311 }
2312}
2313#[cfg(test)]
2317mod since_stream_tests {
2318 use super::*;
2319
2320 #[tokio::test]
2321 async fn test_since_stream_waits() {
2322 let source = futures::stream::iter(vec![1, 2, 3, 4]);
2323 let start = futures::stream::iter(vec![()]); let values: Vec<_> = since_stream(start, source).collect().await;
2326 assert!(!values.is_empty());
2328 }
2329}
2330pub fn buffer<T, S: Stream<Item = T>>(size: usize, s: S) -> impl Stream<Item = Vec<T>> {
2336 stream! {
2337 futures::pin_mut!(s);
2338 let mut buf: Vec<T> = Vec::with_capacity(size);
2339 while let Some(item) = s.next().await {
2340 buf.push(item);
2341 if buf.len() >= size { yield std::mem::replace(&mut buf, Vec::with_capacity(size)); }
2342 }
2343 if !buf.is_empty() { yield buf; }
2344 }
2345}
2346#[cfg(test)]
2350mod buffer_tests {
2351 use super::*;
2352 #[tokio::test]
2353 async fn test_buffer() {
2354 let source = futures::stream::iter(vec![1, 2, 3, 4, 5]);
2355 let values: Vec<_> = buffer(2, source).collect().await;
2356 assert_eq!(values, vec![vec![1, 2], vec![3, 4], vec![5]]);
2357 }
2358
2359 #[tokio::test]
2360 async fn test_buffer_exact_multiple() {
2361 let source = futures::stream::iter(vec![1, 2, 3, 4]);
2362 let values: Vec<_> = buffer(2, source).collect().await;
2363 assert_eq!(values, vec![vec![1, 2], vec![3, 4]]);
2364 }
2365
2366 #[tokio::test]
2367 async fn test_buffer_empty() {
2368 let source = futures::stream::iter(Vec::<i32>::new());
2369 let values: Vec<_> = buffer(3, source).collect().await;
2370 assert_eq!(values, Vec::<Vec<i32>>::new());
2371 }
2372}
2373pub fn buffer_time<R, T, S>(ms: u64, mut s: S) -> impl Stream<Item = Vec<T>>
2379where
2380 R: Runtime,
2381 T: Clone,
2382 S: Stream<Item = T> + Unpin,
2383{
2384 stream! {
2385 let duration = Duration::from_millis(ms);
2386 let mut buf: Vec<T> = Vec::new();
2387 let mut timer = R::sleep(duration);
2388 loop {
2389 futures::select! {
2390 _ = (&mut timer).fuse() => {
2391 if !buf.is_empty() { yield std::mem::take(&mut buf); }
2392 timer = R::sleep(duration);
2393 }
2394 item = s.next().fuse() => {
2395 match item {
2396 Some(v) => buf.push(v),
2397 None => {
2398 if !buf.is_empty() { yield buf; }
2399 break;
2400 }
2401 }
2402 }
2403 }
2404 }
2405 }
2406}
2407
2408pub fn buffer_time_with<T, S, SF, SFut>(
2410 ms: u64,
2411 mut s: S,
2412 sleep_fn: SF,
2413) -> impl Stream<Item = Vec<T>>
2414where
2415 S: Stream<Item = T> + Unpin,
2416 SF: Fn(Duration) -> SFut,
2417 SFut: std::future::Future<Output = ()> + Unpin,
2418{
2419 stream! {
2420 let duration = Duration::from_millis(ms);
2421 let mut buf: Vec<T> = Vec::new();
2422 let mut timer = sleep_fn(duration);
2423 loop {
2424 futures::select! {
2425 _ = (&mut timer).fuse() => {
2426 if !buf.is_empty() { yield std::mem::take(&mut buf); }
2427 timer = sleep_fn(duration);
2428 }
2429 item = s.next().fuse() => {
2430 match item {
2431 Some(v) => buf.push(v),
2432 None => {
2433 if !buf.is_empty() { yield buf; }
2434 break;
2435 }
2436 }
2437 }
2438 }
2439 }
2440 }
2441}
2442#[cfg(test)]
2446mod buffer_time_tests {
2447 }
2451pub fn window<T: Clone + Send + 'static>(
2457 size: usize,
2458 s: impl Stream<Item = T> + Send + 'static,
2459) -> impl Stream<Item = Vec<T>> {
2460 stream! {
2461 futures::pin_mut!(s);
2462 loop {
2463 let mut window = Vec::with_capacity(size);
2464 while window.len() < size {
2465 match s.next().await {
2466 Some(item) => window.push(item),
2467 None => {
2468 if !window.is_empty() {
2469 yield window;
2470 }
2471 return;
2472 }
2473 }
2474 }
2475 yield window;
2476 }
2477 }
2478}
2479pub fn eager<R, T, S>(buffer_size: usize, s: S) -> impl Stream<Item = T>
2485where
2486 R: Runtime,
2487 T: Send + 'static,
2488 S: Stream<Item = T> + Send + Unpin + 'static,
2489{
2490 let (mut tx, mut rx) = mpsc::channel::<T>(buffer_size.max(1));
2492
2493 let mut spawned = false;
2495 let mut s = Some(s);
2496
2497 stream! {
2498 if !spawned {
2499 spawned = true;
2500 let mut source = s.take().unwrap();
2501 R::spawn(async move {
2502 use futures::StreamExt;
2503 use futures::SinkExt;
2504 while let Some(item) = source.next().await { if tx.send(item).await.is_err() { break; } } });
2506 }
2507
2508 while let Some(item) = rx.next().await { yield item; }
2509 }
2510}
2511
2512pub fn eager_now<R, T, S>(buffer_size: usize, s: S) -> impl Stream<Item = T>
2514where
2515 R: Runtime,
2516 T: Send + 'static,
2517 S: Stream<Item = T> + Send + Unpin + 'static,
2518{
2519 let (mut tx, mut rx) = mpsc::channel::<T>(buffer_size.max(1));
2520
2521 let mut source = s;
2523 R::spawn(async move {
2524 use futures::StreamExt;
2525 use futures::SinkExt;
2526 while let Some(item) = source.next().await { if tx.send(item).await.is_err() { break; } }
2527 });
2528
2529 stream! { while let Some(item) = rx.next().await { yield item; } }
2530}
2531#[cfg(test)]
2535mod eager_now_tests {
2536 }
2540pub struct ReplaySubject<T: Clone + Send + 'static> {
2548 inner: Arc<Mutex<ReplaySubjectInner<T>>>,
2549}
2550
2551struct ReplaySubjectInner<T> {
2552 buffer: Vec<T>,
2553 buffer_size: usize,
2554 completed: bool,
2555 error: Option<Arc<dyn std::error::Error + Send + Sync>>,
2556 subscribers: Vec<mpsc::UnboundedSender<T>>,
2557}
2558
2559impl<T: Clone + Send + 'static> ReplaySubject<T> {
2560 pub fn new(buffer_size: usize) -> Self {
2561 Self {
2562 inner: Arc::new(Mutex::new(ReplaySubjectInner {
2563 buffer: Vec::new(),
2564 buffer_size,
2565 completed: false,
2566 error: None,
2567 subscribers: Vec::new(),
2568 })),
2569 }
2570 }
2571 pub async fn next(&self, value: T) {
2572 let mut inner = self.inner.lock().await;
2573 inner.buffer.push(value.clone());
2574 if inner.buffer.len() > inner.buffer_size { inner.buffer.remove(0); }
2575 inner.subscribers.retain(|tx| tx.unbounded_send(value.clone()).is_ok());
2577 }
2578 pub async fn complete(&self) {
2579 let mut inner = self.inner.lock().await;
2580 inner.completed = true;
2581 inner.subscribers.clear();
2582 }
2583 pub fn subscribe(&self) -> impl Stream<Item = T> {
2584 let inner = self.inner.clone();
2585
2586 stream! {
2587 let (tx, mut rx) = mpsc::unbounded();
2588 let buffered: Vec<T>;
2589 let was_completed: bool;
2590
2591 {
2592 let mut guard = inner.lock().await;
2593 buffered = guard.buffer.clone();
2594 was_completed = guard.completed;
2595 if !guard.completed { guard.subscribers.push(tx); }
2596 }
2597
2598 for item in buffered { yield item; }
2600
2601 if was_completed { return; }
2603
2604 while let Some(item) = rx.next().await { yield item; }
2606 }
2607 }
2608}
2609#[cfg(test)]
2613mod replay_subject_tests {
2614 use super::*;
2615
2616 #[tokio::test]
2617 async fn test_replay_subject_buffer() {
2618 let subject = ReplaySubject::new(2);
2619
2620 subject.next(1).await;
2622 subject.next(2).await;
2623 subject.next(3).await; subject.complete().await;
2625
2626 let values: Vec<_> = subject.subscribe().collect().await;
2628 assert_eq!(values, vec![2, 3]);
2629 }
2630
2631 #[tokio::test]
2632 async fn test_replay_subject_empty() {
2633 let subject: ReplaySubject<i32> = ReplaySubject::new(5);
2634 subject.complete().await;
2635
2636 let values: Vec<_> = subject.subscribe().collect().await;
2637 assert_eq!(values, Vec::<i32>::new());
2638 }
2639
2640 #[tokio::test]
2641 async fn test_replay_subject_unlimited() {
2642 let subject = ReplaySubject::new(usize::MAX);
2643
2644 subject.next(1).await;
2645 subject.next(2).await;
2646 subject.next(3).await;
2647 subject.complete().await;
2648
2649 let values: Vec<_> = subject.subscribe().collect().await;
2650 assert_eq!(values, vec![1, 2, 3]);
2651 }
2652}
2653struct Replay<T> {
2658 inner: Arc<Mutex<ReplayInner<T>>>,
2659}
2660
2661struct ReplayInner<T> {
2662 buffer: Vec<T>,
2663 buffer_size: usize,
2664 completed: bool,
2665 error: Option<Arc<dyn std::error::Error + Send + Sync>>,
2666 source_started: bool,
2667 subscribers: Vec<futures::channel::mpsc::UnboundedSender<Result<T, Arc<dyn std::error::Error + Send + Sync>>>>,
2668}
2669
2670impl<T: Clone + Send + 'static> Replay<T> {
2671 fn new<S>(buffer_size: usize, source: S) -> Self
2672 where
2673 S: futures::Stream<Item = T> + Send + Unpin + 'static,
2674 {
2675 let inner = Arc::new(Mutex::new(ReplayInner {
2676 buffer: Vec::new(),
2677 buffer_size,
2678 completed: false,
2679 error: None,
2680 source_started: false,
2681 subscribers: Vec::new(),
2682 }));
2683
2684 Replay { inner }
2685 }
2686
2687 fn subscribe(&self) -> impl futures::Stream<Item = T> {
2688 let inner = self.inner.clone();
2689
2690 async_stream::stream! {
2691 let (tx, mut rx) = futures::channel::mpsc::unbounded();
2692
2693 let buffered: Vec<T>;
2695 {
2696 let mut guard = inner.lock().await;
2697 buffered = guard.buffer.clone();
2698
2699 if !guard.completed && guard.error.is_none() { guard.subscribers.push(tx); }
2700 }
2701
2702 for value in buffered { yield value; }
2704
2705 while let Some(result) = rx.next().await {
2707 match result {
2708 Ok(value) => yield value,
2709 Err(_) => break,
2710 }
2711 }
2712 }
2713 }
2714
2715 async fn start_source<S>(&self, mut source: S)
2716 where
2717 S: futures::Stream<Item = T> + Send + Unpin + 'static,
2718 {
2719 while let Some(value) = source.next().await {
2720 let mut guard = self.inner.lock().await;
2721
2722 guard.buffer.push(value.clone());
2724 if guard.buffer.len() > guard.buffer_size {
2725 guard.buffer.remove(0);
2726 }
2727
2728 guard.subscribers.retain(|tx| tx.unbounded_send(Ok(value.clone())).is_ok());
2730 }
2731
2732 let mut guard = self.inner.lock().await;
2734 guard.completed = true;
2735 guard.subscribers.clear();
2736 }
2737}
2738
2739fn replay<T, S>(buffer_size: usize, source: S) -> impl futures::Stream<Item = T>
2745where
2746 T: Clone + Send + 'static,
2747 S: futures::Stream<Item = T> + Send + 'static,
2748{
2749 let _ = buffer_size; source
2753}
2754#[cfg(test)]
2758mod replay_tests {
2759 use super::*;
2760
2761 #[tokio::test]
2762 async fn test_replay_buffered() {
2763 let source = futures::stream::iter(vec![1, 2, 3, 4, 5]);
2765 let replay = Replay::new(2, source);
2766
2767 }
2770}
2771fn share<T, S>(source: S) -> impl futures::Stream<Item = T>
2777where
2778 T: Clone + Send + 'static,
2779 S: futures::Stream<Item = T> + Send + Unpin + 'static,
2780{
2781 replay(0, source)
2782}
2783#[cfg(test)]
2787mod share_tests {
2788 use super::*;
2789
2790 #[tokio::test]
2791 async fn test_share_basic() {
2792 let source = futures::stream::iter(vec![1, 2, 3]);
2795 let shared = share(source);
2796 futures::pin_mut!(shared);
2797
2798 let first = shared.next().await;
2799 assert_eq!(first, Some(1));
2800
2801 let second = shared.next().await;
2802 assert_eq!(second, Some(2));
2803
2804 let third = shared.next().await;
2805 assert_eq!(third, Some(3));
2806
2807 let done = shared.next().await;
2808 assert_eq!(done, None);
2809 }
2810}
2811fn replay_factory<T, S>(
2817 buffer_size: usize,
2818 source: S,
2819) -> impl Fn() -> BoxedStream<T>
2820where
2821 T: Clone + Send + Sync + 'static,
2822 S: futures::Stream<Item = T> + Send + Unpin + 'static,
2823{
2824 struct SharedState<T> {
2825 buffer: Vec<T>,
2826 buffer_size: usize,
2827 completed: bool,
2828 subscribers: Vec<futures::channel::mpsc::UnboundedSender<T>>,
2829 }
2830
2831 let state = Arc::new(Mutex::new(SharedState {
2832 buffer: Vec::new(),
2833 buffer_size,
2834 completed: false,
2835 subscribers: Vec::new(),
2836 }));
2837 let started = Arc::new(AtomicBool::new(false));
2838 let source = Arc::new(Mutex::new(Some(source)));
2839
2840 move || {
2841 let state = state.clone();
2842 let started = started.clone();
2843 let source = source.clone();
2844
2845 Box::pin(async_stream::stream! {
2846 if !started.swap(true, Ordering::SeqCst) {
2848 let state_clone = state.clone();
2849 if let Some(mut src) = source.lock().await.take() {
2850 while let Some(value) = src.next().await {
2855 let mut guard = state_clone.lock().await;
2856 guard.buffer.push(value.clone());
2857 if guard.buffer.len() > guard.buffer_size { guard.buffer.remove(0); }
2858 guard.subscribers.retain(|tx| tx.unbounded_send(value.clone()).is_ok());
2859 }
2860 state_clone.lock().await.completed = true;
2861 }
2862 }
2863
2864 let (tx, mut rx) = futures::channel::mpsc::unbounded();
2865 let buffered: Vec<T>;
2866 {
2867 let mut guard = state.lock().await;
2868 buffered = guard.buffer.clone();
2869 if !guard.completed { guard.subscribers.push(tx); }
2870 }
2871
2872 for value in buffered { yield value; }
2873
2874 while let Some(value) = rx.next().await { yield value; }
2875 })
2876 }
2877}
2878
2879pub fn replay_factory_spawned<R, T, S>(
2881 buffer_size: usize,
2882 source: S,
2883) -> impl Fn() -> BoxedStream<T>
2884where
2885 R: Runtime,
2886 T: Clone + Send + Sync + 'static,
2887 S: futures::Stream<Item = T> + Send + Unpin + 'static,
2888{
2889 struct SharedState<T> {
2890 buffer: Vec<T>,
2891 buffer_size: usize,
2892 completed: bool,
2893 subscribers: Vec<futures::channel::mpsc::UnboundedSender<T>>,
2894 }
2895
2896 let state = Arc::new(Mutex::new(SharedState {
2897 buffer: Vec::new(),
2898 buffer_size,
2899 completed: false,
2900 subscribers: Vec::new(),
2901 }));
2902 let started = Arc::new(AtomicBool::new(false));
2903 let source = Arc::new(Mutex::new(Some(source)));
2904
2905 move || {
2906 let state = state.clone();
2907 let started = started.clone();
2908 let source = source.clone();
2909
2910 Box::pin(async_stream::stream! {
2911 if !started.swap(true, Ordering::SeqCst) {
2912 let state_clone = state.clone();
2913 if let Some(src) = source.lock().await.take() {
2914 R::spawn(async move {
2915 futures::pin_mut!(src);
2916 while let Some(value) = src.next().await {
2917 let mut guard = state_clone.lock().await;
2918 guard.buffer.push(value.clone());
2919 if guard.buffer.len() > guard.buffer_size { guard.buffer.remove(0); }
2920 guard.subscribers.retain(|tx| tx.unbounded_send(value.clone()).is_ok());
2921 }
2922 state_clone.lock().await.completed = true;
2923 });
2924 }
2925 }
2926
2927 let (tx, mut rx) = futures::channel::mpsc::unbounded();
2928 let buffered: Vec<T>;
2929 {
2930 let mut guard = state.lock().await;
2931 buffered = guard.buffer.clone();
2932 if !guard.completed { guard.subscribers.push(tx); }
2933 }
2934
2935 for value in buffered { yield value; }
2936
2937 while let Some(value) = rx.next().await { yield value; }
2938 })
2939 }
2940}
2941fn replay_stream<T, S>(
2947 buffer_size: usize,
2948 source: S,
2949) -> impl futures::Stream<Item = impl futures::Stream<Item = T>>
2950where
2951 T: Clone + Send + Sync + 'static,
2952 S: futures::Stream<Item = T> + Send + Unpin + 'static,
2953{
2954 let factory = replay_factory(buffer_size, source);
2955
2956 async_stream::stream! {
2957 loop { yield factory(); }
2959 }
2960}
2961
2962async fn replay_stream_example() {
2964 let source = futures::stream::iter(vec![1, 2, 3]);
2965 let copies = replay_stream(usize::MAX, source);
2966 futures::pin_mut!(copies);
2967
2968 if let Some(copy) = copies.next().await {
2970 futures::pin_mut!(copy);
2971 let values: Vec<_> = copy.collect().await;
2972 println!("Copy values: {:?}", values);
2973 }
2974}
2975#[cfg(test)]
2979mod replay_stream_tests {
2980 }
2984use std::sync::atomic::AtomicU64;
2988use std::task::Waker;
2989
2990#[derive(Clone)]
3015pub struct TestRuntime {
3016 inner: Arc<TestRuntimeInner>,
3017}
3018
3019struct TestRuntimeInner {
3020 current_time_ns: AtomicU64,
3022 timers: std::sync::Mutex<Vec<PendingTimer>>,
3024}
3025
3026struct PendingTimer {
3027 fire_at_ns: u64,
3029 waker: Option<Waker>,
3031 fired: Arc<std::sync::atomic::AtomicBool>,
3033}
3034
3035impl TestRuntime {
3036 pub fn new() -> Self {
3038 Self {
3039 inner: Arc::new(TestRuntimeInner {
3040 current_time_ns: AtomicU64::new(0),
3041 timers: std::sync::Mutex::new(Vec::new()),
3042 }),
3043 }
3044 }
3045
3046 pub fn now(&self) -> Duration {
3048 Duration::from_nanos(self.inner.current_time_ns.load(Ordering::SeqCst))
3049 }
3050
3051 pub async fn advance_by(&self, duration: Duration) {
3055 let target = self.now() + duration;
3056 self.advance_to(target).await;
3057 }
3058
3059 pub async fn advance_to(&self, target: Duration) {
3063 let target_ns = target.as_nanos() as u64;
3064
3065 loop {
3066 let wakers_to_wake: Vec<Waker> = {
3068 let mut timers = self.inner.timers.lock().unwrap();
3069 let current = self.inner.current_time_ns.load(Ordering::SeqCst);
3070
3071 let mut earliest: Option<u64> = None;
3073 for timer in timers.iter() {
3074 if !timer.fired.load(Ordering::SeqCst) && timer.fire_at_ns <= target_ns {
3075 earliest = Some(match earliest {
3076 Some(e) => e.min(timer.fire_at_ns),
3077 None => timer.fire_at_ns,
3078 });
3079 }
3080 }
3081
3082 match earliest {
3083 Some(fire_time) if fire_time > current => {
3084 self.inner.current_time_ns.store(fire_time, Ordering::SeqCst);
3086
3087 timers.iter_mut()
3089 .filter(|t| t.fire_at_ns == fire_time && !t.fired.load(Ordering::SeqCst))
3090 .filter_map(|t| {
3091 t.fired.store(true, Ordering::SeqCst);
3092 t.waker.take()
3093 })
3094 .collect()
3095 }
3096 _ => {
3097 self.inner.current_time_ns.store(target_ns, Ordering::SeqCst);
3099 break;
3100 }
3101 }
3102 };
3103
3104 for waker in wakers_to_wake {
3106 waker.wake();
3107 }
3108
3109 futures::future::poll_fn(|_| std::task::Poll::Ready(())).await;
3111 }
3112
3113 {
3115 let mut timers = self.inner.timers.lock().unwrap();
3116 timers.retain(|t| !t.fired.load(Ordering::SeqCst));
3117 }
3118 }
3119
3120 fn register_timer(&self, fire_at: Duration) -> Arc<std::sync::atomic::AtomicBool> {
3122 let fired = Arc::new(std::sync::atomic::AtomicBool::new(false));
3123 let timer = PendingTimer {
3124 fire_at_ns: fire_at.as_nanos() as u64,
3125 waker: None,
3126 fired: fired.clone(),
3127 };
3128 self.inner.timers.lock().unwrap().push(timer);
3129 fired
3130 }
3131
3132 fn set_timer_waker(&self, fire_at_ns: u64, waker: Waker) {
3134 let mut timers = self.inner.timers.lock().unwrap();
3135 for timer in timers.iter_mut() {
3136 if timer.fire_at_ns == fire_at_ns && !timer.fired.load(Ordering::SeqCst) {
3137 timer.waker = Some(waker);
3138 break;
3139 }
3140 }
3141 }
3142}
3143
3144impl Default for TestRuntime {
3145 fn default() -> Self {
3146 Self::new()
3147 }
3148}
3149pub struct TestSleep {
3154 runtime: TestRuntime,
3155 target_ns: u64,
3156 fired: Arc<std::sync::atomic::AtomicBool>,
3157 registered: bool,
3158}
3159
3160impl TestSleep {
3161 fn new(runtime: TestRuntime, duration: Duration) -> Self {
3162 let current = runtime.now();
3163 let target = current + duration;
3164 let target_ns = target.as_nanos() as u64;
3165 Self {
3166 runtime,
3167 target_ns,
3168 fired: Arc::new(std::sync::atomic::AtomicBool::new(false)),
3169 registered: false,
3170 }
3171 }
3172}
3173
3174impl Future for TestSleep {
3175 type Output = ();
3176
3177 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<()> {
3178 if self.fired.load(Ordering::SeqCst) {
3180 return std::task::Poll::Ready(());
3181 }
3182
3183 let current_ns = self.runtime.inner.current_time_ns.load(Ordering::SeqCst);
3185 if current_ns >= self.target_ns {
3186 self.fired.store(true, Ordering::SeqCst);
3187 return std::task::Poll::Ready(());
3188 }
3189
3190 if !self.registered {
3192 self.fired = self.runtime.register_timer(Duration::from_nanos(self.target_ns));
3193 self.registered = true;
3194 }
3195
3196 self.runtime.set_timer_waker(self.target_ns, cx.waker().clone());
3198
3199 std::task::Poll::Pending
3200 }
3201}
3202pub struct TestInterval {
3207 runtime: TestRuntime,
3208 period_ns: u64,
3209 next_fire_ns: u64,
3210 current_timer: Option<Arc<std::sync::atomic::AtomicBool>>,
3211}
3212
3213impl TestInterval {
3214 fn new(runtime: TestRuntime, period: Duration) -> Self {
3215 let period_ns = period.as_nanos() as u64;
3216 let start = runtime.inner.current_time_ns.load(Ordering::SeqCst);
3217 Self {
3218 runtime,
3219 period_ns,
3220 next_fire_ns: start + period_ns,
3221 current_timer: None,
3222 }
3223 }
3224}
3225
3226impl futures::Stream for TestInterval {
3227 type Item = ();
3228
3229 fn poll_next(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<()>> {
3230 let current_ns = self.runtime.inner.current_time_ns.load(Ordering::SeqCst);
3231
3232 if current_ns >= self.next_fire_ns {
3234 self.next_fire_ns += self.period_ns;
3236 self.current_timer = None;
3237 return std::task::Poll::Ready(Some(()));
3238 }
3239
3240 if self.current_timer.is_none() {
3242 self.current_timer = Some(self.runtime.register_timer(Duration::from_nanos(self.next_fire_ns)));
3243 }
3244
3245 self.runtime.set_timer_waker(self.next_fire_ns, cx.waker().clone());
3247
3248 std::task::Poll::Pending
3249 }
3250}
3251impl Runtime for TestRuntime {
3255 fn sleep(duration: Duration) -> Pin<Box<dyn Future<Output = ()> + Send>> {
3256 panic!("TestRuntime::sleep() cannot be called statically. Use runtime.test_sleep(duration) instead.")
3260 }
3261
3262 fn interval(period: Duration) -> Pin<Box<dyn futures::Stream<Item = ()> + Send>> {
3263 panic!("TestRuntime::interval() cannot be called statically. Use runtime.test_interval(period) instead.")
3264 }
3265
3266 fn spawn<F>(_future: F)
3267 where
3268 F: Future<Output = ()> + Send + 'static,
3269 {
3270 panic!("TestRuntime::spawn() is not supported. Use advance_by() to drive futures.")
3273 }
3274}
3275
3276impl TestRuntime {
3277 pub fn test_sleep(&self, duration: Duration) -> TestSleep {
3279 TestSleep::new(self.clone(), duration)
3280 }
3281
3282 pub fn test_interval(&self, period: Duration) -> TestInterval {
3284 TestInterval::new(self.clone(), period)
3285 }
3286}
3287impl TestRuntime {
3291 pub async fn run_timed_test<T, F, Fut>(&self, steps: Vec<Duration>, mut f: F) -> T
3296 where
3297 F: FnMut() -> Fut,
3298 Fut: Future<Output = T>,
3299 {
3300 for step in steps {
3301 self.advance_by(step).await;
3302 }
3303 f().await
3304 }
3305
3306 pub async fn assert_completes_within<T, Fut>(&self, timeout: Duration, fut: Fut) -> T
3308 where
3309 Fut: Future<Output = T>,
3310 {
3311 use futures::future::{select, Either};
3312
3313 let timeout_fut = self.test_sleep(timeout);
3314 futures::pin_mut!(fut);
3315 futures::pin_mut!(timeout_fut);
3316
3317 match futures::future::select(fut, timeout_fut).await {
3319 Either::Left((result, _)) => result,
3320 Either::Right(_) => panic!("Future did not complete within {:?}", timeout),
3321 }
3322 }
3323}
3324#[cfg(test)]
3328mod test_runtime_tests {
3329 use super::*;
3330
3331 #[tokio::test]
3332 async fn test_virtual_sleep() {
3333 let runtime = TestRuntime::new();
3334
3335 assert_eq!(runtime.now(), Duration::ZERO);
3337
3338 let sleep = runtime.test_sleep(Duration::from_millis(100));
3340 futures::pin_mut!(sleep);
3341
3342 let waker = futures::task::noop_waker();
3344 let mut cx = std::task::Context::from_waker(&waker);
3345 assert!(Pin::new(&mut sleep).poll(&mut cx).is_pending());
3346
3347 runtime.advance_by(Duration::from_millis(150)).await;
3349
3350 assert_eq!(runtime.now(), Duration::from_millis(150));
3352 }
3353
3354 #[tokio::test]
3355 async fn test_virtual_interval() {
3356 let runtime = TestRuntime::new();
3357 let mut interval = runtime.test_interval(Duration::from_millis(100));
3358
3359 runtime.advance_by(Duration::from_millis(100)).await;
3361 assert_eq!(interval.next().await, Some(()));
3362
3363 runtime.advance_by(Duration::from_millis(100)).await;
3365 assert_eq!(interval.next().await, Some(()));
3366
3367 assert_eq!(runtime.now(), Duration::from_millis(200));
3369 }
3370
3371 #[tokio::test]
3372 async fn test_multiple_timers() {
3373 let runtime = TestRuntime::new();
3374
3375 let sleep1 = runtime.test_sleep(Duration::from_millis(50));
3377 let sleep2 = runtime.test_sleep(Duration::from_millis(100));
3378 let sleep3 = runtime.test_sleep(Duration::from_millis(150));
3379
3380 futures::pin_mut!(sleep1);
3381 futures::pin_mut!(sleep2);
3382 futures::pin_mut!(sleep3);
3383
3384 let waker = futures::task::noop_waker();
3385 let mut cx = std::task::Context::from_waker(&waker);
3386
3387 assert!(Pin::new(&mut sleep1).poll(&mut cx).is_pending());
3389 assert!(Pin::new(&mut sleep2).poll(&mut cx).is_pending());
3390 assert!(Pin::new(&mut sleep3).poll(&mut cx).is_pending());
3391
3392 runtime.advance_to(Duration::from_millis(75)).await;
3394 assert!(Pin::new(&mut sleep1).poll(&mut cx).is_ready());
3395 assert!(Pin::new(&mut sleep2).poll(&mut cx).is_pending());
3396 assert!(Pin::new(&mut sleep3).poll(&mut cx).is_pending());
3397
3398 runtime.advance_to(Duration::from_millis(125)).await;
3400 assert!(Pin::new(&mut sleep2).poll(&mut cx).is_ready());
3401 assert!(Pin::new(&mut sleep3).poll(&mut cx).is_pending());
3402
3403 runtime.advance_to(Duration::from_millis(200)).await;
3405 assert!(Pin::new(&mut sleep3).poll(&mut cx).is_ready());
3406 }
3407}
3408pub fn delay_test<T, S>(
3416 runtime: TestRuntime,
3417 duration: Duration,
3418 source: S,
3419) -> impl futures::Stream<Item = T>
3420where
3421 T: Send + 'static,
3422 S: futures::Stream<Item = T> + Send + 'static,
3423{
3424 stream! {
3425 futures::pin_mut!(source);
3426 while let Some(value) = source.next().await {
3427 runtime.test_sleep(duration).await;
3428 yield value;
3429 }
3430 }
3431}
3432
3433pub fn periodic_test(runtime: TestRuntime, period: Duration) -> impl futures::Stream<Item = u64> {
3435 stream! {
3436 let mut count = 0u64;
3437 let mut interval = runtime.test_interval(period);
3438 loop {
3439 interval.next().await;
3440 yield count;
3441 count += 1;
3442 }
3443 }
3444}
3445