1use async_stream::stream;
7use futures::channel::mpsc::{channel, Receiver, Sender};
8use futures_core::Stream;
9use futures_util::pin_mut;
10use futures_util::{
11 future,
12 stream::{self, BoxStream, FuturesUnordered, StreamExt},
13 SinkExt,
14};
15use std::future::Future;
16use std::time::Duration;
17use std::sync::Arc;
18use tokio::{spawn, time::sleep};
19use tokio::sync::Mutex;
20
21use crate::error::{StreamError, StreamResult, RetryPolicy};
22use crate::stream_performance_metrics::{HealthThresholds, StreamMetrics};
23
24pub type RS2Stream<O> = BoxStream<'static, O>;
26
27#[derive(Debug, Clone, Copy)]
29pub enum BackpressureStrategy {
30 DropOldest,
32 DropNewest,
34 Block,
36 Error,
38}
39
40#[derive(Debug, Clone)]
42pub struct BackpressureConfig {
43 pub strategy: BackpressureStrategy,
44 pub buffer_size: usize,
45 pub low_watermark: Option<usize>, pub high_watermark: Option<usize>, }
48
49impl Default for BackpressureConfig {
50 fn default() -> Self {
51 Self {
52 strategy: BackpressureStrategy::Block,
53 buffer_size: 100,
54 low_watermark: Some(25),
55 high_watermark: Some(75),
56 }
57 }
58}
59
60#[derive(Debug, Clone)]
62pub enum ExitCase<E> {
63 Completed,
64 Errored(E),
65}
66
67pub fn emit<O>(item: O) -> RS2Stream<O>
73where
74 O: Send + 'static,
75{
76 stream::once(future::ready(item)).boxed()
77}
78
79pub fn empty<O>() -> RS2Stream<O>
81where
82 O: Send + 'static,
83{
84 stream::empty().boxed()
85}
86
87pub fn from_iter<I, O>(iter: I) -> RS2Stream<O>
89where
90 I: IntoIterator<Item = O> + Send + 'static,
91 <I as IntoIterator>::IntoIter: Send,
92 O: Send + 'static,
93{
94 stream::iter(iter).boxed()
95}
96
97pub fn eval<O, F>(fut: F) -> RS2Stream<O>
99where
100 F: Future<Output = O> + Send + 'static,
101 O: Send + 'static,
102{
103 stream::once(fut).boxed()
104}
105
106pub fn repeat<O>(item: O) -> RS2Stream<O>
108where
109 O: Clone + Send + 'static,
110{
111 stream::repeat(item).boxed()
112}
113
114pub fn emit_after<O>(item: O, duration: Duration) -> RS2Stream<O>
116where
117 O: Send + 'static,
118{
119 stream::once(async move {
120 sleep(duration).await;
121 item
122 }).boxed()
123}
124
125pub fn unfold<S, O, F, Fut>(init: S, mut f: F) -> RS2Stream<O>
151where
152 S: Send + 'static,
153 O: Send + 'static,
154 F: FnMut(S) -> Fut + Send + 'static,
155 Fut: Future<Output = Option<(O, S)>> + Send + 'static,
156{
157 stream! {
158 let mut state_opt = Some(init);
159
160 loop {
161 let state = state_opt.take().expect("State should be available");
162 let fut = f(state);
163 match fut.await {
164 Some((item, next_state)) => {
165 yield item;
166 state_opt = Some(next_state);
167 },
168 None => break,
169 }
170 }
171 }
172 .boxed()
173}
174
175pub fn group_adjacent_by<O, K, F>(s: RS2Stream<O>, mut key_fn: F) -> RS2Stream<(K, Vec<O>)>
197where
198 O: Clone + Send + 'static,
199 K: Eq + Clone + Send + 'static,
200 F: FnMut(&O) -> K + Send + 'static,
201{
202 stream! {
203 pin_mut!(s);
204 let mut current_key: Option<K> = None;
205 let mut current_group: Vec<O> = Vec::new();
206
207 while let Some(item) = s.next().await {
208 let key = key_fn(&item);
209
210 match ¤t_key {
211 Some(k) if *k == key => {
212 current_group.push(item);
213 },
214 _ => {
215 if !current_group.is_empty() {
216 yield (current_key.clone().unwrap(), std::mem::take(&mut current_group));
217 }
218 current_key = Some(key);
219 current_group.push(item);
220 }
221 }
222 }
223
224 if !current_group.is_empty() {
225 yield (current_key.clone().unwrap(), std::mem::take(&mut current_group));
226 }
227 }
228 .boxed()
229}
230
231pub fn take<O>(s: RS2Stream<O>, n: usize) -> RS2Stream<O>
233where
234 O: Send + 'static,
235{
236 s.take(n).boxed()
237}
238
239pub fn drop<O>(s: RS2Stream<O>, n: usize) -> RS2Stream<O>
241where
242 O: Send + 'static,
243{
244 s.skip(n).boxed()
245}
246
247pub fn chunk<O>(s: RS2Stream<O>, size: usize) -> RS2Stream<Vec<O>>
249where
250 O: Send + 'static,
251{
252 stream! {
253 let mut buf = Vec::with_capacity(size);
254 pin_mut!(s);
255 while let Some(item) = s.next().await {
256 buf.push(item);
257 if buf.len() == size {
258 yield std::mem::take(&mut buf);
259 }
260 }
261 if !buf.is_empty() {
262 yield std::mem::take(&mut buf);
263 }
264 }
265 .boxed()
266}
267
268pub fn timeout<T>(s: RS2Stream<T>, duration: Duration) -> RS2Stream<StreamResult<T>>
270where
271 T: Send + 'static,
272{
273 stream! {
274 pin_mut!(s);
275 loop {
276 match tokio::time::timeout(duration, s.next()).await {
277 Ok(Some(value)) => yield Ok(value),
278 Ok(None) => break,
279 Err(_) => yield Err(StreamError::Timeout),
280 }
281 }
282 }.boxed()
283}
284
285pub fn scan<T, U, F>(s: RS2Stream<T>, init: U, mut f: F) -> RS2Stream<U>
287where
288 F: FnMut(U, T) -> U + Send + 'static,
289 T: Send + 'static,
290 U: Clone + Send + 'static,
291{
292 stream! {
293 let mut acc = init;
294 pin_mut!(s);
295 while let Some(item) = s.next().await {
296 acc = f(acc.clone(), item);
297 yield acc.clone();
298 }
299 }.boxed()
300}
301
302pub fn fold<T, A, F, Fut>(s: RS2Stream<T>, init: A, mut f: F) -> impl Future<Output = A>
304where
305 F: FnMut(A, T) -> Fut + Send + 'static,
306 Fut: Future<Output = A> + Send + 'static,
307 T: Send + 'static,
308 A: Send + 'static,
309{
310 async move {
311 let mut acc = init;
312 pin_mut!(s);
313 while let Some(item) = s.next().await {
314 acc = f(acc, item).await;
315 }
316 acc
317 }
318}
319
320pub fn reduce<T, F, Fut>(s: RS2Stream<T>, mut f: F) -> impl Future<Output = Option<T>>
322where
323 F: FnMut(T, T) -> Fut + Send + 'static,
324 Fut: Future<Output = T> + Send + 'static,
325 T: Send + 'static,
326{
327 async move {
328 pin_mut!(s);
329 let first = match s.next().await {
330 Some(item) => item,
331 None => return None, };
333
334 let mut acc = first;
335 while let Some(item) = s.next().await {
336 acc = f(acc, item).await;
337 }
338
339 Some(acc)
340 }
341}
342
343pub fn filter_map<T, U, F, Fut>(s: RS2Stream<T>, f: F) -> RS2Stream<U>
345where
346 F: FnMut(T) -> Fut + Send + 'static,
347 Fut: Future<Output = Option<U>> + Send + 'static,
348 T: Send + 'static,
349 U: Send + 'static,
350{
351 s.filter_map(f).boxed()
352}
353
354pub fn take_while<T, F, Fut>(s: RS2Stream<T>, mut predicate: F) -> RS2Stream<T>
371where
372 F: FnMut(&T) -> Fut + Send + 'static,
373 Fut: Future<Output = bool> + Send + 'static,
374 T: Send + 'static,
375{
376 stream! {
377 pin_mut!(s);
378 while let Some(item) = s.next().await {
379 if predicate(&item).await {
380 yield item;
381 } else {
382 break;
383 }
384 }
385 }.boxed()
386}
387
388pub fn drop_while<T, F, Fut>(s: RS2Stream<T>, mut predicate: F) -> RS2Stream<T>
405where
406 F: FnMut(&T) -> Fut + Send + 'static,
407 Fut: Future<Output = bool> + Send + 'static,
408 T: Send + 'static,
409{
410 stream! {
411 pin_mut!(s);
412
413 let mut found_false = false;
414 while let Some(item) = s.next().await {
415 if !found_false && predicate(&item).await {
416 continue;
417 } else {
418 found_false = true;
419 yield item;
420 }
421 }
422 }.boxed()
423}
424
425pub fn group_by<T, K, F>(s: RS2Stream<T>, mut key_fn: F) -> RS2Stream<(K, Vec<T>)>
443where
444 T: Clone + Send + 'static,
445 K: Eq + Clone + Send + 'static,
446 F: FnMut(&T) -> K + Send + 'static,
447{
448 stream! {
449 pin_mut!(s);
450 let mut current_key: Option<K> = None;
451 let mut current_group: Vec<T> = Vec::new();
452
453 while let Some(item) = s.next().await {
454 let key = key_fn(&item);
455
456 match ¤t_key {
457 Some(k) if *k == key => {
458 current_group.push(item);
459 },
460 _ => {
461 if !current_group.is_empty() {
462 yield (current_key.clone().unwrap(), std::mem::take(&mut current_group));
463 }
464 current_key = Some(key);
465 current_group.push(item);
466 }
467 }
468 }
469
470 if !current_group.is_empty() {
471 yield (current_key.clone().unwrap(), std::mem::take(&mut current_group));
472 }
473 }
474 .boxed()
475}
476
477pub fn sliding_window<T>(s: RS2Stream<T>, size: usize) -> RS2Stream<Vec<T>>
479where
480 T: Clone + Send + 'static,
481{
482 if size == 0 {
483 return empty();
484 }
485
486 stream! {
487 let mut window = Vec::with_capacity(size);
488 pin_mut!(s);
489
490 while let Some(item) = s.next().await {
491 window.push(item);
492
493 if window.len() > size {
494 window.remove(0);
495 }
496
497 if window.len() == size {
498 yield window.clone();
499 }
500 }
501 }.boxed()
502}
503
504pub fn batch_process<T, U, F>(
506 s: RS2Stream<T>,
507 batch_size: usize,
508 mut processor: F
509) -> RS2Stream<U>
510where
511 F: FnMut(Vec<T>) -> Vec<U> + Send + 'static,
512 T: Send + 'static,
513 U: Send + 'static,
514{
515 stream! {
516 let chunked = chunk(s, batch_size);
517 pin_mut!(chunked);
518 while let Some(batch) = chunked.next().await {
519 for item in processor(batch) {
520 yield item;
521 }
522 }
523 }.boxed()
524}
525
526pub fn with_metrics<T>(
528 s: RS2Stream<T>,
529 name: String,
530 thresholds: HealthThresholds
531) -> (RS2Stream<T>, Arc<Mutex<StreamMetrics>>)
532where
533 T: Send + 'static,
534{
535 let metrics = Arc::new(Mutex::new(
536 StreamMetrics::new()
537 .with_name(name)
538 .with_health_thresholds(thresholds)
539 ));
540
541 let metrics_clone = Arc::clone(&metrics);
542
543 let monitored_stream = stream! {
544 pin_mut!(s);
545 while let Some(item) = s.next().await {
546 {
547 let mut m = metrics_clone.lock().await;
548 m.record_item(size_of_val(&item) as u64);
549 }
550 yield item;
551 }
552
553 {
554 let mut m = metrics_clone.lock().await;
555 m.finalize();
556 }
557 }.boxed();
558
559 (monitored_stream, metrics)
560}
561
562
563pub fn auto_backpressure<O>(s: RS2Stream<O>, config: BackpressureConfig) -> RS2Stream<O>
569where
570 O: Send + 'static,
571{
572 match config.strategy {
573 BackpressureStrategy::Block => auto_backpressure_block(s, config.buffer_size),
574 BackpressureStrategy::DropOldest => auto_backpressure_drop_oldest(s, config.buffer_size),
575 BackpressureStrategy::DropNewest => auto_backpressure_drop_newest(s, config.buffer_size),
576 BackpressureStrategy::Error => auto_backpressure_error(s, config.buffer_size),
577 }
578}
579
580pub fn auto_backpressure_block<O>(s: RS2Stream<O>, buffer_size: usize) -> RS2Stream<O>
582where
583 O: Send + 'static,
584{
585 let (mut tx, rx): (Sender<O>, Receiver<O>) = channel(buffer_size);
586
587 spawn(async move {
588 pin_mut!(s);
589 while let Some(item) = s.next().await {
590 if tx.send(item).await.is_err() {
591 break;
592 }
593 }
594 });
595
596 stream! {
597 let mut rx = rx;
598 while let Some(item) = rx.next().await {
599 yield item;
600 }
601 }
602 .boxed()
603}
604
605pub fn auto_backpressure_drop_oldest<O>(s: RS2Stream<O>, buffer_size: usize) -> RS2Stream<O>
607where
608 O: Send + 'static,
609{
610 use std::collections::VecDeque;
611
612 let buffer = Arc::new(Mutex::new(VecDeque::<O>::new()));
613 let buffer_clone = Arc::clone(&buffer);
614 let (done_tx, mut done_rx) = tokio::sync::mpsc::channel(1);
615
616 spawn(async move {
617 pin_mut!(s);
618 while let Some(item) = s.next().await {
619 let mut buf = buffer_clone.lock().await;
620
621 if buf.len() >= buffer_size {
622 buf.pop_front();
623 }
624
625 buf.push_back(item);
626 }
627
628 let _ = done_tx.send(()).await;
629 });
630
631 stream! {
632 let mut source_done = false;
633
634 loop {
635 if let Ok(_) = done_rx.try_recv() {
636 source_done = true;
637 }
638
639 let item = {
640 let mut buf = buffer.lock().await;
641 buf.pop_front()
642 };
643
644 match item {
645 Some(item) => yield item,
646 None => {
647 if source_done {
648 break;
649 }
650 tokio::time::sleep(Duration::from_millis(1)).await;
651 }
652 }
653 }
654 }
655 .boxed()
656}
657
658pub fn auto_backpressure_drop_newest<O>(s: RS2Stream<O>, buffer_size: usize) -> RS2Stream<O>
660where
661 O: Send + 'static,
662{
663 use std::collections::VecDeque;
664
665 let buffer = Arc::new(Mutex::new(VecDeque::<O>::new()));
666 let buffer_clone = Arc::clone(&buffer);
667 let (done_tx, mut done_rx) = tokio::sync::mpsc::channel(1);
668
669 spawn(async move {
670 pin_mut!(s);
671 while let Some(item) = s.next().await {
672 let mut buf = buffer_clone.lock().await;
673
674 if buf.len() < buffer_size {
675 buf.push_back(item);
676 }
677 }
678
679 let _ = done_tx.send(()).await;
680 });
681
682 stream! {
683 let mut source_done = false;
684
685 loop {
686 if let Ok(_) = done_rx.try_recv() {
687 source_done = true;
688 }
689
690 let item = {
691 let mut buf = buffer.lock().await;
692 buf.pop_front()
693 };
694
695 match item {
696 Some(item) => yield item,
697 None => {
698 if source_done {
699 break;
700 }
701 tokio::time::sleep(Duration::from_millis(1)).await;
702 }
703 }
704 }
705 }
706 .boxed()
707}
708
709pub fn auto_backpressure_error<O>(s: RS2Stream<O>, buffer_size: usize) -> RS2Stream<O>
711where
712 O: Send + 'static,
713{
714 use tokio::sync::mpsc;
715
716 let (tx, mut rx) = mpsc::channel(buffer_size);
717
718 spawn(async move {
719 pin_mut!(s);
720 while let Some(item) = s.next().await {
721 if tx.send(item).await.is_err() {
722 break;
723 }
724 }
725 });
726
727 stream! {
728 while let Some(item) = rx.recv().await {
729 yield item;
730 }
731 }
732 .boxed()
733}
734
735pub fn interrupt_when<O, F>(s: RS2Stream<O>, signal: F) -> RS2Stream<O>
771where
772 O: Send + 'static,
773 F: Future<Output = ()> + Send + 'static,
774{
775 stream! {
776 pin_mut!(s);
777 pin_mut!(signal);
778
779 loop {
780 tokio::select! {
781 biased;
782 _ = &mut signal => {
783 break;
784 },
785
786 maybe_item = s.next() => {
787 match maybe_item {
788 Some(item) => yield item,
789 None => break,
790 }
791 },
792 }
793 }
794 }
795 .boxed()
796}
797
798pub fn concat<O, S>(streams: Vec<S>) -> RS2Stream<O>
800where
801 S: Stream<Item = O> + Send + 'static,
802 O: Send + 'static,
803{
804 stream! {
805 for s in streams {
806 pin_mut!(s);
807 while let Some(item) = s.next().await {
808 yield item;
809 }
810 }
811 }
812 .boxed()
813}
814
815pub fn merge<O, S1, S2>(s1: S1, mut s2: S2) -> RS2Stream<O>
817where
818 S1: Stream<Item = O> + Send + 'static,
819 S2: Stream<Item = O> + Send + 'static + Unpin,
820 O: Send + 'static,
821{
822 let chained = s1
823 .map(Some)
824 .chain(stream! { while let Some(x) = s2.next().await { yield Some(x) } });
825 stream! {
826 pin_mut!(chained);
827 while let Some(item) = chained.next().await {
828 if let Some(x) = item {
829 yield x;
830 }
831 }
832 }
833 .boxed()
834}
835
836
837pub fn interleave<O, S>(streams: Vec<S>) -> RS2Stream<O>
839where
840 S: Stream<Item = O> + Send + 'static + Unpin,
841 O: Send + 'static,
842{
843 if streams.is_empty() {
844 return empty();
845 }
846
847 stream! {
848 let mut streams: Vec<_> = streams.into_iter().map(|s| Box::pin(s)).collect();
849 let mut index = 0;
850
851 while !streams.is_empty() {
852 if index >= streams.len() {
853 index = 0;
854 }
855
856 match streams[index].next().await {
857 Some(item) => {
858 yield item;
859 index += 1;
860 }
861 None => {
862 streams.remove(index);
863 }
864 }
865 }
866 }
867 .boxed()
868}
869
870pub fn zip_with<A, B, O, F, S1, S2>(s1: S1, s2: S2, mut f: F) -> RS2Stream<O>
874where
875 S1: Stream<Item = A> + Send + 'static,
876 S2: Stream<Item = B> + Send + 'static,
877 F: FnMut(A, B) -> O + Send + 'static,
878 A: Send + 'static,
879 B: Send + 'static,
880 O: Send + 'static,
881{
882 stream! {
883 pin_mut!(s1);
884 pin_mut!(s2);
885
886 loop {
887 match futures_util::future::join(s1.next(), s2.next()).await {
888 (Some(a), Some(b)) => yield f(a, b),
889 _ => break, }
891 }
892 }
893 .boxed()
894}
895
896pub fn either<O, S1, S2>(s1: S1, s2: S2) -> RS2Stream<O>
939where
940 S1: Stream<Item = O> + Send + 'static,
941 S2: Stream<Item = O> + Send + 'static,
942 O: Send + 'static,
943{
944 stream! {
945 pin_mut!(s1);
946 pin_mut!(s2);
947
948 let mut s1_done = false;
949 let mut s2_done = false;
950
951 let mut using_s1 = true;
952
953 loop {
954 if s1_done {
955 match s2.next().await {
956 Some(item) => yield item,
957 None => break,
958 }
959 continue;
960 }
961
962 if s2_done {
963 match s1.next().await {
964 Some(item) => yield item,
965 None => break,
966 }
967 continue;
968 }
969
970 if using_s1 {
971 match s1.next().await {
972 Some(item) => {
973 yield item;
974 },
975 None => {
976 s1_done = true;
977 }
978 }
979 } else {
980 match s2.next().await {
981 Some(item) => {
982 yield item;
983 },
984 None => {
985 s2_done = true;
986 }
987 }
988 }
989
990 tokio::select! {
991 biased;
992
993 maybe_item = s1.next() => {
994 match maybe_item {
995 Some(item) => {
996 yield item;
997 using_s1 = true;
998 },
999 None => {
1000 s1_done = true;
1001 }
1002 }
1003 },
1004 maybe_item = s2.next() => {
1005 match maybe_item {
1006 Some(item) => {
1007 yield item;
1008 using_s1 = false;
1009 },
1010 None => {
1011 s2_done = true;
1012 }
1013 }
1014 }
1015 }
1016 }
1017 }
1018 .boxed()
1019}
1020
1021pub fn debounce<O>(s: RS2Stream<O>, duration: Duration) -> RS2Stream<O>
1035where
1036 O: Send + 'static,
1037{
1038 stream! {
1039 pin_mut!(s);
1040
1041 let mut latest_item: Option<O> = None;
1042 let mut timer_handle: Option<tokio::task::JoinHandle<()>> = None;
1043
1044 let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(1);
1045
1046 loop {
1047 tokio::select! {
1048 maybe_item = s.next() => {
1049 match maybe_item {
1050 Some(item) => {
1051 if let Some(handle) = timer_handle.take() {
1052 handle.abort();
1053 }
1054
1055 latest_item = Some(item);
1056
1057 let tx_clone = tx.clone();
1058 timer_handle = Some(tokio::spawn(async move {
1059 tokio::time::sleep(duration).await;
1060 let _ = tx_clone.send(()).await;
1061 }));
1062 },
1063 None => {
1064 if let Some(item) = latest_item.take() {
1065 yield item;
1066 }
1067 break;
1068 }
1069 }
1070 },
1071 _ = rx.recv() => {
1072 if let Some(item) = latest_item.take() {
1073 yield item;
1074 }
1075 }
1076 }
1077 }
1078 }
1079 .boxed()
1080}
1081
1082pub fn distinct_until_changed<O>(s: RS2Stream<O>) -> RS2Stream<O>
1100where
1101 O: Clone + Send + PartialEq + 'static,
1102{
1103 stream! {
1104 pin_mut!(s);
1105 let mut prev: Option<O> = None;
1106
1107 while let Some(item) = s.next().await {
1108 match &prev {
1109 Some(p) if p == &item => {
1110 },
1111 _ => {
1112 yield item.clone();
1113 prev = Some(item);
1114 }
1115 }
1116 }
1117 }
1118 .boxed()
1119}
1120
1121pub fn sample<O>(s: RS2Stream<O>, interval: Duration) -> RS2Stream<O>
1159where
1160 O: Clone + Send + 'static,
1161{
1162 stream! {
1163 pin_mut!(s);
1164
1165 let mut latest_item: Option<O> = None;
1166 let mut has_new_value = false;
1167
1168 let mut timer = tokio::time::interval(interval);
1169 timer.tick().await;
1170
1171 loop {
1172 tokio::select! {
1173 maybe_item = s.next() => {
1174 match maybe_item {
1175 Some(item) => {
1176 latest_item = Some(item);
1177 has_new_value = true;
1178 },
1179 None => {
1180 if has_new_value {
1181 if let Some(item) = latest_item.take() {
1182 yield item;
1183 }
1184 }
1185 break;
1186 }
1187 }
1188 },
1189 _ = timer.tick() => {
1190 if has_new_value {
1191 if let Some(ref item) = latest_item {
1192 yield item.clone();
1193 has_new_value = false;
1194 }
1195 }
1196 }
1197 }
1198 }
1199 }
1200 .boxed()
1201}
1202
1203pub fn distinct_until_changed_by<O, F>(s: RS2Stream<O>, mut eq: F) -> RS2Stream<O>
1222where
1223 O: Clone + Send + 'static,
1224 F: FnMut(&O, &O) -> bool + Send + 'static,
1225{
1226 stream! {
1227 pin_mut!(s);
1228 let mut prev: Option<O> = None;
1229
1230 while let Some(item) = s.next().await {
1231 match &prev {
1232 Some(p) if eq(p, &item) => {
1233 },
1234 _ => {
1235 yield item.clone();
1236 prev = Some(item);
1237 }
1238 }
1239 }
1240 }
1241 .boxed()
1242}
1243
1244pub fn prefetch<O>(s: RS2Stream<O>, prefetch_count: usize) -> RS2Stream<O>
1251where
1252 O: Send + 'static,
1253{
1254 if prefetch_count == 0 {
1255 return s;
1256 }
1257
1258 let (mut tx, rx): (Sender<O>, Receiver<O>) = channel(prefetch_count);
1259
1260 spawn(async move {
1261 pin_mut!(s);
1262 while let Some(item) = s.next().await {
1263 if tx.send(item).await.is_err() {
1264 break;
1265 }
1266 }
1267 });
1268
1269 stream! {
1270 let mut rx = rx;
1271 while let Some(item) = rx.next().await {
1272 yield item;
1273 }
1274 }
1275 .boxed()
1276}
1277
1278pub fn rate_limit_backpressure<O>(s: RS2Stream<O>, capacity: usize) -> RS2Stream<O>
1280where
1281 O: Send + 'static,
1282{
1283 auto_backpressure_block(s, capacity)
1284}
1285
1286pub fn throttle<O>(s: RS2Stream<O>, duration: Duration) -> RS2Stream<O>
1288where
1289 O: Send + 'static,
1290{
1291 stream! {
1292 pin_mut!(s);
1293 while let Some(item) = s.next().await {
1294 yield item;
1295 sleep(duration).await;
1296 }
1297 }
1298 .boxed()
1299}
1300
1301pub fn tick<O>(period: Duration, item: O) -> RS2Stream<O>
1303where
1304 O: Clone + Send + 'static,
1305{
1306 stream! {
1307 loop {
1308 yield item.clone();
1309 sleep(period).await;
1310 }
1311 }
1312 .boxed()
1313}
1314
1315pub fn par_eval_map<I, O, Fut, F>(s: RS2Stream<I>, concurrency: usize, mut f: F) -> RS2Stream<O>
1321where
1322 F: FnMut(I) -> Fut + Send + 'static,
1323 Fut: Future<Output = O> + Send + 'static,
1324 O: Send + 'static,
1325 I: Send + 'static,
1326{
1327 let buffered_stream = auto_backpressure_block(s, concurrency * 2);
1328
1329 stream! {
1330 let mut in_flight = FuturesUnordered::new();
1331 pin_mut!(buffered_stream);
1332
1333 while let Some(item) = buffered_stream.next().await {
1334 in_flight.push(f(item));
1335 if in_flight.len() >= concurrency {
1336 if let Some(res) = in_flight.next().await {
1337 yield res;
1338 }
1339 }
1340 }
1341 while let Some(res) = in_flight.next().await {
1342 yield res;
1343 }
1344 }
1345 .boxed()
1346}
1347
1348pub fn par_eval_map_unordered<I, O, Fut, F>(
1350 s: RS2Stream<I>,
1351 concurrency: usize,
1352 f: F,
1353) -> RS2Stream<O>
1354where
1355 F: FnMut(I) -> Fut + Send + 'static,
1356 Fut: Future<Output = O> + Send + 'static,
1357 O: Send + 'static,
1358 I: Send + 'static,
1359{
1360 let buffered_stream = auto_backpressure_block(s, concurrency * 2);
1361 buffered_stream.map(f).buffer_unordered(concurrency).boxed()
1362}
1363
1364pub fn par_join<O, S>(
1372 s: RS2Stream<S>,
1373 concurrency: usize,
1374) -> RS2Stream<O>
1375where
1376 S: Stream<Item = O> + Send + 'static + Unpin,
1377 O: Send + 'static,
1378{
1379 let buffered_stream = auto_backpressure_block(s, concurrency * 2);
1380
1381 stream! {
1382 pin_mut!(buffered_stream);
1383
1384 let mut active_streams: Vec<S> = Vec::with_capacity(concurrency);
1385
1386 let mut outer_stream_done = false;
1387
1388 loop {
1389 while active_streams.len() < concurrency && !outer_stream_done {
1390 match buffered_stream.next().await {
1391 Some(inner_stream) => {
1392 active_streams.push(inner_stream);
1393 },
1394 None => {
1395 outer_stream_done = true;
1396 break;
1397 }
1398 }
1399 }
1400 if active_streams.is_empty() && outer_stream_done {
1401 break;
1402 }
1403
1404 let mut i = 0;
1405 while i < active_streams.len() {
1406 match active_streams[i].next().await {
1407 Some(item) => {
1408 yield item;
1409 i += 1;
1410 },
1411 None => {
1412 active_streams.swap_remove(i);
1413 }
1414 }
1415 }
1416 }
1417 }
1418 .boxed()
1419}
1420
1421pub fn bracket<A, O, St, FAcq, FUse, FRel, R>(
1427 acquire: FAcq,
1428 use_fn: FUse,
1429 release: FRel,
1430) -> RS2Stream<O>
1431where
1432 FAcq: Future<Output = A> + Send + 'static,
1433 FUse: FnOnce(A) -> St + Send + 'static,
1434 St: Stream<Item = O> + Send + 'static,
1435 FRel: FnOnce(A) -> R + Send + 'static,
1436 R: Future<Output = ()> + Send + 'static,
1437 O: Send + 'static,
1438 A: Clone + Send + 'static,
1439{
1440 stream! {
1441 let resource = acquire.await;
1442 let stream = use_fn(resource.clone());
1443 pin_mut!(stream);
1444 while let Some(item) = stream.next().await {
1445 yield item;
1446 }
1447 release(resource).await;
1448 }
1449 .boxed()
1450}
1451
1452pub fn bracket_case<A, O, E, St, FAcq, FUse, FRel, R>(
1454 acquire: FAcq,
1455 use_fn: FUse,
1456 release: FRel,
1457) -> RS2Stream<Result<O, E>>
1458where
1459 FAcq: Future<Output = A> + Send + 'static,
1460 FUse: FnOnce(A) -> St + Send + 'static,
1461 St: Stream<Item = Result<O, E>> + Send + 'static,
1462 FRel: FnOnce(A, ExitCase<E>) -> R + Send + 'static,
1463 R: Future<Output = ()> + Send + 'static,
1464 O: Send + 'static,
1465 E: Clone + Send + 'static,
1466 A: Clone + Send + 'static,
1467{
1468 stream! {
1469 let resource = acquire.await;
1470 let stream = use_fn(resource.clone());
1471 pin_mut!(stream);
1472 while let Some(item) = stream.next().await {
1473 yield item;
1474 }
1475 release(resource, ExitCase::Completed).await;
1476 }
1477 .boxed()
1478}
1479
1480pub use crate::rs2_result_stream_ext::RS2ResultStreamExt;
1486pub use crate::rs2_stream_ext::RS2StreamExt;