par_stream/
par_stream.rs

1use crate::{
2    broadcast::BroadcastBuilder,
3    builder::ParBuilder,
4    common::*,
5    config::{BufSize, ParParams},
6    index_stream::{IndexStreamExt as _, ReorderEnumerated},
7    pull::PullBuilder,
8    rt,
9    stream::StreamExt as _,
10    tee::Tee,
11    utils,
12};
13use flume::r#async::RecvStream;
14
15/// Stream for the [par_then()](ParStreamExt::par_then) method.
16pub type ParThen<T> = ReorderEnumerated<RecvStream<'static, (usize, T)>, T>;
17
18/// Stream for the [par_map()](ParStreamExt::par_map) method.
19pub type ParMap<T> = ReorderEnumerated<RecvStream<'static, (usize, T)>, T>;
20
21/// The trait extends [Stream](futures::stream::Stream) types with parallel processing combinators.
22pub trait ParStreamExt
23where
24    Self: 'static + Send + Stream,
25    Self::Item: 'static + Send,
26{
27    /// Moves the stream to a spawned worker and forwards stream items to a channel with `buf_size`.
28    ///
29    /// It returns a receiver [stream](RecvStream) that buffers the items. The receiver stream is
30    /// cloneable so that items are sent in anycast manner.
31    ///
32    /// This combinator is similar to [shared()](crate::stream::StreamExt::shared).
33    /// The difference is that `spawned()` spawns a worker that actively forwards stream
34    /// items to the channel, and the receivers shares the channel. The `shared()` combinator
35    /// directly poll the underlying stream whenever a receiver polls in lock-free manner.
36    /// The choice of these combinator depends on the performance considerations.
37    ///
38    /// ```rust
39    /// # par_stream::rt::block_on_executor(async move {
40    /// use futures::prelude::*;
41    /// use par_stream::prelude::*;
42    ///
43    /// // Creates two sharing handles to the stream
44    /// let stream = stream::iter(0..100);
45    /// let recv1 = stream.spawned(None); // spawn with default buffer size
46    /// let recv2 = recv1.clone(); // creates the second receiver
47    ///
48    /// // Consumes the shared streams individually
49    /// let collect1 = par_stream::rt::spawn(recv1.collect());
50    /// let collect2 = par_stream::rt::spawn(recv2.collect());
51    /// let (vec1, vec2): (Vec<_>, Vec<_>) = futures::join!(collect1, collect2);
52    ///
53    /// // Checks that the combined values of two vecs are equal to original values
54    /// let mut all_vec: Vec<_> = vec1.into_iter().chain(vec2).collect();
55    /// all_vec.sort();
56    /// itertools::assert_equal(all_vec, 0..100);
57    /// # })
58    /// ```
59    fn spawned<B>(self, buf_size: B) -> RecvStream<'static, Self::Item>
60    where
61        B: Into<BufSize>;
62
63    /// Maps this stream’s items to a different type on an blocking thread.
64    ///
65    /// The combinator iteratively maps the stream items and places the output
66    /// items to a channel with `buf_size`. The function `f` is executed on a
67    /// separate blocking thread to prevent from blocking the asynchronous runtime.
68    ///
69    /// ```rust
70    /// # par_stream::rt::block_on_executor(async move {
71    /// use futures::{prelude::*, stream};
72    /// use par_stream::prelude::*;
73    ///
74    /// let vec: Vec<_> = stream::iter(0..100)
75    ///     .map_blocking(None, |_| {
76    ///         // runs a CPU-bounded work here
77    ///         (0..1000).sum::<u64>()
78    ///     })
79    ///     .collect()
80    ///     .await;
81    /// # })
82    /// ```
83    fn map_blocking<B, T, F>(self, buf_size: B, f: F) -> RecvStream<'static, T>
84    where
85        B: Into<BufSize>,
86        T: Send,
87        F: 'static + Send + FnMut(Self::Item) -> T;
88
89    /// Creates a builder that routes each input item according to `key_fn` to a destination receiver.
90    ///
91    /// Call [`builder.register("key")`](PullBuilder::register) to obtain the receiving stream for that key.
92    /// The builder must be finished by [`builder.build()`](PullBuilder::build) so that receivers start
93    /// consuming items. [`builder.build()`](PullBuilder::build) also returns a special leaking receiver
94    /// for items which key is not registered or target receiver is closed. Dropping the builder without
95    /// [`builder.build()`](PullBuilder::build) will cause receivers to get empty input.
96    fn pull_routing<B, K, Q, F>(self, buf_size: B, key_fn: F) -> PullBuilder<Self, K, F, Q>
97    where
98        Self: 'static + Send + Stream,
99        Self::Item: 'static + Send,
100        F: 'static + Send + FnMut(&Self::Item) -> Q,
101        K: 'static + Send + Hash + Eq + Borrow<Q>,
102        Q: Send + Hash + Eq,
103        B: Into<BufSize>;
104
105    /// Creates a builder that setups parallel tasks.
106    fn par_builder(self) -> ParBuilder<Self>;
107
108    /// The combinator maintains a collection of concurrent workers, each consuming as many elements as it likes,
109    /// for each output element.
110    ///
111    /// ```rust
112    /// # par_stream::rt::block_on_executor(async move {
113    /// use futures::prelude::*;
114    /// use par_stream::prelude::*;
115    ///
116    /// let data = vec![1, 2, -3, 4, 5, -6, 7, 8];
117    /// stream::iter(data).par_batching(None, |_worker_index, rx| async move {
118    ///     while let Ok(value) = rx.recv_async().await {
119    ///         if value > 0 {
120    ///             return Some((value, rx));
121    ///         }
122    ///     }
123    ///     None
124    /// });
125    /// # })
126    /// ```
127    fn par_batching<T, P, F, Fut>(self, params: P, f: F) -> RecvStream<'static, T>
128    where
129        Self: Sized,
130        F: 'static + Send + Clone + FnMut(usize, flume::Receiver<Self::Item>) -> Fut,
131        Fut: 'static + Future<Output = Option<(T, flume::Receiver<Self::Item>)>> + Send,
132        T: 'static + Send,
133        P: Into<ParParams>;
134
135    /// Converts the stream to cloneable receivers, each receiving a copy for each input item.
136    ///
137    /// It spawns a task to consume the stream, and forwards item copies to receivers.
138    /// The `buf_size` sets the interal channel size. Dropping a receiver does not cause another
139    /// receiver to stop.
140    ///
141    /// Receivers are not guaranteed to get the same initial item due to the time difference
142    /// among receiver creation time. Use [broadcast()](crate::par_stream::ParStreamExt::broadcast)
143    /// instead if you need this guarantee.
144    ///
145    /// ```rust
146    /// # par_stream::rt::block_on_executor(async move {
147    /// use futures::{join, prelude::*};
148    /// use par_stream::prelude::*;
149    ///
150    /// let orig: Vec<_> = (0..1000).collect();
151    ///
152    /// let rx1 = stream::iter(orig.clone()).tee(1);
153    /// let rx2 = rx1.clone();
154    /// let rx3 = rx1.clone();
155    ///
156    /// let fut1 = rx1.map(|val| val).collect();
157    /// let fut2 = rx2.map(|val| val * 2).collect();
158    /// let fut3 = rx3.map(|val| val * 3).collect();
159    ///
160    /// let (vec1, vec2, vec3): (Vec<_>, Vec<_>, Vec<_>) = join!(fut1, fut2, fut3);
161    /// # })
162    /// ```
163    fn tee<B>(self, buf_size: B) -> Tee<Self::Item>
164    where
165        Self::Item: Clone,
166        B: Into<BufSize>;
167
168    /// Creates a [builder](BroadcastBuilder) to register broadcast receivers.
169    ///
170    /// Call [builder.register()](BroadcastBuilder::register) to create a receiver.
171    /// Once the registration is done. [builder.build()](BroadcastBuilder::build) must
172    /// be called so that receivers start comsuming item copies. If the builder is droppped
173    /// without build, receivers get empty input.
174    ///
175    /// Each receiver maintains an internal buffer of `buf_size`. The `send_all` configures
176    /// the behavior if any one of receiver closes. If `send_all` is true, closing of one receiver
177    /// casues the other receivers to stop, otherwise it does not.
178    ///
179    /// ```rust
180    /// # par_stream::rt::block_on_executor(async move {
181    /// use futures::{join, prelude::*};
182    /// use par_stream::prelude::*;
183    ///
184    /// let mut builder = stream::iter(0..).broadcast(2, true);
185    /// let rx1 = builder.register();
186    /// let rx2 = builder.register();
187    /// builder.build();
188    ///
189    /// let (ret1, ret2): (Vec<_>, Vec<_>) = join!(rx1.take(100).collect(), rx2.take(100).collect());
190    /// let expect: Vec<_> = (0..100).collect();
191    ///
192    /// assert_eq!(ret1, expect);
193    /// assert_eq!(ret2, expect);
194    /// # })
195    /// ```
196    fn broadcast<B>(self, buf_size: B, send_all: bool) -> BroadcastBuilder<Self::Item>
197    where
198        Self::Item: Clone,
199        B: Into<BufSize>;
200
201    /// Runs an asynchronous task on parallel workers and produces items respecting the input order.
202    ///
203    /// The `params` sets the worker pool size and output buffer size.
204    /// Each parallel worker shares the stream and executes a future for each input item.
205    /// Output items are gathered to a channel and are reordered respecting to input order.
206    ///
207    /// ```rust
208    /// # par_stream::rt::block_on_executor(async move {
209    /// use futures::prelude::*;
210    /// use par_stream::prelude::*;
211    ///
212    /// let doubled: Vec<_> = stream::iter(0..1000)
213    ///     // doubles the values in parallel
214    ///     .par_then(None, move |value| async move { value * 2 })
215    ///     // the collected values will be ordered
216    ///     .collect()
217    ///     .await;
218    /// let expect: Vec<_> = (0..1000).map(|value| value * 2).collect();
219    /// assert_eq!(doubled, expect);
220    /// # })
221    /// ```
222    fn par_then<T, P, F, Fut>(self, params: P, f: F) -> ParThen<T>
223    where
224        T: 'static + Send,
225        F: 'static + FnMut(Self::Item) -> Fut + Send,
226        Fut: 'static + Future<Output = T> + Send,
227        P: Into<ParParams>;
228
229    /// Runs an asynchronous task on parallel workers and produces items without respecting input order.
230    ///
231    /// The `params` sets the worker pool size and output buffer size.
232    /// Each parallel worker shares the stream and executes a future for each input item.
233    /// The worker forwards the output to a channel as soon as it finishes.
234    ///
235    /// ```rust
236    /// # par_stream::rt::block_on_executor(async move {
237    /// use futures::prelude::*;
238    /// use par_stream::prelude::*;
239    /// use std::collections::HashSet;
240    ///
241    /// let doubled: HashSet<_> = stream::iter(0..1000)
242    ///     // doubles the values in parallel
243    ///     .par_then_unordered(None, move |value| {
244    ///         // the future is sent to a parallel worker
245    ///         async move { value * 2 }
246    ///     })
247    ///     // the collected values may NOT be ordered
248    ///     .collect()
249    ///     .await;
250    /// let expect: HashSet<_> = (0..1000).map(|value| value * 2).collect();
251    /// assert_eq!(doubled, expect);
252    /// # })
253    /// ```
254    fn par_then_unordered<T, P, F, Fut>(self, params: P, f: F) -> RecvStream<'static, T>
255    where
256        T: 'static + Send,
257        F: 'static + FnMut(Self::Item) -> Fut + Send,
258        Fut: 'static + Future<Output = T> + Send,
259        P: Into<ParParams>;
260
261    /// Runs a blocking task on parallel workers and produces items respecting the input order.
262    ///
263    /// The `params` sets the worker pool size and output buffer size.
264    /// Each parallel worker shares the stream and executes a future for each input item.
265    /// Output items are gathered to a channel and are reordered respecting to input order.
266    ///
267    /// ```rust
268    /// # par_stream::rt::block_on_executor(async move {
269    /// use futures::prelude::*;
270    /// use par_stream::prelude::*;
271    ///
272    /// // the variable will be shared by parallel workers
273    /// let doubled: Vec<_> = stream::iter(0..1000)
274    ///     // doubles the values in parallel
275    ///     .par_map(None, move |value| {
276    ///         // the closure is sent to parallel worker
277    ///         move || value * 2
278    ///     })
279    ///     // the collected values may NOT be ordered
280    ///     .collect()
281    ///     .await;
282    /// let expect: Vec<_> = (0..1000).map(|value| value * 2).collect();
283    /// assert_eq!(doubled, expect);
284    /// # })
285    /// ```
286    fn par_map<T, P, F, Func>(self, params: P, f: F) -> ParMap<T>
287    where
288        T: 'static + Send,
289        F: 'static + FnMut(Self::Item) -> Func + Send,
290        Func: 'static + FnOnce() -> T + Send,
291        P: Into<ParParams>;
292
293    /// Runs a blocking task on parallel workers and produces items without respecting input order.
294    ///
295    /// The `params` sets the worker pool size and output buffer size.
296    /// Each parallel worker shares the stream and executes a future for each input item.
297    /// The worker forwards the output to a channel as soon as it finishes.
298    ///
299    /// ```rust
300    /// # par_stream::rt::block_on_executor(async move {
301    /// use futures::prelude::*;
302    /// use par_stream::prelude::*;
303    /// use std::collections::HashSet;
304    ///
305    /// // the variable will be shared by parallel workers
306    ///
307    /// let doubled: HashSet<_> = stream::iter(0..1000)
308    ///     // doubles the values in parallel
309    ///     .par_map_unordered(None, move |value| {
310    ///         // the closure is sent to parallel worker
311    ///         move || value * 2
312    ///     })
313    ///     // the collected values may NOT be ordered
314    ///     .collect()
315    ///     .await;
316    /// let expect: HashSet<_> = (0..1000).map(|value| value * 2).collect();
317    /// assert_eq!(doubled, expect);
318    /// # })
319    /// ```
320    fn par_map_unordered<T, P, F, Func>(self, params: P, f: F) -> RecvStream<'static, T>
321    where
322        T: 'static + Send,
323        F: 'static + FnMut(Self::Item) -> Func + Send,
324        Func: 'static + FnOnce() -> T + Send,
325        P: Into<ParParams>;
326
327    /// Reduces the input stream into a single value in parallel.
328    ///
329    /// It maintains a parallel worker pool of `num_workers`. Each worker reduces
330    /// the input items from the stream into a single value. Once all parallel worker
331    /// finish, the values from each worker are reduced into one in treefold manner.
332    ///
333    /// ```rust
334    /// # par_stream::rt::block_on_executor(async move {
335    /// use futures::prelude::*;
336    /// use par_stream::prelude::*;
337    ///
338    /// // the variable will be shared by parallel workers
339    /// let sum = stream::iter(1..=1000)
340    ///     // sum up the values in parallel
341    ///     .par_reduce(None, move |lhs, rhs| {
342    ///         // the closure is sent to parallel worker
343    ///         async move { lhs + rhs }
344    ///     })
345    ///     .await;
346    /// assert_eq!(sum, Some((1 + 1000) * 1000 / 2));
347    /// # })
348    /// ```
349    fn par_reduce<P, F, Fut>(
350        self,
351        params: P,
352        reduce_fn: F,
353    ) -> BoxFuture<'static, Option<Self::Item>>
354    where
355        P: Into<ParParams>,
356        F: 'static + FnMut(Self::Item, Self::Item) -> Fut + Send + Clone,
357        Fut: 'static + Future<Output = Self::Item> + Send;
358
359    /// Runs an asynchronous task on parallel workers.
360    fn par_for_each<P, F, Fut>(self, params: P, f: F) -> BoxFuture<'static, ()>
361    where
362        F: 'static + FnMut(Self::Item) -> Fut + Send,
363        Fut: 'static + Future<Output = ()> + Send,
364        P: Into<ParParams>;
365
366    /// Runs a blocking task on parallel workers.
367    fn par_for_each_blocking<P, F, Func>(self, params: P, f: F) -> BoxFuture<'static, ()>
368    where
369        F: 'static + FnMut(Self::Item) -> Func + Send,
370        Func: 'static + FnOnce() + Send,
371        P: Into<ParParams>;
372}
373
374impl<S> ParStreamExt for S
375where
376    S: 'static + Send + Stream,
377    S::Item: 'static + Send,
378{
379    fn spawned<B>(self, buf_size: B) -> RecvStream<'static, Self::Item>
380    where
381        B: Into<BufSize>,
382    {
383        let (tx, rx) = utils::channel(buf_size.into().get());
384
385        rt::spawn(async move {
386            let _ = self.map(Ok).forward(tx.into_sink()).await;
387        });
388
389        rx.into_stream()
390    }
391
392    fn map_blocking<B, T, F>(self, buf_size: B, mut f: F) -> RecvStream<'static, T>
393    where
394        B: Into<BufSize>,
395        T: Send,
396        F: 'static + Send + FnMut(Self::Item) -> T,
397    {
398        let buf_size = buf_size.into().get();
399        let mut stream = self.boxed();
400        let (output_tx, output_rx) = utils::channel(buf_size);
401
402        rt::spawn_blocking(move || {
403            while let Some(input) = rt::block_on(stream.next()) {
404                let output = f(input);
405                if output_tx.send(output).is_err() {
406                    break;
407                }
408            }
409        });
410
411        output_rx.into_stream()
412    }
413
414    fn par_builder(self) -> ParBuilder<Self> {
415        ParBuilder::new(self)
416    }
417
418    fn par_batching<T, P, F, Fut>(self, params: P, f: F) -> RecvStream<'static, T>
419    where
420        F: 'static + Send + Clone + FnMut(usize, flume::Receiver<Self::Item>) -> Fut,
421        Fut: 'static + Future<Output = Option<(T, flume::Receiver<Self::Item>)>> + Send,
422        T: 'static + Send,
423        P: Into<ParParams>,
424    {
425        let ParParams {
426            num_workers,
427            buf_size,
428        } = params.into();
429
430        let (input_tx, input_rx) = utils::channel(buf_size);
431        let (output_tx, output_rx) = utils::channel(buf_size);
432
433        rt::spawn(async move {
434            let _ = self.map(Ok).forward(input_tx.into_sink()).await;
435        });
436
437        (0..num_workers).for_each(move |worker_index| {
438            let output_tx = output_tx.clone();
439            let f = f.clone();
440            let input_rx = input_rx.clone();
441
442            rt::spawn(async move {
443                let _ = stream::repeat(())
444                    .stateful_then((input_rx, f), |(input_rx, mut f), ()| async move {
445                        f(worker_index, input_rx)
446                            .await
447                            .map(move |(item, input_rx)| ((input_rx, f), item))
448                    })
449                    .map(Ok)
450                    .forward(output_tx.into_sink())
451                    .await;
452            });
453        });
454
455        output_rx.into_stream()
456    }
457
458    fn pull_routing<B, K, Q, F>(self, buf_size: B, key_fn: F) -> PullBuilder<Self, K, F, Q>
459    where
460        Self: 'static + Send + Stream,
461        Self::Item: 'static + Send,
462        F: 'static + Send + FnMut(&Self::Item) -> Q,
463        K: 'static + Send + Hash + Eq + Borrow<Q>,
464        Q: Send + Hash + Eq,
465        B: Into<BufSize>,
466    {
467        PullBuilder::new(self, buf_size, key_fn)
468    }
469
470    fn tee<B>(self, buf_size: B) -> Tee<Self::Item>
471    where
472        Self::Item: Clone,
473        B: Into<BufSize>,
474    {
475        Tee::new(self, buf_size)
476    }
477
478    fn broadcast<B>(self, buf_size: B, send_all: bool) -> BroadcastBuilder<Self::Item>
479    where
480        Self::Item: Clone,
481        B: Into<BufSize>,
482    {
483        BroadcastBuilder::new(self, buf_size, send_all)
484    }
485
486    fn par_then<T, P, F, Fut>(self, params: P, mut f: F) -> ParThen<T>
487    where
488        T: 'static + Send,
489        F: 'static + FnMut(Self::Item) -> Fut + Send,
490        Fut: 'static + Future<Output = T> + Send,
491        P: Into<ParParams>,
492    {
493        let indexed_f = move |(index, item)| {
494            let fut = f(item);
495            fut.map(move |output| (index, output))
496        };
497
498        self.enumerate()
499            .par_then_unordered(params, indexed_f)
500            .reorder_enumerated()
501    }
502
503    fn par_then_unordered<T, P, F, Fut>(self, params: P, f: F) -> RecvStream<'static, T>
504    where
505        T: 'static + Send,
506        F: 'static + FnMut(Self::Item) -> Fut + Send,
507        Fut: 'static + Future<Output = T> + Send,
508        P: Into<ParParams>,
509    {
510        let ParParams {
511            num_workers,
512            buf_size,
513        } = params.into();
514        let (output_tx, output_rx) = utils::channel(buf_size);
515        let stream = self
516            .stateful_map(f, |mut f, item| {
517                let fut = f(item);
518                Some((f, fut))
519            })
520            .spawned(buf_size);
521
522        (0..num_workers).for_each(move |_| {
523            let stream = stream.clone();
524            let output_tx = output_tx.clone();
525
526            rt::spawn(async move {
527                let _ = stream
528                    .then(|fut| fut)
529                    .map(Ok)
530                    .forward(output_tx.into_sink())
531                    .await;
532            });
533        });
534        output_rx.into_stream()
535    }
536
537    fn par_map<T, P, F, Func>(self, params: P, mut f: F) -> ParMap<T>
538    where
539        T: 'static + Send,
540        F: 'static + FnMut(Self::Item) -> Func + Send,
541        Func: 'static + FnOnce() -> T + Send,
542        P: Into<ParParams>,
543    {
544        self.enumerate()
545            .par_map_unordered(params, move |(index, item)| {
546                let job = f(item);
547                move || (index, job())
548            })
549            .reorder_enumerated()
550    }
551
552    fn par_map_unordered<T, P, F, Func>(self, params: P, f: F) -> RecvStream<'static, T>
553    where
554        T: 'static + Send,
555        F: 'static + FnMut(Self::Item) -> Func + Send,
556        Func: 'static + FnOnce() -> T + Send,
557        P: Into<ParParams>,
558    {
559        let ParParams {
560            num_workers,
561            buf_size,
562        } = params.into();
563        let stream = self
564            .stateful_map(f, |mut f, item| {
565                let func = f(item);
566                Some((f, func))
567            })
568            .spawned(buf_size);
569        let (output_tx, output_rx) = utils::channel(buf_size);
570
571        (0..num_workers).for_each(move |_| {
572            let mut stream = stream.clone();
573            let output_tx = output_tx.clone();
574
575            rt::spawn_blocking(move || {
576                while let Some(job) = rt::block_on(stream.next()) {
577                    let output = job();
578                    let result = output_tx.send(output);
579                    if result.is_err() {
580                        break;
581                    }
582                }
583            });
584        });
585
586        output_rx.into_stream()
587    }
588
589    fn par_reduce<P, F, Fut>(
590        self,
591        params: P,
592        reduce_fn: F,
593    ) -> BoxFuture<'static, Option<Self::Item>>
594    where
595        F: 'static + FnMut(Self::Item, Self::Item) -> Fut + Send + Clone,
596        Fut: 'static + Future<Output = Self::Item> + Send,
597        P: Into<ParParams>,
598    {
599        let ParParams {
600            num_workers,
601            buf_size,
602        } = params.into();
603        let stream = self.spawned(buf_size);
604
605        // phase 1
606        let phase_1_future = {
607            let reduce_fn = reduce_fn.clone();
608            async move {
609                let reducer_futures = (0..num_workers).map(move |_| {
610                    let reduce_fn = reduce_fn.clone();
611                    let stream = stream.clone();
612
613                    rt::spawn(async move { stream.reduce(reduce_fn).await })
614                });
615
616                future::join_all(reducer_futures).await
617            }
618        };
619
620        // phase 2
621        let phase_2_future = async move {
622            let values = phase_1_future.await;
623
624            let (pair_tx, pair_rx) = utils::channel(num_workers);
625            let (feedback_tx, feedback_rx) = flume::bounded(num_workers);
626
627            let mut count = 0;
628
629            for value in values.into_iter().flatten() {
630                feedback_tx.send_async(value).await.map_err(|_| ()).unwrap();
631                count += 1;
632            }
633
634            let pairing_future = rt::spawn(async move {
635                while count >= 2 {
636                    let first = feedback_rx.recv_async().await.unwrap();
637                    let second = feedback_rx.recv_async().await.unwrap();
638                    pair_tx.send_async((first, second)).await.unwrap();
639                    count -= 1;
640                }
641
642                match count {
643                    0 => None,
644                    1 => {
645                        let output = feedback_rx.recv_async().await.unwrap();
646                        Some(output)
647                    }
648                    _ => unreachable!(),
649                }
650            });
651
652            let worker_futures = (0..num_workers).map(move |_| {
653                let pair_rx = pair_rx.clone();
654                let feedback_tx = feedback_tx.clone();
655                let mut reduce_fn = reduce_fn.clone();
656
657                rt::spawn(async move {
658                    while let Ok((first, second)) = pair_rx.recv_async().await {
659                        let reduced = reduce_fn(first, second).await;
660                        feedback_tx
661                            .send_async(reduced)
662                            .await
663                            .map_err(|_| ())
664                            .unwrap();
665                    }
666                })
667            });
668
669            let (output, _) = join!(pairing_future, future::join_all(worker_futures));
670
671            output
672        };
673
674        phase_2_future.boxed()
675    }
676
677    fn par_for_each<P, F, Fut>(self, params: P, f: F) -> BoxFuture<'static, ()>
678    where
679        F: 'static + FnMut(Self::Item) -> Fut + Send,
680        Fut: 'static + Future<Output = ()> + Send,
681        P: Into<ParParams>,
682    {
683        let ParParams {
684            num_workers,
685            buf_size,
686        } = params.into();
687        let stream = self
688            .stateful_map(f, |mut f, item| {
689                let fut = f(item);
690                Some((f, fut))
691            })
692            .spawned(buf_size);
693
694        let worker_futures =
695            (0..num_workers).map(move |_| rt::spawn(stream.clone().for_each(|fut| fut)));
696
697        future::join_all(worker_futures).map(|_| ()).boxed()
698    }
699
700    fn par_for_each_blocking<P, F, Func>(self, params: P, f: F) -> BoxFuture<'static, ()>
701    where
702        F: 'static + FnMut(Self::Item) -> Func + Send,
703        Func: 'static + FnOnce() + Send,
704        P: Into<ParParams>,
705    {
706        let ParParams {
707            num_workers,
708            buf_size,
709        } = params.into();
710        let stream = self
711            .stateful_map(f, |mut f, item| {
712                let func = f(item);
713                Some((f, func))
714            })
715            .spawned(buf_size);
716
717        let worker_futs: Vec<_> = (0..num_workers)
718            .map(move |_| {
719                let mut stream = stream.clone();
720
721                rt::spawn_blocking(move || {
722                    while let Some(job) = rt::block_on(stream.next()) {
723                        job();
724                    }
725                })
726            })
727            .collect();
728
729        future::join_all(worker_futs).map(|_| ()).boxed()
730    }
731}
732
733// tests
734
735#[cfg(test)]
736mod tests {
737    use super::*;
738    use crate::utils::async_test;
739    use rand::prelude::*;
740    use std::time::Duration;
741
742    async_test! {
743        async fn par_batching_test() {
744            let mut rng = rand::thread_rng();
745            let data: Vec<u32> = (0..10000).map(|_| rng.gen_range(0..10)).collect();
746
747            let sums: Vec<_> = stream::iter(data)
748                .par_batching(None, |_, rx| async move {
749                    let mut sum = rx.recv_async().await.ok()?;
750
751                    while let Ok(val) = rx.recv_async().await {
752                        sum += val;
753
754                        if sum >= 1000 {
755                            return Some((sum, rx));
756                        }
757                    }
758
759                    None
760                })
761                .collect()
762                .await;
763
764            assert!(sums.iter().all(|&sum| sum >= 1000));
765        }
766
767
768        async fn par_then_output_is_ordered_test() {
769            let max = 1000u64;
770            stream::iter(0..max)
771                .par_then(None, |value| async move {
772                    rt::sleep(Duration::from_millis(value % 20)).await;
773                    value
774                })
775                .fold(0u64, |expect, found| async move {
776                    assert_eq!(expect, found);
777                    expect + 1
778                })
779                .await;
780        }
781
782
783        async fn par_then_unordered_test() {
784            let max = 1000u64;
785            let mut values: Vec<_> = stream::iter((0..max).into_iter())
786                .par_then_unordered(None, |value| async move {
787                    rt::sleep(Duration::from_millis(value % 20)).await;
788                    value
789                })
790                .collect()
791                .await;
792            values.sort();
793            values.into_iter().fold(0, |expect, found| {
794                assert_eq!(expect, found);
795                expect + 1
796            });
797        }
798
799
800        async fn par_reduce_test() {
801            {
802                let sum: Option<u64> = stream::iter(iter::empty())
803                    .par_reduce(None, |lhs, rhs| async move { lhs + rhs })
804                    .await;
805                assert!(sum.is_none());
806            }
807
808            {
809                let max = 100_000u64;
810                let sum = stream::iter((1..=max).into_iter())
811                    .par_reduce(None, |lhs, rhs| async move { lhs + rhs })
812                    .await;
813                assert_eq!(sum, Some((1 + max) * max / 2));
814            }
815        }
816
817
818        async fn reorder_index_haling_test() {
819            let indexes = vec![5, 2, 1, 0, 6, 4, 3];
820            let output: Vec<_> = stream::iter(indexes)
821                .then(|index| async move {
822                    rt::sleep(Duration::from_millis(20)).await;
823                    (index, index)
824                })
825                .reorder_enumerated()
826                .collect()
827                .await;
828            assert_eq!(&output, &[0, 1, 2, 3, 4, 5, 6]);
829        }
830
831
832        async fn enumerate_reorder_test() {
833            let max = 1000u64;
834            let iterator = (0..max).rev().step_by(2);
835
836            let lhs = stream::iter(iterator.clone())
837                .enumerate()
838                .par_then_unordered(None, |(index, value)| async move {
839                    rt::sleep(std::time::Duration::from_millis(value % 20)).await;
840                    (index, value)
841                })
842                .reorder_enumerated();
843            let rhs = stream::iter(iterator.clone());
844
845            let is_equal =
846                async_std::stream::StreamExt::all(&mut lhs.zip(rhs), |(lhs_value, rhs_value)| {
847                    lhs_value == rhs_value
848                })
849                .await;
850            assert!(is_equal);
851        }
852
853
854        async fn for_each_test() {
855            use std::sync::atomic::{self, AtomicUsize};
856
857            {
858                let sum = Arc::new(AtomicUsize::new(0));
859                stream::iter(1..=1000)
860                    .par_for_each(None, {
861                        let sum = sum.clone();
862                        move |value| {
863                            let sum = sum.clone();
864                            async move {
865                                sum.fetch_add(value, atomic::Ordering::SeqCst);
866                            }
867                        }
868                    })
869                    .await;
870                assert_eq!(sum.load(atomic::Ordering::SeqCst), (1 + 1000) * 1000 / 2);
871            }
872
873            {
874                let sum = Arc::new(AtomicUsize::new(0));
875                stream::iter(1..=1000)
876                    .par_for_each_blocking(None, {
877                        let sum = sum.clone();
878                        move |value| {
879                            let sum = sum.clone();
880                            move || {
881                                sum.fetch_add(value, atomic::Ordering::SeqCst);
882                            }
883                        }
884                    })
885                    .await;
886                assert_eq!(sum.load(atomic::Ordering::SeqCst), (1 + 1000) * 1000 / 2);
887            }
888        }
889
890
891        async fn tee_halt_test() {
892            let mut rx1 = stream::iter(0..).tee(1);
893            let mut rx2 = rx1.clone();
894
895            assert!(rx1.next().await.is_some());
896            assert!(rx2.next().await.is_some());
897
898            // drop rx1
899            drop(rx1);
900
901            // the following should not block
902            assert!(rx2.next().await.is_some());
903            assert!(rx2.next().await.is_some());
904            assert!(rx2.next().await.is_some());
905            assert!(rx2.next().await.is_some());
906            assert!(rx2.next().await.is_some());
907        }
908
909
910        async fn tee_test() {
911            let orig: Vec<_> = (0..100).collect();
912
913            let rx1 = stream::iter(orig.clone()).tee(1);
914            let rx2 = rx1.clone();
915            let rx3 = rx1.clone();
916
917            let fut1 = rx1
918                .then(|val| async move {
919                    let millis = rand::thread_rng().gen_range(0..5);
920                    rt::sleep(Duration::from_millis(millis)).await;
921                    val
922                })
923                .collect();
924            let fut2 = rx2
925                .then(|val| async move {
926                    let millis = rand::thread_rng().gen_range(0..5);
927                    rt::sleep(Duration::from_millis(millis)).await;
928                    val * 2
929                })
930                .collect();
931            let fut3 = rx3
932                .then(|val| async move {
933                    let millis = rand::thread_rng().gen_range(0..5);
934                    rt::sleep(Duration::from_millis(millis)).await;
935                    val * 3
936                })
937                .collect();
938
939            let (vec1, vec2, vec3): (Vec<_>, Vec<_>, Vec<_>) = join!(fut1, fut2, fut3);
940
941            // the collected method is possibly losing some of first few elements
942            let start1 = orig.len() - vec1.len();
943            let start2 = orig.len() - vec2.len();
944            let start3 = orig.len() - vec3.len();
945
946            assert!(orig[start1..]
947                .iter()
948                .zip(&vec1)
949                .all(|(&orig, &val)| orig == val));
950            assert!(orig[start2..]
951                .iter()
952                .zip(&vec2)
953                .all(|(&orig, &val)| orig * 2 == val));
954            assert!(orig[start3..]
955                .iter()
956                .zip(&vec3)
957                .all(|(&orig, &val)| orig * 3 == val));
958        }
959    }
960}