1use std::fmt::{self, Debug};
2
3use super::{plumbing::*, *};
4
5#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
13#[derive(Clone)]
14pub struct Update<I: ParallelIterator, F> {
15 base: I,
16 update_op: F,
17}
18
19impl<I: ParallelIterator + Debug, F> Debug for Update<I, F> {
20 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
21 f.debug_struct("Update").field("base", &self.base).finish()
22 }
23}
24
25impl<I, F> Update<I, F>
26where
27 I: ParallelIterator,
28{
29 pub(super) fn new(base: I, update_op: F) -> Self {
31 Update { base, update_op }
32 }
33}
34
35impl<I, F> ParallelIterator for Update<I, F>
36where
37 I: ParallelIterator,
38 F: Fn(&mut I::Item) + Send + Sync,
39{
40 type Item = I::Item;
41
42 fn drive_unindexed<C>(self, consumer: C) -> C::Result
43 where
44 C: UnindexedConsumer<Self::Item>,
45 {
46 let consumer1 = UpdateConsumer::new(consumer, &self.update_op);
47 self.base.drive_unindexed(consumer1)
48 }
49
50 fn opt_len(&self) -> Option<usize> {
51 self.base.opt_len()
52 }
53}
54
55impl<I, F> IndexedParallelIterator for Update<I, F>
56where
57 I: IndexedParallelIterator,
58 F: Fn(&mut I::Item) + Send + Sync,
59{
60 fn drive<C>(self, consumer: C) -> C::Result
61 where
62 C: Consumer<Self::Item>,
63 {
64 let consumer1 = UpdateConsumer::new(consumer, &self.update_op);
65 self.base.drive(consumer1)
66 }
67
68 fn len(&self) -> usize {
69 self.base.len()
70 }
71
72 fn with_producer<CB>(self, callback: CB) -> CB::Output
73 where
74 CB: ProducerCallback<Self::Item>,
75 {
76 return self.base.with_producer(Callback {
77 callback,
78 update_op: self.update_op,
79 });
80
81 struct Callback<CB, F> {
82 callback: CB,
83 update_op: F,
84 }
85
86 impl<T, F, CB> ProducerCallback<T> for Callback<CB, F>
87 where
88 CB: ProducerCallback<T>,
89 F: Fn(&mut T) + Send + Sync,
90 {
91 type Output = CB::Output;
92
93 fn callback<P>(self, base: P) -> CB::Output
94 where
95 P: Producer<Item = T>,
96 {
97 let producer = UpdateProducer {
98 base,
99 update_op: &self.update_op,
100 };
101 self.callback.callback(producer)
102 }
103 }
104 }
105}
106
107struct UpdateProducer<'f, P, F> {
110 base: P,
111 update_op: &'f F,
112}
113
114impl<'f, P, F> Producer for UpdateProducer<'f, P, F>
115where
116 P: Producer,
117 F: Fn(&mut P::Item) + Send + Sync,
118{
119 type IntoIter = UpdateSeq<P::IntoIter, &'f F>;
120 type Item = P::Item;
121
122 fn into_iter(self) -> Self::IntoIter {
123 UpdateSeq {
124 base: self.base.into_iter(),
125 update_op: self.update_op,
126 }
127 }
128
129 fn min_len(&self) -> usize {
130 self.base.min_len()
131 }
132
133 fn max_len(&self) -> usize {
134 self.base.max_len()
135 }
136
137 fn split_at(self, index: usize) -> (Self, Self) {
138 let (left, right) = self.base.split_at(index);
139 (
140 UpdateProducer {
141 base: left,
142 update_op: self.update_op,
143 },
144 UpdateProducer {
145 base: right,
146 update_op: self.update_op,
147 },
148 )
149 }
150
151 fn fold_with<G>(self, folder: G) -> G
152 where
153 G: Folder<Self::Item>,
154 {
155 let folder1 = UpdateFolder {
156 base: folder,
157 update_op: self.update_op,
158 };
159 self.base.fold_with(folder1).base
160 }
161}
162
163struct UpdateConsumer<'f, C, F> {
167 base: C,
168 update_op: &'f F,
169}
170
171impl<'f, C, F> UpdateConsumer<'f, C, F> {
172 fn new(base: C, update_op: &'f F) -> Self {
173 UpdateConsumer { base, update_op }
174 }
175}
176
177impl<'f, T, C, F> Consumer<T> for UpdateConsumer<'f, C, F>
178where
179 C: Consumer<T>,
180 F: Fn(&mut T) + Send + Sync,
181{
182 type Folder = UpdateFolder<'f, C::Folder, F>;
183 type Reducer = C::Reducer;
184 type Result = C::Result;
185
186 fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
187 let (left, right, reducer) = self.base.split_at(index);
188 (
189 UpdateConsumer::new(left, self.update_op),
190 UpdateConsumer::new(right, self.update_op),
191 reducer,
192 )
193 }
194
195 fn into_folder(self) -> Self::Folder {
196 UpdateFolder {
197 base: self.base.into_folder(),
198 update_op: self.update_op,
199 }
200 }
201
202 fn full(&self) -> bool {
203 self.base.full()
204 }
205}
206
207impl<'f, T, C, F> UnindexedConsumer<T> for UpdateConsumer<'f, C, F>
208where
209 C: UnindexedConsumer<T>,
210 F: Fn(&mut T) + Send + Sync,
211{
212 fn split_off_left(&self) -> Self {
213 UpdateConsumer::new(self.base.split_off_left(), self.update_op)
214 }
215
216 fn to_reducer(&self) -> Self::Reducer {
217 self.base.to_reducer()
218 }
219}
220
221struct UpdateFolder<'f, C, F> {
222 base: C,
223 update_op: &'f F,
224}
225
226fn apply<T>(update_op: impl Fn(&mut T)) -> impl Fn(T) -> T {
227 move |mut item| {
228 update_op(&mut item);
229 item
230 }
231}
232
233impl<'f, T, C, F> Folder<T> for UpdateFolder<'f, C, F>
234where
235 C: Folder<T>,
236 F: Fn(&mut T),
237{
238 type Result = C::Result;
239
240 fn consume(self, mut item: T) -> Self {
241 (self.update_op)(&mut item);
242
243 UpdateFolder {
244 base: self.base.consume(item),
245 update_op: self.update_op,
246 }
247 }
248
249 fn consume_iter<I>(mut self, iter: I) -> Self
250 where
251 I: IntoIterator<Item = T>,
252 {
253 let update_op = self.update_op;
254 self.base = self
255 .base
256 .consume_iter(iter.into_iter().map(apply(update_op)));
257 self
258 }
259
260 fn complete(self) -> C::Result {
261 self.base.complete()
262 }
263
264 fn full(&self) -> bool {
265 self.base.full()
266 }
267}
268
269#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
271#[derive(Debug, Clone)]
272struct UpdateSeq<I, F> {
273 base: I,
274 update_op: F,
275}
276
277impl<I, F> Iterator for UpdateSeq<I, F>
278where
279 I: Iterator,
280 F: Fn(&mut I::Item),
281{
282 type Item = I::Item;
283
284 fn next(&mut self) -> Option<Self::Item> {
285 let mut v = self.base.next()?;
286 (self.update_op)(&mut v);
287 Some(v)
288 }
289
290 fn size_hint(&self) -> (usize, Option<usize>) {
291 self.base.size_hint()
292 }
293
294 fn fold<Acc, G>(self, init: Acc, g: G) -> Acc
295 where
296 G: FnMut(Acc, Self::Item) -> Acc,
297 {
298 self.base.map(apply(self.update_op)).fold(init, g)
299 }
300
301 fn collect<C>(self) -> C
303 where
304 C: ::std::iter::FromIterator<Self::Item>,
305 {
306 self.base.map(apply(self.update_op)).collect()
307 }
308}
309
310impl<I, F> ExactSizeIterator for UpdateSeq<I, F>
311where
312 I: ExactSizeIterator,
313 F: Fn(&mut I::Item),
314{
315}
316
317impl<I, F> DoubleEndedIterator for UpdateSeq<I, F>
318where
319 I: DoubleEndedIterator,
320 F: Fn(&mut I::Item),
321{
322 fn next_back(&mut self) -> Option<Self::Item> {
323 let mut v = self.base.next_back()?;
324 (self.update_op)(&mut v);
325 Some(v)
326 }
327}