Skip to main content

asparit/iter/
partition.rs

1use crate::{
2    core::Driver, Consumer, Executor, Folder, ParallelExtend, ParallelIterator, Reducer, Setup,
3    WithSetup,
4};
5
6/* Partition */
7
8pub struct Partition<X, O> {
9    iterator: X,
10    operation: O,
11}
12
13impl<X, O> Partition<X, O> {
14    pub fn new(iterator: X, operation: O) -> Self {
15        Self {
16            iterator,
17            operation,
18        }
19    }
20}
21
22impl<'a, X, O, P, T> Driver<'a, P, T> for Partition<X, O>
23where
24    X: ParallelIterator<'a>,
25    O: Fn(&X::Item) -> bool + Clone,
26    P: Default + Send + 'a,
27    PartitionExtend<P, MapTwoFn<O>>: ParallelExtend<'a, X::Item, T>,
28    <<PartitionExtend<P, MapTwoFn<O>> as ParallelExtend<'a, X::Item, T>>::Consumer as Consumer<
29        X::Item,
30    >>::Reducer: Reducer<T> + Send,
31    T: Send + 'a,
32{
33    fn exec_with<E>(self, executor: E) -> E::Result
34    where
35        E: Executor<'a, P, T>,
36    {
37        let operation = self.operation;
38        let executor = executor.into_inner();
39        let consumer = PartitionExtend {
40            base: P::default(),
41            operation: Some(MapTwoFn(operation)),
42        }
43        .into_consumer();
44
45        let inner = self.iterator.drive(executor, consumer);
46
47        E::map(inner, |inner| {
48            let ret = PartitionExtend::map_result(inner);
49
50            ret.base
51        })
52    }
53}
54
55/* PartitionMap */
56
57pub struct PartitionMap<X, O> {
58    iterator: X,
59    operation: O,
60}
61
62impl<X, O> PartitionMap<X, O> {
63    pub fn new(iterator: X, operation: O) -> Self {
64        Self {
65            iterator,
66            operation,
67        }
68    }
69}
70
71impl<'a, X, O, P, T, R> Driver<'a, P, T> for PartitionMap<X, O>
72where
73    X: ParallelIterator<'a>,
74    O: Fn(X::Item) -> R,
75    P: Default + Send + 'a,
76    PartitionExtend<P, MapAnyFn<O>>: ParallelExtend<'a, X::Item, T>,
77    <<PartitionExtend<P, MapAnyFn<O>> as ParallelExtend<'a, X::Item, T>>::Consumer as Consumer<
78        X::Item,
79    >>::Reducer: Reducer<T> + Send,
80    T: Send + 'a,
81{
82    fn exec_with<E>(self, executor: E) -> E::Result
83    where
84        E: Executor<'a, P, T>,
85    {
86        let operation = self.operation;
87        let executor = executor.into_inner();
88        let consumer = PartitionExtend {
89            base: P::default(),
90            operation: Some(MapAnyFn(operation)),
91        }
92        .into_consumer();
93
94        let inner = self.iterator.drive(executor, consumer);
95
96        E::map(inner, |inner| {
97            let ret = PartitionExtend::map_result(inner);
98
99            ret.base
100        })
101    }
102}
103
104/* Misc */
105
106pub struct PartitionExtend<E, O> {
107    base: E,
108    operation: Option<O>,
109}
110
111pub struct PartitionConsumer<C, O> {
112    base: C,
113    operation: O,
114}
115
116pub struct PartitionFolder<F, O> {
117    base: F,
118    operation: O,
119}
120
121pub struct PartitionReducer<R> {
122    base: R,
123}
124
125pub struct PartitionResult<T> {
126    base: T,
127}
128
129pub trait PartitionFn<I> {
130    type Output;
131
132    fn call(&self, item: I) -> Self::Output;
133}
134
135macro_rules! parallel_extend_tuple {
136    (($($E:ident),*), ($($I:ident),*), ($($T:ident),*), ($($C:ident),*), ($($F:ident),*), ($($R:ident),*)) => {
137
138        #[allow(non_snake_case)]
139        impl<'a, O, I, $($E,)+ $($I,)+ $($T,)+> ParallelExtend<'a, I, PartitionResult<($($T,)+)>> for PartitionExtend<($($E,)+), O>
140        where
141            O: PartitionFn<I, Output = ($(Option<$I>,)+)> + Clone + Send + 'a,
142            I: Send + 'a,
143            $($E: ParallelExtend<'a, $I, $T> + Send,)+
144            $(<$E::Consumer as Consumer<$I>>::Reducer: Reducer<$T>,)+
145            $($I: Send + 'a,)+
146            $($T: Send,)+
147        {
148            type Consumer = PartitionConsumer<($($E::Consumer,)+), O>;
149
150            fn into_consumer(self) -> Self::Consumer {
151                let ($($E,)+) = self.base;
152                let operation = self.operation;
153
154                PartitionConsumer {
155                    base: ($($E.into_consumer(),)+),
156                    operation: operation.unwrap(),
157                }
158            }
159
160            fn map_result(inner: PartitionResult<($($T,)+)>) -> Self {
161                let ($($T,)+) = inner.base;
162
163                PartitionExtend {
164                    base: ($($E::map_result($T),)+),
165                    operation: None,
166                }
167            }
168        }
169
170        #[allow(non_snake_case)]
171        impl<O, $($C,)+> WithSetup for PartitionConsumer<($($C,)+), O>
172        where
173            $($C: WithSetup,)+
174        {
175            fn setup(&self) -> Setup {
176                let ($($C,)+) = &self.base;
177
178                let mut ret = Setup::default();
179                $(ret = ret.merge($C.setup());)+
180
181                ret
182            }
183        }
184
185        #[allow(non_snake_case)]
186        impl<O, I, $($I,)+ $($C,)+> Consumer<I> for PartitionConsumer<($($C,)+), O>
187        where
188            O: PartitionFn<I, Output = ($(Option<$I>,)+)> + Clone + Send,
189            I: Send,
190            $($I: Send,)+
191            $($C: Consumer<$I>,)+
192        {
193            type Folder = PartitionFolder<($($C::Folder,)+), O>;
194            type Reducer = PartitionReducer<($($C::Reducer,)+)>;
195            type Result = PartitionResult<($($C::Result,)+)>;
196
197            fn split(self) -> (Self, Self, Self::Reducer) {
198                let operation = self.operation;
199                let ($($C,)+) = self.base;
200                let ($($C,)+) = ($($C.split(),)+);
201
202                let left = PartitionConsumer {
203                    base: ($($C.0,)+),
204                    operation: operation.clone(),
205                };
206                let right = PartitionConsumer {
207                    base: ($($C.1,)+),
208                    operation,
209                };
210                let reducer = PartitionReducer {
211                    base: ($($C.2,)+),
212                };
213
214                (left, right, reducer)
215            }
216
217            fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
218                let operation = self.operation;
219                let ($($C,)+) = self.base;
220                let ($($C,)+) = ($($C.split_at(index),)+);
221
222                let left = PartitionConsumer {
223                    base: ($($C.0,)+),
224                    operation: operation.clone(),
225                };
226                let right = PartitionConsumer {
227                    base: ($($C.1,)+),
228                    operation,
229                };
230                let reducer = PartitionReducer {
231                    base: ($($C.2,)+),
232                };
233
234                (left, right, reducer)
235            }
236
237            fn into_folder(self) -> Self::Folder {
238                let ($($C,)+) = self.base;
239
240                PartitionFolder {
241                    base: ($($C.into_folder(),)+),
242                    operation: self.operation,
243                }
244            }
245
246            fn is_full(&self) -> bool {
247                let ($($C,)+) = &self.base;
248
249                true $(&& $C.is_full())+
250            }
251        }
252
253        #[allow(non_snake_case)]
254        impl<O, I, $($I,)+ $($F,)+> Folder<I> for PartitionFolder<($($F,)+), O>
255        where
256            O: PartitionFn<I, Output = ($(Option<$I>,)+)>,
257            $($F: Folder<$I>,)+
258        {
259            type Result = PartitionResult<($($F::Result,)+)>;
260
261            fn consume(self, item: I) -> Self {
262                let operation = self.operation;
263                let ($($I,)+) = operation.call(item);
264                let ($(mut $F,)+) = self.base;
265
266                $(
267                    if let Some(item) = $I {
268                        $F = $F.consume(item);
269                    }
270                )+
271
272                PartitionFolder {
273                    base: ($($F,)+),
274                    operation,
275                }
276            }
277
278            fn complete(self) -> Self::Result {
279                let ($($F,)+) = self.base;
280
281                PartitionResult {
282                    base: ($($F.complete(),)+),
283                }
284            }
285
286            fn is_full(&self) -> bool {
287                let ($($F,)+) = &self.base;
288
289                $($F.is_full() &&)+ true
290            }
291        }
292
293        #[allow(non_snake_case)]
294        impl<$($R,)+ $($T,)+> Reducer<PartitionResult<($($T,)+)>> for PartitionReducer<($($R,)+)>
295        where
296            $($R: Reducer<$T>,)+
297        {
298            fn reduce(self, left: PartitionResult<($($T,)+)>, right: PartitionResult<($($T,)+)>) -> PartitionResult<($($T,)+)> {
299                let ($($R,)+) = self.base;
300                let ($($T,)+) = left.base;
301                let ($($E,)+) = right.base;
302
303                PartitionResult {
304                    base: ($($R.reduce($T, $E),)+),
305                }
306            }
307        }
308    };
309}
310
311/* MapTwoFn */
312
313pub struct MapTwoFn<O>(O);
314
315impl<O> Clone for MapTwoFn<O>
316where
317    O: Clone,
318{
319    fn clone(&self) -> Self {
320        Self(self.0.clone())
321    }
322}
323
324impl<O, I> PartitionFn<I> for MapTwoFn<O>
325where
326    O: Fn(&I) -> bool,
327{
328    type Output = (Option<I>, Option<I>);
329
330    fn call(&self, item: I) -> Self::Output {
331        if (self.0)(&item) {
332            (Some(item), None)
333        } else {
334            (None, Some(item))
335        }
336    }
337}
338
339/* MapAnyFn */
340
341pub struct MapAnyFn<O>(O);
342
343impl<O> Clone for MapAnyFn<O>
344where
345    O: Clone,
346{
347    fn clone(&self) -> Self {
348        Self(self.0.clone())
349    }
350}
351
352impl<O, I, T> PartitionFn<I> for MapAnyFn<O>
353where
354    O: Fn(I) -> T,
355{
356    type Output = T;
357
358    fn call(&self, item: I) -> Self::Output {
359        (self.0)(item)
360    }
361}
362
363parallel_extend_tuple!((E1, E2), (I1, I2), (T1, T2), (C1, C2), (F1, F2), (R1, R2));
364parallel_extend_tuple!(
365    (E1, E2, E3),
366    (I1, I2, I3),
367    (T1, T2, T3),
368    (C1, C2, C3),
369    (F1, F2, F3),
370    (R1, R2, R3)
371);
372parallel_extend_tuple!(
373    (E1, E2, E3, E4),
374    (I1, I2, I3, I4),
375    (T1, T2, T3, T4),
376    (C1, C2, C3, C4),
377    (F1, F2, F3, F4),
378    (R1, R2, R3, R4)
379);
380parallel_extend_tuple!(
381    (E1, E2, E3, E4, E5),
382    (I1, I2, I3, I4, I5),
383    (T1, T2, T3, T4, T5),
384    (C1, C2, C3, C4, C5),
385    (F1, F2, F3, F4, F5),
386    (R1, R2, R3, R4, R5)
387);