amadeus_core/
par_stream.rs

1// TODO: P: Pool -> impl Pool: async_trait triggers https://github.com/rust-lang/rust/issues/71869
2// TODO: how to dedup??
3
4#![allow(clippy::too_many_lines, unused_qualifications)]
5
6mod chain;
7mod cloned;
8mod filter;
9mod filter_map_sync;
10mod flat_map;
11mod flat_map_sync;
12mod identity;
13mod inspect;
14mod join;
15mod map;
16mod map_sync;
17mod sum_type;
18mod update;
19
20use async_trait::async_trait;
21use either::Either;
22use futures::{future, pin_mut, stream::StreamExt as _, Stream};
23use indexmap::IndexMap;
24use serde_closure::{traits, FnOnce};
25use std::{
26	cmp::Ordering, hash::Hash, iter, ops, pin::Pin, task::{Context, Poll}
27};
28
29use super::{par_pipe::*, par_sink::*};
30use crate::{
31	into_par_stream::{IntoDistributedStream, IntoParallelStream}, pipe::{Sink, StreamExt}, pool::{ProcessPool, ProcessSend, ThreadPool}
32};
33
34pub use self::{
35	chain::*, cloned::*, filter::*, filter_map_sync::*, flat_map::*, flat_map_sync::*, identity::*, inspect::*, join::*, map::*, map_sync::*, update::*
36};
37
38#[must_use]
39pub trait StreamTask {
40	type Item;
41	type Async: Stream<Item = Self::Item>;
42
43	fn into_async(self) -> Self::Async;
44}
45
46macro_rules! stream {
47	($stream:ident $pipe:ident $sink:ident $from_stream:ident $into_stream:ident $into_stream_fn:ident $xxx:ident $pool:ident $send:ident $fns:ident $assert_stream:ident $($meta:meta)* { $($items:item)* }) => {
48		#[async_trait(?Send)]
49		$(#[$meta])*
50		#[must_use]
51		pub trait $stream {
52			type Item;
53			type Task: StreamTask<Item = Self::Item> + $send;
54
55			fn next_task(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Task>>;
56			fn size_hint(&self) -> (usize, Option<usize>);
57
58			$($items)*
59
60			#[inline]
61			fn inspect<F>(self, f: F) -> Inspect<Self, F>
62			where
63				F: $fns::FnMut(&Self::Item) + Clone + $send + 'static,
64				Self: Sized,
65			{
66				$assert_stream(Inspect::new(self, f))
67			}
68
69			#[inline]
70			fn update<F>(self, f: F) -> Update<Self, F>
71			where
72				F: $fns::FnMut(&mut Self::Item) + Clone + $send + 'static,
73				Self: Sized,
74			{
75				$assert_stream(Update::new(self, f))
76			}
77
78			#[inline]
79			fn map<B, F>(self, f: F) -> Map<Self, F>
80			where
81				F: $fns::FnMut(Self::Item) -> B + Clone + $send + 'static,
82				Self: Sized,
83			{
84				$assert_stream(Map::new(self, f))
85			}
86
87			#[inline]
88			fn flat_map<B, F>(self, f: F) -> FlatMap<Self, F>
89			where
90				F: $fns::FnMut(Self::Item) -> B + Clone + $send + 'static,
91				B: Stream,
92				Self: Sized,
93			{
94				$assert_stream(FlatMap::new(self, f))
95			}
96
97			#[inline]
98			fn filter<F>(self, f: F) -> Filter<Self, F>
99			where
100				F: $fns::FnMut(&Self::Item) -> bool + Clone + $send + 'static,
101				Self: Sized,
102			{
103				$assert_stream(Filter::new(self, f))
104			}
105
106			#[inline]
107			fn left_join<K, V1, V2>(self, right: impl IntoIterator<Item = (K, V2)>) -> LeftJoin<Self, K, V1, V2>
108			where
109				K: Eq + Hash + Clone + $send + 'static,
110				V1: 'static,
111				V2: Clone + $send + 'static,
112				Self: $stream<Item = (K, V1)> + Sized,
113			{
114				$assert_stream(LeftJoin::new(self, right.into_iter().collect()))
115			}
116
117			#[inline]
118			fn inner_join<K, V1, V2>(self, right: impl IntoIterator<Item = (K, V2)>) -> InnerJoin<Self, K, V1, V2>
119			where
120				K: Eq + Hash + Clone + $send + 'static,
121				V1: 'static,
122				V2: Clone + $send + 'static,
123				Self: $stream<Item = (K, V1)> + Sized,
124			{
125				$assert_stream(InnerJoin::new(self, right.into_iter().collect()))
126			}
127
128			#[inline]
129			fn chain<C>(self, chain: C) -> Chain<Self, C::$xxx>
130			where
131				C: $into_stream<Item = Self::Item>,
132				Self: Sized,
133			{
134				$assert_stream(Chain::new(self, chain.$into_stream_fn()))
135			}
136
137			#[inline]
138			async fn for_each<P, F>(self, pool: &P, f: F)
139			where
140				P: $pool,
141				F: $fns::FnMut(Self::Item) + Clone + $send + 'static,
142				Self::Item: 'static,
143				Self::Task: 'static,
144				Self: Sized,
145			{
146				self.pipe(pool, $pipe::<Self::Item>::for_each(Identity, f))
147					.await
148			}
149
150			#[inline]
151			async fn fold<P, ID, F, B>(self, pool: &P, identity: ID, op: F) -> B
152			where
153				P: $pool,
154				ID: $fns::FnMut() -> B + Clone + $send + 'static,
155				F: $fns::FnMut(B, Either<Self::Item, B>) -> B + Clone + $send + 'static,
156				B: $send + 'static,
157				Self::Item: 'static,
158				Self::Task: 'static,
159				Self: Sized,
160			{
161				self.pipe(
162					pool,
163					$pipe::<Self::Item>::fold(Identity, identity, op),
164				)
165				.await
166			}
167
168			#[inline]
169			async fn histogram<P>(self, pool: &P) -> Vec<(Self::Item, usize)>
170			where
171				P: $pool,
172				Self::Item: Hash + Ord + $send + 'static,
173				Self::Task: 'static,
174				Self: Sized,
175			{
176				self.pipe(pool, $pipe::<Self::Item>::histogram(Identity))
177					.await
178			}
179
180			#[inline]
181			async fn sort_n_by<P, F>(self, pool: &P, n: usize, cmp: F) -> ::amadeus_streaming::Sort<Self::Item, F>
182			where
183				P: $pool,
184				F: $fns::Fn(&Self::Item, &Self::Item) -> Ordering + Clone + $send + 'static,
185				Self::Item: Clone + $send + 'static,
186				Self::Task: 'static,
187				Self: Sized,
188			{
189				self.pipe(pool, $pipe::<Self::Item>::sort_n_by(Identity, n, cmp))
190					.await
191			}
192
193			#[inline]
194			async fn count<P>(self, pool: &P) -> usize
195			where
196				P: $pool,
197				Self::Item: 'static,
198				Self::Task: 'static,
199				Self: Sized,
200			{
201				self.pipe(pool, $pipe::<Self::Item>::count(Identity))
202					.await
203			}
204
205			#[inline]
206			async fn sum<P, S>(self, pool: &P) -> S
207			where
208				P: $pool,
209				S: iter::Sum<Self::Item> + iter::Sum<S> + $send + 'static,
210				Self::Item: 'static,
211				Self::Task: 'static,
212				Self: Sized,
213			{
214				self.pipe(pool, $pipe::<Self::Item>::sum(Identity))
215					.await
216			}
217
218			#[inline]
219			async fn mean<P>(self, pool: &P) -> f64
220			where
221				P: $pool,
222				Self::Item: 'static,
223				Self::Task: 'static,
224				Self: $stream<Item = f64> + Sized,
225			{
226				self.pipe(pool, $pipe::<Self::Item>::mean(Identity))
227				.await
228			}
229
230			#[inline]
231			async fn stddev<P>(self, pool: &P) -> f64
232			where
233				P: $pool,
234				Self::Item: 'static,
235				Self::Task: 'static,
236				Self: $stream<Item = f64> + Sized,
237			{
238				self.pipe(pool, $pipe::<Self::Item>::stddev(Identity))
239				.await
240			}
241
242			#[inline]
243			async fn combine<P, F>(self, pool: &P, f: F) -> Option<Self::Item>
244			where
245				P: $pool,
246				F: $fns::FnMut(Self::Item, Self::Item) -> Self::Item + Clone + $send + 'static,
247				Self::Item: $send + 'static,
248				Self::Task: 'static,
249				Self: Sized,
250			{
251				self.pipe(pool, $pipe::<Self::Item>::combine(Identity, f))
252					.await
253			}
254
255			#[inline]
256			async fn max<P>(self, pool: &P) -> Option<Self::Item>
257			where
258				P: $pool,
259				Self::Item: Ord + $send + 'static,
260				Self::Task: 'static,
261				Self: Sized,
262			{
263				self.pipe(pool, $pipe::<Self::Item>::max(Identity))
264					.await
265			}
266
267			#[inline]
268			async fn max_by<P, F>(self, pool: &P, f: F) -> Option<Self::Item>
269			where
270				P: $pool,
271				F: $fns::FnMut(&Self::Item, &Self::Item) -> Ordering + Clone + $send + 'static,
272				Self::Item: $send + 'static,
273				Self::Task: 'static,
274				Self: Sized,
275			{
276				self.pipe(pool, $pipe::<Self::Item>::max_by(Identity, f))
277					.await
278			}
279
280			#[inline]
281			async fn max_by_key<P, F, B>(self, pool: &P, f: F) -> Option<Self::Item>
282			where
283				P: $pool,
284				F: $fns::FnMut(&Self::Item) -> B + Clone + $send + 'static,
285				B: Ord + 'static,
286				Self::Item: $send + 'static,
287				Self::Task: 'static,
288				Self: Sized,
289			{
290				self.pipe(pool, $pipe::<Self::Item>::max_by_key(Identity, f))
291					.await
292			}
293
294			#[inline]
295			async fn min<P>(self, pool: &P) -> Option<Self::Item>
296			where
297				P: $pool,
298				Self::Item: Ord + $send + 'static,
299				Self::Task: 'static,
300				Self: Sized,
301			{
302				self.pipe(pool, $pipe::<Self::Item>::min(Identity))
303					.await
304			}
305
306			#[inline]
307			async fn min_by<P, F>(self, pool: &P, f: F) -> Option<Self::Item>
308			where
309				P: $pool,
310				F: $fns::FnMut(&Self::Item, &Self::Item) -> Ordering + Clone + $send + 'static,
311				Self::Item: $send + 'static,
312				Self::Task: 'static,
313				Self: Sized,
314			{
315				self.pipe(pool, $pipe::<Self::Item>::min_by(Identity, f))
316					.await
317			}
318
319			#[inline]
320			async fn min_by_key<P, F, B>(self, pool: &P, f: F) -> Option<Self::Item>
321			where
322				P: $pool,
323				F: $fns::FnMut(&Self::Item) -> B + Clone + $send + 'static,
324				B: Ord + 'static,
325				Self::Item: $send + 'static,
326				Self::Task: 'static,
327				Self: Sized,
328			{
329				self.pipe(pool, $pipe::<Self::Item>::min_by_key(Identity, f))
330					.await
331			}
332
333			#[inline]
334			async fn most_frequent<P>(
335				self, pool: &P, n: usize, probability: f64, tolerance: f64,
336			) -> ::amadeus_streaming::Top<Self::Item, usize>
337			where
338				P: $pool,
339				Self::Item: Hash + Eq + Clone + $send + 'static,
340				Self::Task: 'static,
341				Self: Sized,
342			{
343				self.pipe(
344					pool,
345					$pipe::<Self::Item>::most_frequent(Identity, n, probability, tolerance),
346				)
347				.await
348			}
349
350			#[inline]
351			async fn most_distinct<P, A, B>(
352				self, pool: &P, n: usize, probability: f64, tolerance: f64, error_rate: f64,
353			) -> ::amadeus_streaming::Top<A, amadeus_streaming::HyperLogLogMagnitude<B>>
354			where
355				P: $pool,
356				Self: $stream<Item = (A, B)> + Sized,
357				A: Hash + Eq + Clone + $send + 'static,
358				B: Hash + 'static,
359				Self::Task: 'static,
360			{
361				self.pipe(
362					pool,
363					$pipe::<Self::Item>::most_distinct(
364						Identity,
365						n,
366						probability,
367						tolerance,
368						error_rate,
369					),
370				)
371				.await
372			}
373
374			#[inline]
375			async fn sample_unstable<P>(
376				self, pool: &P, samples: usize,
377			) -> ::amadeus_streaming::SampleUnstable<Self::Item>
378			where
379				P: $pool,
380				Self::Item: $send + 'static,
381				Self::Task: 'static,
382				Self: Sized,
383			{
384				self.pipe(
385					pool,
386					$pipe::<Self::Item>::sample_unstable(Identity, samples),
387				)
388				.await
389			}
390
391			#[inline]
392			async fn all<P, F>(self, pool: &P, f: F) -> bool
393			where
394				P: $pool,
395				F: $fns::FnMut(Self::Item) -> bool + Clone + $send + 'static,
396				Self::Item: 'static,
397				Self::Task: 'static,
398				Self: Sized,
399			{
400				self.pipe(pool, $pipe::<Self::Item>::all(Identity, f))
401					.await
402			}
403
404			#[inline]
405			async fn any<P, F>(self, pool: &P, f: F) -> bool
406			where
407				P: $pool,
408				F: $fns::FnMut(Self::Item) -> bool + Clone + $send + 'static,
409				Self::Item: 'static,
410				Self::Task: 'static,
411				Self: Sized,
412			{
413				self.pipe(pool, $pipe::<Self::Item>::any(Identity, f))
414					.await
415			}
416		}
417
418		#[inline(always)]
419		pub(crate) fn $assert_stream<T, I: $stream<Item = T>>(i: I) -> I {
420			i
421		}
422	}
423}
424
425stream!(ParallelStream ParallelPipe ParallelSink FromParallelStream IntoParallelStream into_par_stream ParStream ThreadPool Send ops assert_parallel_stream {
426	async fn reduce<P, B, R1, R3>(mut self, pool: &P, reduce_a: R1, reduce_c: R3) -> B
427	where
428		P: ThreadPool,
429		R1: ReducerSend<Self::Item> + Clone + Send + 'static,
430		R3: Reducer<<R1 as ReducerSend<Self::Item>>::Done, Done = B>,
431		Self::Task: 'static,
432		Self: Sized,
433	{
434		let self_ = self;
435		pin_mut!(self_);
436		// TODO: don't buffer tasks before sending. requires changes to ThreadPool
437		let mut tasks = (0..pool.threads()).map(|_| Vec::new()).collect::<Vec<_>>();
438		let mut allocated = 0;
439		'a: loop {
440			for i in 0..tasks.len() {
441				loop {
442					let (mut lower, _upper) = self_.size_hint();
443					if lower == 0 {
444						lower = 1;
445					}
446					let mut batch = (allocated + lower) / tasks.len();
447					if i < (allocated + lower) % tasks.len() {
448						batch += 1;
449					}
450					batch -= tasks[i].len();
451					if batch == 0 {
452						break;
453					}
454					for _ in 0..batch {
455						if let Some(task) = future::poll_fn(|cx| self_.as_mut().next_task(cx)).await {
456							tasks[i].push(task);
457							allocated += 1;
458						} else {
459							break 'a;
460						}
461					}
462				}
463			}
464		}
465		for (i, task) in tasks.iter().enumerate() {
466			let mut count = allocated / tasks.len();
467			if i < allocated % tasks.len() {
468				count += 1;
469			}
470			assert_eq!(
471				task.len(),
472				count,
473				"alloc: {:#?}",
474				tasks.iter().map(Vec::len).collect::<Vec<_>>()
475			);
476		}
477
478		let handles = tasks
479			.into_iter()
480			.filter(|tasks| !tasks.is_empty())
481			.map(|tasks| {
482				let reduce_a = reduce_a.clone();
483				pool.spawn(move || async move {
484					let sink = reduce_a.into_async();
485					pin_mut!(sink);
486					// this is faster than stream::iter(tasks.into_iter().map(StreamTask::into_async)).flatten().sink(sink).await
487					for task in tasks.into_iter().map(StreamTask::into_async) {
488							pin_mut!(task);
489							if let Some(ret) = sink.send_all(&mut task).await {
490									return ret;
491							}
492					}
493					sink.done().await
494				})
495			})
496			.collect::<futures::stream::FuturesUnordered<_>>();
497		let stream = handles.map(|item| {
498			item.unwrap_or_else(|err| panic!("Amadeus: task '<unnamed>' panicked at '{}'", err))
499		});
500		let reduce_c = reduce_c.into_async();
501		pin_mut!(reduce_c);
502		stream.sink(reduce_c).await
503	}
504
505	async fn pipe<P, ParSink, A>(self, pool: &P, sink: ParSink) -> A
506	where
507		P: ThreadPool,
508		ParSink: ParallelSink<Self::Item, Done = A>,
509		<ParSink::Pipe as ParallelPipe<Self::Item>>::Task: 'static,
510		ParSink::ReduceA: 'static,
511		Self::Task: 'static,
512		Self: Sized,
513	{
514		let (iterator, reducer_a, reducer_b) = sink.reducers();
515		Pipe::new(self, iterator)
516			.reduce(pool, reducer_a, reducer_b)
517			.await
518	}
519
520	// These messy bounds are unfortunately necessary as requiring 'static in ParallelSink breaks sink_b being e.g. Identity.count()
521	async fn fork<P, ParSinkA, ParSinkB, A, B>(
522		self, pool: &P, sink_a: ParSinkA, sink_b: ParSinkB,
523	) -> (A, B)
524	where
525		P: ThreadPool,
526		ParSinkA: ParallelSink<Self::Item, Done = A>,
527		ParSinkB: for<'a> ParallelSink<&'a Self::Item, Done = B> + 'static,
528		<ParSinkA::Pipe as ParallelPipe<Self::Item>>::Task: 'static,
529		ParSinkA::ReduceA: 'static,
530		<ParSinkB as ParallelSink<&'static Self::Item>>::ReduceA: 'static,
531		<<ParSinkB as ParallelSink<&'static Self::Item>>::Pipe as ParallelPipe<
532			&'static Self::Item,
533		>>::Task: 'static,
534		Self::Item: 'static,
535		Self::Task: 'static,
536		Self: Sized,
537	{
538		let (iterator_a, reducer_a_a, reducer_a_b) = sink_a.reducers();
539		let (iterator_b, reducer_b_a, reducer_b_b) = sink_b.reducers();
540		Fork::new(self, iterator_a, iterator_b)
541			.reduce(
542				pool,
543				ReduceA2::new(reducer_a_a, reducer_b_a),
544				ReduceC2::new(reducer_a_b, reducer_b_b),
545			)
546			.await
547	}
548
549	async fn group_by<P, S, A, B>(self, pool: &P, sink: S) -> IndexMap<A, S::Done>
550	where
551		P: ThreadPool,
552		A: Eq + Hash + Send + 'static,
553		B: 'static,
554		S: ParallelSink<B>,
555		<S::Pipe as ParallelPipe<B>>::Task: Clone + Send + 'static,
556		S::ReduceA: 'static,
557		S::ReduceC: Clone,
558		S::Done: Send + 'static,
559		Self::Task: 'static,
560		Self: ParallelStream<Item = (A, B)> + Sized,
561	{
562		self.pipe(pool, ParallelPipe::<Self::Item>::group_by(Identity, sink))
563			.await
564	}
565
566	async fn collect<P, B>(self, pool: &P) -> B
567	where
568		P: ThreadPool,
569		B: FromParallelStream<Self::Item>,
570		B::ReduceA: Send + 'static,
571		Self::Task: 'static,
572		Self: Sized,
573	{
574		self.pipe(pool, ParallelPipe::<Self::Item>::collect(Identity))
575			.await
576	}
577});
578
579stream!(DistributedStream DistributedPipe DistributedSink FromDistributedStream IntoDistributedStream into_dist_stream DistStream ProcessPool ProcessSend traits assert_distributed_stream cfg_attr(not(nightly), serde_closure::desugar) {
580	async fn reduce<P, B, R1, R2, R3>(
581		mut self, pool: &P, reduce_a: R1, reduce_b: R2, reduce_c: R3,
582	) -> B
583	where
584		P: ProcessPool,
585		R1: ReducerSend<Self::Item> + Clone + ProcessSend + 'static,
586		R2: ReducerProcessSend<<R1 as ReducerSend<Self::Item>>::Done>
587			+ Clone
588			+ ProcessSend
589			+ 'static,
590		R3: Reducer<
591			<R2 as ReducerProcessSend<<R1 as ReducerSend<Self::Item>>::Done>>::Done,
592			Done = B,
593		>,
594		Self::Task: 'static,
595		Self: Sized,
596	{
597		let self_ = self;
598		pin_mut!(self_);
599		// TODO: don't buffer tasks before sending. requires changes to ProcessPool
600		let mut tasks = (0..pool.processes()).map(|_| Vec::new()).collect::<Vec<_>>();
601		let mut allocated = 0;
602		'a: loop {
603			for i in 0..tasks.len() {
604				loop {
605					let (mut lower, _upper) = self_.size_hint();
606					if lower == 0 {
607						lower = 1;
608					}
609					let mut batch = (allocated + lower) / tasks.len();
610					if i < (allocated + lower) % tasks.len() {
611						batch += 1;
612					}
613					batch -= tasks[i].len();
614					if batch == 0 {
615						break;
616					}
617					for _ in 0..batch {
618						if let Some(task) = future::poll_fn(|cx| self_.as_mut().next_task(cx)).await {
619							tasks[i].push(task);
620							allocated += 1;
621						} else {
622							break 'a;
623						}
624					}
625				}
626			}
627		}
628		for (i, task) in tasks.iter().enumerate() {
629			let mut count = allocated / tasks.len();
630			if i < allocated % tasks.len() {
631				count += 1;
632			}
633			assert_eq!(
634				task.len(),
635				count,
636				"alloc: {:#?}",
637				tasks.iter().map(Vec::len).collect::<Vec<_>>()
638			);
639		}
640
641		let handles = tasks
642			.into_iter()
643			.filter(|tasks| !tasks.is_empty())
644			.map(|tasks| {
645				let reduce_b = reduce_b.clone();
646				let reduce_a = reduce_a.clone();
647				pool.spawn(FnOnce!(move |pool: &P::ThreadPool| {
648					let mut process_tasks = tasks.into_iter();
649
650					let mut tasks = (0..pool.threads()).map(|_| Vec::new()).collect::<Vec<_>>();
651					let mut allocated = 0;
652					'a: loop {
653						for i in 0..tasks.len() {
654							loop {
655								let (mut lower, _upper) = process_tasks.size_hint();
656								if lower == 0 {
657									lower = 1;
658								}
659								let mut batch = (allocated + lower) / tasks.len();
660								if i < (allocated + lower) % tasks.len() {
661									batch += 1;
662								}
663								batch -= tasks[i].len();
664								if batch == 0 {
665									break;
666								}
667								for _ in 0..batch {
668									if let Some(task) = process_tasks.next() {
669										tasks[i].push(task);
670										allocated += 1;
671									} else {
672										break 'a;
673									}
674								}
675							}
676						}
677					}
678					for (i, task) in tasks.iter().enumerate() {
679						let mut count = allocated / tasks.len();
680						if i < allocated % tasks.len() {
681							count += 1;
682						}
683						assert_eq!(
684							task.len(),
685							count,
686							"alloc: {:#?}",
687							tasks.iter().map(Vec::len).collect::<Vec<_>>()
688						);
689					}
690					let handles = tasks
691						.into_iter()
692						.filter(|tasks| !tasks.is_empty())
693						.map(|tasks| {
694							let reduce_a = reduce_a.clone();
695							pool.spawn(move || async move {
696								let sink = reduce_a.into_async();
697								pin_mut!(sink);
698								// this is faster than stream::iter(tasks.into_iter().map(StreamTask::into_async)).flatten().sink(sink).await
699								for task in tasks.into_iter().map(StreamTask::into_async) {
700										pin_mut!(task);
701										if let Some(ret) = sink.send_all(&mut task).await {
702												return ret;
703										}
704								}
705								sink.done().await
706							})
707						})
708						.collect::<futures::stream::FuturesUnordered<_>>();
709
710					let stream = handles.map(|item| {
711						item.unwrap_or_else(|err| {
712							panic!("Amadeus: task '<unnamed>' panicked at '{}'", err)
713						})
714					});
715					let reduce_b = reduce_b.into_async();
716					async move {
717						pin_mut!(reduce_b);
718						stream.sink(reduce_b).await
719					}
720				}))
721			})
722			.collect::<futures::stream::FuturesUnordered<_>>();
723		let stream = handles.map(|item| {
724			item.unwrap_or_else(|err| panic!("Amadeus: task '<unnamed>' panicked at '{}'", err))
725		});
726		let reduce_c = reduce_c.into_async();
727		pin_mut!(reduce_c);
728		stream.sink(reduce_c).await
729	}
730
731	async fn pipe<P, DistSink, A>(self, pool: &P, sink: DistSink) -> A
732	where
733		P: ProcessPool,
734		DistSink: DistributedSink<Self::Item, Done = A>,
735		<DistSink::Pipe as DistributedPipe<Self::Item>>::Task: 'static,
736		DistSink::ReduceA: 'static,
737		DistSink::ReduceB: 'static,
738		Self::Task: 'static,
739		Self: Sized,
740	{
741		let (iterator, reducer_a, reducer_b, reducer_c) = sink.reducers();
742		Pipe::new(self, iterator)
743			.reduce(pool, reducer_a, reducer_b, reducer_c)
744			.await
745	}
746
747	// These messy bounds are unfortunately necessary as requiring 'static in DistributedSink breaks sink_b being e.g. Identity.count()
748	async fn fork<P, DistSinkA, DistSinkB, A, B>(
749		self, pool: &P, sink_a: DistSinkA, sink_b: DistSinkB,
750	) -> (A, B)
751	where
752		P: ProcessPool,
753		DistSinkA: DistributedSink<Self::Item, Done = A>,
754		DistSinkB: for<'a> DistributedSink<&'a Self::Item, Done = B> + 'static,
755		<DistSinkA::Pipe as DistributedPipe<Self::Item>>::Task: 'static,
756		DistSinkA::ReduceA: 'static,
757		DistSinkA::ReduceB: 'static,
758		<DistSinkB as DistributedSink<&'static Self::Item>>::ReduceA: 'static,
759		<DistSinkB as DistributedSink<&'static Self::Item>>::ReduceB: 'static,
760		<<DistSinkB as DistributedSink<&'static Self::Item>>::Pipe as DistributedPipe<
761			&'static Self::Item,
762		>>::Task: 'static,
763		Self::Item: 'static,
764		Self::Task: 'static,
765		Self: Sized,
766	{
767		let (iterator_a, reducer_a_a, reducer_a_b, reducer_a_c) = sink_a.reducers();
768		let (iterator_b, reducer_b_a, reducer_b_b, reducer_b_c) = sink_b.reducers();
769		Fork::new(self, iterator_a, iterator_b)
770			.reduce(
771				pool,
772				ReduceA2::new(reducer_a_a, reducer_b_a),
773				ReduceC2::new(reducer_a_b, reducer_b_b),
774				ReduceC2::new(reducer_a_c, reducer_b_c),
775			)
776			.await
777	}
778
779	async fn group_by<P, S, A, B>(self, pool: &P, sink: S) -> IndexMap<A, S::Done>
780	where
781		P: ProcessPool,
782		A: Eq + Hash + ProcessSend + 'static,
783		B: 'static,
784		S: DistributedSink<B>,
785		<S::Pipe as DistributedPipe<B>>::Task: Clone + ProcessSend + 'static,
786		S::ReduceA: 'static,
787		S::ReduceB: 'static,
788		S::ReduceC: Clone,
789		S::Done: ProcessSend + 'static,
790		Self::Task: 'static,
791		Self: DistributedStream<Item = (A, B)> + Sized,
792	{
793		self.pipe(
794			pool,
795			DistributedPipe::<Self::Item>::group_by(Identity, sink),
796		)
797		.await
798	}
799
800	async fn collect<P, B>(self, pool: &P) -> B
801	where
802		P: ProcessPool,
803		B: FromDistributedStream<Self::Item>,
804		B::ReduceA: ProcessSend + 'static,
805		B::ReduceB: ProcessSend + 'static,
806		Self::Task: 'static,
807		Self: Sized,
808	{
809		self.pipe(pool, DistributedPipe::<Self::Item>::collect(Identity))
810			.await
811	}
812});