1use super::{plumbing::*, *};
2
3trait UnzipOp<T>: Sync + Send {
7 type Left: Send;
9
10 type Right: Send;
12
13 fn consume<FA, FB>(&self, item: T, left: FA, right: FB) -> (FA, FB)
15 where
16 FA: Folder<Self::Left>,
17 FB: Folder<Self::Right>;
18
19 fn indexable() -> bool {
23 false
24 }
25}
26
27fn execute<I, OP, FromA, FromB>(pi: I, op: OP) -> (FromA, FromB)
29where
30 I: ParallelIterator,
31 OP: UnzipOp<I::Item>,
32 FromA: Default + Send + ParallelExtend<OP::Left>,
33 FromB: Default + Send + ParallelExtend<OP::Right>,
34{
35 let mut a = FromA::default();
36 let mut b = FromB::default();
37 execute_into(&mut a, &mut b, pi, op);
38 (a, b)
39}
40
41fn execute_into<I, OP, FromA, FromB>(a: &mut FromA, b: &mut FromB, pi: I, op: OP)
43where
44 I: ParallelIterator,
45 OP: UnzipOp<I::Item>,
46 FromA: Send + ParallelExtend<OP::Left>,
47 FromB: Send + ParallelExtend<OP::Right>,
48{
49 let iter = UnzipA { base: pi, op, b };
53 a.par_extend(iter);
54}
55
56pub(super) fn unzip<I, A, B, FromA, FromB>(pi: I) -> (FromA, FromB)
61where
62 I: ParallelIterator<Item = (A, B)>,
63 FromA: Default + Send + ParallelExtend<A>,
64 FromB: Default + Send + ParallelExtend<B>,
65 A: Send,
66 B: Send,
67{
68 execute(pi, Unzip)
69}
70
71pub(super) fn unzip_indexed<I, A, B, CA, CB>(pi: I, left: CA, right: CB) -> (CA::Result, CB::Result)
75where
76 I: IndexedParallelIterator<Item = (A, B)>,
77 CA: Consumer<A>,
78 CB: Consumer<B>,
79 A: Send,
80 B: Send,
81{
82 let consumer = UnzipConsumer {
83 op: &Unzip,
84 left,
85 right,
86 };
87 pi.drive(consumer)
88}
89
90struct Unzip;
92
93impl<A: Send, B: Send> UnzipOp<(A, B)> for Unzip {
94 type Left = A;
95 type Right = B;
96
97 fn consume<FA, FB>(&self, item: (A, B), left: FA, right: FB) -> (FA, FB)
98 where
99 FA: Folder<A>,
100 FB: Folder<B>,
101 {
102 (left.consume(item.0), right.consume(item.1))
103 }
104
105 fn indexable() -> bool {
106 true
107 }
108}
109
110pub(super) fn partition<I, A, B, P>(pi: I, predicate: P) -> (A, B)
115where
116 I: ParallelIterator,
117 A: Default + Send + ParallelExtend<I::Item>,
118 B: Default + Send + ParallelExtend<I::Item>,
119 P: Fn(&I::Item) -> bool + Sync + Send,
120{
121 execute(pi, Partition { predicate })
122}
123
124struct Partition<P> {
126 predicate: P,
127}
128
129impl<P, T> UnzipOp<T> for Partition<P>
130where
131 P: Fn(&T) -> bool + Sync + Send,
132 T: Send,
133{
134 type Left = T;
135 type Right = T;
136
137 fn consume<FA, FB>(&self, item: T, left: FA, right: FB) -> (FA, FB)
138 where
139 FA: Folder<T>,
140 FB: Folder<T>,
141 {
142 if (self.predicate)(&item) {
143 (left.consume(item), right)
144 } else {
145 (left, right.consume(item))
146 }
147 }
148}
149
150pub(super) fn partition_map<I, A, B, P, L, R>(pi: I, predicate: P) -> (A, B)
155where
156 I: ParallelIterator,
157 A: Default + Send + ParallelExtend<L>,
158 B: Default + Send + ParallelExtend<R>,
159 P: Fn(I::Item) -> Either<L, R> + Sync + Send,
160 L: Send,
161 R: Send,
162{
163 execute(pi, PartitionMap { predicate })
164}
165
166struct PartitionMap<P> {
168 predicate: P,
169}
170
171impl<P, L, R, T> UnzipOp<T> for PartitionMap<P>
172where
173 P: Fn(T) -> Either<L, R> + Sync + Send,
174 L: Send,
175 R: Send,
176{
177 type Left = L;
178 type Right = R;
179
180 fn consume<FA, FB>(&self, item: T, left: FA, right: FB) -> (FA, FB)
181 where
182 FA: Folder<L>,
183 FB: Folder<R>,
184 {
185 match (self.predicate)(item) {
186 Either::Left(item) => (left.consume(item), right),
187 Either::Right(item) => (left, right.consume(item)),
188 }
189 }
190}
191
192struct UnzipA<'b, I, OP, FromB> {
194 base: I,
195 op: OP,
196 b: &'b mut FromB,
197}
198
199impl<'b, I, OP, FromB> ParallelIterator for UnzipA<'b, I, OP, FromB>
200where
201 I: ParallelIterator,
202 OP: UnzipOp<I::Item>,
203 FromB: Send + ParallelExtend<OP::Right>,
204{
205 type Item = OP::Left;
206
207 fn drive_unindexed<C>(self, consumer: C) -> C::Result
208 where
209 C: UnindexedConsumer<Self::Item>,
210 {
211 let mut result = None;
212 {
213 let iter = UnzipB {
215 base: self.base,
216 op: self.op,
217 left_consumer: consumer,
218 left_result: &mut result,
219 };
220 self.b.par_extend(iter);
221 }
222 result.expect("unzip consumers didn't execute!")
226 }
227
228 fn opt_len(&self) -> Option<usize> {
229 if OP::indexable() {
230 self.base.opt_len()
231 } else {
232 None
233 }
234 }
235}
236
237struct UnzipB<'r, I, OP, CA>
239where
240 I: ParallelIterator,
241 OP: UnzipOp<I::Item>,
242 CA: UnindexedConsumer<OP::Left>,
243 CA::Result: 'r,
244{
245 base: I,
246 op: OP,
247 left_consumer: CA,
248 left_result: &'r mut Option<CA::Result>,
249}
250
251impl<'r, I, OP, CA> ParallelIterator for UnzipB<'r, I, OP, CA>
252where
253 I: ParallelIterator,
254 OP: UnzipOp<I::Item>,
255 CA: UnindexedConsumer<OP::Left>,
256{
257 type Item = OP::Right;
258
259 fn drive_unindexed<C>(self, consumer: C) -> C::Result
260 where
261 C: UnindexedConsumer<Self::Item>,
262 {
263 let consumer = UnzipConsumer {
265 op: &self.op,
266 left: self.left_consumer,
267 right: consumer,
268 };
269
270 let result = self.base.drive_unindexed(consumer);
271 *self.left_result = Some(result.0);
272 result.1
273 }
274
275 fn opt_len(&self) -> Option<usize> {
276 if OP::indexable() {
277 self.base.opt_len()
278 } else {
279 None
280 }
281 }
282}
283
284struct UnzipConsumer<'a, OP, CA, CB> {
286 op: &'a OP,
287 left: CA,
288 right: CB,
289}
290
291impl<'a, T, OP, CA, CB> Consumer<T> for UnzipConsumer<'a, OP, CA, CB>
292where
293 OP: UnzipOp<T>,
294 CA: Consumer<OP::Left>,
295 CB: Consumer<OP::Right>,
296{
297 type Folder = UnzipFolder<'a, OP, CA::Folder, CB::Folder>;
298 type Reducer = UnzipReducer<CA::Reducer, CB::Reducer>;
299 type Result = (CA::Result, CB::Result);
300
301 fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
302 let (left1, left2, left_reducer) = self.left.split_at(index);
303 let (right1, right2, right_reducer) = self.right.split_at(index);
304
305 (
306 UnzipConsumer {
307 op: self.op,
308 left: left1,
309 right: right1,
310 },
311 UnzipConsumer {
312 op: self.op,
313 left: left2,
314 right: right2,
315 },
316 UnzipReducer {
317 left: left_reducer,
318 right: right_reducer,
319 },
320 )
321 }
322
323 fn into_folder(self) -> Self::Folder {
324 UnzipFolder {
325 op: self.op,
326 left: self.left.into_folder(),
327 right: self.right.into_folder(),
328 }
329 }
330
331 fn full(&self) -> bool {
332 self.left.full() && self.right.full()
334 }
335}
336
337impl<'a, T, OP, CA, CB> UnindexedConsumer<T> for UnzipConsumer<'a, OP, CA, CB>
338where
339 OP: UnzipOp<T>,
340 CA: UnindexedConsumer<OP::Left>,
341 CB: UnindexedConsumer<OP::Right>,
342{
343 fn split_off_left(&self) -> Self {
344 UnzipConsumer {
345 op: self.op,
346 left: self.left.split_off_left(),
347 right: self.right.split_off_left(),
348 }
349 }
350
351 fn to_reducer(&self) -> Self::Reducer {
352 UnzipReducer {
353 left: self.left.to_reducer(),
354 right: self.right.to_reducer(),
355 }
356 }
357}
358
359struct UnzipFolder<'a, OP, FA, FB> {
361 op: &'a OP,
362 left: FA,
363 right: FB,
364}
365
366impl<'a, T, OP, FA, FB> Folder<T> for UnzipFolder<'a, OP, FA, FB>
367where
368 OP: UnzipOp<T>,
369 FA: Folder<OP::Left>,
370 FB: Folder<OP::Right>,
371{
372 type Result = (FA::Result, FB::Result);
373
374 fn consume(self, item: T) -> Self {
375 let (left, right) = self.op.consume(item, self.left, self.right);
376 UnzipFolder {
377 op: self.op,
378 left,
379 right,
380 }
381 }
382
383 fn complete(self) -> Self::Result {
384 (self.left.complete(), self.right.complete())
385 }
386
387 fn full(&self) -> bool {
388 self.left.full() && self.right.full()
390 }
391}
392
393struct UnzipReducer<RA, RB> {
395 left: RA,
396 right: RB,
397}
398
399impl<A, B, RA, RB> Reducer<(A, B)> for UnzipReducer<RA, RB>
400where
401 RA: Reducer<A>,
402 RB: Reducer<B>,
403{
404 fn reduce(self, left: (A, B), right: (A, B)) -> (A, B) {
405 (
406 self.left.reduce(left.0, right.0),
407 self.right.reduce(left.1, right.1),
408 )
409 }
410}
411
412impl<A, B, FromA, FromB> ParallelExtend<(A, B)> for (FromA, FromB)
413where
414 A: Send,
415 B: Send,
416 FromA: Send + ParallelExtend<A>,
417 FromB: Send + ParallelExtend<B>,
418{
419 fn par_extend<I>(&mut self, pi: I)
420 where
421 I: IntoParallelIterator<Item = (A, B)>,
422 {
423 execute_into(&mut self.0, &mut self.1, pi.into_par_iter(), Unzip);
424 }
425}
426
427impl<L, R, A, B> ParallelExtend<Either<L, R>> for (A, B)
428where
429 L: Send,
430 R: Send,
431 A: Send + ParallelExtend<L>,
432 B: Send + ParallelExtend<R>,
433{
434 fn par_extend<I>(&mut self, pi: I)
435 where
436 I: IntoParallelIterator<Item = Either<L, R>>,
437 {
438 execute_into(&mut self.0, &mut self.1, pi.into_par_iter(), UnEither);
439 }
440}
441
442struct UnEither;
444
445impl<L, R> UnzipOp<Either<L, R>> for UnEither
446where
447 L: Send,
448 R: Send,
449{
450 type Left = L;
451 type Right = R;
452
453 fn consume<FL, FR>(&self, item: Either<L, R>, left: FL, right: FR) -> (FL, FR)
454 where
455 FL: Folder<L>,
456 FR: Folder<R>,
457 {
458 match item {
459 Either::Left(item) => (left.consume(item), right),
460 Either::Right(item) => (left, right.consume(item)),
461 }
462 }
463}
464
465impl<A, B, FromA, FromB> FromParallelIterator<(A, B)> for (FromA, FromB)
466where
467 A: Send,
468 B: Send,
469 FromA: Send + FromParallelIterator<A>,
470 FromB: Send + FromParallelIterator<B>,
471{
472 fn from_par_iter<I>(pi: I) -> Self
473 where
474 I: IntoParallelIterator<Item = (A, B)>,
475 {
476 let (a, b): (Collector<FromA>, Collector<FromB>) = pi.into_par_iter().unzip();
477 (a.result.unwrap(), b.result.unwrap())
478 }
479}
480
481impl<L, R, A, B> FromParallelIterator<Either<L, R>> for (A, B)
482where
483 L: Send,
484 R: Send,
485 A: Send + FromParallelIterator<L>,
486 B: Send + FromParallelIterator<R>,
487{
488 fn from_par_iter<I>(pi: I) -> Self
489 where
490 I: IntoParallelIterator<Item = Either<L, R>>,
491 {
492 fn identity<T>(x: T) -> T {
493 x
494 }
495
496 let (a, b): (Collector<A>, Collector<B>) = pi.into_par_iter().partition_map(identity);
497 (a.result.unwrap(), b.result.unwrap())
498 }
499}
500
501struct Collector<FromT> {
503 result: Option<FromT>,
504}
505
506impl<FromT> Default for Collector<FromT> {
507 fn default() -> Self {
508 Collector { result: None }
509 }
510}
511
512impl<T, FromT> ParallelExtend<T> for Collector<FromT>
513where
514 T: Send,
515 FromT: Send + FromParallelIterator<T>,
516{
517 fn par_extend<I>(&mut self, pi: I)
518 where
519 I: IntoParallelIterator<Item = T>,
520 {
521 debug_assert!(self.result.is_none());
522 self.result = Some(pi.into_par_iter().collect());
523 }
524}