par_iter/iter/
fold.rs

1use std::fmt::{self, Debug};
2
3use super::{plumbing::*, *};
4
5impl<U, I, ID, F> Fold<I, ID, F>
6where
7    I: ParallelIterator,
8    F: Fn(U, I::Item) -> U + Sync + Send,
9    ID: Fn() -> U + Sync + Send,
10    U: Send,
11{
12    pub(super) fn new(base: I, identity: ID, fold_op: F) -> Self {
13        Fold {
14            base,
15            identity,
16            fold_op,
17        }
18    }
19}
20
21/// `Fold` is an iterator that applies a function over an iterator producing a
22/// single value. This struct is created by the [`fold()`] method on
23/// [`ParallelIterator`]
24///
25/// [`fold()`]: trait.ParallelIterator.html#method.fold
26/// [`ParallelIterator`]: trait.ParallelIterator.html
27#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
28#[derive(Clone)]
29pub struct Fold<I, ID, F> {
30    base: I,
31    identity: ID,
32    fold_op: F,
33}
34
35impl<I: ParallelIterator + Debug, ID, F> Debug for Fold<I, ID, F> {
36    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37        f.debug_struct("Fold").field("base", &self.base).finish()
38    }
39}
40
41impl<U, I, ID, F> ParallelIterator for Fold<I, ID, F>
42where
43    I: ParallelIterator,
44    F: Fn(U, I::Item) -> U + Sync + Send,
45    ID: Fn() -> U + Sync + Send,
46    U: Send,
47{
48    type Item = U;
49
50    fn drive_unindexed<C>(self, consumer: C) -> C::Result
51    where
52        C: UnindexedConsumer<Self::Item>,
53    {
54        let consumer1 = FoldConsumer {
55            base: consumer,
56            fold_op: &self.fold_op,
57            identity: &self.identity,
58        };
59        self.base.drive_unindexed(consumer1)
60    }
61}
62
63struct FoldConsumer<'c, C, ID, F> {
64    base: C,
65    fold_op: &'c F,
66    identity: &'c ID,
67}
68
69impl<'r, U, T, C, ID, F> Consumer<T> for FoldConsumer<'r, C, ID, F>
70where
71    C: Consumer<U>,
72    F: Fn(U, T) -> U + Sync,
73    ID: Fn() -> U + Sync,
74    U: Send,
75{
76    type Folder = FoldFolder<'r, C::Folder, U, F>;
77    type Reducer = C::Reducer;
78    type Result = C::Result;
79
80    fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
81        let (left, right, reducer) = self.base.split_at(index);
82        (
83            FoldConsumer { base: left, ..self },
84            FoldConsumer {
85                base: right,
86                ..self
87            },
88            reducer,
89        )
90    }
91
92    fn into_folder(self) -> Self::Folder {
93        FoldFolder {
94            base: self.base.into_folder(),
95            item: (self.identity)(),
96            fold_op: self.fold_op,
97        }
98    }
99
100    fn full(&self) -> bool {
101        self.base.full()
102    }
103}
104
105impl<'r, U, T, C, ID, F> UnindexedConsumer<T> for FoldConsumer<'r, C, ID, F>
106where
107    C: UnindexedConsumer<U>,
108    F: Fn(U, T) -> U + Sync,
109    ID: Fn() -> U + Sync,
110    U: Send,
111{
112    fn split_off_left(&self) -> Self {
113        FoldConsumer {
114            base: self.base.split_off_left(),
115            ..*self
116        }
117    }
118
119    fn to_reducer(&self) -> Self::Reducer {
120        self.base.to_reducer()
121    }
122}
123
124struct FoldFolder<'r, C, ID, F> {
125    base: C,
126    fold_op: &'r F,
127    item: ID,
128}
129
130impl<'r, C, ID, F, T> Folder<T> for FoldFolder<'r, C, ID, F>
131where
132    C: Folder<ID>,
133    F: Fn(ID, T) -> ID + Sync,
134{
135    type Result = C::Result;
136
137    fn consume(self, item: T) -> Self {
138        let item = (self.fold_op)(self.item, item);
139        FoldFolder {
140            base: self.base,
141            fold_op: self.fold_op,
142            item,
143        }
144    }
145
146    fn consume_iter<I>(self, iter: I) -> Self
147    where
148        I: IntoIterator<Item = T>,
149    {
150        fn not_full<C, ID, T>(base: &C) -> impl Fn(&T) -> bool + '_
151        where
152            C: Folder<ID>,
153        {
154            move |_| !base.full()
155        }
156
157        let base = self.base;
158        let item = iter
159            .into_iter()
160            // stop iterating if another thread has finished
161            .take_while(not_full(&base))
162            .fold(self.item, self.fold_op);
163
164        FoldFolder {
165            base,
166            item,
167            fold_op: self.fold_op,
168        }
169    }
170
171    fn complete(self) -> C::Result {
172        self.base.consume(self.item).complete()
173    }
174
175    fn full(&self) -> bool {
176        self.base.full()
177    }
178}
179
180// ///////////////////////////////////////////////////////////////////////////
181
182impl<U, I, F> FoldWith<I, U, F>
183where
184    I: ParallelIterator,
185    F: Fn(U, I::Item) -> U + Sync + Send,
186    U: Send + Clone,
187{
188    pub(super) fn new(base: I, item: U, fold_op: F) -> Self {
189        FoldWith {
190            base,
191            item,
192            fold_op,
193        }
194    }
195}
196
197/// `FoldWith` is an iterator that applies a function over an iterator producing
198/// a single value. This struct is created by the [`fold_with()`] method on
199/// [`ParallelIterator`]
200///
201/// [`fold_with()`]: trait.ParallelIterator.html#method.fold_with
202/// [`ParallelIterator`]: trait.ParallelIterator.html
203#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
204#[derive(Clone)]
205pub struct FoldWith<I, U, F> {
206    base: I,
207    item: U,
208    fold_op: F,
209}
210
211impl<I: ParallelIterator + Debug, U: Debug, F> Debug for FoldWith<I, U, F> {
212    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
213        f.debug_struct("FoldWith")
214            .field("base", &self.base)
215            .field("item", &self.item)
216            .finish()
217    }
218}
219
220impl<U, I, F> ParallelIterator for FoldWith<I, U, F>
221where
222    I: ParallelIterator,
223    F: Fn(U, I::Item) -> U + Sync + Send,
224    U: Send + Clone,
225{
226    type Item = U;
227
228    fn drive_unindexed<C>(self, consumer: C) -> C::Result
229    where
230        C: UnindexedConsumer<Self::Item>,
231    {
232        let consumer1 = FoldWithConsumer {
233            base: consumer,
234            item: self.item,
235            fold_op: &self.fold_op,
236        };
237        self.base.drive_unindexed(consumer1)
238    }
239}
240
241struct FoldWithConsumer<'c, C, U, F> {
242    base: C,
243    item: U,
244    fold_op: &'c F,
245}
246
247impl<'r, U, T, C, F> Consumer<T> for FoldWithConsumer<'r, C, U, F>
248where
249    C: Consumer<U>,
250    F: Fn(U, T) -> U + Sync,
251    U: Send + Clone,
252{
253    type Folder = FoldFolder<'r, C::Folder, U, F>;
254    type Reducer = C::Reducer;
255    type Result = C::Result;
256
257    fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
258        let (left, right, reducer) = self.base.split_at(index);
259        (
260            FoldWithConsumer {
261                base: left,
262                item: self.item.clone(),
263                ..self
264            },
265            FoldWithConsumer {
266                base: right,
267                ..self
268            },
269            reducer,
270        )
271    }
272
273    fn into_folder(self) -> Self::Folder {
274        FoldFolder {
275            base: self.base.into_folder(),
276            item: self.item,
277            fold_op: self.fold_op,
278        }
279    }
280
281    fn full(&self) -> bool {
282        self.base.full()
283    }
284}
285
286impl<'r, U, T, C, F> UnindexedConsumer<T> for FoldWithConsumer<'r, C, U, F>
287where
288    C: UnindexedConsumer<U>,
289    F: Fn(U, T) -> U + Sync,
290    U: Send + Clone,
291{
292    fn split_off_left(&self) -> Self {
293        FoldWithConsumer {
294            base: self.base.split_off_left(),
295            item: self.item.clone(),
296            ..*self
297        }
298    }
299
300    fn to_reducer(&self) -> Self::Reducer {
301        self.base.to_reducer()
302    }
303}