1use async_stream::stream;
7use futures::channel::mpsc::{channel, Receiver, Sender};
8use futures_core::Stream;
9use futures_util::pin_mut;
10use futures_util::{
11 future,
12 stream::{self, BoxStream, FuturesUnordered, StreamExt},
13 SinkExt,
14};
15use std::future::Future;
16use std::time::Duration;
17use std::sync::Arc;
18use tokio::{spawn, time::sleep};
19use tokio::sync::Mutex;
20
21use crate::error::{StreamError, StreamResult, RetryPolicy};
22use crate::stream_performance_metrics::StreamMetrics;
23
24pub type RS2Stream<O> = BoxStream<'static, O>;
26
27#[derive(Debug, Clone, Copy)]
29pub enum BackpressureStrategy {
30 DropOldest,
32 DropNewest,
34 Block,
36 Error,
38}
39
40#[derive(Debug, Clone)]
42pub struct BackpressureConfig {
43 pub strategy: BackpressureStrategy,
44 pub buffer_size: usize,
45 pub low_watermark: Option<usize>, pub high_watermark: Option<usize>, }
48
49impl Default for BackpressureConfig {
50 fn default() -> Self {
51 Self {
52 strategy: BackpressureStrategy::Block,
53 buffer_size: 100,
54 low_watermark: Some(25),
55 high_watermark: Some(75),
56 }
57 }
58}
59
60#[derive(Debug, Clone)]
62pub enum ExitCase<E> {
63 Completed,
64 Errored(E),
65}
66
67pub fn emit<O>(item: O) -> RS2Stream<O>
73where
74 O: Send + 'static,
75{
76 stream::once(future::ready(item)).boxed()
77}
78
79pub fn empty<O>() -> RS2Stream<O>
81where
82 O: Send + 'static,
83{
84 stream::empty().boxed()
85}
86
87pub fn from_iter<I, O>(iter: I) -> RS2Stream<O>
89where
90 I: IntoIterator<Item = O> + Send + 'static,
91 <I as IntoIterator>::IntoIter: Send,
92 O: Send + 'static,
93{
94 stream::iter(iter).boxed()
95}
96
97pub fn eval<O, F>(fut: F) -> RS2Stream<O>
99where
100 F: Future<Output = O> + Send + 'static,
101 O: Send + 'static,
102{
103 stream::once(fut).boxed()
104}
105
106pub fn repeat<O>(item: O) -> RS2Stream<O>
108where
109 O: Clone + Send + 'static,
110{
111 stream::repeat(item).boxed()
112}
113
114pub fn emit_after<O>(item: O, duration: Duration) -> RS2Stream<O>
116where
117 O: Send + 'static,
118{
119 stream::once(async move {
120 sleep(duration).await;
121 item
122 }).boxed()
123}
124
125pub fn unfold<S, O, F, Fut>(init: S, mut f: F) -> RS2Stream<O>
151where
152 S: Send + 'static,
153 O: Send + 'static,
154 F: FnMut(S) -> Fut + Send + 'static,
155 Fut: Future<Output = Option<(O, S)>> + Send + 'static,
156{
157 stream! {
158 let mut state_opt = Some(init);
159
160 loop {
161 let state = state_opt.take().expect("State should be available");
162 let fut = f(state);
163 match fut.await {
164 Some((item, next_state)) => {
165 yield item;
166 state_opt = Some(next_state);
167 },
168 None => break,
169 }
170 }
171 }
172 .boxed()
173}
174
175pub fn group_adjacent_by<O, K, F>(s: RS2Stream<O>, mut key_fn: F) -> RS2Stream<(K, Vec<O>)>
197where
198 O: Clone + Send + 'static,
199 K: Eq + Clone + Send + 'static,
200 F: FnMut(&O) -> K + Send + 'static,
201{
202 stream! {
203 pin_mut!(s);
204 let mut current_key: Option<K> = None;
205 let mut current_group: Vec<O> = Vec::new();
206
207 while let Some(item) = s.next().await {
208 let key = key_fn(&item);
209
210 match ¤t_key {
211 Some(k) if *k == key => {
212 current_group.push(item);
213 },
214 _ => {
215 if !current_group.is_empty() {
216 yield (current_key.clone().unwrap(), std::mem::take(&mut current_group));
217 }
218 current_key = Some(key);
219 current_group.push(item);
220 }
221 }
222 }
223
224 if !current_group.is_empty() {
225 yield (current_key.clone().unwrap(), std::mem::take(&mut current_group));
226 }
227 }
228 .boxed()
229}
230
231pub fn take<O>(s: RS2Stream<O>, n: usize) -> RS2Stream<O>
233where
234 O: Send + 'static,
235{
236 s.take(n).boxed()
237}
238
239pub fn drop<O>(s: RS2Stream<O>, n: usize) -> RS2Stream<O>
241where
242 O: Send + 'static,
243{
244 s.skip(n).boxed()
245}
246
247pub fn chunk<O>(s: RS2Stream<O>, size: usize) -> RS2Stream<Vec<O>>
249where
250 O: Send + 'static,
251{
252 stream! {
253 let mut buf = Vec::with_capacity(size);
254 pin_mut!(s);
255 while let Some(item) = s.next().await {
256 buf.push(item);
257 if buf.len() == size {
258 yield std::mem::take(&mut buf);
259 }
260 }
261 if !buf.is_empty() {
262 yield std::mem::take(&mut buf);
263 }
264 }
265 .boxed()
266}
267
268pub fn timeout<T>(s: RS2Stream<T>, duration: Duration) -> RS2Stream<StreamResult<T>>
270where
271 T: Send + 'static,
272{
273 stream! {
274 pin_mut!(s);
275 loop {
276 match tokio::time::timeout(duration, s.next()).await {
277 Ok(Some(value)) => yield Ok(value),
278 Ok(None) => break,
279 Err(_) => yield Err(StreamError::Timeout),
280 }
281 }
282 }.boxed()
283}
284
285pub fn scan<T, U, F>(s: RS2Stream<T>, init: U, mut f: F) -> RS2Stream<U>
287where
288 F: FnMut(U, T) -> U + Send + 'static,
289 T: Send + 'static,
290 U: Clone + Send + 'static,
291{
292 stream! {
293 let mut acc = init;
294 pin_mut!(s);
295 while let Some(item) = s.next().await {
296 acc = f(acc.clone(), item);
297 yield acc.clone();
298 }
299 }.boxed()
300}
301
302pub fn fold<T, A, F, Fut>(s: RS2Stream<T>, init: A, mut f: F) -> impl Future<Output = A>
304where
305 F: FnMut(A, T) -> Fut + Send + 'static,
306 Fut: Future<Output = A> + Send + 'static,
307 T: Send + 'static,
308 A: Send + 'static,
309{
310 async move {
311 let mut acc = init;
312 pin_mut!(s);
313 while let Some(item) = s.next().await {
314 acc = f(acc, item).await;
315 }
316 acc
317 }
318}
319
320pub fn reduce<T, F, Fut>(s: RS2Stream<T>, mut f: F) -> impl Future<Output = Option<T>>
322where
323 F: FnMut(T, T) -> Fut + Send + 'static,
324 Fut: Future<Output = T> + Send + 'static,
325 T: Send + 'static,
326{
327 async move {
328 pin_mut!(s);
329 let first = match s.next().await {
330 Some(item) => item,
331 None => return None, };
333
334 let mut acc = first;
335 while let Some(item) = s.next().await {
336 acc = f(acc, item).await;
337 }
338
339 Some(acc)
340 }
341}
342
343pub fn filter_map<T, U, F, Fut>(s: RS2Stream<T>, f: F) -> RS2Stream<U>
345where
346 F: FnMut(T) -> Fut + Send + 'static,
347 Fut: Future<Output = Option<U>> + Send + 'static,
348 T: Send + 'static,
349 U: Send + 'static,
350{
351 s.filter_map(f).boxed()
352}
353
354pub fn take_while<T, F, Fut>(s: RS2Stream<T>, mut predicate: F) -> RS2Stream<T>
371where
372 F: FnMut(&T) -> Fut + Send + 'static,
373 Fut: Future<Output = bool> + Send + 'static,
374 T: Send + 'static,
375{
376 stream! {
377 pin_mut!(s);
378 while let Some(item) = s.next().await {
379 if predicate(&item).await {
380 yield item;
381 } else {
382 break;
383 }
384 }
385 }.boxed()
386}
387
388pub fn drop_while<T, F, Fut>(s: RS2Stream<T>, mut predicate: F) -> RS2Stream<T>
405where
406 F: FnMut(&T) -> Fut + Send + 'static,
407 Fut: Future<Output = bool> + Send + 'static,
408 T: Send + 'static,
409{
410 stream! {
411 pin_mut!(s);
412
413 let mut found_false = false;
414 while let Some(item) = s.next().await {
415 if !found_false && predicate(&item).await {
416 continue;
417 } else {
418 found_false = true;
419 yield item;
420 }
421 }
422 }.boxed()
423}
424
425pub fn group_by<T, K, F>(s: RS2Stream<T>, mut key_fn: F) -> RS2Stream<(K, Vec<T>)>
443where
444 T: Clone + Send + 'static,
445 K: Eq + Clone + Send + 'static,
446 F: FnMut(&T) -> K + Send + 'static,
447{
448 stream! {
449 pin_mut!(s);
450 let mut current_key: Option<K> = None;
451 let mut current_group: Vec<T> = Vec::new();
452
453 while let Some(item) = s.next().await {
454 let key = key_fn(&item);
455
456 match ¤t_key {
457 Some(k) if *k == key => {
458 current_group.push(item);
459 },
460 _ => {
461 if !current_group.is_empty() {
462 yield (current_key.clone().unwrap(), std::mem::take(&mut current_group));
463 }
464 current_key = Some(key);
465 current_group.push(item);
466 }
467 }
468 }
469
470 if !current_group.is_empty() {
471 yield (current_key.clone().unwrap(), std::mem::take(&mut current_group));
472 }
473 }
474 .boxed()
475}
476
477pub fn sliding_window<T>(s: RS2Stream<T>, size: usize) -> RS2Stream<Vec<T>>
479where
480 T: Clone + Send + 'static,
481{
482 if size == 0 {
483 return empty();
484 }
485
486 stream! {
487 let mut window = Vec::with_capacity(size);
488 pin_mut!(s);
489
490 while let Some(item) = s.next().await {
491 window.push(item);
492
493 if window.len() > size {
494 window.remove(0);
495 }
496
497 if window.len() == size {
498 yield window.clone();
499 }
500 }
501 }.boxed()
502}
503
504pub fn batch_process<T, U, F>(
506 s: RS2Stream<T>,
507 batch_size: usize,
508 mut processor: F
509) -> RS2Stream<U>
510where
511 F: FnMut(Vec<T>) -> Vec<U> + Send + 'static,
512 T: Send + 'static,
513 U: Send + 'static,
514{
515 stream! {
516 let chunked = chunk(s, batch_size);
517 pin_mut!(chunked);
518 while let Some(batch) = chunked.next().await {
519 for item in processor(batch) {
520 yield item;
521 }
522 }
523 }.boxed()
524}
525
526pub fn with_metrics<T>(s: RS2Stream<T>, _name: String) -> (RS2Stream<T>, Arc<Mutex<StreamMetrics>>)
528where
529 T: Send + 'static,
530{
531 let metrics = Arc::new(Mutex::new(StreamMetrics::new()));
532 let metrics_clone = Arc::clone(&metrics);
533
534 let monitored_stream = stream! {
535 pin_mut!(s);
536 while let Some(item) = s.next().await {
537 {
538 let mut m = metrics_clone.lock().await;
539 m.record_item(std::mem::size_of_val(&item) as u64);
540 }
541 yield item;
542 }
543
544 {
545 let mut m = metrics_clone.lock().await;
546 m.finalize();
547 }
548 }.boxed();
549
550 (monitored_stream, metrics)
551}
552
553pub fn auto_backpressure<O>(s: RS2Stream<O>, config: BackpressureConfig) -> RS2Stream<O>
559where
560 O: Send + 'static,
561{
562 match config.strategy {
563 BackpressureStrategy::Block => auto_backpressure_block(s, config.buffer_size),
564 BackpressureStrategy::DropOldest => auto_backpressure_drop_oldest(s, config.buffer_size),
565 BackpressureStrategy::DropNewest => auto_backpressure_drop_newest(s, config.buffer_size),
566 BackpressureStrategy::Error => auto_backpressure_error(s, config.buffer_size),
567 }
568}
569
570pub fn auto_backpressure_block<O>(s: RS2Stream<O>, buffer_size: usize) -> RS2Stream<O>
572where
573 O: Send + 'static,
574{
575 let (mut tx, rx): (Sender<O>, Receiver<O>) = channel(buffer_size);
576
577 spawn(async move {
578 pin_mut!(s);
579 while let Some(item) = s.next().await {
580 if tx.send(item).await.is_err() {
581 break;
582 }
583 }
584 });
585
586 stream! {
587 let mut rx = rx;
588 while let Some(item) = rx.next().await {
589 yield item;
590 }
591 }
592 .boxed()
593}
594
595pub fn auto_backpressure_drop_oldest<O>(s: RS2Stream<O>, buffer_size: usize) -> RS2Stream<O>
597where
598 O: Send + 'static,
599{
600 use std::collections::VecDeque;
601
602 let buffer = Arc::new(Mutex::new(VecDeque::<O>::new()));
603 let buffer_clone = Arc::clone(&buffer);
604 let (done_tx, mut done_rx) = tokio::sync::mpsc::channel(1);
605
606 spawn(async move {
607 pin_mut!(s);
608 while let Some(item) = s.next().await {
609 let mut buf = buffer_clone.lock().await;
610
611 if buf.len() >= buffer_size {
612 buf.pop_front();
613 }
614
615 buf.push_back(item);
616 }
617
618 let _ = done_tx.send(()).await;
619 });
620
621 stream! {
622 let mut source_done = false;
623
624 loop {
625 if let Ok(_) = done_rx.try_recv() {
626 source_done = true;
627 }
628
629 let item = {
630 let mut buf = buffer.lock().await;
631 buf.pop_front()
632 };
633
634 match item {
635 Some(item) => yield item,
636 None => {
637 if source_done {
638 break;
639 }
640 tokio::time::sleep(Duration::from_millis(1)).await;
641 }
642 }
643 }
644 }
645 .boxed()
646}
647
648pub fn auto_backpressure_drop_newest<O>(s: RS2Stream<O>, buffer_size: usize) -> RS2Stream<O>
650where
651 O: Send + 'static,
652{
653 use std::collections::VecDeque;
654
655 let buffer = Arc::new(Mutex::new(VecDeque::<O>::new()));
656 let buffer_clone = Arc::clone(&buffer);
657 let (done_tx, mut done_rx) = tokio::sync::mpsc::channel(1);
658
659 spawn(async move {
660 pin_mut!(s);
661 while let Some(item) = s.next().await {
662 let mut buf = buffer_clone.lock().await;
663
664 if buf.len() < buffer_size {
665 buf.push_back(item);
666 }
667 }
668
669 let _ = done_tx.send(()).await;
670 });
671
672 stream! {
673 let mut source_done = false;
674
675 loop {
676 if let Ok(_) = done_rx.try_recv() {
677 source_done = true;
678 }
679
680 let item = {
681 let mut buf = buffer.lock().await;
682 buf.pop_front()
683 };
684
685 match item {
686 Some(item) => yield item,
687 None => {
688 if source_done {
689 break;
690 }
691 tokio::time::sleep(Duration::from_millis(1)).await;
692 }
693 }
694 }
695 }
696 .boxed()
697}
698
699pub fn auto_backpressure_error<O>(s: RS2Stream<O>, buffer_size: usize) -> RS2Stream<O>
701where
702 O: Send + 'static,
703{
704 use tokio::sync::mpsc;
705
706 let (tx, mut rx) = mpsc::channel(buffer_size);
707
708 spawn(async move {
709 pin_mut!(s);
710 while let Some(item) = s.next().await {
711 if tx.send(item).await.is_err() {
712 break;
713 }
714 }
715 });
716
717 stream! {
718 while let Some(item) = rx.recv().await {
719 yield item;
720 }
721 }
722 .boxed()
723}
724
725pub fn interrupt_when<O, F>(s: RS2Stream<O>, signal: F) -> RS2Stream<O>
761where
762 O: Send + 'static,
763 F: Future<Output = ()> + Send + 'static,
764{
765 stream! {
766 pin_mut!(s);
767 pin_mut!(signal);
768
769 loop {
770 tokio::select! {
771 biased;
772 _ = &mut signal => {
773 break;
774 },
775
776 maybe_item = s.next() => {
777 match maybe_item {
778 Some(item) => yield item,
779 None => break,
780 }
781 },
782 }
783 }
784 }
785 .boxed()
786}
787
788pub fn concat<O, S>(streams: Vec<S>) -> RS2Stream<O>
790where
791 S: Stream<Item = O> + Send + 'static,
792 O: Send + 'static,
793{
794 stream! {
795 for s in streams {
796 pin_mut!(s);
797 while let Some(item) = s.next().await {
798 yield item;
799 }
800 }
801 }
802 .boxed()
803}
804
805pub fn merge<O, S1, S2>(s1: S1, mut s2: S2) -> RS2Stream<O>
807where
808 S1: Stream<Item = O> + Send + 'static,
809 S2: Stream<Item = O> + Send + 'static + Unpin,
810 O: Send + 'static,
811{
812 let chained = s1
813 .map(Some)
814 .chain(stream! { while let Some(x) = s2.next().await { yield Some(x) } });
815 stream! {
816 pin_mut!(chained);
817 while let Some(item) = chained.next().await {
818 if let Some(x) = item {
819 yield x;
820 }
821 }
822 }
823 .boxed()
824}
825
826
827pub fn interleave<O, S>(streams: Vec<S>) -> RS2Stream<O>
829where
830 S: Stream<Item = O> + Send + 'static + Unpin,
831 O: Send + 'static,
832{
833 if streams.is_empty() {
834 return empty();
835 }
836
837 stream! {
838 let mut streams: Vec<_> = streams.into_iter().map(|s| Box::pin(s)).collect();
839 let mut index = 0;
840
841 while !streams.is_empty() {
842 if index >= streams.len() {
843 index = 0;
844 }
845
846 match streams[index].next().await {
847 Some(item) => {
848 yield item;
849 index += 1;
850 }
851 None => {
852 streams.remove(index);
853 }
854 }
855 }
856 }
857 .boxed()
858}
859
860pub fn zip_with<A, B, O, F, S1, S2>(s1: S1, s2: S2, mut f: F) -> RS2Stream<O>
864where
865 S1: Stream<Item = A> + Send + 'static,
866 S2: Stream<Item = B> + Send + 'static,
867 F: FnMut(A, B) -> O + Send + 'static,
868 A: Send + 'static,
869 B: Send + 'static,
870 O: Send + 'static,
871{
872 stream! {
873 pin_mut!(s1);
874 pin_mut!(s2);
875
876 loop {
877 match futures_util::future::join(s1.next(), s2.next()).await {
878 (Some(a), Some(b)) => yield f(a, b),
879 _ => break, }
881 }
882 }
883 .boxed()
884}
885
886pub fn either<O, S1, S2>(s1: S1, s2: S2) -> RS2Stream<O>
929where
930 S1: Stream<Item = O> + Send + 'static,
931 S2: Stream<Item = O> + Send + 'static,
932 O: Send + 'static,
933{
934 stream! {
935 pin_mut!(s1);
936 pin_mut!(s2);
937
938 let mut s1_done = false;
939 let mut s2_done = false;
940
941 let mut using_s1 = true;
942
943 loop {
944 if s1_done {
945 match s2.next().await {
946 Some(item) => yield item,
947 None => break,
948 }
949 continue;
950 }
951
952 if s2_done {
953 match s1.next().await {
954 Some(item) => yield item,
955 None => break,
956 }
957 continue;
958 }
959
960 if using_s1 {
961 match s1.next().await {
962 Some(item) => {
963 yield item;
964 },
965 None => {
966 s1_done = true;
967 }
968 }
969 } else {
970 match s2.next().await {
971 Some(item) => {
972 yield item;
973 },
974 None => {
975 s2_done = true;
976 }
977 }
978 }
979
980 tokio::select! {
981 biased;
982
983 maybe_item = s1.next() => {
984 match maybe_item {
985 Some(item) => {
986 yield item;
987 using_s1 = true;
988 },
989 None => {
990 s1_done = true;
991 }
992 }
993 },
994 maybe_item = s2.next() => {
995 match maybe_item {
996 Some(item) => {
997 yield item;
998 using_s1 = false;
999 },
1000 None => {
1001 s2_done = true;
1002 }
1003 }
1004 }
1005 }
1006 }
1007 }
1008 .boxed()
1009}
1010
1011pub fn debounce<O>(s: RS2Stream<O>, duration: Duration) -> RS2Stream<O>
1025where
1026 O: Send + 'static,
1027{
1028 stream! {
1029 pin_mut!(s);
1030
1031 let mut latest_item: Option<O> = None;
1032 let mut timer_handle: Option<tokio::task::JoinHandle<()>> = None;
1033
1034 let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(1);
1035
1036 loop {
1037 tokio::select! {
1038 maybe_item = s.next() => {
1039 match maybe_item {
1040 Some(item) => {
1041 if let Some(handle) = timer_handle.take() {
1042 handle.abort();
1043 }
1044
1045 latest_item = Some(item);
1046
1047 let tx_clone = tx.clone();
1048 timer_handle = Some(tokio::spawn(async move {
1049 tokio::time::sleep(duration).await;
1050 let _ = tx_clone.send(()).await;
1051 }));
1052 },
1053 None => {
1054 if let Some(item) = latest_item.take() {
1055 yield item;
1056 }
1057 break;
1058 }
1059 }
1060 },
1061 _ = rx.recv() => {
1062 if let Some(item) = latest_item.take() {
1063 yield item;
1064 }
1065 }
1066 }
1067 }
1068 }
1069 .boxed()
1070}
1071
1072pub fn distinct_until_changed<O>(s: RS2Stream<O>) -> RS2Stream<O>
1090where
1091 O: Clone + Send + PartialEq + 'static,
1092{
1093 stream! {
1094 pin_mut!(s);
1095 let mut prev: Option<O> = None;
1096
1097 while let Some(item) = s.next().await {
1098 match &prev {
1099 Some(p) if p == &item => {
1100 },
1101 _ => {
1102 yield item.clone();
1103 prev = Some(item);
1104 }
1105 }
1106 }
1107 }
1108 .boxed()
1109}
1110
1111pub fn sample<O>(s: RS2Stream<O>, interval: Duration) -> RS2Stream<O>
1149where
1150 O: Clone + Send + 'static,
1151{
1152 stream! {
1153 pin_mut!(s);
1154
1155 let mut latest_item: Option<O> = None;
1156 let mut has_new_value = false;
1157
1158 let mut timer = tokio::time::interval(interval);
1159 timer.tick().await;
1160
1161 loop {
1162 tokio::select! {
1163 maybe_item = s.next() => {
1164 match maybe_item {
1165 Some(item) => {
1166 latest_item = Some(item);
1167 has_new_value = true;
1168 },
1169 None => {
1170 if has_new_value {
1171 if let Some(item) = latest_item.take() {
1172 yield item;
1173 }
1174 }
1175 break;
1176 }
1177 }
1178 },
1179 _ = timer.tick() => {
1180 if has_new_value {
1181 if let Some(ref item) = latest_item {
1182 yield item.clone();
1183 has_new_value = false;
1184 }
1185 }
1186 }
1187 }
1188 }
1189 }
1190 .boxed()
1191}
1192
1193pub fn distinct_until_changed_by<O, F>(s: RS2Stream<O>, mut eq: F) -> RS2Stream<O>
1212where
1213 O: Clone + Send + 'static,
1214 F: FnMut(&O, &O) -> bool + Send + 'static,
1215{
1216 stream! {
1217 pin_mut!(s);
1218 let mut prev: Option<O> = None;
1219
1220 while let Some(item) = s.next().await {
1221 match &prev {
1222 Some(p) if eq(p, &item) => {
1223 },
1224 _ => {
1225 yield item.clone();
1226 prev = Some(item);
1227 }
1228 }
1229 }
1230 }
1231 .boxed()
1232}
1233
1234pub fn prefetch<O>(s: RS2Stream<O>, prefetch_count: usize) -> RS2Stream<O>
1241where
1242 O: Send + 'static,
1243{
1244 if prefetch_count == 0 {
1245 return s;
1246 }
1247
1248 let (mut tx, rx): (Sender<O>, Receiver<O>) = channel(prefetch_count);
1249
1250 spawn(async move {
1251 pin_mut!(s);
1252 while let Some(item) = s.next().await {
1253 if tx.send(item).await.is_err() {
1254 break;
1255 }
1256 }
1257 });
1258
1259 stream! {
1260 let mut rx = rx;
1261 while let Some(item) = rx.next().await {
1262 yield item;
1263 }
1264 }
1265 .boxed()
1266}
1267
1268pub fn rate_limit_backpressure<O>(s: RS2Stream<O>, capacity: usize) -> RS2Stream<O>
1270where
1271 O: Send + 'static,
1272{
1273 auto_backpressure_block(s, capacity)
1274}
1275
1276pub fn throttle<O>(s: RS2Stream<O>, duration: Duration) -> RS2Stream<O>
1278where
1279 O: Send + 'static,
1280{
1281 stream! {
1282 pin_mut!(s);
1283 while let Some(item) = s.next().await {
1284 yield item;
1285 sleep(duration).await;
1286 }
1287 }
1288 .boxed()
1289}
1290
1291pub fn tick<O>(period: Duration, item: O) -> RS2Stream<O>
1293where
1294 O: Clone + Send + 'static,
1295{
1296 stream! {
1297 loop {
1298 yield item.clone();
1299 sleep(period).await;
1300 }
1301 }
1302 .boxed()
1303}
1304
1305pub fn par_eval_map<I, O, Fut, F>(s: RS2Stream<I>, concurrency: usize, mut f: F) -> RS2Stream<O>
1311where
1312 F: FnMut(I) -> Fut + Send + 'static,
1313 Fut: Future<Output = O> + Send + 'static,
1314 O: Send + 'static,
1315 I: Send + 'static,
1316{
1317 let buffered_stream = auto_backpressure_block(s, concurrency * 2);
1318
1319 stream! {
1320 let mut in_flight = FuturesUnordered::new();
1321 pin_mut!(buffered_stream);
1322
1323 while let Some(item) = buffered_stream.next().await {
1324 in_flight.push(f(item));
1325 if in_flight.len() >= concurrency {
1326 if let Some(res) = in_flight.next().await {
1327 yield res;
1328 }
1329 }
1330 }
1331 while let Some(res) = in_flight.next().await {
1332 yield res;
1333 }
1334 }
1335 .boxed()
1336}
1337
1338pub fn par_eval_map_unordered<I, O, Fut, F>(
1340 s: RS2Stream<I>,
1341 concurrency: usize,
1342 f: F,
1343) -> RS2Stream<O>
1344where
1345 F: FnMut(I) -> Fut + Send + 'static,
1346 Fut: Future<Output = O> + Send + 'static,
1347 O: Send + 'static,
1348 I: Send + 'static,
1349{
1350 let buffered_stream = auto_backpressure_block(s, concurrency * 2);
1351 buffered_stream.map(f).buffer_unordered(concurrency).boxed()
1352}
1353
1354pub fn par_join<O, S>(
1362 s: RS2Stream<S>,
1363 concurrency: usize,
1364) -> RS2Stream<O>
1365where
1366 S: Stream<Item = O> + Send + 'static + Unpin,
1367 O: Send + 'static,
1368{
1369 let buffered_stream = auto_backpressure_block(s, concurrency * 2);
1370
1371 stream! {
1372 pin_mut!(buffered_stream);
1373
1374 let mut active_streams: Vec<S> = Vec::with_capacity(concurrency);
1375
1376 let mut outer_stream_done = false;
1377
1378 loop {
1379 while active_streams.len() < concurrency && !outer_stream_done {
1380 match buffered_stream.next().await {
1381 Some(inner_stream) => {
1382 active_streams.push(inner_stream);
1383 },
1384 None => {
1385 outer_stream_done = true;
1386 break;
1387 }
1388 }
1389 }
1390 if active_streams.is_empty() && outer_stream_done {
1391 break;
1392 }
1393
1394 let mut i = 0;
1395 while i < active_streams.len() {
1396 match active_streams[i].next().await {
1397 Some(item) => {
1398 yield item;
1399 i += 1;
1400 },
1401 None => {
1402 active_streams.swap_remove(i);
1403 }
1404 }
1405 }
1406 }
1407 }
1408 .boxed()
1409}
1410
1411pub fn bracket<A, O, St, FAcq, FUse, FRel, R>(
1417 acquire: FAcq,
1418 use_fn: FUse,
1419 release: FRel,
1420) -> RS2Stream<O>
1421where
1422 FAcq: Future<Output = A> + Send + 'static,
1423 FUse: FnOnce(A) -> St + Send + 'static,
1424 St: Stream<Item = O> + Send + 'static,
1425 FRel: FnOnce(A) -> R + Send + 'static,
1426 R: Future<Output = ()> + Send + 'static,
1427 O: Send + 'static,
1428 A: Clone + Send + 'static,
1429{
1430 stream! {
1431 let resource = acquire.await;
1432 let stream = use_fn(resource.clone());
1433 pin_mut!(stream);
1434 while let Some(item) = stream.next().await {
1435 yield item;
1436 }
1437 release(resource).await;
1438 }
1439 .boxed()
1440}
1441
1442pub fn bracket_case<A, O, E, St, FAcq, FUse, FRel, R>(
1444 acquire: FAcq,
1445 use_fn: FUse,
1446 release: FRel,
1447) -> RS2Stream<Result<O, E>>
1448where
1449 FAcq: Future<Output = A> + Send + 'static,
1450 FUse: FnOnce(A) -> St + Send + 'static,
1451 St: Stream<Item = Result<O, E>> + Send + 'static,
1452 FRel: FnOnce(A, ExitCase<E>) -> R + Send + 'static,
1453 R: Future<Output = ()> + Send + 'static,
1454 O: Send + 'static,
1455 E: Clone + Send + 'static,
1456 A: Clone + Send + 'static,
1457{
1458 stream! {
1459 let resource = acquire.await;
1460 let stream = use_fn(resource.clone());
1461 pin_mut!(stream);
1462 while let Some(item) = stream.next().await {
1463 yield item;
1464 }
1465 release(resource, ExitCase::Completed).await;
1466 }
1467 .boxed()
1468}
1469
1470pub use crate::rs2_result_stream_ext::RS2ResultStreamExt;
1476pub use crate::rs2_stream_ext::RS2StreamExt;