par_stream/
functions.rs

1use crate::{
2    common::*,
3    config::{BufSize, ParParams},
4    rt,
5    stream::StreamExt as _,
6    try_stream::{TakeUntilError, TryStreamExt as _},
7    utils,
8};
9use flume::r#async::RecvStream;
10use tokio::sync::broadcast;
11
12/// Stream for the [try_par_unfold()] method.
13pub type TryParUnfold<T, E> = TakeUntilError<RecvStream<'static, Result<T, E>>, T, E>;
14
15/// Stream for the [try_par_unfold_blocking()] method.
16pub type TryParUnfoldBlocking<T, E> = TakeUntilError<RecvStream<'static, Result<T, E>>, T, E>;
17
18// // par_unfold_builder
19
20// pub use par_unfold_builder::*;
21// mod par_unfold_builder {
22//     use super::*;
23
24//     pub fn par_unfold_builder<State, Out, Fut, F>(f: F) -> ParUnfoldAsyncBuilder<State, Out, F>
25//     where
26//         F: FnMut(State) -> Fut,
27//         Fut: 'static + Send + Future<Output = Option<(State, Out)>>,
28//         State: 'static + Send,
29//         Out: 'static + Send,
30//     {
31//         ParUnfoldAsyncBuilder::new(f)
32//     }
33// }
34
35// // par_unfold_blocking_builder
36
37// pub use par_unfold_blocking_builder::*;
38// mod par_unfold_blocking_builder {
39//     use super::*;
40
41//     pub fn par_unfold_blocking_builder<State, Out, Func, F>(
42//         f: F,
43//     ) -> ParUnfoldBlockingBuilder<State, Out, F>
44//     where
45//         F: Send + FnMut(State) -> Func,
46//         Func: 'static + Send + FnOnce() -> Option<(State, Out)>,
47//         State: 'static + Send,
48//         Out: 'static + Send,
49//     {
50//         ParUnfoldBlockingBuilder::new(f)
51//     }
52// }
53
54// iter_blocking
55
56pub use iter_blocking::*;
57
58mod iter_blocking {
59    use super::*;
60
61    /// Converts an [Iterator] into a [Stream] by consuming the iterator in a blocking thread.
62    ///
63    /// It is useful when consuming the iterator is computationally expensive and involves blocking code.
64    /// It prevents blocking the asynchronous context when consuming the returned stream.
65    pub fn iter_blocking<B, I>(buf_size: B, iter: I) -> RecvStream<'static, I::Item>
66    where
67        B: Into<BufSize>,
68        I: 'static + IntoIterator + Send,
69        I::Item: Send,
70    {
71        let buf_size = buf_size.into().get();
72        let (tx, rx) = utils::channel(buf_size);
73
74        rt::spawn_blocking(move || {
75            for item in iter.into_iter() {
76                if tx.send(item).is_err() {
77                    break;
78                }
79            }
80        });
81
82        rx.into_stream()
83    }
84}
85
86// par_unfold
87
88pub use par_unfold::*;
89
90mod par_unfold {
91    use super::*;
92
93    /// Produce stream elements from a parallel asynchronous task.
94    ///
95    /// This function spawns a set of parallel workers. Each worker produces and places
96    /// items to an output buffer. The worker pool size and buffer size is determined by
97    /// `params`.
98    ///
99    /// Each worker receives a copy of initialized state `init`, then iteratively calls the function
100    /// `f(worker_index, State) -> Fut`. The `Fut` is a future that returns `Option<(output, State)>`.
101    /// The future updates the state and produces an output item.
102    ///
103    /// If a worker receives a `None`, the worker with that worker index will halt, but it does not halt
104    /// the other workers. The output stream terminates after every worker halts.
105    ///
106    /// ```rust
107    /// # par_stream::rt::block_on_executor(async move {
108    /// use futures::prelude::*;
109    /// use par_stream::prelude::*;
110    /// use std::sync::{
111    ///     atomic::{AtomicUsize, Ordering::*},
112    ///     Arc,
113    /// };
114    ///
115    /// let mut vec: Vec<_> = par_stream::par_unfold(
116    ///     None,
117    ///     Arc::new(AtomicUsize::new(0)),
118    ///     |_, counter| async move {
119    ///         let output = counter.fetch_add(1, SeqCst);
120    ///         (output < 1000).then(|| (output, counter))
121    ///     },
122    /// )
123    /// .collect()
124    /// .await;
125    ///
126    /// vec.sort();
127    /// itertools::assert_equal(vec, 0..1000);
128    /// # })
129    /// ```
130    pub fn par_unfold<Item, State, P, F, Fut>(
131        params: P,
132        init: State,
133        f: F,
134    ) -> RecvStream<'static, Item>
135    where
136        P: Into<ParParams>,
137        F: 'static + FnMut(usize, State) -> Fut + Send + Clone,
138        Fut: 'static + Future<Output = Option<(Item, State)>> + Send,
139        Item: 'static + Send,
140        State: 'static + Send + Clone,
141    {
142        let ParParams {
143            num_workers,
144            buf_size,
145        } = params.into();
146        let (output_tx, output_rx) = utils::channel(buf_size);
147
148        (0..num_workers).for_each(|worker_index| {
149            let output_tx = output_tx.clone();
150            let state = init.clone();
151            let f = f.clone();
152
153            rt::spawn(async move {
154                let _ = stream::unfold((state, f), |(state, mut f)| async move {
155                    f(worker_index, state)
156                        .await
157                        .map(|(item, state)| (item, (state, f)))
158                })
159                .map(Ok)
160                .forward(output_tx.into_sink())
161                .await;
162            });
163        });
164
165        output_rx.into_stream()
166    }
167
168    /// Produce stream elements from a parallel blocking task.
169    ///
170    /// This function spawns a set of parallel workers. Each worker produces and places
171    /// items to an output buffer. The worker pool size and buffer size is determined by
172    /// `params`.
173    ///
174    /// Each worker receives a copy of initialized state `init`, then iteratively calls the function
175    /// `f(worker_index, State) -> Option<(output, State)>` to update the state and produce an output item.
176    ///
177    /// If a worker receives a `None`, the worker with that worker index will halt, but it does not halt
178    /// the other workers. The output stream terminates after every worker halts.
179    ///
180    /// ```rust
181    /// # par_stream::rt::block_on_executor(async move {
182    /// use futures::prelude::*;
183    /// use par_stream::prelude::*;
184    /// use std::sync::{
185    ///     atomic::{AtomicUsize, Ordering::*},
186    ///     Arc,
187    /// };
188    ///
189    /// let mut vec: Vec<_> =
190    ///     par_stream::par_unfold_blocking(None, Arc::new(AtomicUsize::new(0)), move |_, counter| {
191    ///         let output = counter.fetch_add(1, SeqCst);
192    ///         (output < 1000).then(|| (output, counter))
193    ///     })
194    ///     .collect()
195    ///     .await;
196    ///
197    /// vec.sort();
198    /// itertools::assert_equal(vec, 0..1000);
199    /// # })
200    /// ```
201    pub fn par_unfold_blocking<Item, State, P, F>(
202        params: P,
203        init: State,
204        f: F,
205    ) -> RecvStream<'static, Item>
206    where
207        P: Into<ParParams>,
208        F: 'static + FnMut(usize, State) -> Option<(Item, State)> + Send + Clone,
209        Item: 'static + Send,
210        State: 'static + Send + Clone,
211    {
212        let ParParams {
213            num_workers,
214            buf_size,
215        } = params.into();
216        let (output_tx, output_rx) = utils::channel(buf_size);
217
218        (0..num_workers).for_each(|worker_index| {
219            let mut f = f.clone();
220            let mut state = init.clone();
221            let output_tx = output_tx.clone();
222
223            rt::spawn_blocking(move || {
224                while let Some((item, new_state)) = f(worker_index, state) {
225                    if output_tx.send(item).is_ok() {
226                        state = new_state;
227                    } else {
228                        break;
229                    }
230                }
231            });
232        });
233
234        output_rx.into_stream()
235    }
236}
237
238// sync
239
240pub use sync::*;
241
242mod sync {
243    use super::*;
244    use std::{cmp::Reverse, collections::BinaryHeap};
245
246    #[derive(Derivative)]
247    #[derivative(PartialEq, Eq, PartialOrd, Ord)]
248    struct KV<K, V> {
249        pub key: K,
250        pub index: usize,
251        #[derivative(PartialEq = "ignore", PartialOrd = "ignore", Ord = "ignore")]
252        pub value: V,
253    }
254
255    /// Synchronize streams by pairing up keys of each stream item.
256    ///
257    /// The `key_fn` constructs the key for each item.
258    /// The input items are grouped by their keys in the interal buffer until
259    /// all items with the key arrives. The finished items are yielded in type
260    /// `Ok((stream_index, item))` in monotonic manner.
261    ///
262    /// If any one of the `streams` generates a non-monotonic item. The item is
263    /// yielded as `Err((stream_index, item))` immediately.
264    pub fn sync_by_key<I, F, K, S>(
265        buf_size: impl Into<Option<usize>>,
266        key_fn: F,
267        streams: I,
268    ) -> BoxStream<'static, Result<(usize, S::Item), (usize, S::Item)>>
269    where
270        I: IntoIterator<Item = S>,
271        S: 'static + Stream + Send,
272        S::Item: 'static + Send,
273        F: 'static + Fn(&S::Item) -> K + Send,
274        K: 'static + Clone + Ord + Send,
275    {
276        let buf_size = buf_size.into().unwrap_or_else(num_cpus::get);
277
278        let streams: Vec<_> = streams
279            .into_iter()
280            .enumerate()
281            .map(|(stream_index, stream)| stream.map(move |item| (stream_index, item)).boxed())
282            .collect();
283        let num_streams = streams.len();
284
285        match num_streams {
286            0 => {
287                // The case that no stream provided, return empty stream
288                return stream::empty().boxed();
289            }
290            1 => {
291                // Fast path for single stream case
292                return streams.into_iter().next().unwrap().map(Ok).boxed();
293            }
294            _ => {
295                // Fall through for multiple streams
296            }
297        }
298
299        let mut input_stream =
300            stream::select_all(streams).stateful_map(key_fn, |key_fn, (index, item)| {
301                let key = key_fn(&item);
302                Some((key_fn, (index, key, item)))
303            });
304        let (output_tx, output_rx) = utils::channel(buf_size);
305
306        rt::spawn(async move {
307            let mut heap: BinaryHeap<Reverse<KV<K, S::Item>>> = BinaryHeap::new();
308            let mut min_items: Vec<Option<K>> = vec![None; num_streams];
309            let mut threshold: Option<K>;
310
311            'worker: loop {
312                'input: while let Some((index, key, item)) = input_stream.next().await {
313                    // update min item for that stream
314                    {
315                        let prev = &mut min_items[index];
316                        match prev {
317                            Some(prev) if *prev <= key => {
318                                *prev = key.clone();
319                            }
320                            Some(_) => {
321                                let ok = output_tx.send_async(Err((index, item))).await.is_ok();
322                                if !ok {
323                                    break 'worker;
324                                }
325                                continue 'input;
326                            }
327                            None => *prev = Some(key.clone()),
328                        }
329                    }
330
331                    // save item
332                    heap.push(Reverse(KV {
333                        index,
334                        key,
335                        value: item,
336                    }));
337
338                    // update global threshold
339                    threshold = min_items.iter().min().unwrap().clone();
340
341                    // pop items below threshold
342                    if let Some(threshold) = &threshold {
343                        'output: while let Some(Reverse(KV { key, .. })) = heap.peek() {
344                            if key < threshold {
345                                let KV { value, index, .. } = heap.pop().unwrap().0;
346                                let ok = output_tx.send(Ok((index, value))).is_ok();
347                                if !ok {
348                                    break 'worker;
349                                }
350                            } else {
351                                break 'output;
352                            }
353                        }
354                    }
355                }
356
357                // send remaining items
358                for Reverse(KV { index, value, .. }) in heap {
359                    let ok = output_tx.send(Ok((index, value))).is_ok();
360                    if !ok {
361                        break 'worker;
362                    }
363                }
364
365                break;
366            }
367        });
368
369        output_rx.into_stream().boxed()
370    }
371}
372
373// try_sync
374
375pub use try_sync::*;
376
377mod try_sync {
378    use super::*;
379    use std::{cmp::Reverse, collections::BinaryHeap};
380
381    #[derive(Derivative)]
382    #[derivative(PartialEq, Eq, PartialOrd, Ord)]
383    struct KV<K, V> {
384        pub key: K,
385        pub index: usize,
386        #[derivative(PartialEq = "ignore", PartialOrd = "ignore", Ord = "ignore")]
387        pub value: V,
388    }
389
390    /// Synchronize streams by pairing up keys of each stream item. It is fallible counterpart of [sync_by_key](crate::sync_by_key).
391    ///
392    /// The `key_fn` constructs the key for each item.
393    /// The input items are grouped by their keys in the interal buffer until
394    /// all items with the key arrives. The finished items are yielded in type
395    /// `Ok(Ok((stream_index, item)))` in monotonic manner.
396    ///
397    /// If any one of the `streams` generates a non-monotonic item. The item is
398    /// yielded as `Ok(Err((stream_index, item)))` immediately.
399    ///
400    /// When an error is receiver from one of the `streams`. The returned stream
401    /// yields `Err(err)` and no longer produce future items.
402    pub fn try_sync_by_key<I, F, K, T, E, S>(
403        buf_size: impl Into<Option<usize>>,
404        key_fn: F,
405        streams: I,
406    ) -> BoxStream<'static, Result<Result<(usize, T), (usize, T)>, E>>
407    where
408        I: IntoIterator<Item = S>,
409        S: 'static + Stream<Item = Result<T, E>> + Send,
410        T: 'static + Send,
411        E: 'static + Send,
412        F: 'static + Fn(&T) -> K + Send,
413        K: 'static + Clone + Ord + Send,
414    {
415        let buf_size = buf_size.into().unwrap_or_else(num_cpus::get);
416
417        let streams: Vec<_> = streams
418            .into_iter()
419            .enumerate()
420            .map(|(index, stream)| stream.map_ok(move |item| (index, item)).boxed())
421            .collect();
422        let num_streams = streams.len();
423
424        match num_streams {
425            0 => {
426                // The case that no stream provided, return empty stream
427                return stream::empty().boxed();
428            }
429            1 => {
430                // Fast path for single stream case
431                return streams
432                    .into_iter()
433                    .next()
434                    .unwrap()
435                    .and_then(|item| async move { Ok(Ok(item)) })
436                    .boxed();
437            }
438            _ => {
439                // Fall through for multiple streams
440            }
441        }
442
443        let (output_tx, output_rx) = utils::channel(buf_size);
444        let mut input_stream =
445            stream::select_all(streams).stateful_map(key_fn, |key_fn, result| {
446                let result = result.map(|(index, item)| {
447                    let key = key_fn(&item);
448                    (index, key, item)
449                });
450
451                Some((key_fn, result))
452            });
453
454        rt::spawn(async move {
455            let mut heap: BinaryHeap<Reverse<KV<K, T>>> = BinaryHeap::new();
456            let mut min_items: Vec<Option<K>> = vec![None; num_streams];
457            let mut threshold: Option<K>;
458
459            'worker: loop {
460                'input: while let Some(result) = input_stream.next().await {
461                    let (index, key, item) = match result {
462                        Ok(tuple) => tuple,
463                        Err(err) => {
464                            let _ = output_tx.send_async(Err(err)).await;
465                            break 'worker;
466                        }
467                    };
468
469                    // update min item for that stream
470                    {
471                        let prev = &mut min_items[index];
472                        match prev {
473                            Some(prev) if *prev <= key => {
474                                *prev = key.clone();
475                            }
476                            Some(_) => {
477                                let ok = output_tx.send(Ok(Err((index, item)))).is_ok();
478                                if !ok {
479                                    break 'worker;
480                                }
481                                continue 'input;
482                            }
483                            None => *prev = Some(key.clone()),
484                        }
485                    }
486
487                    // save item
488                    heap.push(Reverse(KV {
489                        index,
490                        key,
491                        value: item,
492                    }));
493
494                    // update global threshold
495                    threshold = min_items.iter().min().unwrap().clone();
496
497                    // pop items below threshold
498                    if let Some(threshold) = &threshold {
499                        'output: while let Some(Reverse(KV { key, .. })) = heap.peek() {
500                            if key < threshold {
501                                let KV { value, index, .. } = heap.pop().unwrap().0;
502                                let ok = output_tx.send(Ok(Ok((index, value)))).is_ok();
503                                if !ok {
504                                    break 'worker;
505                                }
506                            } else {
507                                break 'output;
508                            }
509                        }
510                    }
511                }
512
513                // send remaining items
514                for Reverse(KV { index, value, .. }) in heap {
515                    let ok = output_tx.send(Ok(Ok((index, value)))).is_ok();
516                    if !ok {
517                        break 'worker;
518                    }
519                }
520
521                break;
522            }
523        });
524
525        output_rx.into_stream().boxed()
526    }
527}
528
529// try_par_unfold
530
531pub use try_par_unfold::*;
532mod try_par_unfold {
533    use super::*;
534
535    /// Produce stream elements from a fallible parallel asynchronous task.
536    ///
537    /// This function spawns a set of parallel workers. Each worker produces and places
538    /// items to an output buffer. The worker pool size and buffer size is determined by
539    /// `params`.
540    ///
541    /// Each worker receives a copy of initialized state `init`, then iteratively calls the function
542    /// `f(worker_index, State) -> Fut`. The `Fut` is a future that returns `Result<Option<(output, State)>, Error>`.
543    /// The future updates the state and produces an output item.
544    ///
545    /// If a worker receives an error `Err(_)`, the error is produced in output stream and the stream halts for ever.
546    /// If a worker receives a `Ok(None)`, the worker with that worker index will halt, but it does not halt
547    /// the other workers. The output stream terminates after every worker halts.
548    pub fn try_par_unfold<Item, Error, State, P, F, Fut>(
549        params: P,
550        init: State,
551        f: F,
552    ) -> TryParUnfold<Item, Error>
553    where
554        P: Into<ParParams>,
555        F: 'static + FnMut(usize, State) -> Fut + Send + Clone,
556        Fut: 'static + Future<Output = Result<Option<(Item, State)>, Error>> + Send,
557        State: 'static + Send + Clone,
558        Item: 'static + Send,
559        Error: 'static + Send,
560    {
561        let ParParams {
562            num_workers,
563            buf_size,
564        } = params.into();
565        let (output_tx, output_rx) = utils::channel(buf_size);
566        let (terminate_tx, _) = broadcast::channel::<()>(1);
567
568        (0..num_workers).for_each(move |worker_index| {
569            let f = f.clone();
570            let state = init.clone();
571            let output_tx = output_tx.clone();
572            let mut terminate_rx = terminate_tx.subscribe();
573            let terminate_tx = terminate_tx.clone();
574
575            rt::spawn(async move {
576                let _ = stream::repeat(())
577                    .take_until(async move {
578                        let _ = terminate_rx.recv().await;
579                    })
580                    .map(Ok)
581                    .try_stateful_then(
582                        (f, terminate_tx, state),
583                        |(mut f, terminate_tx, state), ()| async move {
584                            let result = f(worker_index, state).await;
585
586                            if result.is_err() {
587                                let _ = terminate_tx.send(());
588                            }
589
590                            result.map(|option| {
591                                option.map(|(item, state)| ((f, terminate_tx, state), item))
592                            })
593                        },
594                    )
595                    .map(Ok)
596                    .forward(output_tx.into_sink())
597                    .await;
598            });
599        });
600
601        output_rx.into_stream().take_until_error()
602    }
603}
604
605// try_par_unfold_blocking
606
607pub use try_par_unfold_blocking::*;
608mod try_par_unfold_blocking {
609    use super::*;
610
611    /// Produce stream elements from a fallible parallel asynchronous task.
612    ///
613    /// This function spawns a set of parallel workers. Each worker produces and places
614    /// items to an output buffer. The worker pool size and buffer size is determined by
615    /// `params`.
616    ///
617    /// Each worker receives a copy of initialized state `init`, then iteratively calls the function
618    /// `f(worker_index, State) -> Result<Option<(output, State)>, Error>`, which updates the state
619    /// and produces an output item.
620    ///
621    /// If a worker receives an error `Err(_)`, the error is produced in output stream and the stream halts for ever.
622    /// If a worker receives a `Ok(None)`, the worker with that worker index will halt, but it does not halt
623    /// the other workers. The output stream terminates after every worker halts.
624    pub fn try_par_unfold_blocking<Item, Error, State, P, F>(
625        params: P,
626        init: State,
627        f: F,
628    ) -> TryParUnfoldBlocking<Item, Error>
629    where
630        F: 'static + FnMut(usize, State) -> Result<Option<(Item, State)>, Error> + Send + Clone,
631        Item: 'static + Send,
632        Error: 'static + Send,
633        State: 'static + Send + Clone,
634        P: Into<ParParams>,
635    {
636        let ParParams {
637            num_workers,
638            buf_size,
639        } = params.into();
640        let (output_tx, output_rx) = utils::channel(buf_size);
641        let terminate = Arc::new(AtomicBool::new(false));
642
643        (0..num_workers).for_each(|worker_index| {
644            let mut f = f.clone();
645            let mut state = init.clone();
646            let output_tx = output_tx.clone();
647            let terminate = terminate.clone();
648
649            rt::spawn_blocking(move || loop {
650                if terminate.load(Acquire) {
651                    break;
652                }
653
654                match f(worker_index, state) {
655                    Ok(Some((item, new_state))) => {
656                        let result = output_tx.send(Ok(item));
657                        if result.is_err() {
658                            break;
659                        }
660                        state = new_state;
661                    }
662                    Ok(None) => {
663                        break;
664                    }
665                    Err(err) => {
666                        let _ = output_tx.send(Err(err));
667                        terminate.store(true, Release);
668                        break;
669                    }
670                }
671            });
672        });
673
674        output_rx.into_stream().take_until_error()
675    }
676}
677
678#[cfg(test)]
679mod tests {
680    use super::*;
681    use crate::utils::async_test;
682    use rand::prelude::*;
683
684    async_test! {
685
686
687        async fn sync_test() {
688            {
689                let stream1 = stream::iter([1, 3, 5, 7]);
690                let stream2 = stream::iter([2, 4, 6, 8]);
691
692                let collected: Vec<_> = super::sync_by_key(None, |&val| val, [stream1, stream2])
693                    .collect()
694                    .await;
695
696                assert_eq!(
697                    collected,
698                    [
699                        Ok((0, 1)),
700                        Ok((1, 2)),
701                        Ok((0, 3)),
702                        Ok((1, 4)),
703                        Ok((0, 5)),
704                        Ok((1, 6)),
705                        Ok((0, 7)),
706                        Ok((1, 8)),
707                    ]
708                );
709            }
710
711            {
712                let stream1 = stream::iter([1, 2, 3]);
713                let stream2 = stream::iter([2, 1, 3]);
714
715                let (synced, leaked): (Vec<_>, Vec<_>) =
716                    super::sync_by_key(None, |&val| val, [stream1, stream2])
717                        .map(|result| match result {
718                            Ok(item) => (Some(item), None),
719                            Err(item) => (None, Some(item)),
720                        })
721                        .unzip()
722                        .await;
723                let synced: Vec<_> = synced.into_iter().flatten().collect();
724                let leaked: Vec<_> = leaked.into_iter().flatten().collect();
725
726                assert_eq!(synced, [(0, 1), (0, 2), (1, 2), (0, 3), (1, 3)]);
727                assert_eq!(leaked, [(1, 1)]);
728            }
729        }
730
731
732        async fn par_unfold_test() {
733            let max_quota = 100;
734
735            let count = super::par_unfold(
736                4,
737                Arc::new(AtomicUsize::new(0)),
738                move |_, quota| async move {
739                    let enough = quota.fetch_add(1, AcqRel) < max_quota;
740
741                    enough.then(|| {
742                        let mut rng = rand::thread_rng();
743                        let val = rng.gen_range(0..10);
744                        (val, quota)
745                    })
746                },
747            )
748            .count()
749            .await;
750
751            assert_eq!(count, max_quota);
752        }
753
754
755        async fn par_unfold_blocking_test() {
756            let max_quota = 100;
757
758            let count =
759                super::par_unfold_blocking(4, Arc::new(AtomicUsize::new(0)), move |_, quota| {
760                    let enough = quota.fetch_add(1, AcqRel) < max_quota;
761
762                    enough.then(|| {
763                        let mut rng = rand::thread_rng();
764                        let val = rng.gen_range(0..10);
765                        (val, quota)
766                    })
767                })
768                .count()
769                .await;
770
771            assert_eq!(count, max_quota);
772        }
773
774
775        async fn try_sync_test() {
776            {
777                let stream1 = stream::iter(vec![Ok(3), Ok(1), Ok(5), Ok(7)]);
778                let stream2 = stream::iter(vec![Ok(2), Ok(4), Ok(6), Err("error")]);
779
780                let mut stream = super::try_sync_by_key(None, |&val| val, [stream1, stream2]);
781
782                let mut prev = None;
783                while let Some(result) = stream.next().await {
784                    match result {
785                        Ok(Ok((index, value))) => {
786                            if value & 1 == 1 {
787                                assert_eq!(index, 0);
788                            } else {
789                                assert_eq!(index, 1);
790                            }
791
792                            if let Some(prev) = prev {
793                                assert!(prev < value);
794                            }
795                            prev = Some(value);
796                        }
797                        Ok(Err((index, value))) => {
798                            assert_eq!(index, 0);
799                            assert_eq!(value, 1);
800                        }
801                        Err(err) => {
802                            assert_eq!(err, "error");
803                            break;
804                        }
805                    }
806                }
807
808                assert_eq!(stream.next().await, None);
809            }
810        }
811
812
813        async fn try_par_unfold_test() {
814            let max_quota = 100;
815
816            let mut stream = super::try_par_unfold(
817                None,
818                Arc::new(AtomicUsize::new(0)),
819                move |index, quota| async move {
820                    let enough = quota.fetch_add(1, AcqRel) < max_quota;
821
822                    if enough {
823                        Ok(Some((index, quota)))
824                    } else {
825                        Err("out of quota")
826                    }
827                },
828            );
829
830            let mut counts = HashMap::new();
831
832            loop {
833                let result = stream.next().await;
834
835                match result {
836                    Some(Ok(index)) => {
837                        *counts.entry(index).or_insert_with(|| 0) += 1;
838                    }
839                    Some(Err("out of quota")) => {
840                        break;
841                    }
842                    Some(Err(_)) | None => {
843                        unreachable!();
844                    }
845                }
846            }
847
848            assert!(stream.next().await.is_none());
849            assert!(counts.values().all(|&count| count <= max_quota));
850            assert!(counts.values().cloned().sum::<usize>() <= max_quota);
851        }
852
853
854        async fn try_par_unfold_blocking_test() {
855            let max_quota = 100;
856
857            let mut stream = super::try_par_unfold_blocking(
858                None,
859                Arc::new(AtomicUsize::new(0)),
860                move |index, quota| {
861                    let enough = quota.fetch_add(1, AcqRel) < max_quota;
862
863                    if enough {
864                        Ok(Some((index, quota)))
865                    } else {
866                        Err("out of quota")
867                    }
868                },
869            );
870
871            let mut counts = HashMap::new();
872
873            loop {
874                let result = stream.next().await;
875
876                match result {
877                    Some(Ok(index)) => {
878                        *counts.entry(index).or_insert_with(|| 0) += 1;
879                    }
880                    Some(Err("out of quota")) => {
881                        break;
882                    }
883                    Some(Err(_)) | None => {
884                        unreachable!();
885                    }
886                }
887            }
888
889            assert!(stream.next().await.is_none());
890            assert!(counts.values().all(|&count| count <= max_quota));
891            assert!(counts.values().cloned().sum::<usize>() <= max_quota);
892        }
893
894
895        async fn iter_blocking_test() {
896            let iter = (0..2).map(|val| {
897                std::thread::sleep(Duration::from_millis(100));
898                val
899            });
900
901            let vec: Vec<_> = stream::select(
902                super::iter_blocking(None, iter),
903                future::ready(2).into_stream(),
904            )
905            .collect()
906            .await;
907
908            // assuming iter_blocking() will not block the executor,
909            // 2 must go before 0, 1
910            assert_eq!(vec, [2, 0, 1]);
911        }
912    }
913}