1#[cfg(feature = "parallel")]
2mod stepped {
3 use crate::parallel::num_threads;
4
5 pub struct Stepwise<Reduce: super::Reduce> {
8 receive_result: std::sync::mpsc::Receiver<Reduce::Input>,
11 threads: Vec<std::thread::JoinHandle<()>>,
14 reducer: Option<Reduce>,
16 }
17
18 impl<Reduce: super::Reduce> Drop for Stepwise<Reduce> {
19 fn drop(&mut self) {
20 let (_, sink) = std::sync::mpsc::channel();
21 drop(std::mem::replace(&mut self.receive_result, sink));
22
23 let mut last_err = None;
24 for handle in std::mem::take(&mut self.threads) {
25 if let Err(err) = handle.join() {
26 last_err = Some(err);
27 };
28 }
29 if let Some(thread_err) = last_err {
30 std::panic::resume_unwind(thread_err);
31 }
32 }
33 }
34
35 impl<Reduce: super::Reduce> Stepwise<Reduce> {
36 pub fn new<InputIter, ThreadStateFn, ConsumeFn, I, O, S>(
39 input: InputIter,
40 thread_limit: Option<usize>,
41 new_thread_state: ThreadStateFn,
42 consume: ConsumeFn,
43 reducer: Reduce,
44 ) -> Self
45 where
46 InputIter: Iterator<Item = I> + Send + 'static,
47 ThreadStateFn: Fn(usize) -> S + Send + Clone + 'static,
48 ConsumeFn: Fn(I, &mut S) -> O + Send + Clone + 'static,
49 Reduce: super::Reduce<Input = O> + 'static,
50 I: Send + 'static,
51 O: Send + 'static,
52 {
53 let num_threads = num_threads(thread_limit);
54 let mut threads = Vec::with_capacity(num_threads + 1);
55 let receive_result = {
56 let (send_input, receive_input) = crossbeam_channel::bounded::<I>(num_threads);
57 let (send_result, receive_result) = std::sync::mpsc::sync_channel::<O>(num_threads);
58 for thread_id in 0..num_threads {
59 let handle = std::thread::spawn({
60 let send_result = send_result.clone();
61 let receive_input = receive_input.clone();
62 let new_thread_state = new_thread_state.clone();
63 let consume = consume.clone();
64 move || {
65 let mut state = new_thread_state(thread_id);
66 for item in receive_input {
67 if send_result.send(consume(item, &mut state)).is_err() {
68 break;
69 }
70 }
71 }
72 });
73 threads.push(handle);
74 }
75 threads.push(std::thread::spawn(move || {
76 for item in input {
77 if send_input.send(item).is_err() {
78 break;
79 }
80 }
81 }));
82 receive_result
83 };
84 Stepwise {
85 threads,
86 receive_result,
87 reducer: Some(reducer),
88 }
89 }
90
91 pub fn finalize(mut self) -> Result<Reduce::Output, Reduce::Error> {
93 for value in self.by_ref() {
94 drop(value?);
95 }
96 self.reducer
97 .take()
98 .expect("this is the last call before consumption")
99 .finalize()
100 }
101 }
102
103 impl<Reduce: super::Reduce> Iterator for Stepwise<Reduce> {
104 type Item = Result<Reduce::FeedProduce, Reduce::Error>;
105
106 fn next(&mut self) -> Option<<Self as Iterator>::Item> {
107 self.receive_result
108 .recv()
109 .ok()
110 .and_then(|input| self.reducer.as_mut().map(|r| r.feed(input)))
111 }
112 }
113
114 impl<R: super::Reduce> super::Finalize for Stepwise<R> {
115 type Reduce = R;
116
117 fn finalize(
118 self,
119 ) -> Result<
120 <<Self as super::Finalize>::Reduce as super::Reduce>::Output,
121 <<Self as super::Finalize>::Reduce as super::Reduce>::Error,
122 > {
123 Stepwise::finalize(self)
124 }
125 }
126}
127
128#[cfg(not(feature = "parallel"))]
129mod stepped {
130 pub struct Stepwise<InputIter, ConsumeFn, ThreadState, Reduce> {
133 input: InputIter,
134 consume: ConsumeFn,
135 thread_state: ThreadState,
136 reducer: Reduce,
137 }
138
139 impl<InputIter, ConsumeFn, Reduce, I, O, S> Stepwise<InputIter, ConsumeFn, S, Reduce>
140 where
141 InputIter: Iterator<Item = I>,
142 ConsumeFn: Fn(I, &mut S) -> O,
143 Reduce: super::Reduce<Input = O>,
144 {
145 pub fn new<ThreadStateFn>(
148 input: InputIter,
149 _thread_limit: Option<usize>,
150 new_thread_state: ThreadStateFn,
151 consume: ConsumeFn,
152 reducer: Reduce,
153 ) -> Self
154 where
155 ThreadStateFn: Fn(usize) -> S,
156 {
157 Stepwise {
158 input,
159 consume,
160 thread_state: new_thread_state(0),
161 reducer,
162 }
163 }
164
165 pub fn finalize(mut self) -> Result<Reduce::Output, Reduce::Error> {
167 for value in self.by_ref() {
168 drop(value?);
169 }
170 self.reducer.finalize()
171 }
172 }
173
174 impl<InputIter, ConsumeFn, ThreadState, Reduce, I, O> Iterator for Stepwise<InputIter, ConsumeFn, ThreadState, Reduce>
175 where
176 InputIter: Iterator<Item = I>,
177 ConsumeFn: Fn(I, &mut ThreadState) -> O,
178 Reduce: super::Reduce<Input = O>,
179 {
180 type Item = Result<Reduce::FeedProduce, Reduce::Error>;
181
182 fn next(&mut self) -> Option<<Self as Iterator>::Item> {
183 self.input
184 .next()
185 .map(|input| self.reducer.feed((self.consume)(input, &mut self.thread_state)))
186 }
187 }
188
189 impl<InputIter, ConsumeFn, R, I, O, S> super::Finalize for Stepwise<InputIter, ConsumeFn, S, R>
190 where
191 InputIter: Iterator<Item = I>,
192 ConsumeFn: Fn(I, &mut S) -> O,
193 R: super::Reduce<Input = O>,
194 {
195 type Reduce = R;
196
197 fn finalize(
198 self,
199 ) -> Result<
200 <<Self as super::Finalize>::Reduce as super::Reduce>::Output,
201 <<Self as super::Finalize>::Reduce as super::Reduce>::Error,
202 > {
203 Stepwise::finalize(self)
204 }
205 }
206}
207
208use std::marker::PhantomData;
209
210pub use stepped::Stepwise;
211
212pub trait Reduce {
215 type Input;
219 type FeedProduce;
224 type Output;
229 type Error;
231 fn feed(&mut self, item: Self::Input) -> Result<Self::FeedProduce, Self::Error>;
235 fn finalize(self) -> Result<Self::Output, Self::Error>;
237}
238
239pub struct IdentityWithResult<Input, Error> {
242 _input: PhantomData<Input>,
243 _error: PhantomData<Error>,
244}
245
246impl<Input, Error> Default for IdentityWithResult<Input, Error> {
247 fn default() -> Self {
248 IdentityWithResult {
249 _input: Default::default(),
250 _error: Default::default(),
251 }
252 }
253}
254
255impl<Input, Error> Reduce for IdentityWithResult<Input, Error> {
256 type Input = Result<Input, Self::Error>;
257 type FeedProduce = Input;
258 type Output = ();
259 type Error = Error;
260
261 fn feed(&mut self, item: Self::Input) -> Result<Self::FeedProduce, Self::Error> {
262 item
263 }
264
265 fn finalize(self) -> Result<Self::Output, Self::Error> {
266 Ok(())
267 }
268}
269
270pub trait Finalize {
272 type Reduce: self::Reduce;
274
275 fn finalize(
277 self,
278 ) -> Result<<<Self as Finalize>::Reduce as self::Reduce>::Output, <<Self as Finalize>::Reduce as self::Reduce>::Error>;
279}