amadeus_core/
par_pipe.rs

1#![allow(unused_qualifications)]
2
3use either::Either;
4use futures::Stream;
5use serde_closure::traits;
6use std::{cmp::Ordering, hash::Hash, iter, ops};
7
8use super::{par_sink::*, par_stream::*};
9use crate::{pipe::Pipe, pool::ProcessSend};
10
11#[must_use]
12pub trait PipeTask<Input> {
13	type Output;
14	type Async: Pipe<Input, Output = Self::Output>;
15
16	fn into_async(self) -> Self::Async;
17}
18
19macro_rules! pipe {
20	($pipe:ident $sink:ident $from_sink:ident $send:ident $fns:ident $assert_pipe:ident $assert_sink:ident $($meta:meta)*) => {
21		$(#[$meta])*
22		#[must_use]
23		pub trait $pipe<Input> {
24			type Output;
25			type Task: PipeTask<Input, Output = Self::Output> + $send;
26
27			fn task(&self) -> Self::Task;
28
29			#[inline]
30			fn inspect<F>(self, f: F) -> Inspect<Self, F>
31			where
32				F: $fns::FnMut(&Self::Output) + Clone + $send + 'static,
33				Self: Sized,
34			{
35				$assert_pipe(Inspect::new(self, f))
36			}
37
38			#[inline]
39			fn update<F>(self, f: F) -> Update<Self, F>
40			where
41				F: $fns::FnMut(&mut Self::Output) + Clone + $send + 'static,
42				Self: Sized,
43			{
44				$assert_pipe(Update::new(self, f))
45			}
46
47			#[inline]
48			fn map<B, F>(self, f: F) -> Map<Self, F>
49			where
50				F: $fns::FnMut(Self::Output) -> B + Clone + $send + 'static,
51				Self: Sized,
52			{
53				$assert_pipe(Map::new(self, f))
54			}
55
56			#[inline]
57			fn flat_map<B, F>(self, f: F) -> FlatMap<Self, F>
58			where
59				F: $fns::FnMut(Self::Output) -> B + Clone + $send + 'static,
60				B: Stream,
61				Self: Sized,
62			{
63				$assert_pipe(FlatMap::new(self, f))
64			}
65
66			#[inline]
67			fn filter<F>(self, f: F) -> Filter<Self, F>
68			where
69				F: $fns::FnMut(&Self::Output) -> bool + Clone + $send + 'static,
70				Self: Sized,
71			{
72				$assert_pipe(Filter::new(self, f))
73			}
74
75			#[inline]
76			fn cloned<'a, T>(self) -> Cloned<Self, T, Input>
77			where
78				T: Clone + 'a,
79				Input: 'a,
80				Self: $pipe<&'a Input, Output = &'a T> + Sized,
81			{
82				$assert_pipe(Cloned::new(self))
83			}
84
85			#[inline]
86			fn left_join<K, V1, V2>(self, right: impl IntoIterator<Item = (K, V2)>) -> LeftJoin<Self, K, V1, V2>
87			where
88				K: Eq + Hash + Clone + $send + 'static,
89				V1: 'static,
90				V2: Clone + $send + 'static,
91				Self: $pipe<Input, Output = (K, V1)> + Sized,
92			{
93				$assert_pipe(LeftJoin::new(self, right.into_iter().collect()))
94			}
95
96			#[inline]
97			fn inner_join<K, V1, V2>(self, right: impl IntoIterator<Item = (K, V2)>) -> InnerJoin<Self, K, V1, V2>
98			where
99				K: Eq + Hash + Clone + $send + 'static,
100				V1: 'static,
101				V2: Clone + $send + 'static,
102				Self: $pipe<Input, Output = (K, V1)> + Sized,
103			{
104				$assert_pipe(InnerJoin::new(self, right.into_iter().collect()))
105			}
106
107			// #[must_use]
108			// fn chain<C>(self, chain: C) -> Chain<Self, C::Iter>
109			// where
110			// 	C: IntoParallelStream<Output = Self::Output>,
111			// 	Self: Sized,
112			// {
113			// 	$assert_pipe(Chain::new(self, chain.into_par_stream()))
114			// }
115
116			#[inline]
117			fn pipe<S>(self, sink: S) -> super::par_sink::Pipe<Self, S>
118			where
119				S: $sink<Self::Output>,
120				Self: Sized,
121			{
122				$assert_sink(super::par_sink::Pipe::new(self, sink))
123			}
124
125			#[inline]
126			fn fork<A, B, RefAItem>(
127				self, sink: A, sink_ref: B,
128			) -> Fork<Self, A, B, &'static Self::Output>
129			where
130				A: $sink<Self::Output>,
131				B: for<'a> $sink<&'a Self::Output>,
132				Self: Sized,
133			{
134				$assert_sink(Fork::new(self, sink, sink_ref))
135			}
136
137			#[inline]
138			fn for_each<F>(self, f: F) -> ForEach<Self, F>
139			where
140				F: $fns::FnMut(Self::Output) + Clone + $send + 'static,
141				Self: Sized,
142			{
143				$assert_sink(ForEach::new(self, f))
144			}
145
146			#[inline]
147			fn fold<ID, F, B>(self, identity: ID, op: F) -> Fold<Self, ID, F, B>
148			where
149				ID: $fns::FnMut() -> B + Clone + $send + 'static,
150				F: $fns::FnMut(B, Either<Self::Output, B>) -> B + Clone + $send + 'static,
151				B: $send + 'static,
152				Self: Sized,
153			{
154				$assert_sink(Fold::new(self, identity, op))
155			}
156
157			#[inline]
158			fn group_by<S, A, B>(self, sink: S) -> GroupBy<Self, S>
159			where
160				A: Eq + Hash + $send + 'static,
161				S: $sink<B>,
162				<S::Pipe as $pipe<B>>::Task: Clone + $send + 'static,
163				S::ReduceA: 'static,
164				S::ReduceC: Clone,
165				S::Done: $send + 'static,
166				Self: $pipe<Input, Output = (A, B)> + Sized,
167			{
168				$assert_sink(GroupBy::new(self, sink))
169			}
170
171			#[inline]
172			fn histogram(self) -> Histogram<Self>
173			where
174				Self::Output: Hash + Ord + $send + 'static,
175				Self: Sized,
176			{
177				$assert_sink(Histogram::new(self))
178			}
179
180			#[inline]
181			fn sort_n_by<F>(self, n: usize, cmp: F) -> Sort<Self, F>
182			where
183				F: $fns::Fn(&Self::Output, &Self::Output) -> Ordering + Clone + $send + 'static,
184				Self::Output: Clone + $send + 'static,
185				Self: Sized,
186			{
187				$assert_sink(Sort::new(self, cmp, n))
188			}
189
190			#[inline]
191			fn count(self) -> Count<Self>
192			where
193				Self: Sized,
194			{
195				$assert_sink(Count::new(self))
196			}
197
198			#[inline]
199			fn sum<B>(self) -> Sum<Self, B>
200			where
201				B: iter::Sum<Self::Output> + iter::Sum<B> + $send + 'static,
202				Self: Sized,
203			{
204				$assert_sink(Sum::new(self))
205			}
206
207			#[inline]
208			fn mean(self) -> Mean<Self>
209			where
210				Self: $pipe<Input, Output = f64> + Sized,
211			{
212				$assert_sink(Mean::new(self))
213			}
214
215			#[inline]
216			fn stddev(self) -> StdDev<Self>
217			where
218				Self: $pipe<Input, Output = f64> + Sized,
219			{
220				$assert_sink(StdDev::new(self))
221			}
222
223			#[inline]
224			fn combine<F>(self, f: F) -> Combine<Self, F>
225			where
226				F: $fns::FnMut(Self::Output, Self::Output) -> Self::Output + Clone + $send + 'static,
227				Self::Output: $send + 'static,
228				Self: Sized,
229			{
230				$assert_sink(Combine::new(self, f))
231			}
232
233			#[inline]
234			fn max(self) -> Max<Self>
235			where
236				Self::Output: Ord + $send + 'static,
237				Self: Sized,
238			{
239				$assert_sink(Max::new(self))
240			}
241
242			#[inline]
243			fn max_by<F>(self, f: F) -> MaxBy<Self, F>
244			where
245				F: $fns::FnMut(&Self::Output, &Self::Output) -> Ordering + Clone + $send + 'static,
246				Self::Output: $send + 'static,
247				Self: Sized,
248			{
249				$assert_sink(MaxBy::new(self, f))
250			}
251
252			#[inline]
253			fn max_by_key<F, B>(self, f: F) -> MaxByKey<Self, F>
254			where
255				F: $fns::FnMut(&Self::Output) -> B + Clone + $send + 'static,
256				B: Ord + 'static,
257				Self::Output: $send + 'static,
258				Self: Sized,
259			{
260				$assert_sink(MaxByKey::new(self, f))
261			}
262
263			#[inline]
264			fn min(self) -> Min<Self>
265			where
266				Self::Output: Ord + $send + 'static,
267				Self: Sized,
268			{
269				$assert_sink(Min::new(self))
270			}
271
272			#[inline]
273			fn min_by<F>(self, f: F) -> MinBy<Self, F>
274			where
275				F: $fns::FnMut(&Self::Output, &Self::Output) -> Ordering + Clone + $send + 'static,
276				Self::Output: $send + 'static,
277				Self: Sized,
278			{
279				$assert_sink(MinBy::new(self, f))
280			}
281
282			#[inline]
283			fn min_by_key<F, B>(self, f: F) -> MinByKey<Self, F>
284			where
285				F: $fns::FnMut(&Self::Output) -> B + Clone + $send + 'static,
286				B: Ord + 'static,
287				Self::Output: $send + 'static,
288				Self: Sized,
289			{
290				$assert_sink(MinByKey::new(self, f))
291			}
292
293			#[inline]
294			fn most_frequent(self, n: usize, probability: f64, tolerance: f64) -> MostFrequent<Self>
295			where
296				Self::Output: Hash + Eq + Clone + $send + 'static,
297				Self: Sized,
298			{
299				$assert_sink(MostFrequent::new(self, n, probability, tolerance))
300			}
301
302			#[inline]
303			fn most_distinct<A, B>(
304				self, n: usize, probability: f64, tolerance: f64, error_rate: f64,
305			) -> MostDistinct<Self>
306			where
307				Self: $pipe<Input, Output = (A, B)> + Sized,
308				A: Hash + Eq + Clone + $send + 'static,
309				B: Hash + 'static,
310			{
311				$assert_sink(MostDistinct::new(
312					self,
313					n,
314					probability,
315					tolerance,
316					error_rate,
317				))
318			}
319
320			#[inline]
321			fn sample_unstable(self, samples: usize) -> SampleUnstable<Self>
322			where
323				Self::Output: $send + 'static,
324				Self: Sized,
325			{
326				$assert_sink(SampleUnstable::new(self, samples))
327			}
328
329			#[inline]
330			fn all<F>(self, f: F) -> All<Self, F>
331			where
332				F: $fns::FnMut(Self::Output) -> bool + Clone + $send + 'static,
333				Self: Sized,
334			{
335				$assert_sink(All::new(self, f))
336			}
337
338			#[inline]
339			fn any<F>(self, f: F) -> Any<Self, F>
340			where
341				F: $fns::FnMut(Self::Output) -> bool + Clone + $send + 'static,
342				Self: Sized,
343			{
344				$assert_sink(Any::new(self, f))
345			}
346
347			#[inline]
348			fn collect<B>(self) -> Collect<Self, B>
349			where
350				B: $from_sink<Self::Output>,
351				Self: Sized,
352			{
353				$assert_sink(Collect::new(self))
354			}
355		}
356
357		#[inline(always)]
358		pub(crate) fn $assert_pipe<T, I: $pipe<Input, Output = T>, Input>(i: I) -> I {
359			i
360		}
361	};
362}
363
364pipe!(ParallelPipe ParallelSink FromParallelStream Send ops assert_parallel_pipe assert_parallel_sink);
365pipe!(DistributedPipe DistributedSink FromDistributedStream ProcessSend traits assert_distributed_pipe assert_distributed_sink cfg_attr(not(nightly), serde_closure::desugar));