1#![allow(clippy::too_many_lines, unused_qualifications)]
5
6mod chain;
7mod cloned;
8mod filter;
9mod filter_map_sync;
10mod flat_map;
11mod flat_map_sync;
12mod identity;
13mod inspect;
14mod join;
15mod map;
16mod map_sync;
17mod sum_type;
18mod update;
19
20use async_trait::async_trait;
21use either::Either;
22use futures::{future, pin_mut, stream::StreamExt as _, Stream};
23use indexmap::IndexMap;
24use serde_closure::{traits, FnOnce};
25use std::{
26 cmp::Ordering, hash::Hash, iter, ops, pin::Pin, task::{Context, Poll}
27};
28
29use super::{par_pipe::*, par_sink::*};
30use crate::{
31 into_par_stream::{IntoDistributedStream, IntoParallelStream}, pipe::{Sink, StreamExt}, pool::{ProcessPool, ProcessSend, ThreadPool}
32};
33
34pub use self::{
35 chain::*, cloned::*, filter::*, filter_map_sync::*, flat_map::*, flat_map_sync::*, identity::*, inspect::*, join::*, map::*, map_sync::*, update::*
36};
37
38#[must_use]
39pub trait StreamTask {
40 type Item;
41 type Async: Stream<Item = Self::Item>;
42
43 fn into_async(self) -> Self::Async;
44}
45
46macro_rules! stream {
47 ($stream:ident $pipe:ident $sink:ident $from_stream:ident $into_stream:ident $into_stream_fn:ident $xxx:ident $pool:ident $send:ident $fns:ident $assert_stream:ident $($meta:meta)* { $($items:item)* }) => {
48 #[async_trait(?Send)]
49 $(#[$meta])*
50 #[must_use]
51 pub trait $stream {
52 type Item;
53 type Task: StreamTask<Item = Self::Item> + $send;
54
55 fn next_task(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Task>>;
56 fn size_hint(&self) -> (usize, Option<usize>);
57
58 $($items)*
59
60 #[inline]
61 fn inspect<F>(self, f: F) -> Inspect<Self, F>
62 where
63 F: $fns::FnMut(&Self::Item) + Clone + $send + 'static,
64 Self: Sized,
65 {
66 $assert_stream(Inspect::new(self, f))
67 }
68
69 #[inline]
70 fn update<F>(self, f: F) -> Update<Self, F>
71 where
72 F: $fns::FnMut(&mut Self::Item) + Clone + $send + 'static,
73 Self: Sized,
74 {
75 $assert_stream(Update::new(self, f))
76 }
77
78 #[inline]
79 fn map<B, F>(self, f: F) -> Map<Self, F>
80 where
81 F: $fns::FnMut(Self::Item) -> B + Clone + $send + 'static,
82 Self: Sized,
83 {
84 $assert_stream(Map::new(self, f))
85 }
86
87 #[inline]
88 fn flat_map<B, F>(self, f: F) -> FlatMap<Self, F>
89 where
90 F: $fns::FnMut(Self::Item) -> B + Clone + $send + 'static,
91 B: Stream,
92 Self: Sized,
93 {
94 $assert_stream(FlatMap::new(self, f))
95 }
96
97 #[inline]
98 fn filter<F>(self, f: F) -> Filter<Self, F>
99 where
100 F: $fns::FnMut(&Self::Item) -> bool + Clone + $send + 'static,
101 Self: Sized,
102 {
103 $assert_stream(Filter::new(self, f))
104 }
105
106 #[inline]
107 fn left_join<K, V1, V2>(self, right: impl IntoIterator<Item = (K, V2)>) -> LeftJoin<Self, K, V1, V2>
108 where
109 K: Eq + Hash + Clone + $send + 'static,
110 V1: 'static,
111 V2: Clone + $send + 'static,
112 Self: $stream<Item = (K, V1)> + Sized,
113 {
114 $assert_stream(LeftJoin::new(self, right.into_iter().collect()))
115 }
116
117 #[inline]
118 fn inner_join<K, V1, V2>(self, right: impl IntoIterator<Item = (K, V2)>) -> InnerJoin<Self, K, V1, V2>
119 where
120 K: Eq + Hash + Clone + $send + 'static,
121 V1: 'static,
122 V2: Clone + $send + 'static,
123 Self: $stream<Item = (K, V1)> + Sized,
124 {
125 $assert_stream(InnerJoin::new(self, right.into_iter().collect()))
126 }
127
128 #[inline]
129 fn chain<C>(self, chain: C) -> Chain<Self, C::$xxx>
130 where
131 C: $into_stream<Item = Self::Item>,
132 Self: Sized,
133 {
134 $assert_stream(Chain::new(self, chain.$into_stream_fn()))
135 }
136
137 #[inline]
138 async fn for_each<P, F>(self, pool: &P, f: F)
139 where
140 P: $pool,
141 F: $fns::FnMut(Self::Item) + Clone + $send + 'static,
142 Self::Item: 'static,
143 Self::Task: 'static,
144 Self: Sized,
145 {
146 self.pipe(pool, $pipe::<Self::Item>::for_each(Identity, f))
147 .await
148 }
149
150 #[inline]
151 async fn fold<P, ID, F, B>(self, pool: &P, identity: ID, op: F) -> B
152 where
153 P: $pool,
154 ID: $fns::FnMut() -> B + Clone + $send + 'static,
155 F: $fns::FnMut(B, Either<Self::Item, B>) -> B + Clone + $send + 'static,
156 B: $send + 'static,
157 Self::Item: 'static,
158 Self::Task: 'static,
159 Self: Sized,
160 {
161 self.pipe(
162 pool,
163 $pipe::<Self::Item>::fold(Identity, identity, op),
164 )
165 .await
166 }
167
168 #[inline]
169 async fn histogram<P>(self, pool: &P) -> Vec<(Self::Item, usize)>
170 where
171 P: $pool,
172 Self::Item: Hash + Ord + $send + 'static,
173 Self::Task: 'static,
174 Self: Sized,
175 {
176 self.pipe(pool, $pipe::<Self::Item>::histogram(Identity))
177 .await
178 }
179
180 #[inline]
181 async fn sort_n_by<P, F>(self, pool: &P, n: usize, cmp: F) -> ::amadeus_streaming::Sort<Self::Item, F>
182 where
183 P: $pool,
184 F: $fns::Fn(&Self::Item, &Self::Item) -> Ordering + Clone + $send + 'static,
185 Self::Item: Clone + $send + 'static,
186 Self::Task: 'static,
187 Self: Sized,
188 {
189 self.pipe(pool, $pipe::<Self::Item>::sort_n_by(Identity, n, cmp))
190 .await
191 }
192
193 #[inline]
194 async fn count<P>(self, pool: &P) -> usize
195 where
196 P: $pool,
197 Self::Item: 'static,
198 Self::Task: 'static,
199 Self: Sized,
200 {
201 self.pipe(pool, $pipe::<Self::Item>::count(Identity))
202 .await
203 }
204
205 #[inline]
206 async fn sum<P, S>(self, pool: &P) -> S
207 where
208 P: $pool,
209 S: iter::Sum<Self::Item> + iter::Sum<S> + $send + 'static,
210 Self::Item: 'static,
211 Self::Task: 'static,
212 Self: Sized,
213 {
214 self.pipe(pool, $pipe::<Self::Item>::sum(Identity))
215 .await
216 }
217
218 #[inline]
219 async fn mean<P>(self, pool: &P) -> f64
220 where
221 P: $pool,
222 Self::Item: 'static,
223 Self::Task: 'static,
224 Self: $stream<Item = f64> + Sized,
225 {
226 self.pipe(pool, $pipe::<Self::Item>::mean(Identity))
227 .await
228 }
229
230 #[inline]
231 async fn stddev<P>(self, pool: &P) -> f64
232 where
233 P: $pool,
234 Self::Item: 'static,
235 Self::Task: 'static,
236 Self: $stream<Item = f64> + Sized,
237 {
238 self.pipe(pool, $pipe::<Self::Item>::stddev(Identity))
239 .await
240 }
241
242 #[inline]
243 async fn combine<P, F>(self, pool: &P, f: F) -> Option<Self::Item>
244 where
245 P: $pool,
246 F: $fns::FnMut(Self::Item, Self::Item) -> Self::Item + Clone + $send + 'static,
247 Self::Item: $send + 'static,
248 Self::Task: 'static,
249 Self: Sized,
250 {
251 self.pipe(pool, $pipe::<Self::Item>::combine(Identity, f))
252 .await
253 }
254
255 #[inline]
256 async fn max<P>(self, pool: &P) -> Option<Self::Item>
257 where
258 P: $pool,
259 Self::Item: Ord + $send + 'static,
260 Self::Task: 'static,
261 Self: Sized,
262 {
263 self.pipe(pool, $pipe::<Self::Item>::max(Identity))
264 .await
265 }
266
267 #[inline]
268 async fn max_by<P, F>(self, pool: &P, f: F) -> Option<Self::Item>
269 where
270 P: $pool,
271 F: $fns::FnMut(&Self::Item, &Self::Item) -> Ordering + Clone + $send + 'static,
272 Self::Item: $send + 'static,
273 Self::Task: 'static,
274 Self: Sized,
275 {
276 self.pipe(pool, $pipe::<Self::Item>::max_by(Identity, f))
277 .await
278 }
279
280 #[inline]
281 async fn max_by_key<P, F, B>(self, pool: &P, f: F) -> Option<Self::Item>
282 where
283 P: $pool,
284 F: $fns::FnMut(&Self::Item) -> B + Clone + $send + 'static,
285 B: Ord + 'static,
286 Self::Item: $send + 'static,
287 Self::Task: 'static,
288 Self: Sized,
289 {
290 self.pipe(pool, $pipe::<Self::Item>::max_by_key(Identity, f))
291 .await
292 }
293
294 #[inline]
295 async fn min<P>(self, pool: &P) -> Option<Self::Item>
296 where
297 P: $pool,
298 Self::Item: Ord + $send + 'static,
299 Self::Task: 'static,
300 Self: Sized,
301 {
302 self.pipe(pool, $pipe::<Self::Item>::min(Identity))
303 .await
304 }
305
306 #[inline]
307 async fn min_by<P, F>(self, pool: &P, f: F) -> Option<Self::Item>
308 where
309 P: $pool,
310 F: $fns::FnMut(&Self::Item, &Self::Item) -> Ordering + Clone + $send + 'static,
311 Self::Item: $send + 'static,
312 Self::Task: 'static,
313 Self: Sized,
314 {
315 self.pipe(pool, $pipe::<Self::Item>::min_by(Identity, f))
316 .await
317 }
318
319 #[inline]
320 async fn min_by_key<P, F, B>(self, pool: &P, f: F) -> Option<Self::Item>
321 where
322 P: $pool,
323 F: $fns::FnMut(&Self::Item) -> B + Clone + $send + 'static,
324 B: Ord + 'static,
325 Self::Item: $send + 'static,
326 Self::Task: 'static,
327 Self: Sized,
328 {
329 self.pipe(pool, $pipe::<Self::Item>::min_by_key(Identity, f))
330 .await
331 }
332
333 #[inline]
334 async fn most_frequent<P>(
335 self, pool: &P, n: usize, probability: f64, tolerance: f64,
336 ) -> ::amadeus_streaming::Top<Self::Item, usize>
337 where
338 P: $pool,
339 Self::Item: Hash + Eq + Clone + $send + 'static,
340 Self::Task: 'static,
341 Self: Sized,
342 {
343 self.pipe(
344 pool,
345 $pipe::<Self::Item>::most_frequent(Identity, n, probability, tolerance),
346 )
347 .await
348 }
349
350 #[inline]
351 async fn most_distinct<P, A, B>(
352 self, pool: &P, n: usize, probability: f64, tolerance: f64, error_rate: f64,
353 ) -> ::amadeus_streaming::Top<A, amadeus_streaming::HyperLogLogMagnitude<B>>
354 where
355 P: $pool,
356 Self: $stream<Item = (A, B)> + Sized,
357 A: Hash + Eq + Clone + $send + 'static,
358 B: Hash + 'static,
359 Self::Task: 'static,
360 {
361 self.pipe(
362 pool,
363 $pipe::<Self::Item>::most_distinct(
364 Identity,
365 n,
366 probability,
367 tolerance,
368 error_rate,
369 ),
370 )
371 .await
372 }
373
374 #[inline]
375 async fn sample_unstable<P>(
376 self, pool: &P, samples: usize,
377 ) -> ::amadeus_streaming::SampleUnstable<Self::Item>
378 where
379 P: $pool,
380 Self::Item: $send + 'static,
381 Self::Task: 'static,
382 Self: Sized,
383 {
384 self.pipe(
385 pool,
386 $pipe::<Self::Item>::sample_unstable(Identity, samples),
387 )
388 .await
389 }
390
391 #[inline]
392 async fn all<P, F>(self, pool: &P, f: F) -> bool
393 where
394 P: $pool,
395 F: $fns::FnMut(Self::Item) -> bool + Clone + $send + 'static,
396 Self::Item: 'static,
397 Self::Task: 'static,
398 Self: Sized,
399 {
400 self.pipe(pool, $pipe::<Self::Item>::all(Identity, f))
401 .await
402 }
403
404 #[inline]
405 async fn any<P, F>(self, pool: &P, f: F) -> bool
406 where
407 P: $pool,
408 F: $fns::FnMut(Self::Item) -> bool + Clone + $send + 'static,
409 Self::Item: 'static,
410 Self::Task: 'static,
411 Self: Sized,
412 {
413 self.pipe(pool, $pipe::<Self::Item>::any(Identity, f))
414 .await
415 }
416 }
417
418 #[inline(always)]
419 pub(crate) fn $assert_stream<T, I: $stream<Item = T>>(i: I) -> I {
420 i
421 }
422 }
423}
424
425stream!(ParallelStream ParallelPipe ParallelSink FromParallelStream IntoParallelStream into_par_stream ParStream ThreadPool Send ops assert_parallel_stream {
426 async fn reduce<P, B, R1, R3>(mut self, pool: &P, reduce_a: R1, reduce_c: R3) -> B
427 where
428 P: ThreadPool,
429 R1: ReducerSend<Self::Item> + Clone + Send + 'static,
430 R3: Reducer<<R1 as ReducerSend<Self::Item>>::Done, Done = B>,
431 Self::Task: 'static,
432 Self: Sized,
433 {
434 let self_ = self;
435 pin_mut!(self_);
436 let mut tasks = (0..pool.threads()).map(|_| Vec::new()).collect::<Vec<_>>();
438 let mut allocated = 0;
439 'a: loop {
440 for i in 0..tasks.len() {
441 loop {
442 let (mut lower, _upper) = self_.size_hint();
443 if lower == 0 {
444 lower = 1;
445 }
446 let mut batch = (allocated + lower) / tasks.len();
447 if i < (allocated + lower) % tasks.len() {
448 batch += 1;
449 }
450 batch -= tasks[i].len();
451 if batch == 0 {
452 break;
453 }
454 for _ in 0..batch {
455 if let Some(task) = future::poll_fn(|cx| self_.as_mut().next_task(cx)).await {
456 tasks[i].push(task);
457 allocated += 1;
458 } else {
459 break 'a;
460 }
461 }
462 }
463 }
464 }
465 for (i, task) in tasks.iter().enumerate() {
466 let mut count = allocated / tasks.len();
467 if i < allocated % tasks.len() {
468 count += 1;
469 }
470 assert_eq!(
471 task.len(),
472 count,
473 "alloc: {:#?}",
474 tasks.iter().map(Vec::len).collect::<Vec<_>>()
475 );
476 }
477
478 let handles = tasks
479 .into_iter()
480 .filter(|tasks| !tasks.is_empty())
481 .map(|tasks| {
482 let reduce_a = reduce_a.clone();
483 pool.spawn(move || async move {
484 let sink = reduce_a.into_async();
485 pin_mut!(sink);
486 for task in tasks.into_iter().map(StreamTask::into_async) {
488 pin_mut!(task);
489 if let Some(ret) = sink.send_all(&mut task).await {
490 return ret;
491 }
492 }
493 sink.done().await
494 })
495 })
496 .collect::<futures::stream::FuturesUnordered<_>>();
497 let stream = handles.map(|item| {
498 item.unwrap_or_else(|err| panic!("Amadeus: task '<unnamed>' panicked at '{}'", err))
499 });
500 let reduce_c = reduce_c.into_async();
501 pin_mut!(reduce_c);
502 stream.sink(reduce_c).await
503 }
504
505 async fn pipe<P, ParSink, A>(self, pool: &P, sink: ParSink) -> A
506 where
507 P: ThreadPool,
508 ParSink: ParallelSink<Self::Item, Done = A>,
509 <ParSink::Pipe as ParallelPipe<Self::Item>>::Task: 'static,
510 ParSink::ReduceA: 'static,
511 Self::Task: 'static,
512 Self: Sized,
513 {
514 let (iterator, reducer_a, reducer_b) = sink.reducers();
515 Pipe::new(self, iterator)
516 .reduce(pool, reducer_a, reducer_b)
517 .await
518 }
519
520 async fn fork<P, ParSinkA, ParSinkB, A, B>(
522 self, pool: &P, sink_a: ParSinkA, sink_b: ParSinkB,
523 ) -> (A, B)
524 where
525 P: ThreadPool,
526 ParSinkA: ParallelSink<Self::Item, Done = A>,
527 ParSinkB: for<'a> ParallelSink<&'a Self::Item, Done = B> + 'static,
528 <ParSinkA::Pipe as ParallelPipe<Self::Item>>::Task: 'static,
529 ParSinkA::ReduceA: 'static,
530 <ParSinkB as ParallelSink<&'static Self::Item>>::ReduceA: 'static,
531 <<ParSinkB as ParallelSink<&'static Self::Item>>::Pipe as ParallelPipe<
532 &'static Self::Item,
533 >>::Task: 'static,
534 Self::Item: 'static,
535 Self::Task: 'static,
536 Self: Sized,
537 {
538 let (iterator_a, reducer_a_a, reducer_a_b) = sink_a.reducers();
539 let (iterator_b, reducer_b_a, reducer_b_b) = sink_b.reducers();
540 Fork::new(self, iterator_a, iterator_b)
541 .reduce(
542 pool,
543 ReduceA2::new(reducer_a_a, reducer_b_a),
544 ReduceC2::new(reducer_a_b, reducer_b_b),
545 )
546 .await
547 }
548
549 async fn group_by<P, S, A, B>(self, pool: &P, sink: S) -> IndexMap<A, S::Done>
550 where
551 P: ThreadPool,
552 A: Eq + Hash + Send + 'static,
553 B: 'static,
554 S: ParallelSink<B>,
555 <S::Pipe as ParallelPipe<B>>::Task: Clone + Send + 'static,
556 S::ReduceA: 'static,
557 S::ReduceC: Clone,
558 S::Done: Send + 'static,
559 Self::Task: 'static,
560 Self: ParallelStream<Item = (A, B)> + Sized,
561 {
562 self.pipe(pool, ParallelPipe::<Self::Item>::group_by(Identity, sink))
563 .await
564 }
565
566 async fn collect<P, B>(self, pool: &P) -> B
567 where
568 P: ThreadPool,
569 B: FromParallelStream<Self::Item>,
570 B::ReduceA: Send + 'static,
571 Self::Task: 'static,
572 Self: Sized,
573 {
574 self.pipe(pool, ParallelPipe::<Self::Item>::collect(Identity))
575 .await
576 }
577});
578
579stream!(DistributedStream DistributedPipe DistributedSink FromDistributedStream IntoDistributedStream into_dist_stream DistStream ProcessPool ProcessSend traits assert_distributed_stream cfg_attr(not(nightly), serde_closure::desugar) {
580 async fn reduce<P, B, R1, R2, R3>(
581 mut self, pool: &P, reduce_a: R1, reduce_b: R2, reduce_c: R3,
582 ) -> B
583 where
584 P: ProcessPool,
585 R1: ReducerSend<Self::Item> + Clone + ProcessSend + 'static,
586 R2: ReducerProcessSend<<R1 as ReducerSend<Self::Item>>::Done>
587 + Clone
588 + ProcessSend
589 + 'static,
590 R3: Reducer<
591 <R2 as ReducerProcessSend<<R1 as ReducerSend<Self::Item>>::Done>>::Done,
592 Done = B,
593 >,
594 Self::Task: 'static,
595 Self: Sized,
596 {
597 let self_ = self;
598 pin_mut!(self_);
599 let mut tasks = (0..pool.processes()).map(|_| Vec::new()).collect::<Vec<_>>();
601 let mut allocated = 0;
602 'a: loop {
603 for i in 0..tasks.len() {
604 loop {
605 let (mut lower, _upper) = self_.size_hint();
606 if lower == 0 {
607 lower = 1;
608 }
609 let mut batch = (allocated + lower) / tasks.len();
610 if i < (allocated + lower) % tasks.len() {
611 batch += 1;
612 }
613 batch -= tasks[i].len();
614 if batch == 0 {
615 break;
616 }
617 for _ in 0..batch {
618 if let Some(task) = future::poll_fn(|cx| self_.as_mut().next_task(cx)).await {
619 tasks[i].push(task);
620 allocated += 1;
621 } else {
622 break 'a;
623 }
624 }
625 }
626 }
627 }
628 for (i, task) in tasks.iter().enumerate() {
629 let mut count = allocated / tasks.len();
630 if i < allocated % tasks.len() {
631 count += 1;
632 }
633 assert_eq!(
634 task.len(),
635 count,
636 "alloc: {:#?}",
637 tasks.iter().map(Vec::len).collect::<Vec<_>>()
638 );
639 }
640
641 let handles = tasks
642 .into_iter()
643 .filter(|tasks| !tasks.is_empty())
644 .map(|tasks| {
645 let reduce_b = reduce_b.clone();
646 let reduce_a = reduce_a.clone();
647 pool.spawn(FnOnce!(move |pool: &P::ThreadPool| {
648 let mut process_tasks = tasks.into_iter();
649
650 let mut tasks = (0..pool.threads()).map(|_| Vec::new()).collect::<Vec<_>>();
651 let mut allocated = 0;
652 'a: loop {
653 for i in 0..tasks.len() {
654 loop {
655 let (mut lower, _upper) = process_tasks.size_hint();
656 if lower == 0 {
657 lower = 1;
658 }
659 let mut batch = (allocated + lower) / tasks.len();
660 if i < (allocated + lower) % tasks.len() {
661 batch += 1;
662 }
663 batch -= tasks[i].len();
664 if batch == 0 {
665 break;
666 }
667 for _ in 0..batch {
668 if let Some(task) = process_tasks.next() {
669 tasks[i].push(task);
670 allocated += 1;
671 } else {
672 break 'a;
673 }
674 }
675 }
676 }
677 }
678 for (i, task) in tasks.iter().enumerate() {
679 let mut count = allocated / tasks.len();
680 if i < allocated % tasks.len() {
681 count += 1;
682 }
683 assert_eq!(
684 task.len(),
685 count,
686 "alloc: {:#?}",
687 tasks.iter().map(Vec::len).collect::<Vec<_>>()
688 );
689 }
690 let handles = tasks
691 .into_iter()
692 .filter(|tasks| !tasks.is_empty())
693 .map(|tasks| {
694 let reduce_a = reduce_a.clone();
695 pool.spawn(move || async move {
696 let sink = reduce_a.into_async();
697 pin_mut!(sink);
698 for task in tasks.into_iter().map(StreamTask::into_async) {
700 pin_mut!(task);
701 if let Some(ret) = sink.send_all(&mut task).await {
702 return ret;
703 }
704 }
705 sink.done().await
706 })
707 })
708 .collect::<futures::stream::FuturesUnordered<_>>();
709
710 let stream = handles.map(|item| {
711 item.unwrap_or_else(|err| {
712 panic!("Amadeus: task '<unnamed>' panicked at '{}'", err)
713 })
714 });
715 let reduce_b = reduce_b.into_async();
716 async move {
717 pin_mut!(reduce_b);
718 stream.sink(reduce_b).await
719 }
720 }))
721 })
722 .collect::<futures::stream::FuturesUnordered<_>>();
723 let stream = handles.map(|item| {
724 item.unwrap_or_else(|err| panic!("Amadeus: task '<unnamed>' panicked at '{}'", err))
725 });
726 let reduce_c = reduce_c.into_async();
727 pin_mut!(reduce_c);
728 stream.sink(reduce_c).await
729 }
730
731 async fn pipe<P, DistSink, A>(self, pool: &P, sink: DistSink) -> A
732 where
733 P: ProcessPool,
734 DistSink: DistributedSink<Self::Item, Done = A>,
735 <DistSink::Pipe as DistributedPipe<Self::Item>>::Task: 'static,
736 DistSink::ReduceA: 'static,
737 DistSink::ReduceB: 'static,
738 Self::Task: 'static,
739 Self: Sized,
740 {
741 let (iterator, reducer_a, reducer_b, reducer_c) = sink.reducers();
742 Pipe::new(self, iterator)
743 .reduce(pool, reducer_a, reducer_b, reducer_c)
744 .await
745 }
746
747 async fn fork<P, DistSinkA, DistSinkB, A, B>(
749 self, pool: &P, sink_a: DistSinkA, sink_b: DistSinkB,
750 ) -> (A, B)
751 where
752 P: ProcessPool,
753 DistSinkA: DistributedSink<Self::Item, Done = A>,
754 DistSinkB: for<'a> DistributedSink<&'a Self::Item, Done = B> + 'static,
755 <DistSinkA::Pipe as DistributedPipe<Self::Item>>::Task: 'static,
756 DistSinkA::ReduceA: 'static,
757 DistSinkA::ReduceB: 'static,
758 <DistSinkB as DistributedSink<&'static Self::Item>>::ReduceA: 'static,
759 <DistSinkB as DistributedSink<&'static Self::Item>>::ReduceB: 'static,
760 <<DistSinkB as DistributedSink<&'static Self::Item>>::Pipe as DistributedPipe<
761 &'static Self::Item,
762 >>::Task: 'static,
763 Self::Item: 'static,
764 Self::Task: 'static,
765 Self: Sized,
766 {
767 let (iterator_a, reducer_a_a, reducer_a_b, reducer_a_c) = sink_a.reducers();
768 let (iterator_b, reducer_b_a, reducer_b_b, reducer_b_c) = sink_b.reducers();
769 Fork::new(self, iterator_a, iterator_b)
770 .reduce(
771 pool,
772 ReduceA2::new(reducer_a_a, reducer_b_a),
773 ReduceC2::new(reducer_a_b, reducer_b_b),
774 ReduceC2::new(reducer_a_c, reducer_b_c),
775 )
776 .await
777 }
778
779 async fn group_by<P, S, A, B>(self, pool: &P, sink: S) -> IndexMap<A, S::Done>
780 where
781 P: ProcessPool,
782 A: Eq + Hash + ProcessSend + 'static,
783 B: 'static,
784 S: DistributedSink<B>,
785 <S::Pipe as DistributedPipe<B>>::Task: Clone + ProcessSend + 'static,
786 S::ReduceA: 'static,
787 S::ReduceB: 'static,
788 S::ReduceC: Clone,
789 S::Done: ProcessSend + 'static,
790 Self::Task: 'static,
791 Self: DistributedStream<Item = (A, B)> + Sized,
792 {
793 self.pipe(
794 pool,
795 DistributedPipe::<Self::Item>::group_by(Identity, sink),
796 )
797 .await
798 }
799
800 async fn collect<P, B>(self, pool: &P) -> B
801 where
802 P: ProcessPool,
803 B: FromDistributedStream<Self::Item>,
804 B::ReduceA: ProcessSend + 'static,
805 B::ReduceB: ProcessSend + 'static,
806 Self::Task: 'static,
807 Self: Sized,
808 {
809 self.pipe(pool, DistributedPipe::<Self::Item>::collect(Identity))
810 .await
811 }
812});