par_stream/functions.rs
1use crate::{
2 common::*,
3 config::{BufSize, ParParams},
4 rt,
5 stream::StreamExt as _,
6 try_stream::{TakeUntilError, TryStreamExt as _},
7 utils,
8};
9use flume::r#async::RecvStream;
10use tokio::sync::broadcast;
11
12/// Stream for the [try_par_unfold()] method.
13pub type TryParUnfold<T, E> = TakeUntilError<RecvStream<'static, Result<T, E>>, T, E>;
14
15/// Stream for the [try_par_unfold_blocking()] method.
16pub type TryParUnfoldBlocking<T, E> = TakeUntilError<RecvStream<'static, Result<T, E>>, T, E>;
17
18// // par_unfold_builder
19
20// pub use par_unfold_builder::*;
21// mod par_unfold_builder {
22// use super::*;
23
24// pub fn par_unfold_builder<State, Out, Fut, F>(f: F) -> ParUnfoldAsyncBuilder<State, Out, F>
25// where
26// F: FnMut(State) -> Fut,
27// Fut: 'static + Send + Future<Output = Option<(State, Out)>>,
28// State: 'static + Send,
29// Out: 'static + Send,
30// {
31// ParUnfoldAsyncBuilder::new(f)
32// }
33// }
34
35// // par_unfold_blocking_builder
36
37// pub use par_unfold_blocking_builder::*;
38// mod par_unfold_blocking_builder {
39// use super::*;
40
41// pub fn par_unfold_blocking_builder<State, Out, Func, F>(
42// f: F,
43// ) -> ParUnfoldBlockingBuilder<State, Out, F>
44// where
45// F: Send + FnMut(State) -> Func,
46// Func: 'static + Send + FnOnce() -> Option<(State, Out)>,
47// State: 'static + Send,
48// Out: 'static + Send,
49// {
50// ParUnfoldBlockingBuilder::new(f)
51// }
52// }
53
54// iter_blocking
55
56pub use iter_blocking::*;
57
58mod iter_blocking {
59 use super::*;
60
61 /// Converts an [Iterator] into a [Stream] by consuming the iterator in a blocking thread.
62 ///
63 /// It is useful when consuming the iterator is computationally expensive and involves blocking code.
64 /// It prevents blocking the asynchronous context when consuming the returned stream.
65 pub fn iter_blocking<B, I>(buf_size: B, iter: I) -> RecvStream<'static, I::Item>
66 where
67 B: Into<BufSize>,
68 I: 'static + IntoIterator + Send,
69 I::Item: Send,
70 {
71 let buf_size = buf_size.into().get();
72 let (tx, rx) = utils::channel(buf_size);
73
74 rt::spawn_blocking(move || {
75 for item in iter.into_iter() {
76 if tx.send(item).is_err() {
77 break;
78 }
79 }
80 });
81
82 rx.into_stream()
83 }
84}
85
86// par_unfold
87
88pub use par_unfold::*;
89
90mod par_unfold {
91 use super::*;
92
93 /// Produce stream elements from a parallel asynchronous task.
94 ///
95 /// This function spawns a set of parallel workers. Each worker produces and places
96 /// items to an output buffer. The worker pool size and buffer size is determined by
97 /// `params`.
98 ///
99 /// Each worker receives a copy of initialized state `init`, then iteratively calls the function
100 /// `f(worker_index, State) -> Fut`. The `Fut` is a future that returns `Option<(output, State)>`.
101 /// The future updates the state and produces an output item.
102 ///
103 /// If a worker receives a `None`, the worker with that worker index will halt, but it does not halt
104 /// the other workers. The output stream terminates after every worker halts.
105 ///
106 /// ```rust
107 /// # par_stream::rt::block_on_executor(async move {
108 /// use futures::prelude::*;
109 /// use par_stream::prelude::*;
110 /// use std::sync::{
111 /// atomic::{AtomicUsize, Ordering::*},
112 /// Arc,
113 /// };
114 ///
115 /// let mut vec: Vec<_> = par_stream::par_unfold(
116 /// None,
117 /// Arc::new(AtomicUsize::new(0)),
118 /// |_, counter| async move {
119 /// let output = counter.fetch_add(1, SeqCst);
120 /// (output < 1000).then(|| (output, counter))
121 /// },
122 /// )
123 /// .collect()
124 /// .await;
125 ///
126 /// vec.sort();
127 /// itertools::assert_equal(vec, 0..1000);
128 /// # })
129 /// ```
130 pub fn par_unfold<Item, State, P, F, Fut>(
131 params: P,
132 init: State,
133 f: F,
134 ) -> RecvStream<'static, Item>
135 where
136 P: Into<ParParams>,
137 F: 'static + FnMut(usize, State) -> Fut + Send + Clone,
138 Fut: 'static + Future<Output = Option<(Item, State)>> + Send,
139 Item: 'static + Send,
140 State: 'static + Send + Clone,
141 {
142 let ParParams {
143 num_workers,
144 buf_size,
145 } = params.into();
146 let (output_tx, output_rx) = utils::channel(buf_size);
147
148 (0..num_workers).for_each(|worker_index| {
149 let output_tx = output_tx.clone();
150 let state = init.clone();
151 let f = f.clone();
152
153 rt::spawn(async move {
154 let _ = stream::unfold((state, f), |(state, mut f)| async move {
155 f(worker_index, state)
156 .await
157 .map(|(item, state)| (item, (state, f)))
158 })
159 .map(Ok)
160 .forward(output_tx.into_sink())
161 .await;
162 });
163 });
164
165 output_rx.into_stream()
166 }
167
168 /// Produce stream elements from a parallel blocking task.
169 ///
170 /// This function spawns a set of parallel workers. Each worker produces and places
171 /// items to an output buffer. The worker pool size and buffer size is determined by
172 /// `params`.
173 ///
174 /// Each worker receives a copy of initialized state `init`, then iteratively calls the function
175 /// `f(worker_index, State) -> Option<(output, State)>` to update the state and produce an output item.
176 ///
177 /// If a worker receives a `None`, the worker with that worker index will halt, but it does not halt
178 /// the other workers. The output stream terminates after every worker halts.
179 ///
180 /// ```rust
181 /// # par_stream::rt::block_on_executor(async move {
182 /// use futures::prelude::*;
183 /// use par_stream::prelude::*;
184 /// use std::sync::{
185 /// atomic::{AtomicUsize, Ordering::*},
186 /// Arc,
187 /// };
188 ///
189 /// let mut vec: Vec<_> =
190 /// par_stream::par_unfold_blocking(None, Arc::new(AtomicUsize::new(0)), move |_, counter| {
191 /// let output = counter.fetch_add(1, SeqCst);
192 /// (output < 1000).then(|| (output, counter))
193 /// })
194 /// .collect()
195 /// .await;
196 ///
197 /// vec.sort();
198 /// itertools::assert_equal(vec, 0..1000);
199 /// # })
200 /// ```
201 pub fn par_unfold_blocking<Item, State, P, F>(
202 params: P,
203 init: State,
204 f: F,
205 ) -> RecvStream<'static, Item>
206 where
207 P: Into<ParParams>,
208 F: 'static + FnMut(usize, State) -> Option<(Item, State)> + Send + Clone,
209 Item: 'static + Send,
210 State: 'static + Send + Clone,
211 {
212 let ParParams {
213 num_workers,
214 buf_size,
215 } = params.into();
216 let (output_tx, output_rx) = utils::channel(buf_size);
217
218 (0..num_workers).for_each(|worker_index| {
219 let mut f = f.clone();
220 let mut state = init.clone();
221 let output_tx = output_tx.clone();
222
223 rt::spawn_blocking(move || {
224 while let Some((item, new_state)) = f(worker_index, state) {
225 if output_tx.send(item).is_ok() {
226 state = new_state;
227 } else {
228 break;
229 }
230 }
231 });
232 });
233
234 output_rx.into_stream()
235 }
236}
237
238// sync
239
240pub use sync::*;
241
242mod sync {
243 use super::*;
244 use std::{cmp::Reverse, collections::BinaryHeap};
245
246 #[derive(Derivative)]
247 #[derivative(PartialEq, Eq, PartialOrd, Ord)]
248 struct KV<K, V> {
249 pub key: K,
250 pub index: usize,
251 #[derivative(PartialEq = "ignore", PartialOrd = "ignore", Ord = "ignore")]
252 pub value: V,
253 }
254
255 /// Synchronize streams by pairing up keys of each stream item.
256 ///
257 /// The `key_fn` constructs the key for each item.
258 /// The input items are grouped by their keys in the interal buffer until
259 /// all items with the key arrives. The finished items are yielded in type
260 /// `Ok((stream_index, item))` in monotonic manner.
261 ///
262 /// If any one of the `streams` generates a non-monotonic item. The item is
263 /// yielded as `Err((stream_index, item))` immediately.
264 pub fn sync_by_key<I, F, K, S>(
265 buf_size: impl Into<Option<usize>>,
266 key_fn: F,
267 streams: I,
268 ) -> BoxStream<'static, Result<(usize, S::Item), (usize, S::Item)>>
269 where
270 I: IntoIterator<Item = S>,
271 S: 'static + Stream + Send,
272 S::Item: 'static + Send,
273 F: 'static + Fn(&S::Item) -> K + Send,
274 K: 'static + Clone + Ord + Send,
275 {
276 let buf_size = buf_size.into().unwrap_or_else(num_cpus::get);
277
278 let streams: Vec<_> = streams
279 .into_iter()
280 .enumerate()
281 .map(|(stream_index, stream)| stream.map(move |item| (stream_index, item)).boxed())
282 .collect();
283 let num_streams = streams.len();
284
285 match num_streams {
286 0 => {
287 // The case that no stream provided, return empty stream
288 return stream::empty().boxed();
289 }
290 1 => {
291 // Fast path for single stream case
292 return streams.into_iter().next().unwrap().map(Ok).boxed();
293 }
294 _ => {
295 // Fall through for multiple streams
296 }
297 }
298
299 let mut input_stream =
300 stream::select_all(streams).stateful_map(key_fn, |key_fn, (index, item)| {
301 let key = key_fn(&item);
302 Some((key_fn, (index, key, item)))
303 });
304 let (output_tx, output_rx) = utils::channel(buf_size);
305
306 rt::spawn(async move {
307 let mut heap: BinaryHeap<Reverse<KV<K, S::Item>>> = BinaryHeap::new();
308 let mut min_items: Vec<Option<K>> = vec![None; num_streams];
309 let mut threshold: Option<K>;
310
311 'worker: loop {
312 'input: while let Some((index, key, item)) = input_stream.next().await {
313 // update min item for that stream
314 {
315 let prev = &mut min_items[index];
316 match prev {
317 Some(prev) if *prev <= key => {
318 *prev = key.clone();
319 }
320 Some(_) => {
321 let ok = output_tx.send_async(Err((index, item))).await.is_ok();
322 if !ok {
323 break 'worker;
324 }
325 continue 'input;
326 }
327 None => *prev = Some(key.clone()),
328 }
329 }
330
331 // save item
332 heap.push(Reverse(KV {
333 index,
334 key,
335 value: item,
336 }));
337
338 // update global threshold
339 threshold = min_items.iter().min().unwrap().clone();
340
341 // pop items below threshold
342 if let Some(threshold) = &threshold {
343 'output: while let Some(Reverse(KV { key, .. })) = heap.peek() {
344 if key < threshold {
345 let KV { value, index, .. } = heap.pop().unwrap().0;
346 let ok = output_tx.send(Ok((index, value))).is_ok();
347 if !ok {
348 break 'worker;
349 }
350 } else {
351 break 'output;
352 }
353 }
354 }
355 }
356
357 // send remaining items
358 for Reverse(KV { index, value, .. }) in heap {
359 let ok = output_tx.send(Ok((index, value))).is_ok();
360 if !ok {
361 break 'worker;
362 }
363 }
364
365 break;
366 }
367 });
368
369 output_rx.into_stream().boxed()
370 }
371}
372
373// try_sync
374
375pub use try_sync::*;
376
377mod try_sync {
378 use super::*;
379 use std::{cmp::Reverse, collections::BinaryHeap};
380
381 #[derive(Derivative)]
382 #[derivative(PartialEq, Eq, PartialOrd, Ord)]
383 struct KV<K, V> {
384 pub key: K,
385 pub index: usize,
386 #[derivative(PartialEq = "ignore", PartialOrd = "ignore", Ord = "ignore")]
387 pub value: V,
388 }
389
390 /// Synchronize streams by pairing up keys of each stream item. It is fallible counterpart of [sync_by_key](crate::sync_by_key).
391 ///
392 /// The `key_fn` constructs the key for each item.
393 /// The input items are grouped by their keys in the interal buffer until
394 /// all items with the key arrives. The finished items are yielded in type
395 /// `Ok(Ok((stream_index, item)))` in monotonic manner.
396 ///
397 /// If any one of the `streams` generates a non-monotonic item. The item is
398 /// yielded as `Ok(Err((stream_index, item)))` immediately.
399 ///
400 /// When an error is receiver from one of the `streams`. The returned stream
401 /// yields `Err(err)` and no longer produce future items.
402 pub fn try_sync_by_key<I, F, K, T, E, S>(
403 buf_size: impl Into<Option<usize>>,
404 key_fn: F,
405 streams: I,
406 ) -> BoxStream<'static, Result<Result<(usize, T), (usize, T)>, E>>
407 where
408 I: IntoIterator<Item = S>,
409 S: 'static + Stream<Item = Result<T, E>> + Send,
410 T: 'static + Send,
411 E: 'static + Send,
412 F: 'static + Fn(&T) -> K + Send,
413 K: 'static + Clone + Ord + Send,
414 {
415 let buf_size = buf_size.into().unwrap_or_else(num_cpus::get);
416
417 let streams: Vec<_> = streams
418 .into_iter()
419 .enumerate()
420 .map(|(index, stream)| stream.map_ok(move |item| (index, item)).boxed())
421 .collect();
422 let num_streams = streams.len();
423
424 match num_streams {
425 0 => {
426 // The case that no stream provided, return empty stream
427 return stream::empty().boxed();
428 }
429 1 => {
430 // Fast path for single stream case
431 return streams
432 .into_iter()
433 .next()
434 .unwrap()
435 .and_then(|item| async move { Ok(Ok(item)) })
436 .boxed();
437 }
438 _ => {
439 // Fall through for multiple streams
440 }
441 }
442
443 let (output_tx, output_rx) = utils::channel(buf_size);
444 let mut input_stream =
445 stream::select_all(streams).stateful_map(key_fn, |key_fn, result| {
446 let result = result.map(|(index, item)| {
447 let key = key_fn(&item);
448 (index, key, item)
449 });
450
451 Some((key_fn, result))
452 });
453
454 rt::spawn(async move {
455 let mut heap: BinaryHeap<Reverse<KV<K, T>>> = BinaryHeap::new();
456 let mut min_items: Vec<Option<K>> = vec![None; num_streams];
457 let mut threshold: Option<K>;
458
459 'worker: loop {
460 'input: while let Some(result) = input_stream.next().await {
461 let (index, key, item) = match result {
462 Ok(tuple) => tuple,
463 Err(err) => {
464 let _ = output_tx.send_async(Err(err)).await;
465 break 'worker;
466 }
467 };
468
469 // update min item for that stream
470 {
471 let prev = &mut min_items[index];
472 match prev {
473 Some(prev) if *prev <= key => {
474 *prev = key.clone();
475 }
476 Some(_) => {
477 let ok = output_tx.send(Ok(Err((index, item)))).is_ok();
478 if !ok {
479 break 'worker;
480 }
481 continue 'input;
482 }
483 None => *prev = Some(key.clone()),
484 }
485 }
486
487 // save item
488 heap.push(Reverse(KV {
489 index,
490 key,
491 value: item,
492 }));
493
494 // update global threshold
495 threshold = min_items.iter().min().unwrap().clone();
496
497 // pop items below threshold
498 if let Some(threshold) = &threshold {
499 'output: while let Some(Reverse(KV { key, .. })) = heap.peek() {
500 if key < threshold {
501 let KV { value, index, .. } = heap.pop().unwrap().0;
502 let ok = output_tx.send(Ok(Ok((index, value)))).is_ok();
503 if !ok {
504 break 'worker;
505 }
506 } else {
507 break 'output;
508 }
509 }
510 }
511 }
512
513 // send remaining items
514 for Reverse(KV { index, value, .. }) in heap {
515 let ok = output_tx.send(Ok(Ok((index, value)))).is_ok();
516 if !ok {
517 break 'worker;
518 }
519 }
520
521 break;
522 }
523 });
524
525 output_rx.into_stream().boxed()
526 }
527}
528
529// try_par_unfold
530
531pub use try_par_unfold::*;
532mod try_par_unfold {
533 use super::*;
534
535 /// Produce stream elements from a fallible parallel asynchronous task.
536 ///
537 /// This function spawns a set of parallel workers. Each worker produces and places
538 /// items to an output buffer. The worker pool size and buffer size is determined by
539 /// `params`.
540 ///
541 /// Each worker receives a copy of initialized state `init`, then iteratively calls the function
542 /// `f(worker_index, State) -> Fut`. The `Fut` is a future that returns `Result<Option<(output, State)>, Error>`.
543 /// The future updates the state and produces an output item.
544 ///
545 /// If a worker receives an error `Err(_)`, the error is produced in output stream and the stream halts for ever.
546 /// If a worker receives a `Ok(None)`, the worker with that worker index will halt, but it does not halt
547 /// the other workers. The output stream terminates after every worker halts.
548 pub fn try_par_unfold<Item, Error, State, P, F, Fut>(
549 params: P,
550 init: State,
551 f: F,
552 ) -> TryParUnfold<Item, Error>
553 where
554 P: Into<ParParams>,
555 F: 'static + FnMut(usize, State) -> Fut + Send + Clone,
556 Fut: 'static + Future<Output = Result<Option<(Item, State)>, Error>> + Send,
557 State: 'static + Send + Clone,
558 Item: 'static + Send,
559 Error: 'static + Send,
560 {
561 let ParParams {
562 num_workers,
563 buf_size,
564 } = params.into();
565 let (output_tx, output_rx) = utils::channel(buf_size);
566 let (terminate_tx, _) = broadcast::channel::<()>(1);
567
568 (0..num_workers).for_each(move |worker_index| {
569 let f = f.clone();
570 let state = init.clone();
571 let output_tx = output_tx.clone();
572 let mut terminate_rx = terminate_tx.subscribe();
573 let terminate_tx = terminate_tx.clone();
574
575 rt::spawn(async move {
576 let _ = stream::repeat(())
577 .take_until(async move {
578 let _ = terminate_rx.recv().await;
579 })
580 .map(Ok)
581 .try_stateful_then(
582 (f, terminate_tx, state),
583 |(mut f, terminate_tx, state), ()| async move {
584 let result = f(worker_index, state).await;
585
586 if result.is_err() {
587 let _ = terminate_tx.send(());
588 }
589
590 result.map(|option| {
591 option.map(|(item, state)| ((f, terminate_tx, state), item))
592 })
593 },
594 )
595 .map(Ok)
596 .forward(output_tx.into_sink())
597 .await;
598 });
599 });
600
601 output_rx.into_stream().take_until_error()
602 }
603}
604
605// try_par_unfold_blocking
606
607pub use try_par_unfold_blocking::*;
608mod try_par_unfold_blocking {
609 use super::*;
610
611 /// Produce stream elements from a fallible parallel asynchronous task.
612 ///
613 /// This function spawns a set of parallel workers. Each worker produces and places
614 /// items to an output buffer. The worker pool size and buffer size is determined by
615 /// `params`.
616 ///
617 /// Each worker receives a copy of initialized state `init`, then iteratively calls the function
618 /// `f(worker_index, State) -> Result<Option<(output, State)>, Error>`, which updates the state
619 /// and produces an output item.
620 ///
621 /// If a worker receives an error `Err(_)`, the error is produced in output stream and the stream halts for ever.
622 /// If a worker receives a `Ok(None)`, the worker with that worker index will halt, but it does not halt
623 /// the other workers. The output stream terminates after every worker halts.
624 pub fn try_par_unfold_blocking<Item, Error, State, P, F>(
625 params: P,
626 init: State,
627 f: F,
628 ) -> TryParUnfoldBlocking<Item, Error>
629 where
630 F: 'static + FnMut(usize, State) -> Result<Option<(Item, State)>, Error> + Send + Clone,
631 Item: 'static + Send,
632 Error: 'static + Send,
633 State: 'static + Send + Clone,
634 P: Into<ParParams>,
635 {
636 let ParParams {
637 num_workers,
638 buf_size,
639 } = params.into();
640 let (output_tx, output_rx) = utils::channel(buf_size);
641 let terminate = Arc::new(AtomicBool::new(false));
642
643 (0..num_workers).for_each(|worker_index| {
644 let mut f = f.clone();
645 let mut state = init.clone();
646 let output_tx = output_tx.clone();
647 let terminate = terminate.clone();
648
649 rt::spawn_blocking(move || loop {
650 if terminate.load(Acquire) {
651 break;
652 }
653
654 match f(worker_index, state) {
655 Ok(Some((item, new_state))) => {
656 let result = output_tx.send(Ok(item));
657 if result.is_err() {
658 break;
659 }
660 state = new_state;
661 }
662 Ok(None) => {
663 break;
664 }
665 Err(err) => {
666 let _ = output_tx.send(Err(err));
667 terminate.store(true, Release);
668 break;
669 }
670 }
671 });
672 });
673
674 output_rx.into_stream().take_until_error()
675 }
676}
677
678#[cfg(test)]
679mod tests {
680 use super::*;
681 use crate::utils::async_test;
682 use rand::prelude::*;
683
684 async_test! {
685
686
687 async fn sync_test() {
688 {
689 let stream1 = stream::iter([1, 3, 5, 7]);
690 let stream2 = stream::iter([2, 4, 6, 8]);
691
692 let collected: Vec<_> = super::sync_by_key(None, |&val| val, [stream1, stream2])
693 .collect()
694 .await;
695
696 assert_eq!(
697 collected,
698 [
699 Ok((0, 1)),
700 Ok((1, 2)),
701 Ok((0, 3)),
702 Ok((1, 4)),
703 Ok((0, 5)),
704 Ok((1, 6)),
705 Ok((0, 7)),
706 Ok((1, 8)),
707 ]
708 );
709 }
710
711 {
712 let stream1 = stream::iter([1, 2, 3]);
713 let stream2 = stream::iter([2, 1, 3]);
714
715 let (synced, leaked): (Vec<_>, Vec<_>) =
716 super::sync_by_key(None, |&val| val, [stream1, stream2])
717 .map(|result| match result {
718 Ok(item) => (Some(item), None),
719 Err(item) => (None, Some(item)),
720 })
721 .unzip()
722 .await;
723 let synced: Vec<_> = synced.into_iter().flatten().collect();
724 let leaked: Vec<_> = leaked.into_iter().flatten().collect();
725
726 assert_eq!(synced, [(0, 1), (0, 2), (1, 2), (0, 3), (1, 3)]);
727 assert_eq!(leaked, [(1, 1)]);
728 }
729 }
730
731
732 async fn par_unfold_test() {
733 let max_quota = 100;
734
735 let count = super::par_unfold(
736 4,
737 Arc::new(AtomicUsize::new(0)),
738 move |_, quota| async move {
739 let enough = quota.fetch_add(1, AcqRel) < max_quota;
740
741 enough.then(|| {
742 let mut rng = rand::thread_rng();
743 let val = rng.gen_range(0..10);
744 (val, quota)
745 })
746 },
747 )
748 .count()
749 .await;
750
751 assert_eq!(count, max_quota);
752 }
753
754
755 async fn par_unfold_blocking_test() {
756 let max_quota = 100;
757
758 let count =
759 super::par_unfold_blocking(4, Arc::new(AtomicUsize::new(0)), move |_, quota| {
760 let enough = quota.fetch_add(1, AcqRel) < max_quota;
761
762 enough.then(|| {
763 let mut rng = rand::thread_rng();
764 let val = rng.gen_range(0..10);
765 (val, quota)
766 })
767 })
768 .count()
769 .await;
770
771 assert_eq!(count, max_quota);
772 }
773
774
775 async fn try_sync_test() {
776 {
777 let stream1 = stream::iter(vec![Ok(3), Ok(1), Ok(5), Ok(7)]);
778 let stream2 = stream::iter(vec![Ok(2), Ok(4), Ok(6), Err("error")]);
779
780 let mut stream = super::try_sync_by_key(None, |&val| val, [stream1, stream2]);
781
782 let mut prev = None;
783 while let Some(result) = stream.next().await {
784 match result {
785 Ok(Ok((index, value))) => {
786 if value & 1 == 1 {
787 assert_eq!(index, 0);
788 } else {
789 assert_eq!(index, 1);
790 }
791
792 if let Some(prev) = prev {
793 assert!(prev < value);
794 }
795 prev = Some(value);
796 }
797 Ok(Err((index, value))) => {
798 assert_eq!(index, 0);
799 assert_eq!(value, 1);
800 }
801 Err(err) => {
802 assert_eq!(err, "error");
803 break;
804 }
805 }
806 }
807
808 assert_eq!(stream.next().await, None);
809 }
810 }
811
812
813 async fn try_par_unfold_test() {
814 let max_quota = 100;
815
816 let mut stream = super::try_par_unfold(
817 None,
818 Arc::new(AtomicUsize::new(0)),
819 move |index, quota| async move {
820 let enough = quota.fetch_add(1, AcqRel) < max_quota;
821
822 if enough {
823 Ok(Some((index, quota)))
824 } else {
825 Err("out of quota")
826 }
827 },
828 );
829
830 let mut counts = HashMap::new();
831
832 loop {
833 let result = stream.next().await;
834
835 match result {
836 Some(Ok(index)) => {
837 *counts.entry(index).or_insert_with(|| 0) += 1;
838 }
839 Some(Err("out of quota")) => {
840 break;
841 }
842 Some(Err(_)) | None => {
843 unreachable!();
844 }
845 }
846 }
847
848 assert!(stream.next().await.is_none());
849 assert!(counts.values().all(|&count| count <= max_quota));
850 assert!(counts.values().cloned().sum::<usize>() <= max_quota);
851 }
852
853
854 async fn try_par_unfold_blocking_test() {
855 let max_quota = 100;
856
857 let mut stream = super::try_par_unfold_blocking(
858 None,
859 Arc::new(AtomicUsize::new(0)),
860 move |index, quota| {
861 let enough = quota.fetch_add(1, AcqRel) < max_quota;
862
863 if enough {
864 Ok(Some((index, quota)))
865 } else {
866 Err("out of quota")
867 }
868 },
869 );
870
871 let mut counts = HashMap::new();
872
873 loop {
874 let result = stream.next().await;
875
876 match result {
877 Some(Ok(index)) => {
878 *counts.entry(index).or_insert_with(|| 0) += 1;
879 }
880 Some(Err("out of quota")) => {
881 break;
882 }
883 Some(Err(_)) | None => {
884 unreachable!();
885 }
886 }
887 }
888
889 assert!(stream.next().await.is_none());
890 assert!(counts.values().all(|&count| count <= max_quota));
891 assert!(counts.values().cloned().sum::<usize>() <= max_quota);
892 }
893
894
895 async fn iter_blocking_test() {
896 let iter = (0..2).map(|val| {
897 std::thread::sleep(Duration::from_millis(100));
898 val
899 });
900
901 let vec: Vec<_> = stream::select(
902 super::iter_blocking(None, iter),
903 future::ready(2).into_stream(),
904 )
905 .collect()
906 .await;
907
908 // assuming iter_blocking() will not block the executor,
909 // 2 must go before 0, 1
910 assert_eq!(vec, [2, 0, 1]);
911 }
912 }
913}