1#![allow(unused_qualifications)]
2
3use either::Either;
4use futures::Stream;
5use serde_closure::traits;
6use std::{cmp::Ordering, hash::Hash, iter, ops};
7
8use super::{par_sink::*, par_stream::*};
9use crate::{pipe::Pipe, pool::ProcessSend};
10
11#[must_use]
12pub trait PipeTask<Input> {
13 type Output;
14 type Async: Pipe<Input, Output = Self::Output>;
15
16 fn into_async(self) -> Self::Async;
17}
18
19macro_rules! pipe {
20 ($pipe:ident $sink:ident $from_sink:ident $send:ident $fns:ident $assert_pipe:ident $assert_sink:ident $($meta:meta)*) => {
21 $(#[$meta])*
22 #[must_use]
23 pub trait $pipe<Input> {
24 type Output;
25 type Task: PipeTask<Input, Output = Self::Output> + $send;
26
27 fn task(&self) -> Self::Task;
28
29 #[inline]
30 fn inspect<F>(self, f: F) -> Inspect<Self, F>
31 where
32 F: $fns::FnMut(&Self::Output) + Clone + $send + 'static,
33 Self: Sized,
34 {
35 $assert_pipe(Inspect::new(self, f))
36 }
37
38 #[inline]
39 fn update<F>(self, f: F) -> Update<Self, F>
40 where
41 F: $fns::FnMut(&mut Self::Output) + Clone + $send + 'static,
42 Self: Sized,
43 {
44 $assert_pipe(Update::new(self, f))
45 }
46
47 #[inline]
48 fn map<B, F>(self, f: F) -> Map<Self, F>
49 where
50 F: $fns::FnMut(Self::Output) -> B + Clone + $send + 'static,
51 Self: Sized,
52 {
53 $assert_pipe(Map::new(self, f))
54 }
55
56 #[inline]
57 fn flat_map<B, F>(self, f: F) -> FlatMap<Self, F>
58 where
59 F: $fns::FnMut(Self::Output) -> B + Clone + $send + 'static,
60 B: Stream,
61 Self: Sized,
62 {
63 $assert_pipe(FlatMap::new(self, f))
64 }
65
66 #[inline]
67 fn filter<F>(self, f: F) -> Filter<Self, F>
68 where
69 F: $fns::FnMut(&Self::Output) -> bool + Clone + $send + 'static,
70 Self: Sized,
71 {
72 $assert_pipe(Filter::new(self, f))
73 }
74
75 #[inline]
76 fn cloned<'a, T>(self) -> Cloned<Self, T, Input>
77 where
78 T: Clone + 'a,
79 Input: 'a,
80 Self: $pipe<&'a Input, Output = &'a T> + Sized,
81 {
82 $assert_pipe(Cloned::new(self))
83 }
84
85 #[inline]
86 fn left_join<K, V1, V2>(self, right: impl IntoIterator<Item = (K, V2)>) -> LeftJoin<Self, K, V1, V2>
87 where
88 K: Eq + Hash + Clone + $send + 'static,
89 V1: 'static,
90 V2: Clone + $send + 'static,
91 Self: $pipe<Input, Output = (K, V1)> + Sized,
92 {
93 $assert_pipe(LeftJoin::new(self, right.into_iter().collect()))
94 }
95
96 #[inline]
97 fn inner_join<K, V1, V2>(self, right: impl IntoIterator<Item = (K, V2)>) -> InnerJoin<Self, K, V1, V2>
98 where
99 K: Eq + Hash + Clone + $send + 'static,
100 V1: 'static,
101 V2: Clone + $send + 'static,
102 Self: $pipe<Input, Output = (K, V1)> + Sized,
103 {
104 $assert_pipe(InnerJoin::new(self, right.into_iter().collect()))
105 }
106
107 #[inline]
117 fn pipe<S>(self, sink: S) -> super::par_sink::Pipe<Self, S>
118 where
119 S: $sink<Self::Output>,
120 Self: Sized,
121 {
122 $assert_sink(super::par_sink::Pipe::new(self, sink))
123 }
124
125 #[inline]
126 fn fork<A, B, RefAItem>(
127 self, sink: A, sink_ref: B,
128 ) -> Fork<Self, A, B, &'static Self::Output>
129 where
130 A: $sink<Self::Output>,
131 B: for<'a> $sink<&'a Self::Output>,
132 Self: Sized,
133 {
134 $assert_sink(Fork::new(self, sink, sink_ref))
135 }
136
137 #[inline]
138 fn for_each<F>(self, f: F) -> ForEach<Self, F>
139 where
140 F: $fns::FnMut(Self::Output) + Clone + $send + 'static,
141 Self: Sized,
142 {
143 $assert_sink(ForEach::new(self, f))
144 }
145
146 #[inline]
147 fn fold<ID, F, B>(self, identity: ID, op: F) -> Fold<Self, ID, F, B>
148 where
149 ID: $fns::FnMut() -> B + Clone + $send + 'static,
150 F: $fns::FnMut(B, Either<Self::Output, B>) -> B + Clone + $send + 'static,
151 B: $send + 'static,
152 Self: Sized,
153 {
154 $assert_sink(Fold::new(self, identity, op))
155 }
156
157 #[inline]
158 fn group_by<S, A, B>(self, sink: S) -> GroupBy<Self, S>
159 where
160 A: Eq + Hash + $send + 'static,
161 S: $sink<B>,
162 <S::Pipe as $pipe<B>>::Task: Clone + $send + 'static,
163 S::ReduceA: 'static,
164 S::ReduceC: Clone,
165 S::Done: $send + 'static,
166 Self: $pipe<Input, Output = (A, B)> + Sized,
167 {
168 $assert_sink(GroupBy::new(self, sink))
169 }
170
171 #[inline]
172 fn histogram(self) -> Histogram<Self>
173 where
174 Self::Output: Hash + Ord + $send + 'static,
175 Self: Sized,
176 {
177 $assert_sink(Histogram::new(self))
178 }
179
180 #[inline]
181 fn sort_n_by<F>(self, n: usize, cmp: F) -> Sort<Self, F>
182 where
183 F: $fns::Fn(&Self::Output, &Self::Output) -> Ordering + Clone + $send + 'static,
184 Self::Output: Clone + $send + 'static,
185 Self: Sized,
186 {
187 $assert_sink(Sort::new(self, cmp, n))
188 }
189
190 #[inline]
191 fn count(self) -> Count<Self>
192 where
193 Self: Sized,
194 {
195 $assert_sink(Count::new(self))
196 }
197
198 #[inline]
199 fn sum<B>(self) -> Sum<Self, B>
200 where
201 B: iter::Sum<Self::Output> + iter::Sum<B> + $send + 'static,
202 Self: Sized,
203 {
204 $assert_sink(Sum::new(self))
205 }
206
207 #[inline]
208 fn mean(self) -> Mean<Self>
209 where
210 Self: $pipe<Input, Output = f64> + Sized,
211 {
212 $assert_sink(Mean::new(self))
213 }
214
215 #[inline]
216 fn stddev(self) -> StdDev<Self>
217 where
218 Self: $pipe<Input, Output = f64> + Sized,
219 {
220 $assert_sink(StdDev::new(self))
221 }
222
223 #[inline]
224 fn combine<F>(self, f: F) -> Combine<Self, F>
225 where
226 F: $fns::FnMut(Self::Output, Self::Output) -> Self::Output + Clone + $send + 'static,
227 Self::Output: $send + 'static,
228 Self: Sized,
229 {
230 $assert_sink(Combine::new(self, f))
231 }
232
233 #[inline]
234 fn max(self) -> Max<Self>
235 where
236 Self::Output: Ord + $send + 'static,
237 Self: Sized,
238 {
239 $assert_sink(Max::new(self))
240 }
241
242 #[inline]
243 fn max_by<F>(self, f: F) -> MaxBy<Self, F>
244 where
245 F: $fns::FnMut(&Self::Output, &Self::Output) -> Ordering + Clone + $send + 'static,
246 Self::Output: $send + 'static,
247 Self: Sized,
248 {
249 $assert_sink(MaxBy::new(self, f))
250 }
251
252 #[inline]
253 fn max_by_key<F, B>(self, f: F) -> MaxByKey<Self, F>
254 where
255 F: $fns::FnMut(&Self::Output) -> B + Clone + $send + 'static,
256 B: Ord + 'static,
257 Self::Output: $send + 'static,
258 Self: Sized,
259 {
260 $assert_sink(MaxByKey::new(self, f))
261 }
262
263 #[inline]
264 fn min(self) -> Min<Self>
265 where
266 Self::Output: Ord + $send + 'static,
267 Self: Sized,
268 {
269 $assert_sink(Min::new(self))
270 }
271
272 #[inline]
273 fn min_by<F>(self, f: F) -> MinBy<Self, F>
274 where
275 F: $fns::FnMut(&Self::Output, &Self::Output) -> Ordering + Clone + $send + 'static,
276 Self::Output: $send + 'static,
277 Self: Sized,
278 {
279 $assert_sink(MinBy::new(self, f))
280 }
281
282 #[inline]
283 fn min_by_key<F, B>(self, f: F) -> MinByKey<Self, F>
284 where
285 F: $fns::FnMut(&Self::Output) -> B + Clone + $send + 'static,
286 B: Ord + 'static,
287 Self::Output: $send + 'static,
288 Self: Sized,
289 {
290 $assert_sink(MinByKey::new(self, f))
291 }
292
293 #[inline]
294 fn most_frequent(self, n: usize, probability: f64, tolerance: f64) -> MostFrequent<Self>
295 where
296 Self::Output: Hash + Eq + Clone + $send + 'static,
297 Self: Sized,
298 {
299 $assert_sink(MostFrequent::new(self, n, probability, tolerance))
300 }
301
302 #[inline]
303 fn most_distinct<A, B>(
304 self, n: usize, probability: f64, tolerance: f64, error_rate: f64,
305 ) -> MostDistinct<Self>
306 where
307 Self: $pipe<Input, Output = (A, B)> + Sized,
308 A: Hash + Eq + Clone + $send + 'static,
309 B: Hash + 'static,
310 {
311 $assert_sink(MostDistinct::new(
312 self,
313 n,
314 probability,
315 tolerance,
316 error_rate,
317 ))
318 }
319
320 #[inline]
321 fn sample_unstable(self, samples: usize) -> SampleUnstable<Self>
322 where
323 Self::Output: $send + 'static,
324 Self: Sized,
325 {
326 $assert_sink(SampleUnstable::new(self, samples))
327 }
328
329 #[inline]
330 fn all<F>(self, f: F) -> All<Self, F>
331 where
332 F: $fns::FnMut(Self::Output) -> bool + Clone + $send + 'static,
333 Self: Sized,
334 {
335 $assert_sink(All::new(self, f))
336 }
337
338 #[inline]
339 fn any<F>(self, f: F) -> Any<Self, F>
340 where
341 F: $fns::FnMut(Self::Output) -> bool + Clone + $send + 'static,
342 Self: Sized,
343 {
344 $assert_sink(Any::new(self, f))
345 }
346
347 #[inline]
348 fn collect<B>(self) -> Collect<Self, B>
349 where
350 B: $from_sink<Self::Output>,
351 Self: Sized,
352 {
353 $assert_sink(Collect::new(self))
354 }
355 }
356
357 #[inline(always)]
358 pub(crate) fn $assert_pipe<T, I: $pipe<Input, Output = T>, Input>(i: I) -> I {
359 i
360 }
361 };
362}
363
364pipe!(ParallelPipe ParallelSink FromParallelStream Send ops assert_parallel_pipe assert_parallel_sink);
365pipe!(DistributedPipe DistributedSink FromDistributedStream ProcessSend traits assert_distributed_pipe assert_distributed_sink cfg_attr(not(nightly), serde_closure::desugar));