1use crate::{
2 core::Driver, Consumer, Executor, Folder, ParallelExtend, ParallelIterator, Reducer, Setup,
3 WithSetup,
4};
5
6pub struct Partition<X, O> {
9 iterator: X,
10 operation: O,
11}
12
13impl<X, O> Partition<X, O> {
14 pub fn new(iterator: X, operation: O) -> Self {
15 Self {
16 iterator,
17 operation,
18 }
19 }
20}
21
22impl<'a, X, O, P, T> Driver<'a, P, T> for Partition<X, O>
23where
24 X: ParallelIterator<'a>,
25 O: Fn(&X::Item) -> bool + Clone,
26 P: Default + Send + 'a,
27 PartitionExtend<P, MapTwoFn<O>>: ParallelExtend<'a, X::Item, T>,
28 <<PartitionExtend<P, MapTwoFn<O>> as ParallelExtend<'a, X::Item, T>>::Consumer as Consumer<
29 X::Item,
30 >>::Reducer: Reducer<T> + Send,
31 T: Send + 'a,
32{
33 fn exec_with<E>(self, executor: E) -> E::Result
34 where
35 E: Executor<'a, P, T>,
36 {
37 let operation = self.operation;
38 let executor = executor.into_inner();
39 let consumer = PartitionExtend {
40 base: P::default(),
41 operation: Some(MapTwoFn(operation)),
42 }
43 .into_consumer();
44
45 let inner = self.iterator.drive(executor, consumer);
46
47 E::map(inner, |inner| {
48 let ret = PartitionExtend::map_result(inner);
49
50 ret.base
51 })
52 }
53}
54
55pub struct PartitionMap<X, O> {
58 iterator: X,
59 operation: O,
60}
61
62impl<X, O> PartitionMap<X, O> {
63 pub fn new(iterator: X, operation: O) -> Self {
64 Self {
65 iterator,
66 operation,
67 }
68 }
69}
70
71impl<'a, X, O, P, T, R> Driver<'a, P, T> for PartitionMap<X, O>
72where
73 X: ParallelIterator<'a>,
74 O: Fn(X::Item) -> R,
75 P: Default + Send + 'a,
76 PartitionExtend<P, MapAnyFn<O>>: ParallelExtend<'a, X::Item, T>,
77 <<PartitionExtend<P, MapAnyFn<O>> as ParallelExtend<'a, X::Item, T>>::Consumer as Consumer<
78 X::Item,
79 >>::Reducer: Reducer<T> + Send,
80 T: Send + 'a,
81{
82 fn exec_with<E>(self, executor: E) -> E::Result
83 where
84 E: Executor<'a, P, T>,
85 {
86 let operation = self.operation;
87 let executor = executor.into_inner();
88 let consumer = PartitionExtend {
89 base: P::default(),
90 operation: Some(MapAnyFn(operation)),
91 }
92 .into_consumer();
93
94 let inner = self.iterator.drive(executor, consumer);
95
96 E::map(inner, |inner| {
97 let ret = PartitionExtend::map_result(inner);
98
99 ret.base
100 })
101 }
102}
103
104pub struct PartitionExtend<E, O> {
107 base: E,
108 operation: Option<O>,
109}
110
111pub struct PartitionConsumer<C, O> {
112 base: C,
113 operation: O,
114}
115
116pub struct PartitionFolder<F, O> {
117 base: F,
118 operation: O,
119}
120
121pub struct PartitionReducer<R> {
122 base: R,
123}
124
125pub struct PartitionResult<T> {
126 base: T,
127}
128
129pub trait PartitionFn<I> {
130 type Output;
131
132 fn call(&self, item: I) -> Self::Output;
133}
134
135macro_rules! parallel_extend_tuple {
136 (($($E:ident),*), ($($I:ident),*), ($($T:ident),*), ($($C:ident),*), ($($F:ident),*), ($($R:ident),*)) => {
137
138 #[allow(non_snake_case)]
139 impl<'a, O, I, $($E,)+ $($I,)+ $($T,)+> ParallelExtend<'a, I, PartitionResult<($($T,)+)>> for PartitionExtend<($($E,)+), O>
140 where
141 O: PartitionFn<I, Output = ($(Option<$I>,)+)> + Clone + Send + 'a,
142 I: Send + 'a,
143 $($E: ParallelExtend<'a, $I, $T> + Send,)+
144 $(<$E::Consumer as Consumer<$I>>::Reducer: Reducer<$T>,)+
145 $($I: Send + 'a,)+
146 $($T: Send,)+
147 {
148 type Consumer = PartitionConsumer<($($E::Consumer,)+), O>;
149
150 fn into_consumer(self) -> Self::Consumer {
151 let ($($E,)+) = self.base;
152 let operation = self.operation;
153
154 PartitionConsumer {
155 base: ($($E.into_consumer(),)+),
156 operation: operation.unwrap(),
157 }
158 }
159
160 fn map_result(inner: PartitionResult<($($T,)+)>) -> Self {
161 let ($($T,)+) = inner.base;
162
163 PartitionExtend {
164 base: ($($E::map_result($T),)+),
165 operation: None,
166 }
167 }
168 }
169
170 #[allow(non_snake_case)]
171 impl<O, $($C,)+> WithSetup for PartitionConsumer<($($C,)+), O>
172 where
173 $($C: WithSetup,)+
174 {
175 fn setup(&self) -> Setup {
176 let ($($C,)+) = &self.base;
177
178 let mut ret = Setup::default();
179 $(ret = ret.merge($C.setup());)+
180
181 ret
182 }
183 }
184
185 #[allow(non_snake_case)]
186 impl<O, I, $($I,)+ $($C,)+> Consumer<I> for PartitionConsumer<($($C,)+), O>
187 where
188 O: PartitionFn<I, Output = ($(Option<$I>,)+)> + Clone + Send,
189 I: Send,
190 $($I: Send,)+
191 $($C: Consumer<$I>,)+
192 {
193 type Folder = PartitionFolder<($($C::Folder,)+), O>;
194 type Reducer = PartitionReducer<($($C::Reducer,)+)>;
195 type Result = PartitionResult<($($C::Result,)+)>;
196
197 fn split(self) -> (Self, Self, Self::Reducer) {
198 let operation = self.operation;
199 let ($($C,)+) = self.base;
200 let ($($C,)+) = ($($C.split(),)+);
201
202 let left = PartitionConsumer {
203 base: ($($C.0,)+),
204 operation: operation.clone(),
205 };
206 let right = PartitionConsumer {
207 base: ($($C.1,)+),
208 operation,
209 };
210 let reducer = PartitionReducer {
211 base: ($($C.2,)+),
212 };
213
214 (left, right, reducer)
215 }
216
217 fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
218 let operation = self.operation;
219 let ($($C,)+) = self.base;
220 let ($($C,)+) = ($($C.split_at(index),)+);
221
222 let left = PartitionConsumer {
223 base: ($($C.0,)+),
224 operation: operation.clone(),
225 };
226 let right = PartitionConsumer {
227 base: ($($C.1,)+),
228 operation,
229 };
230 let reducer = PartitionReducer {
231 base: ($($C.2,)+),
232 };
233
234 (left, right, reducer)
235 }
236
237 fn into_folder(self) -> Self::Folder {
238 let ($($C,)+) = self.base;
239
240 PartitionFolder {
241 base: ($($C.into_folder(),)+),
242 operation: self.operation,
243 }
244 }
245
246 fn is_full(&self) -> bool {
247 let ($($C,)+) = &self.base;
248
249 true $(&& $C.is_full())+
250 }
251 }
252
253 #[allow(non_snake_case)]
254 impl<O, I, $($I,)+ $($F,)+> Folder<I> for PartitionFolder<($($F,)+), O>
255 where
256 O: PartitionFn<I, Output = ($(Option<$I>,)+)>,
257 $($F: Folder<$I>,)+
258 {
259 type Result = PartitionResult<($($F::Result,)+)>;
260
261 fn consume(self, item: I) -> Self {
262 let operation = self.operation;
263 let ($($I,)+) = operation.call(item);
264 let ($(mut $F,)+) = self.base;
265
266 $(
267 if let Some(item) = $I {
268 $F = $F.consume(item);
269 }
270 )+
271
272 PartitionFolder {
273 base: ($($F,)+),
274 operation,
275 }
276 }
277
278 fn complete(self) -> Self::Result {
279 let ($($F,)+) = self.base;
280
281 PartitionResult {
282 base: ($($F.complete(),)+),
283 }
284 }
285
286 fn is_full(&self) -> bool {
287 let ($($F,)+) = &self.base;
288
289 $($F.is_full() &&)+ true
290 }
291 }
292
293 #[allow(non_snake_case)]
294 impl<$($R,)+ $($T,)+> Reducer<PartitionResult<($($T,)+)>> for PartitionReducer<($($R,)+)>
295 where
296 $($R: Reducer<$T>,)+
297 {
298 fn reduce(self, left: PartitionResult<($($T,)+)>, right: PartitionResult<($($T,)+)>) -> PartitionResult<($($T,)+)> {
299 let ($($R,)+) = self.base;
300 let ($($T,)+) = left.base;
301 let ($($E,)+) = right.base;
302
303 PartitionResult {
304 base: ($($R.reduce($T, $E),)+),
305 }
306 }
307 }
308 };
309}
310
311pub struct MapTwoFn<O>(O);
314
315impl<O> Clone for MapTwoFn<O>
316where
317 O: Clone,
318{
319 fn clone(&self) -> Self {
320 Self(self.0.clone())
321 }
322}
323
324impl<O, I> PartitionFn<I> for MapTwoFn<O>
325where
326 O: Fn(&I) -> bool,
327{
328 type Output = (Option<I>, Option<I>);
329
330 fn call(&self, item: I) -> Self::Output {
331 if (self.0)(&item) {
332 (Some(item), None)
333 } else {
334 (None, Some(item))
335 }
336 }
337}
338
339pub struct MapAnyFn<O>(O);
342
343impl<O> Clone for MapAnyFn<O>
344where
345 O: Clone,
346{
347 fn clone(&self) -> Self {
348 Self(self.0.clone())
349 }
350}
351
352impl<O, I, T> PartitionFn<I> for MapAnyFn<O>
353where
354 O: Fn(I) -> T,
355{
356 type Output = T;
357
358 fn call(&self, item: I) -> Self::Output {
359 (self.0)(item)
360 }
361}
362
363parallel_extend_tuple!((E1, E2), (I1, I2), (T1, T2), (C1, C2), (F1, F2), (R1, R2));
364parallel_extend_tuple!(
365 (E1, E2, E3),
366 (I1, I2, I3),
367 (T1, T2, T3),
368 (C1, C2, C3),
369 (F1, F2, F3),
370 (R1, R2, R3)
371);
372parallel_extend_tuple!(
373 (E1, E2, E3, E4),
374 (I1, I2, I3, I4),
375 (T1, T2, T3, T4),
376 (C1, C2, C3, C4),
377 (F1, F2, F3, F4),
378 (R1, R2, R3, R4)
379);
380parallel_extend_tuple!(
381 (E1, E2, E3, E4, E5),
382 (I1, I2, I3, I4, I5),
383 (T1, T2, T3, T4, T5),
384 (C1, C2, C3, C4, C5),
385 (F1, F2, F3, F4, F5),
386 (R1, R2, R3, R4, R5)
387);