par_iter/iter/
while_some.rs1use std::sync::atomic::{AtomicBool, Ordering};
2
3use super::{plumbing::*, *};
4
5#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
14#[derive(Debug, Clone)]
15pub struct WhileSome<I: ParallelIterator> {
16 base: I,
17}
18
19impl<I> WhileSome<I>
20where
21 I: ParallelIterator,
22{
23 pub(super) fn new(base: I) -> Self {
25 WhileSome { base }
26 }
27}
28
29impl<I, T> ParallelIterator for WhileSome<I>
30where
31 I: ParallelIterator<Item = Option<T>>,
32 T: Send,
33{
34 type Item = T;
35
36 fn drive_unindexed<C>(self, consumer: C) -> C::Result
37 where
38 C: UnindexedConsumer<Self::Item>,
39 {
40 let full = AtomicBool::new(false);
41 let consumer1 = WhileSomeConsumer {
42 base: consumer,
43 full: &full,
44 };
45 self.base.drive_unindexed(consumer1)
46 }
47}
48
49struct WhileSomeConsumer<'f, C> {
53 base: C,
54 full: &'f AtomicBool,
55}
56
57impl<'f, T, C> Consumer<Option<T>> for WhileSomeConsumer<'f, C>
58where
59 C: Consumer<T>,
60 T: Send,
61{
62 type Folder = WhileSomeFolder<'f, C::Folder>;
63 type Reducer = C::Reducer;
64 type Result = C::Result;
65
66 fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
67 let (left, right, reducer) = self.base.split_at(index);
68 (
69 WhileSomeConsumer { base: left, ..self },
70 WhileSomeConsumer {
71 base: right,
72 ..self
73 },
74 reducer,
75 )
76 }
77
78 fn into_folder(self) -> Self::Folder {
79 WhileSomeFolder {
80 base: self.base.into_folder(),
81 full: self.full,
82 }
83 }
84
85 fn full(&self) -> bool {
86 self.full.load(Ordering::Relaxed) || self.base.full()
87 }
88}
89
90impl<'f, T, C> UnindexedConsumer<Option<T>> for WhileSomeConsumer<'f, C>
91where
92 C: UnindexedConsumer<T>,
93 T: Send,
94{
95 fn split_off_left(&self) -> Self {
96 WhileSomeConsumer {
97 base: self.base.split_off_left(),
98 ..*self
99 }
100 }
101
102 fn to_reducer(&self) -> Self::Reducer {
103 self.base.to_reducer()
104 }
105}
106
107struct WhileSomeFolder<'f, C> {
108 base: C,
109 full: &'f AtomicBool,
110}
111
112impl<'f, T, C> Folder<Option<T>> for WhileSomeFolder<'f, C>
113where
114 C: Folder<T>,
115{
116 type Result = C::Result;
117
118 fn consume(mut self, item: Option<T>) -> Self {
119 match item {
120 Some(item) => self.base = self.base.consume(item),
121 None => self.full.store(true, Ordering::Relaxed),
122 }
123 self
124 }
125
126 fn consume_iter<I>(mut self, iter: I) -> Self
127 where
128 I: IntoIterator<Item = Option<T>>,
129 {
130 fn some<T>(full: &AtomicBool) -> impl Fn(&Option<T>) -> bool + '_ {
131 move |x| match *x {
132 Some(_) => !full.load(Ordering::Relaxed),
133 None => {
134 full.store(true, Ordering::Relaxed);
135 false
136 }
137 }
138 }
139
140 self.base = self.base.consume_iter(
141 iter.into_iter()
142 .take_while(some(self.full))
143 .map(Option::unwrap),
144 );
145 self
146 }
147
148 fn complete(self) -> C::Result {
149 self.base.complete()
150 }
151
152 fn full(&self) -> bool {
153 self.full.load(Ordering::Relaxed) || self.base.full()
154 }
155}