gix_features/parallel/
reduce.rs

1#[cfg(feature = "parallel")]
2mod stepped {
3    use crate::parallel::num_threads;
4
5    /// An iterator adaptor to allow running computations using [`in_parallel()`][crate::parallel::in_parallel()] in a step-wise manner, see the [module docs][crate::parallel]
6    /// for details.
7    pub struct Stepwise<Reduce: super::Reduce> {
8        /// This field is first to assure it's dropped first and cause threads that are dropped next to stop their loops
9        /// as sending results fails when the receiver is dropped.
10        receive_result: std::sync::mpsc::Receiver<Reduce::Input>,
11        /// `join()` will be called on these guards to assure every thread tries to send through a closed channel. When
12        /// that happens, they break out of their loops.
13        threads: Vec<std::thread::JoinHandle<()>>,
14        /// The reducer is called only in the thread using the iterator, dropping it has no side effects.
15        reducer: Option<Reduce>,
16    }
17
18    impl<Reduce: super::Reduce> Drop for Stepwise<Reduce> {
19        fn drop(&mut self) {
20            let (_, sink) = std::sync::mpsc::channel();
21            drop(std::mem::replace(&mut self.receive_result, sink));
22
23            let mut last_err = None;
24            for handle in std::mem::take(&mut self.threads) {
25                if let Err(err) = handle.join() {
26                    last_err = Some(err);
27                };
28            }
29            if let Some(thread_err) = last_err {
30                std::panic::resume_unwind(thread_err);
31            }
32        }
33    }
34
35    impl<Reduce: super::Reduce> Stepwise<Reduce> {
36        /// Instantiate a new iterator and start working in threads.
37        /// For a description of parameters, see [`in_parallel()`][crate::parallel::in_parallel()].
38        pub fn new<InputIter, ThreadStateFn, ConsumeFn, I, O, S>(
39            input: InputIter,
40            thread_limit: Option<usize>,
41            new_thread_state: ThreadStateFn,
42            consume: ConsumeFn,
43            reducer: Reduce,
44        ) -> Self
45        where
46            InputIter: Iterator<Item = I> + Send + 'static,
47            ThreadStateFn: Fn(usize) -> S + Send + Clone + 'static,
48            ConsumeFn: Fn(I, &mut S) -> O + Send + Clone + 'static,
49            Reduce: super::Reduce<Input = O> + 'static,
50            I: Send + 'static,
51            O: Send + 'static,
52        {
53            let num_threads = num_threads(thread_limit);
54            let mut threads = Vec::with_capacity(num_threads + 1);
55            let receive_result = {
56                let (send_input, receive_input) = crossbeam_channel::bounded::<I>(num_threads);
57                let (send_result, receive_result) = std::sync::mpsc::sync_channel::<O>(num_threads);
58                for thread_id in 0..num_threads {
59                    let handle = std::thread::spawn({
60                        let send_result = send_result.clone();
61                        let receive_input = receive_input.clone();
62                        let new_thread_state = new_thread_state.clone();
63                        let consume = consume.clone();
64                        move || {
65                            let mut state = new_thread_state(thread_id);
66                            for item in receive_input {
67                                if send_result.send(consume(item, &mut state)).is_err() {
68                                    break;
69                                }
70                            }
71                        }
72                    });
73                    threads.push(handle);
74                }
75                threads.push(std::thread::spawn(move || {
76                    for item in input {
77                        if send_input.send(item).is_err() {
78                            break;
79                        }
80                    }
81                }));
82                receive_result
83            };
84            Stepwise {
85                threads,
86                receive_result,
87                reducer: Some(reducer),
88            }
89        }
90
91        /// Consume the iterator by finishing its iteration and calling [`Reduce::finalize()`][crate::parallel::Reduce::finalize()].
92        pub fn finalize(mut self) -> Result<Reduce::Output, Reduce::Error> {
93            for value in self.by_ref() {
94                drop(value?);
95            }
96            self.reducer
97                .take()
98                .expect("this is the last call before consumption")
99                .finalize()
100        }
101    }
102
103    impl<Reduce: super::Reduce> Iterator for Stepwise<Reduce> {
104        type Item = Result<Reduce::FeedProduce, Reduce::Error>;
105
106        fn next(&mut self) -> Option<<Self as Iterator>::Item> {
107            self.receive_result
108                .recv()
109                .ok()
110                .and_then(|input| self.reducer.as_mut().map(|r| r.feed(input)))
111        }
112    }
113
114    impl<R: super::Reduce> super::Finalize for Stepwise<R> {
115        type Reduce = R;
116
117        fn finalize(
118            self,
119        ) -> Result<
120            <<Self as super::Finalize>::Reduce as super::Reduce>::Output,
121            <<Self as super::Finalize>::Reduce as super::Reduce>::Error,
122        > {
123            Stepwise::finalize(self)
124        }
125    }
126}
127
128#[cfg(not(feature = "parallel"))]
129mod stepped {
130    /// An iterator adaptor to allow running computations using [`in_parallel()`][crate::parallel::in_parallel()] in a step-wise manner, see the [module docs][crate::parallel]
131    /// for details.
132    pub struct Stepwise<InputIter, ConsumeFn, ThreadState, Reduce> {
133        input: InputIter,
134        consume: ConsumeFn,
135        thread_state: ThreadState,
136        reducer: Reduce,
137    }
138
139    impl<InputIter, ConsumeFn, Reduce, I, O, S> Stepwise<InputIter, ConsumeFn, S, Reduce>
140    where
141        InputIter: Iterator<Item = I>,
142        ConsumeFn: Fn(I, &mut S) -> O,
143        Reduce: super::Reduce<Input = O>,
144    {
145        /// Instantiate a new iterator.
146        /// For a description of parameters, see [`in_parallel()`][crate::parallel::in_parallel()].
147        pub fn new<ThreadStateFn>(
148            input: InputIter,
149            _thread_limit: Option<usize>,
150            new_thread_state: ThreadStateFn,
151            consume: ConsumeFn,
152            reducer: Reduce,
153        ) -> Self
154        where
155            ThreadStateFn: Fn(usize) -> S,
156        {
157            Stepwise {
158                input,
159                consume,
160                thread_state: new_thread_state(0),
161                reducer,
162            }
163        }
164
165        /// Consume the iterator by finishing its iteration and calling [`Reduce::finalize()`][crate::parallel::Reduce::finalize()].
166        pub fn finalize(mut self) -> Result<Reduce::Output, Reduce::Error> {
167            for value in self.by_ref() {
168                drop(value?);
169            }
170            self.reducer.finalize()
171        }
172    }
173
174    impl<InputIter, ConsumeFn, ThreadState, Reduce, I, O> Iterator for Stepwise<InputIter, ConsumeFn, ThreadState, Reduce>
175    where
176        InputIter: Iterator<Item = I>,
177        ConsumeFn: Fn(I, &mut ThreadState) -> O,
178        Reduce: super::Reduce<Input = O>,
179    {
180        type Item = Result<Reduce::FeedProduce, Reduce::Error>;
181
182        fn next(&mut self) -> Option<<Self as Iterator>::Item> {
183            self.input
184                .next()
185                .map(|input| self.reducer.feed((self.consume)(input, &mut self.thread_state)))
186        }
187    }
188
189    impl<InputIter, ConsumeFn, R, I, O, S> super::Finalize for Stepwise<InputIter, ConsumeFn, S, R>
190    where
191        InputIter: Iterator<Item = I>,
192        ConsumeFn: Fn(I, &mut S) -> O,
193        R: super::Reduce<Input = O>,
194    {
195        type Reduce = R;
196
197        fn finalize(
198            self,
199        ) -> Result<
200            <<Self as super::Finalize>::Reduce as super::Reduce>::Output,
201            <<Self as super::Finalize>::Reduce as super::Reduce>::Error,
202        > {
203            Stepwise::finalize(self)
204        }
205    }
206}
207
208use std::marker::PhantomData;
209
210pub use stepped::Stepwise;
211
212/// An trait for aggregating items commonly produced in threads into a single result, without itself
213/// needing to be thread safe.
214pub trait Reduce {
215    /// The type fed to the reducer in the [`feed()`][Reduce::feed()] method.
216    ///
217    /// It's produced by a function that may run on multiple threads.
218    type Input;
219    /// The type produced in Ok(…) by [`feed()`][Reduce::feed()].
220    /// Most reducers by nature use `()` here as the value is in the aggregation.
221    /// However, some may use it to collect statistics only and return their Input
222    /// in some form as a result here for [`Stepwise`] to be useful.
223    type FeedProduce;
224    /// The type produced once by the [`finalize()`][Reduce::finalize()] method.
225    ///
226    /// For traditional reducers, this is the value produced by the entire operation.
227    /// For those made for step-wise iteration this may be aggregated statistics.
228    type Output;
229    /// The error type to use for all methods of this trait.
230    type Error;
231    /// Called each time a new `item` was produced in order to aggregate it into the final result.
232    ///
233    /// If an `Error` is returned, the entire operation will be stopped.
234    fn feed(&mut self, item: Self::Input) -> Result<Self::FeedProduce, Self::Error>;
235    /// Called once once all items where passed to `feed()`, producing the final `Output` of the operation or an `Error`.
236    fn finalize(self) -> Result<Self::Output, Self::Error>;
237}
238
239/// An identity reducer for those who want to use [`Stepwise`] or [`in_parallel()`][crate::parallel::in_parallel()]
240/// without the use of non-threaded reduction of products created in threads.
241pub struct IdentityWithResult<Input, Error> {
242    _input: PhantomData<Input>,
243    _error: PhantomData<Error>,
244}
245
246impl<Input, Error> Default for IdentityWithResult<Input, Error> {
247    fn default() -> Self {
248        IdentityWithResult {
249            _input: Default::default(),
250            _error: Default::default(),
251        }
252    }
253}
254
255impl<Input, Error> Reduce for IdentityWithResult<Input, Error> {
256    type Input = Result<Input, Self::Error>;
257    type FeedProduce = Input;
258    type Output = ();
259    type Error = Error;
260
261    fn feed(&mut self, item: Self::Input) -> Result<Self::FeedProduce, Self::Error> {
262        item
263    }
264
265    fn finalize(self) -> Result<Self::Output, Self::Error> {
266        Ok(())
267    }
268}
269
270/// A trait reflecting the `finalize()` method of [`Reduce`] implementations
271pub trait Finalize {
272    /// An implementation of [`Reduce`]
273    type Reduce: self::Reduce;
274
275    /// Similar to the [`Reduce::finalize()`] method
276    fn finalize(
277        self,
278    ) -> Result<<<Self as Finalize>::Reduce as self::Reduce>::Output, <<Self as Finalize>::Reduce as self::Reduce>::Error>;
279}