par_iter/iter/
unzip.rs

1use super::{plumbing::*, *};
2
3/// This trait abstracts the different ways we can "unzip" one parallel
4/// iterator into two distinct consumers, which we can handle almost
5/// identically apart from how to process the individual items.
6trait UnzipOp<T>: Sync + Send {
7    /// The type of item expected by the left consumer.
8    type Left: Send;
9
10    /// The type of item expected by the right consumer.
11    type Right: Send;
12
13    /// Consumes one item and feeds it to one or both of the underlying folders.
14    fn consume<FA, FB>(&self, item: T, left: FA, right: FB) -> (FA, FB)
15    where
16        FA: Folder<Self::Left>,
17        FB: Folder<Self::Right>;
18
19    /// Reports whether this op may support indexed consumers.
20    /// - e.g. true for `unzip` where the item count passed through directly.
21    /// - e.g. false for `partition` where the sorting is not yet known.
22    fn indexable() -> bool {
23        false
24    }
25}
26
27/// Runs an unzip-like operation into default `ParallelExtend` collections.
28fn execute<I, OP, FromA, FromB>(pi: I, op: OP) -> (FromA, FromB)
29where
30    I: ParallelIterator,
31    OP: UnzipOp<I::Item>,
32    FromA: Default + Send + ParallelExtend<OP::Left>,
33    FromB: Default + Send + ParallelExtend<OP::Right>,
34{
35    let mut a = FromA::default();
36    let mut b = FromB::default();
37    execute_into(&mut a, &mut b, pi, op);
38    (a, b)
39}
40
41/// Runs an unzip-like operation into `ParallelExtend` collections.
42fn execute_into<I, OP, FromA, FromB>(a: &mut FromA, b: &mut FromB, pi: I, op: OP)
43where
44    I: ParallelIterator,
45    OP: UnzipOp<I::Item>,
46    FromA: Send + ParallelExtend<OP::Left>,
47    FromB: Send + ParallelExtend<OP::Right>,
48{
49    // We have no idea what the consumers will look like for these
50    // collections' `par_extend`, but we can intercept them in our own
51    // `drive_unindexed`.  Start with the left side, type `A`:
52    let iter = UnzipA { base: pi, op, b };
53    a.par_extend(iter);
54}
55
56/// Unzips the items of a parallel iterator into a pair of arbitrary
57/// `ParallelExtend` containers.
58///
59/// This is called by `ParallelIterator::unzip`.
60pub(super) fn unzip<I, A, B, FromA, FromB>(pi: I) -> (FromA, FromB)
61where
62    I: ParallelIterator<Item = (A, B)>,
63    FromA: Default + Send + ParallelExtend<A>,
64    FromB: Default + Send + ParallelExtend<B>,
65    A: Send,
66    B: Send,
67{
68    execute(pi, Unzip)
69}
70
71/// Unzips an `IndexedParallelIterator` into two arbitrary `Consumer`s.
72///
73/// This is called by `super::collect::unzip_into_vecs`.
74pub(super) fn unzip_indexed<I, A, B, CA, CB>(pi: I, left: CA, right: CB) -> (CA::Result, CB::Result)
75where
76    I: IndexedParallelIterator<Item = (A, B)>,
77    CA: Consumer<A>,
78    CB: Consumer<B>,
79    A: Send,
80    B: Send,
81{
82    let consumer = UnzipConsumer {
83        op: &Unzip,
84        left,
85        right,
86    };
87    pi.drive(consumer)
88}
89
90/// An `UnzipOp` that splits a tuple directly into the two consumers.
91struct Unzip;
92
93impl<A: Send, B: Send> UnzipOp<(A, B)> for Unzip {
94    type Left = A;
95    type Right = B;
96
97    fn consume<FA, FB>(&self, item: (A, B), left: FA, right: FB) -> (FA, FB)
98    where
99        FA: Folder<A>,
100        FB: Folder<B>,
101    {
102        (left.consume(item.0), right.consume(item.1))
103    }
104
105    fn indexable() -> bool {
106        true
107    }
108}
109
110/// Partitions the items of a parallel iterator into a pair of arbitrary
111/// `ParallelExtend` containers.
112///
113/// This is called by `ParallelIterator::partition`.
114pub(super) fn partition<I, A, B, P>(pi: I, predicate: P) -> (A, B)
115where
116    I: ParallelIterator,
117    A: Default + Send + ParallelExtend<I::Item>,
118    B: Default + Send + ParallelExtend<I::Item>,
119    P: Fn(&I::Item) -> bool + Sync + Send,
120{
121    execute(pi, Partition { predicate })
122}
123
124/// An `UnzipOp` that routes items depending on a predicate function.
125struct Partition<P> {
126    predicate: P,
127}
128
129impl<P, T> UnzipOp<T> for Partition<P>
130where
131    P: Fn(&T) -> bool + Sync + Send,
132    T: Send,
133{
134    type Left = T;
135    type Right = T;
136
137    fn consume<FA, FB>(&self, item: T, left: FA, right: FB) -> (FA, FB)
138    where
139        FA: Folder<T>,
140        FB: Folder<T>,
141    {
142        if (self.predicate)(&item) {
143            (left.consume(item), right)
144        } else {
145            (left, right.consume(item))
146        }
147    }
148}
149
150/// Partitions and maps the items of a parallel iterator into a pair of
151/// arbitrary `ParallelExtend` containers.
152///
153/// This called by `ParallelIterator::partition_map`.
154pub(super) fn partition_map<I, A, B, P, L, R>(pi: I, predicate: P) -> (A, B)
155where
156    I: ParallelIterator,
157    A: Default + Send + ParallelExtend<L>,
158    B: Default + Send + ParallelExtend<R>,
159    P: Fn(I::Item) -> Either<L, R> + Sync + Send,
160    L: Send,
161    R: Send,
162{
163    execute(pi, PartitionMap { predicate })
164}
165
166/// An `UnzipOp` that routes items depending on how they are mapped `Either`.
167struct PartitionMap<P> {
168    predicate: P,
169}
170
171impl<P, L, R, T> UnzipOp<T> for PartitionMap<P>
172where
173    P: Fn(T) -> Either<L, R> + Sync + Send,
174    L: Send,
175    R: Send,
176{
177    type Left = L;
178    type Right = R;
179
180    fn consume<FA, FB>(&self, item: T, left: FA, right: FB) -> (FA, FB)
181    where
182        FA: Folder<L>,
183        FB: Folder<R>,
184    {
185        match (self.predicate)(item) {
186            Either::Left(item) => (left.consume(item), right),
187            Either::Right(item) => (left, right.consume(item)),
188        }
189    }
190}
191
192/// A fake iterator to intercept the `Consumer` for type `A`.
193struct UnzipA<'b, I, OP, FromB> {
194    base: I,
195    op: OP,
196    b: &'b mut FromB,
197}
198
199impl<'b, I, OP, FromB> ParallelIterator for UnzipA<'b, I, OP, FromB>
200where
201    I: ParallelIterator,
202    OP: UnzipOp<I::Item>,
203    FromB: Send + ParallelExtend<OP::Right>,
204{
205    type Item = OP::Left;
206
207    fn drive_unindexed<C>(self, consumer: C) -> C::Result
208    where
209        C: UnindexedConsumer<Self::Item>,
210    {
211        let mut result = None;
212        {
213            // Now it's time to find the consumer for type `B`
214            let iter = UnzipB {
215                base: self.base,
216                op: self.op,
217                left_consumer: consumer,
218                left_result: &mut result,
219            };
220            self.b.par_extend(iter);
221        }
222        // NB: If for some reason `b.par_extend` doesn't actually drive the
223        // iterator, then we won't have a result for the left side to return
224        // at all.  We can't fake an arbitrary consumer's result, so panic.
225        result.expect("unzip consumers didn't execute!")
226    }
227
228    fn opt_len(&self) -> Option<usize> {
229        if OP::indexable() {
230            self.base.opt_len()
231        } else {
232            None
233        }
234    }
235}
236
237/// A fake iterator to intercept the `Consumer` for type `B`.
238struct UnzipB<'r, I, OP, CA>
239where
240    I: ParallelIterator,
241    OP: UnzipOp<I::Item>,
242    CA: UnindexedConsumer<OP::Left>,
243    CA::Result: 'r,
244{
245    base: I,
246    op: OP,
247    left_consumer: CA,
248    left_result: &'r mut Option<CA::Result>,
249}
250
251impl<'r, I, OP, CA> ParallelIterator for UnzipB<'r, I, OP, CA>
252where
253    I: ParallelIterator,
254    OP: UnzipOp<I::Item>,
255    CA: UnindexedConsumer<OP::Left>,
256{
257    type Item = OP::Right;
258
259    fn drive_unindexed<C>(self, consumer: C) -> C::Result
260    where
261        C: UnindexedConsumer<Self::Item>,
262    {
263        // Now that we have two consumers, we can unzip the real iterator.
264        let consumer = UnzipConsumer {
265            op: &self.op,
266            left: self.left_consumer,
267            right: consumer,
268        };
269
270        let result = self.base.drive_unindexed(consumer);
271        *self.left_result = Some(result.0);
272        result.1
273    }
274
275    fn opt_len(&self) -> Option<usize> {
276        if OP::indexable() {
277            self.base.opt_len()
278        } else {
279            None
280        }
281    }
282}
283
284/// `Consumer` that unzips into two other `Consumer`s
285struct UnzipConsumer<'a, OP, CA, CB> {
286    op: &'a OP,
287    left: CA,
288    right: CB,
289}
290
291impl<'a, T, OP, CA, CB> Consumer<T> for UnzipConsumer<'a, OP, CA, CB>
292where
293    OP: UnzipOp<T>,
294    CA: Consumer<OP::Left>,
295    CB: Consumer<OP::Right>,
296{
297    type Folder = UnzipFolder<'a, OP, CA::Folder, CB::Folder>;
298    type Reducer = UnzipReducer<CA::Reducer, CB::Reducer>;
299    type Result = (CA::Result, CB::Result);
300
301    fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
302        let (left1, left2, left_reducer) = self.left.split_at(index);
303        let (right1, right2, right_reducer) = self.right.split_at(index);
304
305        (
306            UnzipConsumer {
307                op: self.op,
308                left: left1,
309                right: right1,
310            },
311            UnzipConsumer {
312                op: self.op,
313                left: left2,
314                right: right2,
315            },
316            UnzipReducer {
317                left: left_reducer,
318                right: right_reducer,
319            },
320        )
321    }
322
323    fn into_folder(self) -> Self::Folder {
324        UnzipFolder {
325            op: self.op,
326            left: self.left.into_folder(),
327            right: self.right.into_folder(),
328        }
329    }
330
331    fn full(&self) -> bool {
332        // don't stop until everyone is full
333        self.left.full() && self.right.full()
334    }
335}
336
337impl<'a, T, OP, CA, CB> UnindexedConsumer<T> for UnzipConsumer<'a, OP, CA, CB>
338where
339    OP: UnzipOp<T>,
340    CA: UnindexedConsumer<OP::Left>,
341    CB: UnindexedConsumer<OP::Right>,
342{
343    fn split_off_left(&self) -> Self {
344        UnzipConsumer {
345            op: self.op,
346            left: self.left.split_off_left(),
347            right: self.right.split_off_left(),
348        }
349    }
350
351    fn to_reducer(&self) -> Self::Reducer {
352        UnzipReducer {
353            left: self.left.to_reducer(),
354            right: self.right.to_reducer(),
355        }
356    }
357}
358
359/// `Folder` that unzips into two other `Folder`s
360struct UnzipFolder<'a, OP, FA, FB> {
361    op: &'a OP,
362    left: FA,
363    right: FB,
364}
365
366impl<'a, T, OP, FA, FB> Folder<T> for UnzipFolder<'a, OP, FA, FB>
367where
368    OP: UnzipOp<T>,
369    FA: Folder<OP::Left>,
370    FB: Folder<OP::Right>,
371{
372    type Result = (FA::Result, FB::Result);
373
374    fn consume(self, item: T) -> Self {
375        let (left, right) = self.op.consume(item, self.left, self.right);
376        UnzipFolder {
377            op: self.op,
378            left,
379            right,
380        }
381    }
382
383    fn complete(self) -> Self::Result {
384        (self.left.complete(), self.right.complete())
385    }
386
387    fn full(&self) -> bool {
388        // don't stop until everyone is full
389        self.left.full() && self.right.full()
390    }
391}
392
393/// `Reducer` that unzips into two other `Reducer`s
394struct UnzipReducer<RA, RB> {
395    left: RA,
396    right: RB,
397}
398
399impl<A, B, RA, RB> Reducer<(A, B)> for UnzipReducer<RA, RB>
400where
401    RA: Reducer<A>,
402    RB: Reducer<B>,
403{
404    fn reduce(self, left: (A, B), right: (A, B)) -> (A, B) {
405        (
406            self.left.reduce(left.0, right.0),
407            self.right.reduce(left.1, right.1),
408        )
409    }
410}
411
412impl<A, B, FromA, FromB> ParallelExtend<(A, B)> for (FromA, FromB)
413where
414    A: Send,
415    B: Send,
416    FromA: Send + ParallelExtend<A>,
417    FromB: Send + ParallelExtend<B>,
418{
419    fn par_extend<I>(&mut self, pi: I)
420    where
421        I: IntoParallelIterator<Item = (A, B)>,
422    {
423        execute_into(&mut self.0, &mut self.1, pi.into_par_iter(), Unzip);
424    }
425}
426
427impl<L, R, A, B> ParallelExtend<Either<L, R>> for (A, B)
428where
429    L: Send,
430    R: Send,
431    A: Send + ParallelExtend<L>,
432    B: Send + ParallelExtend<R>,
433{
434    fn par_extend<I>(&mut self, pi: I)
435    where
436        I: IntoParallelIterator<Item = Either<L, R>>,
437    {
438        execute_into(&mut self.0, &mut self.1, pi.into_par_iter(), UnEither);
439    }
440}
441
442/// An `UnzipOp` that routes items depending on their `Either` variant.
443struct UnEither;
444
445impl<L, R> UnzipOp<Either<L, R>> for UnEither
446where
447    L: Send,
448    R: Send,
449{
450    type Left = L;
451    type Right = R;
452
453    fn consume<FL, FR>(&self, item: Either<L, R>, left: FL, right: FR) -> (FL, FR)
454    where
455        FL: Folder<L>,
456        FR: Folder<R>,
457    {
458        match item {
459            Either::Left(item) => (left.consume(item), right),
460            Either::Right(item) => (left, right.consume(item)),
461        }
462    }
463}
464
465impl<A, B, FromA, FromB> FromParallelIterator<(A, B)> for (FromA, FromB)
466where
467    A: Send,
468    B: Send,
469    FromA: Send + FromParallelIterator<A>,
470    FromB: Send + FromParallelIterator<B>,
471{
472    fn from_par_iter<I>(pi: I) -> Self
473    where
474        I: IntoParallelIterator<Item = (A, B)>,
475    {
476        let (a, b): (Collector<FromA>, Collector<FromB>) = pi.into_par_iter().unzip();
477        (a.result.unwrap(), b.result.unwrap())
478    }
479}
480
481impl<L, R, A, B> FromParallelIterator<Either<L, R>> for (A, B)
482where
483    L: Send,
484    R: Send,
485    A: Send + FromParallelIterator<L>,
486    B: Send + FromParallelIterator<R>,
487{
488    fn from_par_iter<I>(pi: I) -> Self
489    where
490        I: IntoParallelIterator<Item = Either<L, R>>,
491    {
492        fn identity<T>(x: T) -> T {
493            x
494        }
495
496        let (a, b): (Collector<A>, Collector<B>) = pi.into_par_iter().partition_map(identity);
497        (a.result.unwrap(), b.result.unwrap())
498    }
499}
500
501/// Shim to implement a one-time `ParallelExtend` using `FromParallelIterator`.
502struct Collector<FromT> {
503    result: Option<FromT>,
504}
505
506impl<FromT> Default for Collector<FromT> {
507    fn default() -> Self {
508        Collector { result: None }
509    }
510}
511
512impl<T, FromT> ParallelExtend<T> for Collector<FromT>
513where
514    T: Send,
515    FromT: Send + FromParallelIterator<T>,
516{
517    fn par_extend<I>(&mut self, pi: I)
518    where
519        I: IntoParallelIterator<Item = T>,
520    {
521        debug_assert!(self.result.is_none());
522        self.result = Some(pi.into_par_iter().collect());
523    }
524}