1use async_stream::stream;
7use futures::channel::mpsc::{channel, Receiver, Sender};
8use futures_core::Stream;
9use futures_util::pin_mut;
10use futures_util::{
11 future,
12 stream::{self, BoxStream, FuturesUnordered, StreamExt},
13 SinkExt,
14};
15use std::future::Future;
16use std::sync::Arc;
17use std::time::Duration;
18use tokio::sync::Mutex;
19use tokio::{spawn, time::sleep};
20
21use crate::error::{StreamError, StreamResult};
22use crate::stream_performance_metrics::{HealthThresholds, StreamMetrics};
23use crate::resource_manager::get_global_resource_manager;
24
25pub type RS2Stream<O> = BoxStream<'static, O>;
27
28#[derive(Debug, Clone, Copy)]
30pub enum BackpressureStrategy {
31 DropOldest,
33 DropNewest,
35 Block,
37 Error,
39}
40
41#[derive(Debug, Clone)]
43pub struct BackpressureConfig {
44 pub strategy: BackpressureStrategy,
45 pub buffer_size: usize,
46 pub low_watermark: Option<usize>, pub high_watermark: Option<usize>, }
49
50impl Default for BackpressureConfig {
51 fn default() -> Self {
52 Self {
53 strategy: BackpressureStrategy::Block,
54 buffer_size: 100,
55 low_watermark: Some(25),
56 high_watermark: Some(75),
57 }
58 }
59}
60
61#[derive(Debug, Clone)]
63pub enum ExitCase<E> {
64 Completed,
65 Errored(E),
66}
67
68pub fn emit<O>(item: O) -> RS2Stream<O>
74where
75 O: Send + 'static,
76{
77 stream::once(future::ready(item)).boxed()
78}
79
80pub fn empty<O>() -> RS2Stream<O>
82where
83 O: Send + 'static,
84{
85 stream::empty().boxed()
86}
87
88pub fn from_iter<I, O>(iter: I) -> RS2Stream<O>
90where
91 I: IntoIterator<Item = O> + Send + 'static,
92 <I as IntoIterator>::IntoIter: Send,
93 O: Send + 'static,
94{
95 stream::iter(iter).boxed()
96}
97
98pub fn eval<O, F>(fut: F) -> RS2Stream<O>
100where
101 F: Future<Output = O> + Send + 'static,
102 O: Send + 'static,
103{
104 stream::once(fut).boxed()
105}
106
107pub fn repeat<O>(item: O) -> RS2Stream<O>
109where
110 O: Clone + Send + 'static,
111{
112 stream::repeat(item).boxed()
113}
114
115pub fn emit_after<O>(item: O, duration: Duration) -> RS2Stream<O>
117where
118 O: Send + 'static,
119{
120 stream::once(async move {
121 sleep(duration).await;
122 item
123 })
124 .boxed()
125}
126
127pub fn unfold<S, O, F, Fut>(init: S, mut f: F) -> RS2Stream<O>
153where
154 S: Send + 'static,
155 O: Send + 'static,
156 F: FnMut(S) -> Fut + Send + 'static,
157 Fut: Future<Output = Option<(O, S)>> + Send + 'static,
158{
159 stream! {
160 let mut state_opt = Some(init);
161
162 loop {
163 let state = state_opt.take().expect("State should be available");
164 let fut = f(state);
165 match fut.await {
166 Some((item, next_state)) => {
167 yield item;
168 state_opt = Some(next_state);
169 },
170 None => break,
171 }
172 }
173 }
174 .boxed()
175}
176
177pub fn group_adjacent_by<O, K, F>(s: RS2Stream<O>, mut key_fn: F) -> RS2Stream<(K, Vec<O>)>
199where
200 O: Clone + Send + 'static,
201 K: Eq + Clone + Send + 'static,
202 F: FnMut(&O) -> K + Send + 'static,
203{
204 stream! {
205 pin_mut!(s);
206 let mut current_key: Option<K> = None;
207 let mut current_group: Vec<O> = Vec::new();
208
209 while let Some(item) = s.next().await {
210 let key = key_fn(&item);
211
212 match ¤t_key {
213 Some(k) if *k == key => {
214 current_group.push(item);
215 },
216 _ => {
217 if !current_group.is_empty() {
218 yield (current_key.clone().unwrap(), std::mem::take(&mut current_group));
219 }
220 current_key = Some(key);
221 current_group.push(item);
222 }
223 }
224 }
225
226 if !current_group.is_empty() {
227 yield (current_key.clone().unwrap(), std::mem::take(&mut current_group));
228 }
229 }
230 .boxed()
231}
232
233pub fn take<O>(s: RS2Stream<O>, n: usize) -> RS2Stream<O>
235where
236 O: Send + 'static,
237{
238 s.take(n).boxed()
239}
240
241pub fn drop<O>(s: RS2Stream<O>, n: usize) -> RS2Stream<O>
243where
244 O: Send + 'static,
245{
246 s.skip(n).boxed()
247}
248
249pub fn chunk<O>(s: RS2Stream<O>, size: usize) -> RS2Stream<Vec<O>>
251where
252 O: Send + 'static,
253{
254 stream! {
255 let mut buf = Vec::with_capacity(size);
256 pin_mut!(s);
257 while let Some(item) = s.next().await {
258 buf.push(item);
259 if buf.len() == size {
260 yield std::mem::take(&mut buf);
261 }
262 }
263 if !buf.is_empty() {
264 yield std::mem::take(&mut buf);
265 }
266 }
267 .boxed()
268}
269
270pub fn timeout<T>(s: RS2Stream<T>, duration: Duration) -> RS2Stream<StreamResult<T>>
272where
273 T: Send + 'static,
274{
275 stream! {
276 pin_mut!(s);
277 loop {
278 match tokio::time::timeout(duration, s.next()).await {
279 Ok(Some(value)) => yield Ok(value),
280 Ok(None) => break,
281 Err(_) => yield Err(StreamError::Timeout),
282 }
283 }
284 }
285 .boxed()
286}
287
288pub fn scan<T, U, F>(s: RS2Stream<T>, init: U, mut f: F) -> RS2Stream<U>
290where
291 F: FnMut(U, T) -> U + Send + 'static,
292 T: Send + 'static,
293 U: Clone + Send + 'static,
294{
295 stream! {
296 let mut acc = init;
297 pin_mut!(s);
298 while let Some(item) = s.next().await {
299 acc = f(acc.clone(), item);
300 yield acc.clone();
301 }
302 }
303 .boxed()
304}
305
306pub fn fold<T, A, F, Fut>(s: RS2Stream<T>, init: A, mut f: F) -> impl Future<Output = A>
308where
309 F: FnMut(A, T) -> Fut + Send + 'static,
310 Fut: Future<Output = A> + Send + 'static,
311 T: Send + 'static,
312 A: Send + 'static,
313{
314 async move {
315 let mut acc = init;
316 pin_mut!(s);
317 while let Some(item) = s.next().await {
318 acc = f(acc, item).await;
319 }
320 acc
321 }
322}
323
324pub fn reduce<T, F, Fut>(s: RS2Stream<T>, mut f: F) -> impl Future<Output = Option<T>>
326where
327 F: FnMut(T, T) -> Fut + Send + 'static,
328 Fut: Future<Output = T> + Send + 'static,
329 T: Send + 'static,
330{
331 async move {
332 pin_mut!(s);
333 let first = match s.next().await {
334 Some(item) => item,
335 None => return None, };
337
338 let mut acc = first;
339 while let Some(item) = s.next().await {
340 acc = f(acc, item).await;
341 }
342
343 Some(acc)
344 }
345}
346
347pub fn filter_map<T, U, F, Fut>(s: RS2Stream<T>, f: F) -> RS2Stream<U>
349where
350 F: FnMut(T) -> Fut + Send + 'static,
351 Fut: Future<Output = Option<U>> + Send + 'static,
352 T: Send + 'static,
353 U: Send + 'static,
354{
355 s.filter_map(f).boxed()
356}
357
358pub fn take_while<T, F, Fut>(s: RS2Stream<T>, mut predicate: F) -> RS2Stream<T>
375where
376 F: FnMut(&T) -> Fut + Send + 'static,
377 Fut: Future<Output = bool> + Send + 'static,
378 T: Send + 'static,
379{
380 stream! {
381 pin_mut!(s);
382 while let Some(item) = s.next().await {
383 if predicate(&item).await {
384 yield item;
385 } else {
386 break;
387 }
388 }
389 }
390 .boxed()
391}
392
393pub fn drop_while<T, F, Fut>(s: RS2Stream<T>, mut predicate: F) -> RS2Stream<T>
410where
411 F: FnMut(&T) -> Fut + Send + 'static,
412 Fut: Future<Output = bool> + Send + 'static,
413 T: Send + 'static,
414{
415 stream! {
416 pin_mut!(s);
417
418 let mut found_false = false;
419 while let Some(item) = s.next().await {
420 if !found_false && predicate(&item).await {
421 continue;
422 } else {
423 found_false = true;
424 yield item;
425 }
426 }
427 }
428 .boxed()
429}
430
431pub fn group_by<T, K, F>(s: RS2Stream<T>, mut key_fn: F) -> RS2Stream<(K, Vec<T>)>
449where
450 T: Clone + Send + 'static,
451 K: Eq + Clone + Send + 'static,
452 F: FnMut(&T) -> K + Send + 'static,
453{
454 stream! {
455 pin_mut!(s);
456 let mut current_key: Option<K> = None;
457 let mut current_group: Vec<T> = Vec::new();
458
459 while let Some(item) = s.next().await {
460 let key = key_fn(&item);
461
462 match ¤t_key {
463 Some(k) if *k == key => {
464 current_group.push(item);
465 },
466 _ => {
467 if !current_group.is_empty() {
468 yield (current_key.clone().unwrap(), std::mem::take(&mut current_group));
469 }
470 current_key = Some(key);
471 current_group.push(item);
472 }
473 }
474 }
475
476 if !current_group.is_empty() {
477 yield (current_key.clone().unwrap(), std::mem::take(&mut current_group));
478 }
479 }
480 .boxed()
481}
482
483pub fn sliding_window<T>(s: RS2Stream<T>, size: usize) -> RS2Stream<Vec<T>>
485where
486 T: Clone + Send + 'static,
487{
488 if size == 0 {
489 return empty();
490 }
491
492 stream! {
493 let mut window = Vec::with_capacity(size);
494 pin_mut!(s);
495
496 while let Some(item) = s.next().await {
497 window.push(item);
498
499 if window.len() > size {
500 window.remove(0);
501 }
502
503 if window.len() == size {
504 yield window.clone();
505 }
506 }
507 }
508 .boxed()
509}
510
511pub fn batch_process<T, U, F>(s: RS2Stream<T>, batch_size: usize, mut processor: F) -> RS2Stream<U>
513where
514 F: FnMut(Vec<T>) -> Vec<U> + Send + 'static,
515 T: Send + 'static,
516 U: Send + 'static,
517{
518 stream! {
519 let chunked = chunk(s, batch_size);
520 pin_mut!(chunked);
521 while let Some(batch) = chunked.next().await {
522 for item in processor(batch) {
523 yield item;
524 }
525 }
526 }
527 .boxed()
528}
529
530pub fn with_metrics<T>(
532 s: RS2Stream<T>,
533 name: String,
534 thresholds: HealthThresholds,
535) -> (RS2Stream<T>, Arc<Mutex<StreamMetrics>>)
536where
537 T: Send + 'static,
538{
539 let metrics = Arc::new(Mutex::new(
540 StreamMetrics::new()
541 .with_name(name)
542 .with_health_thresholds(thresholds),
543 ));
544
545 let metrics_clone = Arc::clone(&metrics);
546
547 let monitored_stream = stream! {
548 pin_mut!(s);
549 while let Some(item) = s.next().await {
550 {
551 let mut m = metrics_clone.lock().await;
552 m.record_item(size_of_val(&item) as u64);
553 }
554 yield item;
555 }
556
557 {
558 let mut m = metrics_clone.lock().await;
559 m.finalize();
560 }
561 }
562 .boxed();
563
564 (monitored_stream, metrics)
565}
566
567pub fn auto_backpressure<O>(s: RS2Stream<O>, config: BackpressureConfig) -> RS2Stream<O>
573where
574 O: Send + 'static,
575{
576 match config.strategy {
577 BackpressureStrategy::Block => auto_backpressure_block(s, config.buffer_size),
578 BackpressureStrategy::DropOldest => auto_backpressure_drop_oldest(s, config.buffer_size),
579 BackpressureStrategy::DropNewest => auto_backpressure_drop_newest(s, config.buffer_size),
580 BackpressureStrategy::Error => auto_backpressure_error(s, config.buffer_size),
581 }
582}
583
584pub fn auto_backpressure_block<O>(s: RS2Stream<O>, buffer_size: usize) -> RS2Stream<O>
586where
587 O: Send + 'static,
588{
589 let (mut tx, rx): (Sender<O>, Receiver<O>) = channel(buffer_size);
590
591 spawn(async move {
592 pin_mut!(s);
593 while let Some(item) = s.next().await {
594 if tx.send(item).await.is_err() {
595 break;
596 }
597 }
598 });
599
600 stream! {
601 let mut rx = rx;
602 while let Some(item) = rx.next().await {
603 yield item;
604 }
605 }
606 .boxed()
607}
608
609pub fn auto_backpressure_drop_oldest<O>(s: RS2Stream<O>, buffer_size: usize) -> RS2Stream<O>
611where
612 O: Send + 'static,
613{
614 use std::collections::VecDeque;
615
616 let buffer = Arc::new(Mutex::new(VecDeque::<O>::new()));
617 let buffer_clone = Arc::clone(&buffer);
618 let (done_tx, mut done_rx) = tokio::sync::mpsc::channel(1);
619 let resource_manager = get_global_resource_manager();
620 let resource_manager_clone = resource_manager.clone();
621
622 spawn(async move {
623 pin_mut!(s);
624 while let Some(item) = s.next().await {
625 let mut buf = buffer_clone.lock().await;
626 if buf.len() >= buffer_size {
627 buf.pop_front();
628 resource_manager_clone.track_buffer_overflow().await.ok();
629 }
630 buf.push_back(item);
631 resource_manager_clone.track_memory_allocation(1).await.ok(); }
633 let _ = done_tx.send(()).await;
634 });
635
636 stream! {
637 let mut source_done = false;
638 loop {
639 if let Ok(_) = done_rx.try_recv() {
640 source_done = true;
641 }
642 let item = {
643 let mut buf = buffer.lock().await;
644 let popped = buf.pop_front();
645 if popped.is_some() {
646 resource_manager.track_memory_deallocation(1).await;
647 }
648 popped
649 };
650 match item {
651 Some(item) => yield item,
652 None => {
653 if source_done {
654 break;
655 }
656 tokio::time::sleep(Duration::from_millis(1)).await;
657 }
658 }
659 }
660 }
661 .boxed()
662}
663
664pub fn auto_backpressure_drop_newest<O>(s: RS2Stream<O>, buffer_size: usize) -> RS2Stream<O>
666where
667 O: Send + 'static,
668{
669 use std::collections::VecDeque;
670
671 let buffer = Arc::new(Mutex::new(VecDeque::<O>::new()));
672 let buffer_clone = Arc::clone(&buffer);
673 let (done_tx, mut done_rx) = tokio::sync::mpsc::channel(1);
674 let resource_manager = get_global_resource_manager();
675 let resource_manager_clone = resource_manager.clone();
676
677 spawn(async move {
678 pin_mut!(s);
679 while let Some(item) = s.next().await {
680 let mut buf = buffer_clone.lock().await;
681 if buf.len() < buffer_size {
682 buf.push_back(item);
683 resource_manager_clone.track_memory_allocation(1).await.ok();
684 } else {
685 resource_manager_clone.track_buffer_overflow().await.ok();
686 }
687 }
688 let _ = done_tx.send(()).await;
689 });
690
691 stream! {
692 let mut source_done = false;
693 loop {
694 if let Ok(_) = done_rx.try_recv() {
695 source_done = true;
696 }
697 let item = {
698 let mut buf = buffer.lock().await;
699 let popped = buf.pop_front();
700 if popped.is_some() {
701 resource_manager.track_memory_deallocation(1).await;
702 }
703 popped
704 };
705 match item {
706 Some(item) => yield item,
707 None => {
708 if source_done {
709 break;
710 }
711 tokio::time::sleep(Duration::from_millis(1)).await;
712 }
713 }
714 }
715 }
716 .boxed()
717}
718
719pub fn auto_backpressure_error<O>(s: RS2Stream<O>, buffer_size: usize) -> RS2Stream<O>
721where
722 O: Send + 'static,
723{
724 use tokio::sync::mpsc;
725
726 let (tx, mut rx) = mpsc::channel(buffer_size);
727
728 spawn(async move {
729 pin_mut!(s);
730 while let Some(item) = s.next().await {
731 if tx.send(item).await.is_err() {
732 break;
733 }
734 }
735 });
736
737 stream! {
738 while let Some(item) = rx.recv().await {
739 yield item;
740 }
741 }
742 .boxed()
743}
744
745pub fn interrupt_when<O, F>(s: RS2Stream<O>, signal: F) -> RS2Stream<O>
781where
782 O: Send + 'static,
783 F: Future<Output = ()> + Send + 'static,
784{
785 stream! {
786 pin_mut!(s);
787 pin_mut!(signal);
788
789 loop {
790 tokio::select! {
791 biased;
792 _ = &mut signal => {
793 break;
794 },
795
796 maybe_item = s.next() => {
797 match maybe_item {
798 Some(item) => yield item,
799 None => break,
800 }
801 },
802 }
803 }
804 }
805 .boxed()
806}
807
808pub fn concat<O, S>(streams: Vec<S>) -> RS2Stream<O>
810where
811 S: Stream<Item = O> + Send + 'static,
812 O: Send + 'static,
813{
814 stream! {
815 for s in streams {
816 pin_mut!(s);
817 while let Some(item) = s.next().await {
818 yield item;
819 }
820 }
821 }
822 .boxed()
823}
824
825pub fn merge<O, S1, S2>(s1: S1, mut s2: S2) -> RS2Stream<O>
827where
828 S1: Stream<Item = O> + Send + 'static,
829 S2: Stream<Item = O> + Send + 'static + Unpin,
830 O: Send + 'static,
831{
832 let chained = s1
833 .map(Some)
834 .chain(stream! { while let Some(x) = s2.next().await { yield Some(x) } });
835 stream! {
836 pin_mut!(chained);
837 while let Some(item) = chained.next().await {
838 if let Some(x) = item {
839 yield x;
840 }
841 }
842 }
843 .boxed()
844}
845
846pub fn interleave<O, S>(streams: Vec<S>) -> RS2Stream<O>
848where
849 S: Stream<Item = O> + Send + 'static + Unpin,
850 O: Send + 'static,
851{
852 if streams.is_empty() {
853 return empty();
854 }
855
856 stream! {
857 let mut streams: Vec<_> = streams.into_iter().map(|s| Box::pin(s)).collect();
858 let mut index = 0;
859
860 while !streams.is_empty() {
861 if index >= streams.len() {
862 index = 0;
863 }
864
865 match streams[index].next().await {
866 Some(item) => {
867 yield item;
868 index += 1;
869 }
870 None => {
871 streams.remove(index);
872 }
873 }
874 }
875 }
876 .boxed()
877}
878
879pub fn zip_with<A, B, O, F, S1, S2>(s1: S1, s2: S2, mut f: F) -> RS2Stream<O>
883where
884 S1: Stream<Item = A> + Send + 'static,
885 S2: Stream<Item = B> + Send + 'static,
886 F: FnMut(A, B) -> O + Send + 'static,
887 A: Send + 'static,
888 B: Send + 'static,
889 O: Send + 'static,
890{
891 stream! {
892 pin_mut!(s1);
893 pin_mut!(s2);
894
895 loop {
896 match futures_util::future::join(s1.next(), s2.next()).await {
897 (Some(a), Some(b)) => yield f(a, b),
898 _ => break, }
900 }
901 }
902 .boxed()
903}
904
905pub fn either<O, S1, S2>(s1: S1, s2: S2) -> RS2Stream<O>
948where
949 S1: Stream<Item = O> + Send + 'static,
950 S2: Stream<Item = O> + Send + 'static,
951 O: Send + 'static,
952{
953 stream! {
954 pin_mut!(s1);
955 pin_mut!(s2);
956
957 let mut s1_done = false;
958 let mut s2_done = false;
959
960 let mut using_s1 = true;
961
962 loop {
963 if s1_done {
964 match s2.next().await {
965 Some(item) => yield item,
966 None => break,
967 }
968 continue;
969 }
970
971 if s2_done {
972 match s1.next().await {
973 Some(item) => yield item,
974 None => break,
975 }
976 continue;
977 }
978
979 if using_s1 {
980 match s1.next().await {
981 Some(item) => {
982 yield item;
983 },
984 None => {
985 s1_done = true;
986 }
987 }
988 } else {
989 match s2.next().await {
990 Some(item) => {
991 yield item;
992 },
993 None => {
994 s2_done = true;
995 }
996 }
997 }
998
999 tokio::select! {
1000 biased;
1001
1002 maybe_item = s1.next() => {
1003 match maybe_item {
1004 Some(item) => {
1005 yield item;
1006 using_s1 = true;
1007 },
1008 None => {
1009 s1_done = true;
1010 }
1011 }
1012 },
1013 maybe_item = s2.next() => {
1014 match maybe_item {
1015 Some(item) => {
1016 yield item;
1017 using_s1 = false;
1018 },
1019 None => {
1020 s2_done = true;
1021 }
1022 }
1023 }
1024 }
1025 }
1026 }
1027 .boxed()
1028}
1029
1030pub fn debounce<O>(s: RS2Stream<O>, duration: Duration) -> RS2Stream<O>
1044where
1045 O: Send + 'static,
1046{
1047 stream! {
1048 pin_mut!(s);
1049
1050 let mut latest_item: Option<O> = None;
1051 let mut timer_handle: Option<tokio::task::JoinHandle<()>> = None;
1052
1053 let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(1);
1054
1055 loop {
1056 tokio::select! {
1057 maybe_item = s.next() => {
1058 match maybe_item {
1059 Some(item) => {
1060 if let Some(handle) = timer_handle.take() {
1061 handle.abort();
1062 }
1063
1064 latest_item = Some(item);
1065
1066 let tx_clone = tx.clone();
1067 timer_handle = Some(tokio::spawn(async move {
1068 tokio::time::sleep(duration).await;
1069 let _ = tx_clone.send(()).await;
1070 }));
1071 },
1072 None => {
1073 if let Some(item) = latest_item.take() {
1074 yield item;
1075 }
1076 break;
1077 }
1078 }
1079 },
1080 _ = rx.recv() => {
1081 if let Some(item) = latest_item.take() {
1082 yield item;
1083 }
1084 }
1085 }
1086 }
1087 }
1088 .boxed()
1089}
1090
1091pub fn distinct_until_changed<O>(s: RS2Stream<O>) -> RS2Stream<O>
1109where
1110 O: Clone + Send + PartialEq + 'static,
1111{
1112 stream! {
1113 pin_mut!(s);
1114 let mut prev: Option<O> = None;
1115
1116 while let Some(item) = s.next().await {
1117 match &prev {
1118 Some(p) if p == &item => {
1119 },
1120 _ => {
1121 yield item.clone();
1122 prev = Some(item);
1123 }
1124 }
1125 }
1126 }
1127 .boxed()
1128}
1129
1130pub fn sample<O>(s: RS2Stream<O>, interval: Duration) -> RS2Stream<O>
1168where
1169 O: Clone + Send + 'static,
1170{
1171 stream! {
1172 pin_mut!(s);
1173
1174 let mut latest_item: Option<O> = None;
1175 let mut has_new_value = false;
1176
1177 let mut timer = tokio::time::interval(interval);
1178 timer.tick().await;
1179
1180 loop {
1181 tokio::select! {
1182 maybe_item = s.next() => {
1183 match maybe_item {
1184 Some(item) => {
1185 latest_item = Some(item);
1186 has_new_value = true;
1187 },
1188 None => {
1189 if has_new_value {
1190 if let Some(item) = latest_item.take() {
1191 yield item;
1192 }
1193 }
1194 break;
1195 }
1196 }
1197 },
1198 _ = timer.tick() => {
1199 if has_new_value {
1200 if let Some(ref item) = latest_item {
1201 yield item.clone();
1202 has_new_value = false;
1203 }
1204 }
1205 }
1206 }
1207 }
1208 }
1209 .boxed()
1210}
1211
1212pub fn distinct_until_changed_by<O, F>(s: RS2Stream<O>, mut eq: F) -> RS2Stream<O>
1231where
1232 O: Clone + Send + 'static,
1233 F: FnMut(&O, &O) -> bool + Send + 'static,
1234{
1235 stream! {
1236 pin_mut!(s);
1237 let mut prev: Option<O> = None;
1238
1239 while let Some(item) = s.next().await {
1240 match &prev {
1241 Some(p) if eq(p, &item) => {
1242 },
1243 _ => {
1244 yield item.clone();
1245 prev = Some(item);
1246 }
1247 }
1248 }
1249 }
1250 .boxed()
1251}
1252
1253pub fn prefetch<O>(s: RS2Stream<O>, prefetch_count: usize) -> RS2Stream<O>
1260where
1261 O: Send + 'static,
1262{
1263 if prefetch_count == 0 {
1264 return s;
1265 }
1266
1267 let (mut tx, rx): (Sender<O>, Receiver<O>) = channel(prefetch_count);
1268
1269 spawn(async move {
1270 pin_mut!(s);
1271 while let Some(item) = s.next().await {
1272 if tx.send(item).await.is_err() {
1273 break;
1274 }
1275 }
1276 });
1277
1278 stream! {
1279 let mut rx = rx;
1280 while let Some(item) = rx.next().await {
1281 yield item;
1282 }
1283 }
1284 .boxed()
1285}
1286
1287pub fn rate_limit_backpressure<O>(s: RS2Stream<O>, capacity: usize) -> RS2Stream<O>
1289where
1290 O: Send + 'static,
1291{
1292 auto_backpressure_block(s, capacity)
1293}
1294
1295pub fn throttle<O>(s: RS2Stream<O>, duration: Duration) -> RS2Stream<O>
1297where
1298 O: Send + 'static,
1299{
1300 stream! {
1301 pin_mut!(s);
1302 while let Some(item) = s.next().await {
1303 yield item;
1304 sleep(duration).await;
1305 }
1306 }
1307 .boxed()
1308}
1309
1310pub fn tick<O>(period: Duration, item: O) -> RS2Stream<O>
1312where
1313 O: Clone + Send + 'static,
1314{
1315 stream! {
1316 loop {
1317 yield item.clone();
1318 sleep(period).await;
1319 }
1320 }
1321 .boxed()
1322}
1323
1324pub fn par_eval_map<I, O, Fut, F>(s: RS2Stream<I>, concurrency: usize, mut f: F) -> RS2Stream<O>
1330where
1331 F: FnMut(I) -> Fut + Send + 'static,
1332 Fut: Future<Output = O> + Send + 'static,
1333 O: Send + 'static,
1334 I: Send + 'static,
1335{
1336 let buffered_stream = auto_backpressure_block(s, concurrency * 2);
1337
1338 stream! {
1339 let mut in_flight = FuturesUnordered::new();
1340 pin_mut!(buffered_stream);
1341
1342 while let Some(item) = buffered_stream.next().await {
1343 in_flight.push(f(item));
1344 if in_flight.len() >= concurrency {
1345 if let Some(res) = in_flight.next().await {
1346 yield res;
1347 }
1348 }
1349 }
1350 while let Some(res) = in_flight.next().await {
1351 yield res;
1352 }
1353 }
1354 .boxed()
1355}
1356
1357pub fn par_eval_map_unordered<I, O, Fut, F>(
1359 s: RS2Stream<I>,
1360 concurrency: usize,
1361 f: F,
1362) -> RS2Stream<O>
1363where
1364 F: FnMut(I) -> Fut + Send + 'static,
1365 Fut: Future<Output = O> + Send + 'static,
1366 O: Send + 'static,
1367 I: Send + 'static,
1368{
1369 let buffered_stream = auto_backpressure_block(s, concurrency * 2);
1370 buffered_stream.map(f).buffer_unordered(concurrency).boxed()
1371}
1372
1373pub fn par_join<O, S>(s: RS2Stream<S>, concurrency: usize) -> RS2Stream<O>
1381where
1382 S: Stream<Item = O> + Send + 'static + Unpin,
1383 O: Send + 'static,
1384{
1385 let buffered_stream = auto_backpressure_block(s, concurrency * 2);
1386
1387 stream! {
1388 pin_mut!(buffered_stream);
1389
1390 let mut active_streams: Vec<S> = Vec::with_capacity(concurrency);
1391
1392 let mut outer_stream_done = false;
1393
1394 loop {
1395 while active_streams.len() < concurrency && !outer_stream_done {
1396 match buffered_stream.next().await {
1397 Some(inner_stream) => {
1398 active_streams.push(inner_stream);
1399 },
1400 None => {
1401 outer_stream_done = true;
1402 break;
1403 }
1404 }
1405 }
1406 if active_streams.is_empty() && outer_stream_done {
1407 break;
1408 }
1409
1410 let mut i = 0;
1411 while i < active_streams.len() {
1412 match active_streams[i].next().await {
1413 Some(item) => {
1414 yield item;
1415 i += 1;
1416 },
1417 None => {
1418 active_streams.swap_remove(i);
1419 }
1420 }
1421 }
1422 }
1423 }
1424 .boxed()
1425}
1426
1427pub fn bracket<A, O, St, FAcq, FUse, FRel, R>(
1433 acquire: FAcq,
1434 use_fn: FUse,
1435 release: FRel,
1436) -> RS2Stream<O>
1437where
1438 FAcq: Future<Output = A> + Send + 'static,
1439 FUse: FnOnce(A) -> St + Send + 'static,
1440 St: Stream<Item = O> + Send + 'static,
1441 FRel: FnOnce(A) -> R + Send + 'static,
1442 R: Future<Output = ()> + Send + 'static,
1443 O: Send + 'static,
1444 A: Clone + Send + 'static,
1445{
1446 stream! {
1447 let resource = acquire.await;
1448 let stream = use_fn(resource.clone());
1449 pin_mut!(stream);
1450 while let Some(item) = stream.next().await {
1451 yield item;
1452 }
1453 release(resource).await;
1454 }
1455 .boxed()
1456}
1457
1458pub fn bracket_case<A, O, E, St, FAcq, FUse, FRel, R>(
1460 acquire: FAcq,
1461 use_fn: FUse,
1462 release: FRel,
1463) -> RS2Stream<Result<O, E>>
1464where
1465 FAcq: Future<Output = A> + Send + 'static,
1466 FUse: FnOnce(A) -> St + Send + 'static,
1467 St: Stream<Item = Result<O, E>> + Send + 'static,
1468 FRel: FnOnce(A, ExitCase<E>) -> R + Send + 'static,
1469 R: Future<Output = ()> + Send + 'static,
1470 O: Send + 'static,
1471 E: Clone + Send + 'static,
1472 A: Clone + Send + 'static,
1473{
1474 stream! {
1475 let resource = acquire.await;
1476 let stream = use_fn(resource.clone());
1477 pin_mut!(stream);
1478 while let Some(item) = stream.next().await {
1479 yield item;
1480 }
1481 release(resource, ExitCase::Completed).await;
1482 }
1483 .boxed()
1484}
1485
1486pub use crate::rs2_result_stream_ext::RS2ResultStreamExt;
1492pub use crate::rs2_stream_ext::RS2StreamExt;