par_stream/par_stream.rs
1use crate::{
2 broadcast::BroadcastBuilder,
3 builder::ParBuilder,
4 common::*,
5 config::{BufSize, ParParams},
6 index_stream::{IndexStreamExt as _, ReorderEnumerated},
7 pull::PullBuilder,
8 rt,
9 stream::StreamExt as _,
10 tee::Tee,
11 utils,
12};
13use flume::r#async::RecvStream;
14
15/// Stream for the [par_then()](ParStreamExt::par_then) method.
16pub type ParThen<T> = ReorderEnumerated<RecvStream<'static, (usize, T)>, T>;
17
18/// Stream for the [par_map()](ParStreamExt::par_map) method.
19pub type ParMap<T> = ReorderEnumerated<RecvStream<'static, (usize, T)>, T>;
20
21/// The trait extends [Stream](futures::stream::Stream) types with parallel processing combinators.
22pub trait ParStreamExt
23where
24 Self: 'static + Send + Stream,
25 Self::Item: 'static + Send,
26{
27 /// Moves the stream to a spawned worker and forwards stream items to a channel with `buf_size`.
28 ///
29 /// It returns a receiver [stream](RecvStream) that buffers the items. The receiver stream is
30 /// cloneable so that items are sent in anycast manner.
31 ///
32 /// This combinator is similar to [shared()](crate::stream::StreamExt::shared).
33 /// The difference is that `spawned()` spawns a worker that actively forwards stream
34 /// items to the channel, and the receivers shares the channel. The `shared()` combinator
35 /// directly poll the underlying stream whenever a receiver polls in lock-free manner.
36 /// The choice of these combinator depends on the performance considerations.
37 ///
38 /// ```rust
39 /// # par_stream::rt::block_on_executor(async move {
40 /// use futures::prelude::*;
41 /// use par_stream::prelude::*;
42 ///
43 /// // Creates two sharing handles to the stream
44 /// let stream = stream::iter(0..100);
45 /// let recv1 = stream.spawned(None); // spawn with default buffer size
46 /// let recv2 = recv1.clone(); // creates the second receiver
47 ///
48 /// // Consumes the shared streams individually
49 /// let collect1 = par_stream::rt::spawn(recv1.collect());
50 /// let collect2 = par_stream::rt::spawn(recv2.collect());
51 /// let (vec1, vec2): (Vec<_>, Vec<_>) = futures::join!(collect1, collect2);
52 ///
53 /// // Checks that the combined values of two vecs are equal to original values
54 /// let mut all_vec: Vec<_> = vec1.into_iter().chain(vec2).collect();
55 /// all_vec.sort();
56 /// itertools::assert_equal(all_vec, 0..100);
57 /// # })
58 /// ```
59 fn spawned<B>(self, buf_size: B) -> RecvStream<'static, Self::Item>
60 where
61 B: Into<BufSize>;
62
63 /// Maps this stream’s items to a different type on an blocking thread.
64 ///
65 /// The combinator iteratively maps the stream items and places the output
66 /// items to a channel with `buf_size`. The function `f` is executed on a
67 /// separate blocking thread to prevent from blocking the asynchronous runtime.
68 ///
69 /// ```rust
70 /// # par_stream::rt::block_on_executor(async move {
71 /// use futures::{prelude::*, stream};
72 /// use par_stream::prelude::*;
73 ///
74 /// let vec: Vec<_> = stream::iter(0..100)
75 /// .map_blocking(None, |_| {
76 /// // runs a CPU-bounded work here
77 /// (0..1000).sum::<u64>()
78 /// })
79 /// .collect()
80 /// .await;
81 /// # })
82 /// ```
83 fn map_blocking<B, T, F>(self, buf_size: B, f: F) -> RecvStream<'static, T>
84 where
85 B: Into<BufSize>,
86 T: Send,
87 F: 'static + Send + FnMut(Self::Item) -> T;
88
89 /// Creates a builder that routes each input item according to `key_fn` to a destination receiver.
90 ///
91 /// Call [`builder.register("key")`](PullBuilder::register) to obtain the receiving stream for that key.
92 /// The builder must be finished by [`builder.build()`](PullBuilder::build) so that receivers start
93 /// consuming items. [`builder.build()`](PullBuilder::build) also returns a special leaking receiver
94 /// for items which key is not registered or target receiver is closed. Dropping the builder without
95 /// [`builder.build()`](PullBuilder::build) will cause receivers to get empty input.
96 fn pull_routing<B, K, Q, F>(self, buf_size: B, key_fn: F) -> PullBuilder<Self, K, F, Q>
97 where
98 Self: 'static + Send + Stream,
99 Self::Item: 'static + Send,
100 F: 'static + Send + FnMut(&Self::Item) -> Q,
101 K: 'static + Send + Hash + Eq + Borrow<Q>,
102 Q: Send + Hash + Eq,
103 B: Into<BufSize>;
104
105 /// Creates a builder that setups parallel tasks.
106 fn par_builder(self) -> ParBuilder<Self>;
107
108 /// The combinator maintains a collection of concurrent workers, each consuming as many elements as it likes,
109 /// for each output element.
110 ///
111 /// ```rust
112 /// # par_stream::rt::block_on_executor(async move {
113 /// use futures::prelude::*;
114 /// use par_stream::prelude::*;
115 ///
116 /// let data = vec![1, 2, -3, 4, 5, -6, 7, 8];
117 /// stream::iter(data).par_batching(None, |_worker_index, rx| async move {
118 /// while let Ok(value) = rx.recv_async().await {
119 /// if value > 0 {
120 /// return Some((value, rx));
121 /// }
122 /// }
123 /// None
124 /// });
125 /// # })
126 /// ```
127 fn par_batching<T, P, F, Fut>(self, params: P, f: F) -> RecvStream<'static, T>
128 where
129 Self: Sized,
130 F: 'static + Send + Clone + FnMut(usize, flume::Receiver<Self::Item>) -> Fut,
131 Fut: 'static + Future<Output = Option<(T, flume::Receiver<Self::Item>)>> + Send,
132 T: 'static + Send,
133 P: Into<ParParams>;
134
135 /// Converts the stream to cloneable receivers, each receiving a copy for each input item.
136 ///
137 /// It spawns a task to consume the stream, and forwards item copies to receivers.
138 /// The `buf_size` sets the interal channel size. Dropping a receiver does not cause another
139 /// receiver to stop.
140 ///
141 /// Receivers are not guaranteed to get the same initial item due to the time difference
142 /// among receiver creation time. Use [broadcast()](crate::par_stream::ParStreamExt::broadcast)
143 /// instead if you need this guarantee.
144 ///
145 /// ```rust
146 /// # par_stream::rt::block_on_executor(async move {
147 /// use futures::{join, prelude::*};
148 /// use par_stream::prelude::*;
149 ///
150 /// let orig: Vec<_> = (0..1000).collect();
151 ///
152 /// let rx1 = stream::iter(orig.clone()).tee(1);
153 /// let rx2 = rx1.clone();
154 /// let rx3 = rx1.clone();
155 ///
156 /// let fut1 = rx1.map(|val| val).collect();
157 /// let fut2 = rx2.map(|val| val * 2).collect();
158 /// let fut3 = rx3.map(|val| val * 3).collect();
159 ///
160 /// let (vec1, vec2, vec3): (Vec<_>, Vec<_>, Vec<_>) = join!(fut1, fut2, fut3);
161 /// # })
162 /// ```
163 fn tee<B>(self, buf_size: B) -> Tee<Self::Item>
164 where
165 Self::Item: Clone,
166 B: Into<BufSize>;
167
168 /// Creates a [builder](BroadcastBuilder) to register broadcast receivers.
169 ///
170 /// Call [builder.register()](BroadcastBuilder::register) to create a receiver.
171 /// Once the registration is done. [builder.build()](BroadcastBuilder::build) must
172 /// be called so that receivers start comsuming item copies. If the builder is droppped
173 /// without build, receivers get empty input.
174 ///
175 /// Each receiver maintains an internal buffer of `buf_size`. The `send_all` configures
176 /// the behavior if any one of receiver closes. If `send_all` is true, closing of one receiver
177 /// casues the other receivers to stop, otherwise it does not.
178 ///
179 /// ```rust
180 /// # par_stream::rt::block_on_executor(async move {
181 /// use futures::{join, prelude::*};
182 /// use par_stream::prelude::*;
183 ///
184 /// let mut builder = stream::iter(0..).broadcast(2, true);
185 /// let rx1 = builder.register();
186 /// let rx2 = builder.register();
187 /// builder.build();
188 ///
189 /// let (ret1, ret2): (Vec<_>, Vec<_>) = join!(rx1.take(100).collect(), rx2.take(100).collect());
190 /// let expect: Vec<_> = (0..100).collect();
191 ///
192 /// assert_eq!(ret1, expect);
193 /// assert_eq!(ret2, expect);
194 /// # })
195 /// ```
196 fn broadcast<B>(self, buf_size: B, send_all: bool) -> BroadcastBuilder<Self::Item>
197 where
198 Self::Item: Clone,
199 B: Into<BufSize>;
200
201 /// Runs an asynchronous task on parallel workers and produces items respecting the input order.
202 ///
203 /// The `params` sets the worker pool size and output buffer size.
204 /// Each parallel worker shares the stream and executes a future for each input item.
205 /// Output items are gathered to a channel and are reordered respecting to input order.
206 ///
207 /// ```rust
208 /// # par_stream::rt::block_on_executor(async move {
209 /// use futures::prelude::*;
210 /// use par_stream::prelude::*;
211 ///
212 /// let doubled: Vec<_> = stream::iter(0..1000)
213 /// // doubles the values in parallel
214 /// .par_then(None, move |value| async move { value * 2 })
215 /// // the collected values will be ordered
216 /// .collect()
217 /// .await;
218 /// let expect: Vec<_> = (0..1000).map(|value| value * 2).collect();
219 /// assert_eq!(doubled, expect);
220 /// # })
221 /// ```
222 fn par_then<T, P, F, Fut>(self, params: P, f: F) -> ParThen<T>
223 where
224 T: 'static + Send,
225 F: 'static + FnMut(Self::Item) -> Fut + Send,
226 Fut: 'static + Future<Output = T> + Send,
227 P: Into<ParParams>;
228
229 /// Runs an asynchronous task on parallel workers and produces items without respecting input order.
230 ///
231 /// The `params` sets the worker pool size and output buffer size.
232 /// Each parallel worker shares the stream and executes a future for each input item.
233 /// The worker forwards the output to a channel as soon as it finishes.
234 ///
235 /// ```rust
236 /// # par_stream::rt::block_on_executor(async move {
237 /// use futures::prelude::*;
238 /// use par_stream::prelude::*;
239 /// use std::collections::HashSet;
240 ///
241 /// let doubled: HashSet<_> = stream::iter(0..1000)
242 /// // doubles the values in parallel
243 /// .par_then_unordered(None, move |value| {
244 /// // the future is sent to a parallel worker
245 /// async move { value * 2 }
246 /// })
247 /// // the collected values may NOT be ordered
248 /// .collect()
249 /// .await;
250 /// let expect: HashSet<_> = (0..1000).map(|value| value * 2).collect();
251 /// assert_eq!(doubled, expect);
252 /// # })
253 /// ```
254 fn par_then_unordered<T, P, F, Fut>(self, params: P, f: F) -> RecvStream<'static, T>
255 where
256 T: 'static + Send,
257 F: 'static + FnMut(Self::Item) -> Fut + Send,
258 Fut: 'static + Future<Output = T> + Send,
259 P: Into<ParParams>;
260
261 /// Runs a blocking task on parallel workers and produces items respecting the input order.
262 ///
263 /// The `params` sets the worker pool size and output buffer size.
264 /// Each parallel worker shares the stream and executes a future for each input item.
265 /// Output items are gathered to a channel and are reordered respecting to input order.
266 ///
267 /// ```rust
268 /// # par_stream::rt::block_on_executor(async move {
269 /// use futures::prelude::*;
270 /// use par_stream::prelude::*;
271 ///
272 /// // the variable will be shared by parallel workers
273 /// let doubled: Vec<_> = stream::iter(0..1000)
274 /// // doubles the values in parallel
275 /// .par_map(None, move |value| {
276 /// // the closure is sent to parallel worker
277 /// move || value * 2
278 /// })
279 /// // the collected values may NOT be ordered
280 /// .collect()
281 /// .await;
282 /// let expect: Vec<_> = (0..1000).map(|value| value * 2).collect();
283 /// assert_eq!(doubled, expect);
284 /// # })
285 /// ```
286 fn par_map<T, P, F, Func>(self, params: P, f: F) -> ParMap<T>
287 where
288 T: 'static + Send,
289 F: 'static + FnMut(Self::Item) -> Func + Send,
290 Func: 'static + FnOnce() -> T + Send,
291 P: Into<ParParams>;
292
293 /// Runs a blocking task on parallel workers and produces items without respecting input order.
294 ///
295 /// The `params` sets the worker pool size and output buffer size.
296 /// Each parallel worker shares the stream and executes a future for each input item.
297 /// The worker forwards the output to a channel as soon as it finishes.
298 ///
299 /// ```rust
300 /// # par_stream::rt::block_on_executor(async move {
301 /// use futures::prelude::*;
302 /// use par_stream::prelude::*;
303 /// use std::collections::HashSet;
304 ///
305 /// // the variable will be shared by parallel workers
306 ///
307 /// let doubled: HashSet<_> = stream::iter(0..1000)
308 /// // doubles the values in parallel
309 /// .par_map_unordered(None, move |value| {
310 /// // the closure is sent to parallel worker
311 /// move || value * 2
312 /// })
313 /// // the collected values may NOT be ordered
314 /// .collect()
315 /// .await;
316 /// let expect: HashSet<_> = (0..1000).map(|value| value * 2).collect();
317 /// assert_eq!(doubled, expect);
318 /// # })
319 /// ```
320 fn par_map_unordered<T, P, F, Func>(self, params: P, f: F) -> RecvStream<'static, T>
321 where
322 T: 'static + Send,
323 F: 'static + FnMut(Self::Item) -> Func + Send,
324 Func: 'static + FnOnce() -> T + Send,
325 P: Into<ParParams>;
326
327 /// Reduces the input stream into a single value in parallel.
328 ///
329 /// It maintains a parallel worker pool of `num_workers`. Each worker reduces
330 /// the input items from the stream into a single value. Once all parallel worker
331 /// finish, the values from each worker are reduced into one in treefold manner.
332 ///
333 /// ```rust
334 /// # par_stream::rt::block_on_executor(async move {
335 /// use futures::prelude::*;
336 /// use par_stream::prelude::*;
337 ///
338 /// // the variable will be shared by parallel workers
339 /// let sum = stream::iter(1..=1000)
340 /// // sum up the values in parallel
341 /// .par_reduce(None, move |lhs, rhs| {
342 /// // the closure is sent to parallel worker
343 /// async move { lhs + rhs }
344 /// })
345 /// .await;
346 /// assert_eq!(sum, Some((1 + 1000) * 1000 / 2));
347 /// # })
348 /// ```
349 fn par_reduce<P, F, Fut>(
350 self,
351 params: P,
352 reduce_fn: F,
353 ) -> BoxFuture<'static, Option<Self::Item>>
354 where
355 P: Into<ParParams>,
356 F: 'static + FnMut(Self::Item, Self::Item) -> Fut + Send + Clone,
357 Fut: 'static + Future<Output = Self::Item> + Send;
358
359 /// Runs an asynchronous task on parallel workers.
360 fn par_for_each<P, F, Fut>(self, params: P, f: F) -> BoxFuture<'static, ()>
361 where
362 F: 'static + FnMut(Self::Item) -> Fut + Send,
363 Fut: 'static + Future<Output = ()> + Send,
364 P: Into<ParParams>;
365
366 /// Runs a blocking task on parallel workers.
367 fn par_for_each_blocking<P, F, Func>(self, params: P, f: F) -> BoxFuture<'static, ()>
368 where
369 F: 'static + FnMut(Self::Item) -> Func + Send,
370 Func: 'static + FnOnce() + Send,
371 P: Into<ParParams>;
372}
373
374impl<S> ParStreamExt for S
375where
376 S: 'static + Send + Stream,
377 S::Item: 'static + Send,
378{
379 fn spawned<B>(self, buf_size: B) -> RecvStream<'static, Self::Item>
380 where
381 B: Into<BufSize>,
382 {
383 let (tx, rx) = utils::channel(buf_size.into().get());
384
385 rt::spawn(async move {
386 let _ = self.map(Ok).forward(tx.into_sink()).await;
387 });
388
389 rx.into_stream()
390 }
391
392 fn map_blocking<B, T, F>(self, buf_size: B, mut f: F) -> RecvStream<'static, T>
393 where
394 B: Into<BufSize>,
395 T: Send,
396 F: 'static + Send + FnMut(Self::Item) -> T,
397 {
398 let buf_size = buf_size.into().get();
399 let mut stream = self.boxed();
400 let (output_tx, output_rx) = utils::channel(buf_size);
401
402 rt::spawn_blocking(move || {
403 while let Some(input) = rt::block_on(stream.next()) {
404 let output = f(input);
405 if output_tx.send(output).is_err() {
406 break;
407 }
408 }
409 });
410
411 output_rx.into_stream()
412 }
413
414 fn par_builder(self) -> ParBuilder<Self> {
415 ParBuilder::new(self)
416 }
417
418 fn par_batching<T, P, F, Fut>(self, params: P, f: F) -> RecvStream<'static, T>
419 where
420 F: 'static + Send + Clone + FnMut(usize, flume::Receiver<Self::Item>) -> Fut,
421 Fut: 'static + Future<Output = Option<(T, flume::Receiver<Self::Item>)>> + Send,
422 T: 'static + Send,
423 P: Into<ParParams>,
424 {
425 let ParParams {
426 num_workers,
427 buf_size,
428 } = params.into();
429
430 let (input_tx, input_rx) = utils::channel(buf_size);
431 let (output_tx, output_rx) = utils::channel(buf_size);
432
433 rt::spawn(async move {
434 let _ = self.map(Ok).forward(input_tx.into_sink()).await;
435 });
436
437 (0..num_workers).for_each(move |worker_index| {
438 let output_tx = output_tx.clone();
439 let f = f.clone();
440 let input_rx = input_rx.clone();
441
442 rt::spawn(async move {
443 let _ = stream::repeat(())
444 .stateful_then((input_rx, f), |(input_rx, mut f), ()| async move {
445 f(worker_index, input_rx)
446 .await
447 .map(move |(item, input_rx)| ((input_rx, f), item))
448 })
449 .map(Ok)
450 .forward(output_tx.into_sink())
451 .await;
452 });
453 });
454
455 output_rx.into_stream()
456 }
457
458 fn pull_routing<B, K, Q, F>(self, buf_size: B, key_fn: F) -> PullBuilder<Self, K, F, Q>
459 where
460 Self: 'static + Send + Stream,
461 Self::Item: 'static + Send,
462 F: 'static + Send + FnMut(&Self::Item) -> Q,
463 K: 'static + Send + Hash + Eq + Borrow<Q>,
464 Q: Send + Hash + Eq,
465 B: Into<BufSize>,
466 {
467 PullBuilder::new(self, buf_size, key_fn)
468 }
469
470 fn tee<B>(self, buf_size: B) -> Tee<Self::Item>
471 where
472 Self::Item: Clone,
473 B: Into<BufSize>,
474 {
475 Tee::new(self, buf_size)
476 }
477
478 fn broadcast<B>(self, buf_size: B, send_all: bool) -> BroadcastBuilder<Self::Item>
479 where
480 Self::Item: Clone,
481 B: Into<BufSize>,
482 {
483 BroadcastBuilder::new(self, buf_size, send_all)
484 }
485
486 fn par_then<T, P, F, Fut>(self, params: P, mut f: F) -> ParThen<T>
487 where
488 T: 'static + Send,
489 F: 'static + FnMut(Self::Item) -> Fut + Send,
490 Fut: 'static + Future<Output = T> + Send,
491 P: Into<ParParams>,
492 {
493 let indexed_f = move |(index, item)| {
494 let fut = f(item);
495 fut.map(move |output| (index, output))
496 };
497
498 self.enumerate()
499 .par_then_unordered(params, indexed_f)
500 .reorder_enumerated()
501 }
502
503 fn par_then_unordered<T, P, F, Fut>(self, params: P, f: F) -> RecvStream<'static, T>
504 where
505 T: 'static + Send,
506 F: 'static + FnMut(Self::Item) -> Fut + Send,
507 Fut: 'static + Future<Output = T> + Send,
508 P: Into<ParParams>,
509 {
510 let ParParams {
511 num_workers,
512 buf_size,
513 } = params.into();
514 let (output_tx, output_rx) = utils::channel(buf_size);
515 let stream = self
516 .stateful_map(f, |mut f, item| {
517 let fut = f(item);
518 Some((f, fut))
519 })
520 .spawned(buf_size);
521
522 (0..num_workers).for_each(move |_| {
523 let stream = stream.clone();
524 let output_tx = output_tx.clone();
525
526 rt::spawn(async move {
527 let _ = stream
528 .then(|fut| fut)
529 .map(Ok)
530 .forward(output_tx.into_sink())
531 .await;
532 });
533 });
534 output_rx.into_stream()
535 }
536
537 fn par_map<T, P, F, Func>(self, params: P, mut f: F) -> ParMap<T>
538 where
539 T: 'static + Send,
540 F: 'static + FnMut(Self::Item) -> Func + Send,
541 Func: 'static + FnOnce() -> T + Send,
542 P: Into<ParParams>,
543 {
544 self.enumerate()
545 .par_map_unordered(params, move |(index, item)| {
546 let job = f(item);
547 move || (index, job())
548 })
549 .reorder_enumerated()
550 }
551
552 fn par_map_unordered<T, P, F, Func>(self, params: P, f: F) -> RecvStream<'static, T>
553 where
554 T: 'static + Send,
555 F: 'static + FnMut(Self::Item) -> Func + Send,
556 Func: 'static + FnOnce() -> T + Send,
557 P: Into<ParParams>,
558 {
559 let ParParams {
560 num_workers,
561 buf_size,
562 } = params.into();
563 let stream = self
564 .stateful_map(f, |mut f, item| {
565 let func = f(item);
566 Some((f, func))
567 })
568 .spawned(buf_size);
569 let (output_tx, output_rx) = utils::channel(buf_size);
570
571 (0..num_workers).for_each(move |_| {
572 let mut stream = stream.clone();
573 let output_tx = output_tx.clone();
574
575 rt::spawn_blocking(move || {
576 while let Some(job) = rt::block_on(stream.next()) {
577 let output = job();
578 let result = output_tx.send(output);
579 if result.is_err() {
580 break;
581 }
582 }
583 });
584 });
585
586 output_rx.into_stream()
587 }
588
589 fn par_reduce<P, F, Fut>(
590 self,
591 params: P,
592 reduce_fn: F,
593 ) -> BoxFuture<'static, Option<Self::Item>>
594 where
595 F: 'static + FnMut(Self::Item, Self::Item) -> Fut + Send + Clone,
596 Fut: 'static + Future<Output = Self::Item> + Send,
597 P: Into<ParParams>,
598 {
599 let ParParams {
600 num_workers,
601 buf_size,
602 } = params.into();
603 let stream = self.spawned(buf_size);
604
605 // phase 1
606 let phase_1_future = {
607 let reduce_fn = reduce_fn.clone();
608 async move {
609 let reducer_futures = (0..num_workers).map(move |_| {
610 let reduce_fn = reduce_fn.clone();
611 let stream = stream.clone();
612
613 rt::spawn(async move { stream.reduce(reduce_fn).await })
614 });
615
616 future::join_all(reducer_futures).await
617 }
618 };
619
620 // phase 2
621 let phase_2_future = async move {
622 let values = phase_1_future.await;
623
624 let (pair_tx, pair_rx) = utils::channel(num_workers);
625 let (feedback_tx, feedback_rx) = flume::bounded(num_workers);
626
627 let mut count = 0;
628
629 for value in values.into_iter().flatten() {
630 feedback_tx.send_async(value).await.map_err(|_| ()).unwrap();
631 count += 1;
632 }
633
634 let pairing_future = rt::spawn(async move {
635 while count >= 2 {
636 let first = feedback_rx.recv_async().await.unwrap();
637 let second = feedback_rx.recv_async().await.unwrap();
638 pair_tx.send_async((first, second)).await.unwrap();
639 count -= 1;
640 }
641
642 match count {
643 0 => None,
644 1 => {
645 let output = feedback_rx.recv_async().await.unwrap();
646 Some(output)
647 }
648 _ => unreachable!(),
649 }
650 });
651
652 let worker_futures = (0..num_workers).map(move |_| {
653 let pair_rx = pair_rx.clone();
654 let feedback_tx = feedback_tx.clone();
655 let mut reduce_fn = reduce_fn.clone();
656
657 rt::spawn(async move {
658 while let Ok((first, second)) = pair_rx.recv_async().await {
659 let reduced = reduce_fn(first, second).await;
660 feedback_tx
661 .send_async(reduced)
662 .await
663 .map_err(|_| ())
664 .unwrap();
665 }
666 })
667 });
668
669 let (output, _) = join!(pairing_future, future::join_all(worker_futures));
670
671 output
672 };
673
674 phase_2_future.boxed()
675 }
676
677 fn par_for_each<P, F, Fut>(self, params: P, f: F) -> BoxFuture<'static, ()>
678 where
679 F: 'static + FnMut(Self::Item) -> Fut + Send,
680 Fut: 'static + Future<Output = ()> + Send,
681 P: Into<ParParams>,
682 {
683 let ParParams {
684 num_workers,
685 buf_size,
686 } = params.into();
687 let stream = self
688 .stateful_map(f, |mut f, item| {
689 let fut = f(item);
690 Some((f, fut))
691 })
692 .spawned(buf_size);
693
694 let worker_futures =
695 (0..num_workers).map(move |_| rt::spawn(stream.clone().for_each(|fut| fut)));
696
697 future::join_all(worker_futures).map(|_| ()).boxed()
698 }
699
700 fn par_for_each_blocking<P, F, Func>(self, params: P, f: F) -> BoxFuture<'static, ()>
701 where
702 F: 'static + FnMut(Self::Item) -> Func + Send,
703 Func: 'static + FnOnce() + Send,
704 P: Into<ParParams>,
705 {
706 let ParParams {
707 num_workers,
708 buf_size,
709 } = params.into();
710 let stream = self
711 .stateful_map(f, |mut f, item| {
712 let func = f(item);
713 Some((f, func))
714 })
715 .spawned(buf_size);
716
717 let worker_futs: Vec<_> = (0..num_workers)
718 .map(move |_| {
719 let mut stream = stream.clone();
720
721 rt::spawn_blocking(move || {
722 while let Some(job) = rt::block_on(stream.next()) {
723 job();
724 }
725 })
726 })
727 .collect();
728
729 future::join_all(worker_futs).map(|_| ()).boxed()
730 }
731}
732
733// tests
734
735#[cfg(test)]
736mod tests {
737 use super::*;
738 use crate::utils::async_test;
739 use rand::prelude::*;
740 use std::time::Duration;
741
742 async_test! {
743 async fn par_batching_test() {
744 let mut rng = rand::thread_rng();
745 let data: Vec<u32> = (0..10000).map(|_| rng.gen_range(0..10)).collect();
746
747 let sums: Vec<_> = stream::iter(data)
748 .par_batching(None, |_, rx| async move {
749 let mut sum = rx.recv_async().await.ok()?;
750
751 while let Ok(val) = rx.recv_async().await {
752 sum += val;
753
754 if sum >= 1000 {
755 return Some((sum, rx));
756 }
757 }
758
759 None
760 })
761 .collect()
762 .await;
763
764 assert!(sums.iter().all(|&sum| sum >= 1000));
765 }
766
767
768 async fn par_then_output_is_ordered_test() {
769 let max = 1000u64;
770 stream::iter(0..max)
771 .par_then(None, |value| async move {
772 rt::sleep(Duration::from_millis(value % 20)).await;
773 value
774 })
775 .fold(0u64, |expect, found| async move {
776 assert_eq!(expect, found);
777 expect + 1
778 })
779 .await;
780 }
781
782
783 async fn par_then_unordered_test() {
784 let max = 1000u64;
785 let mut values: Vec<_> = stream::iter((0..max).into_iter())
786 .par_then_unordered(None, |value| async move {
787 rt::sleep(Duration::from_millis(value % 20)).await;
788 value
789 })
790 .collect()
791 .await;
792 values.sort();
793 values.into_iter().fold(0, |expect, found| {
794 assert_eq!(expect, found);
795 expect + 1
796 });
797 }
798
799
800 async fn par_reduce_test() {
801 {
802 let sum: Option<u64> = stream::iter(iter::empty())
803 .par_reduce(None, |lhs, rhs| async move { lhs + rhs })
804 .await;
805 assert!(sum.is_none());
806 }
807
808 {
809 let max = 100_000u64;
810 let sum = stream::iter((1..=max).into_iter())
811 .par_reduce(None, |lhs, rhs| async move { lhs + rhs })
812 .await;
813 assert_eq!(sum, Some((1 + max) * max / 2));
814 }
815 }
816
817
818 async fn reorder_index_haling_test() {
819 let indexes = vec![5, 2, 1, 0, 6, 4, 3];
820 let output: Vec<_> = stream::iter(indexes)
821 .then(|index| async move {
822 rt::sleep(Duration::from_millis(20)).await;
823 (index, index)
824 })
825 .reorder_enumerated()
826 .collect()
827 .await;
828 assert_eq!(&output, &[0, 1, 2, 3, 4, 5, 6]);
829 }
830
831
832 async fn enumerate_reorder_test() {
833 let max = 1000u64;
834 let iterator = (0..max).rev().step_by(2);
835
836 let lhs = stream::iter(iterator.clone())
837 .enumerate()
838 .par_then_unordered(None, |(index, value)| async move {
839 rt::sleep(std::time::Duration::from_millis(value % 20)).await;
840 (index, value)
841 })
842 .reorder_enumerated();
843 let rhs = stream::iter(iterator.clone());
844
845 let is_equal =
846 async_std::stream::StreamExt::all(&mut lhs.zip(rhs), |(lhs_value, rhs_value)| {
847 lhs_value == rhs_value
848 })
849 .await;
850 assert!(is_equal);
851 }
852
853
854 async fn for_each_test() {
855 use std::sync::atomic::{self, AtomicUsize};
856
857 {
858 let sum = Arc::new(AtomicUsize::new(0));
859 stream::iter(1..=1000)
860 .par_for_each(None, {
861 let sum = sum.clone();
862 move |value| {
863 let sum = sum.clone();
864 async move {
865 sum.fetch_add(value, atomic::Ordering::SeqCst);
866 }
867 }
868 })
869 .await;
870 assert_eq!(sum.load(atomic::Ordering::SeqCst), (1 + 1000) * 1000 / 2);
871 }
872
873 {
874 let sum = Arc::new(AtomicUsize::new(0));
875 stream::iter(1..=1000)
876 .par_for_each_blocking(None, {
877 let sum = sum.clone();
878 move |value| {
879 let sum = sum.clone();
880 move || {
881 sum.fetch_add(value, atomic::Ordering::SeqCst);
882 }
883 }
884 })
885 .await;
886 assert_eq!(sum.load(atomic::Ordering::SeqCst), (1 + 1000) * 1000 / 2);
887 }
888 }
889
890
891 async fn tee_halt_test() {
892 let mut rx1 = stream::iter(0..).tee(1);
893 let mut rx2 = rx1.clone();
894
895 assert!(rx1.next().await.is_some());
896 assert!(rx2.next().await.is_some());
897
898 // drop rx1
899 drop(rx1);
900
901 // the following should not block
902 assert!(rx2.next().await.is_some());
903 assert!(rx2.next().await.is_some());
904 assert!(rx2.next().await.is_some());
905 assert!(rx2.next().await.is_some());
906 assert!(rx2.next().await.is_some());
907 }
908
909
910 async fn tee_test() {
911 let orig: Vec<_> = (0..100).collect();
912
913 let rx1 = stream::iter(orig.clone()).tee(1);
914 let rx2 = rx1.clone();
915 let rx3 = rx1.clone();
916
917 let fut1 = rx1
918 .then(|val| async move {
919 let millis = rand::thread_rng().gen_range(0..5);
920 rt::sleep(Duration::from_millis(millis)).await;
921 val
922 })
923 .collect();
924 let fut2 = rx2
925 .then(|val| async move {
926 let millis = rand::thread_rng().gen_range(0..5);
927 rt::sleep(Duration::from_millis(millis)).await;
928 val * 2
929 })
930 .collect();
931 let fut3 = rx3
932 .then(|val| async move {
933 let millis = rand::thread_rng().gen_range(0..5);
934 rt::sleep(Duration::from_millis(millis)).await;
935 val * 3
936 })
937 .collect();
938
939 let (vec1, vec2, vec3): (Vec<_>, Vec<_>, Vec<_>) = join!(fut1, fut2, fut3);
940
941 // the collected method is possibly losing some of first few elements
942 let start1 = orig.len() - vec1.len();
943 let start2 = orig.len() - vec2.len();
944 let start3 = orig.len() - vec3.len();
945
946 assert!(orig[start1..]
947 .iter()
948 .zip(&vec1)
949 .all(|(&orig, &val)| orig == val));
950 assert!(orig[start2..]
951 .iter()
952 .zip(&vec2)
953 .all(|(&orig, &val)| orig * 2 == val));
954 assert!(orig[start3..]
955 .iter()
956 .zip(&vec3)
957 .all(|(&orig, &val)| orig * 3 == val));
958 }
959 }
960}