par_iter/iter/
update.rs

1use std::fmt::{self, Debug};
2
3use super::{plumbing::*, *};
4
5/// `Update` is an iterator that mutates the elements of an
6/// underlying iterator before they are yielded.
7///
8/// This struct is created by the [`update()`] method on [`ParallelIterator`]
9///
10/// [`update()`]: trait.ParallelIterator.html#method.update
11/// [`ParallelIterator`]: trait.ParallelIterator.html
12#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
13#[derive(Clone)]
14pub struct Update<I: ParallelIterator, F> {
15    base: I,
16    update_op: F,
17}
18
19impl<I: ParallelIterator + Debug, F> Debug for Update<I, F> {
20    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
21        f.debug_struct("Update").field("base", &self.base).finish()
22    }
23}
24
25impl<I, F> Update<I, F>
26where
27    I: ParallelIterator,
28{
29    /// Creates a new `Update` iterator.
30    pub(super) fn new(base: I, update_op: F) -> Self {
31        Update { base, update_op }
32    }
33}
34
35impl<I, F> ParallelIterator for Update<I, F>
36where
37    I: ParallelIterator,
38    F: Fn(&mut I::Item) + Send + Sync,
39{
40    type Item = I::Item;
41
42    fn drive_unindexed<C>(self, consumer: C) -> C::Result
43    where
44        C: UnindexedConsumer<Self::Item>,
45    {
46        let consumer1 = UpdateConsumer::new(consumer, &self.update_op);
47        self.base.drive_unindexed(consumer1)
48    }
49
50    fn opt_len(&self) -> Option<usize> {
51        self.base.opt_len()
52    }
53}
54
55impl<I, F> IndexedParallelIterator for Update<I, F>
56where
57    I: IndexedParallelIterator,
58    F: Fn(&mut I::Item) + Send + Sync,
59{
60    fn drive<C>(self, consumer: C) -> C::Result
61    where
62        C: Consumer<Self::Item>,
63    {
64        let consumer1 = UpdateConsumer::new(consumer, &self.update_op);
65        self.base.drive(consumer1)
66    }
67
68    fn len(&self) -> usize {
69        self.base.len()
70    }
71
72    fn with_producer<CB>(self, callback: CB) -> CB::Output
73    where
74        CB: ProducerCallback<Self::Item>,
75    {
76        return self.base.with_producer(Callback {
77            callback,
78            update_op: self.update_op,
79        });
80
81        struct Callback<CB, F> {
82            callback: CB,
83            update_op: F,
84        }
85
86        impl<T, F, CB> ProducerCallback<T> for Callback<CB, F>
87        where
88            CB: ProducerCallback<T>,
89            F: Fn(&mut T) + Send + Sync,
90        {
91            type Output = CB::Output;
92
93            fn callback<P>(self, base: P) -> CB::Output
94            where
95                P: Producer<Item = T>,
96            {
97                let producer = UpdateProducer {
98                    base,
99                    update_op: &self.update_op,
100                };
101                self.callback.callback(producer)
102            }
103        }
104    }
105}
106
107/// ////////////////////////////////////////////////////////////////////////
108
109struct UpdateProducer<'f, P, F> {
110    base: P,
111    update_op: &'f F,
112}
113
114impl<'f, P, F> Producer for UpdateProducer<'f, P, F>
115where
116    P: Producer,
117    F: Fn(&mut P::Item) + Send + Sync,
118{
119    type IntoIter = UpdateSeq<P::IntoIter, &'f F>;
120    type Item = P::Item;
121
122    fn into_iter(self) -> Self::IntoIter {
123        UpdateSeq {
124            base: self.base.into_iter(),
125            update_op: self.update_op,
126        }
127    }
128
129    fn min_len(&self) -> usize {
130        self.base.min_len()
131    }
132
133    fn max_len(&self) -> usize {
134        self.base.max_len()
135    }
136
137    fn split_at(self, index: usize) -> (Self, Self) {
138        let (left, right) = self.base.split_at(index);
139        (
140            UpdateProducer {
141                base: left,
142                update_op: self.update_op,
143            },
144            UpdateProducer {
145                base: right,
146                update_op: self.update_op,
147            },
148        )
149    }
150
151    fn fold_with<G>(self, folder: G) -> G
152    where
153        G: Folder<Self::Item>,
154    {
155        let folder1 = UpdateFolder {
156            base: folder,
157            update_op: self.update_op,
158        };
159        self.base.fold_with(folder1).base
160    }
161}
162
163/// ////////////////////////////////////////////////////////////////////////
164/// Consumer implementation
165
166struct UpdateConsumer<'f, C, F> {
167    base: C,
168    update_op: &'f F,
169}
170
171impl<'f, C, F> UpdateConsumer<'f, C, F> {
172    fn new(base: C, update_op: &'f F) -> Self {
173        UpdateConsumer { base, update_op }
174    }
175}
176
177impl<'f, T, C, F> Consumer<T> for UpdateConsumer<'f, C, F>
178where
179    C: Consumer<T>,
180    F: Fn(&mut T) + Send + Sync,
181{
182    type Folder = UpdateFolder<'f, C::Folder, F>;
183    type Reducer = C::Reducer;
184    type Result = C::Result;
185
186    fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
187        let (left, right, reducer) = self.base.split_at(index);
188        (
189            UpdateConsumer::new(left, self.update_op),
190            UpdateConsumer::new(right, self.update_op),
191            reducer,
192        )
193    }
194
195    fn into_folder(self) -> Self::Folder {
196        UpdateFolder {
197            base: self.base.into_folder(),
198            update_op: self.update_op,
199        }
200    }
201
202    fn full(&self) -> bool {
203        self.base.full()
204    }
205}
206
207impl<'f, T, C, F> UnindexedConsumer<T> for UpdateConsumer<'f, C, F>
208where
209    C: UnindexedConsumer<T>,
210    F: Fn(&mut T) + Send + Sync,
211{
212    fn split_off_left(&self) -> Self {
213        UpdateConsumer::new(self.base.split_off_left(), self.update_op)
214    }
215
216    fn to_reducer(&self) -> Self::Reducer {
217        self.base.to_reducer()
218    }
219}
220
221struct UpdateFolder<'f, C, F> {
222    base: C,
223    update_op: &'f F,
224}
225
226fn apply<T>(update_op: impl Fn(&mut T)) -> impl Fn(T) -> T {
227    move |mut item| {
228        update_op(&mut item);
229        item
230    }
231}
232
233impl<'f, T, C, F> Folder<T> for UpdateFolder<'f, C, F>
234where
235    C: Folder<T>,
236    F: Fn(&mut T),
237{
238    type Result = C::Result;
239
240    fn consume(self, mut item: T) -> Self {
241        (self.update_op)(&mut item);
242
243        UpdateFolder {
244            base: self.base.consume(item),
245            update_op: self.update_op,
246        }
247    }
248
249    fn consume_iter<I>(mut self, iter: I) -> Self
250    where
251        I: IntoIterator<Item = T>,
252    {
253        let update_op = self.update_op;
254        self.base = self
255            .base
256            .consume_iter(iter.into_iter().map(apply(update_op)));
257        self
258    }
259
260    fn complete(self) -> C::Result {
261        self.base.complete()
262    }
263
264    fn full(&self) -> bool {
265        self.base.full()
266    }
267}
268
269/// Standard Update adaptor, based on `itertools::adaptors::Update`
270#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
271#[derive(Debug, Clone)]
272struct UpdateSeq<I, F> {
273    base: I,
274    update_op: F,
275}
276
277impl<I, F> Iterator for UpdateSeq<I, F>
278where
279    I: Iterator,
280    F: Fn(&mut I::Item),
281{
282    type Item = I::Item;
283
284    fn next(&mut self) -> Option<Self::Item> {
285        let mut v = self.base.next()?;
286        (self.update_op)(&mut v);
287        Some(v)
288    }
289
290    fn size_hint(&self) -> (usize, Option<usize>) {
291        self.base.size_hint()
292    }
293
294    fn fold<Acc, G>(self, init: Acc, g: G) -> Acc
295    where
296        G: FnMut(Acc, Self::Item) -> Acc,
297    {
298        self.base.map(apply(self.update_op)).fold(init, g)
299    }
300
301    // if possible, re-use inner iterator specializations in collect
302    fn collect<C>(self) -> C
303    where
304        C: ::std::iter::FromIterator<Self::Item>,
305    {
306        self.base.map(apply(self.update_op)).collect()
307    }
308}
309
310impl<I, F> ExactSizeIterator for UpdateSeq<I, F>
311where
312    I: ExactSizeIterator,
313    F: Fn(&mut I::Item),
314{
315}
316
317impl<I, F> DoubleEndedIterator for UpdateSeq<I, F>
318where
319    I: DoubleEndedIterator,
320    F: Fn(&mut I::Item),
321{
322    fn next_back(&mut self) -> Option<Self::Item> {
323        let mut v = self.base.next_back()?;
324        (self.update_op)(&mut v);
325        Some(v)
326    }
327}