pariter/
lib.rs

1#![doc = include_str!("../README.md")]
2use std::sync::{
3    atomic::{AtomicBool, Ordering::SeqCst},
4    Arc,
5};
6
7mod parallel_map;
8pub use self::parallel_map::{ParallelMap, ParallelMapBuilder};
9
10mod readahead;
11pub use self::readahead::{Readahead, ReadaheadBuilder};
12
13mod parallel_filter;
14pub use self::parallel_filter::{ParallelFilter, ParallelFilterBuilder};
15
16pub mod profile;
17pub use self::profile::{
18    ProfileEgress, ProfileIngress, Profiler, TotalTimeProfiler, TotalTimeStats,
19};
20
21pub use crossbeam::{scope, thread::Scope};
22
23/// Extension trait for [`std::iter::Iterator`] bringing parallel operations
24///
25/// # TODO
26///
27/// * `parallel_for_each`
28/// * `parallel_flat_map`
29/// * possibly others
30///
31/// PRs welcome
32pub trait IteratorExt {
33    /// Run `map` function in parallel on multiple threads
34    ///
35    /// Results will be returned in order.
36    ///
37    /// No items will be pulled until first time [`ParallelMap`] is pulled for elements with [`ParallelMap::next`].
38    /// In that respect, `ParallelMap` behaves like every other iterator and is lazy.
39    fn parallel_map<F, O>(self, f: F) -> ParallelMap<Self, O>
40    where
41        Self: Sized,
42        Self: Iterator + 'static,
43        F: 'static + Send + Clone,
44        Self::Item: Send + 'static,
45        F: FnMut(Self::Item) -> O,
46        O: Send + 'static,
47    {
48        ParallelMapBuilder::new(self).with(f)
49    }
50
51    /// See [`IteratorExt::parallel_map`]
52    fn parallel_map_custom<F, O, OF>(self, of: OF, f: F) -> ParallelMap<Self, O>
53    where
54        Self: Sized,
55        Self: Iterator + 'static,
56        F: 'static + Send + Clone,
57        F: FnMut(Self::Item) -> O,
58        Self::Item: Send + 'static,
59        O: Send + 'static,
60        OF: FnOnce(ParallelMapBuilder<Self>) -> ParallelMapBuilder<Self>,
61    {
62        of(ParallelMapBuilder::new(self)).with(f)
63    }
64
65    /// A version of [`parallel_map`] supporting iterating over
66    /// borrowed values.
67    ///
68    /// See [`IteratorExt::parallel_map`]
69    fn parallel_map_scoped<'env, 'scope, F, O>(
70        self,
71        scope: &'scope Scope<'env>,
72        f: F,
73    ) -> ParallelMap<Self, O>
74    where
75        Self: Sized,
76        Self: Iterator + 'env,
77        F: 'env + Send + Clone,
78        Self::Item: Send + 'env,
79        F: FnMut(Self::Item) -> O,
80        O: Send + 'env,
81    {
82        ParallelMapBuilder::new(self).with_scoped(scope, f)
83    }
84
85    /// See [`IteratorExt::parallel_map_scoped`]
86    fn parallel_map_scoped_custom<'env, 'scope, F, O, OF>(
87        self,
88        scope: &'scope Scope<'env>,
89        of: OF,
90        f: F,
91    ) -> ParallelMap<Self, O>
92    where
93        Self: Sized,
94        Self: Iterator + 'env,
95        F: 'env + Send + Clone,
96        Self::Item: Send + 'env,
97        F: FnMut(Self::Item) -> O,
98        O: Send + 'env,
99        OF: FnOnce(ParallelMapBuilder<Self>) -> ParallelMapBuilder<Self>,
100    {
101        of(ParallelMapBuilder::new(self)).with_scoped(scope, f)
102    }
103
104    /// Run `filter` function in parallel on multiple threads
105    ///
106    /// A wrapper around [`IteratorExt::parallel_map`] really, so it has similiar properties.
107    fn parallel_filter<F>(self, f: F) -> ParallelFilter<Self>
108    where
109        Self: Sized,
110        Self: Iterator + 'static,
111        F: 'static + Send + Clone,
112        Self::Item: Send + 'static,
113        F: FnMut(&Self::Item) -> bool,
114    {
115        ParallelFilterBuilder::new(self).with(f)
116    }
117
118    /// See [`IteratorExt::parallel_filter`]
119    fn parallel_filter_custom<F, OF>(self, of: OF, f: F) -> ParallelFilter<Self>
120    where
121        Self: Sized,
122        Self: Iterator + 'static,
123        F: 'static + Send + Clone,
124        Self::Item: Send + 'static,
125        F: FnMut(&Self::Item) -> bool,
126        OF: FnOnce(ParallelFilterBuilder<Self>) -> ParallelFilterBuilder<Self>,
127    {
128        of(ParallelFilterBuilder::new(self)).with(f)
129    }
130
131    /// See [`IteratorExt::parallel_filter`]
132    fn parallel_filter_scoped<'env, 'scope, F>(
133        self,
134        scope: &'scope Scope<'env>,
135        f: F,
136    ) -> ParallelFilter<Self>
137    where
138        Self: Sized,
139        Self: Iterator + 'env,
140        F: 'env + Send + Clone,
141        Self::Item: Send + 'env,
142        F: FnMut(&Self::Item) -> bool,
143    {
144        ParallelFilterBuilder::new(self).with_scoped(scope, f)
145    }
146
147    /// See [`IteratorExt::parallel_filter`]
148    fn parallel_filter_scoped_custom<'env, 'scope, F, OF>(
149        self,
150        scope: &'scope Scope<'env>,
151        of: OF,
152        f: F,
153    ) -> ParallelFilter<Self>
154    where
155        Self: Sized,
156        Self: Iterator + 'env,
157        F: 'env + Send + Clone,
158        Self::Item: Send + 'env,
159        F: FnMut(&Self::Item) -> bool,
160        OF: FnOnce(ParallelFilterBuilder<Self>) -> ParallelFilterBuilder<Self>,
161    {
162        of(ParallelFilterBuilder::new(self)).with_scoped(scope, f)
163    }
164    /// Run the current iterator in another thread and return elements
165    /// through a buffered channel.
166    ///
167    /// `buffer_size` defines the size of the output channel connecting
168    /// current and the inner thread.
169    //
170    /// It's a common mistake to use large channel sizes needlessly
171    /// in hopes of achieving higher performance. The only benefit
172    /// large buffer size value provides is smooting out the variance
173    /// of the inner iterator returning items. The cost - wasting memory.
174    /// In normal circumstances `0` is recommended (the default).
175    fn readahead(self) -> Readahead<Self>
176    where
177        Self: Iterator + Send + 'static,
178        Self: Sized,
179        Self::Item: Send + 'static,
180    {
181        ReadaheadBuilder::new(self).with()
182    }
183
184    fn readahead_custom<OF>(self, of: OF) -> Readahead<Self>
185    where
186        Self: Iterator,
187        Self: Sized + Send + 'static,
188        Self::Item: Send + 'static,
189        OF: FnOnce(ReadaheadBuilder<Self>) -> ReadaheadBuilder<Self>,
190    {
191        of(ReadaheadBuilder::new(self)).with()
192    }
193
194    /// Scoped version of [`IteratorExt::readahead`]
195    ///
196    /// Use when you want to process in parallel items that contain
197    /// borrowed references.
198    ///
199    /// See [`scope`].
200    fn readahead_scoped<'env, 'scope>(self, scope: &'scope Scope<'env>) -> Readahead<Self>
201    where
202        Self: Sized + Send,
203        Self: Iterator + 'scope + 'env,
204        Self::Item: Send + 'env + 'scope + Send,
205    {
206        ReadaheadBuilder::new(self).with_scoped(scope)
207    }
208
209    fn readahead_scoped_custom<'env, 'scope, OF>(
210        self,
211        scope: &'scope Scope<'env>,
212        of: OF,
213    ) -> Readahead<Self>
214    where
215        Self: Sized + Send,
216        Self: Iterator + 'scope + 'env,
217        Self::Item: Send + 'env + 'scope + Send,
218        OF: FnOnce(ReadaheadBuilder<Self>) -> ReadaheadBuilder<Self>,
219    {
220        of(ReadaheadBuilder::new(self)).with_scoped(scope)
221    }
222
223    /// Profile the time it takes downstream iterator step to consume the returned items.
224    ///
225    /// See [`ProfileEgress`] and [`profile::Profiler`].
226    fn profile_egress<P: profile::Profiler>(self, profiler: P) -> ProfileEgress<Self, P>
227    where
228        Self: Iterator,
229        Self: Sized,
230    {
231        ProfileEgress::new(self, profiler)
232    }
233
234    /// Profile the time it takes upstream iterator step to produce the returned items.
235    ///
236    /// See [`ProfileIngress`] and [`profile::Profiler`].
237    fn profile_ingress<P: profile::Profiler>(self, profiler: P) -> ProfileIngress<Self, P>
238    where
239        Self: Iterator,
240        Self: Sized,
241    {
242        ProfileIngress::new(self, profiler)
243    }
244
245    /// Profiled version of [`IteratorExt::readahead`]
246    ///
247    /// Literally `.profile_egress(tx_profiler).readahead(n).profile_ingress(rx_profiler)`
248    ///
249    /// See [`Profiler`] for more info.
250    fn readahead_profiled<TxP: profile::Profiler, RxP: profile::Profiler>(
251        self,
252        tx_profiler: TxP,
253        rx_profiler: RxP,
254    ) -> ProfileIngress<Readahead<ProfileEgress<Self, TxP>>, RxP>
255    where
256        Self: Iterator,
257        Self: Sized,
258        Self: Send + 'static,
259        Self::Item: Send + 'static,
260        TxP: Send + 'static,
261    {
262        self.profile_egress(tx_profiler)
263            .readahead()
264            .profile_ingress(rx_profiler)
265    }
266
267    /// Profiled version of [`IteratorExt::readahead_scoped`]
268    ///
269    /// Literally `.profile_egress(tx_profiler).readahead_scoped(scope, n).profile_ingress(rx_profiler)`
270    ///
271    /// See [`Profiler`] for more info.
272    fn readahead_scoped_profiled<'env, 'scope, TxP: profile::Profiler, RxP: profile::Profiler>(
273        self,
274        scope: &'scope Scope<'env>,
275        tx_profiler: TxP,
276        rx_profiler: RxP,
277    ) -> ProfileIngress<Readahead<ProfileEgress<Self, TxP>>, RxP>
278    where
279        Self: Sized + Send,
280        Self: Iterator + 'scope + 'env,
281        Self::Item: Send + 'env + 'scope + Send,
282        TxP: Send + 'static,
283    {
284        self.profile_egress(tx_profiler)
285            .readahead_scoped(scope)
286            .profile_ingress(rx_profiler)
287    }
288}
289
290impl<I> IteratorExt for I where I: Iterator {}
291
292struct DropIndicator {
293    canceled: bool,
294    indicator: Arc<AtomicBool>,
295}
296
297impl DropIndicator {
298    fn new(indicator: Arc<AtomicBool>) -> Self {
299        Self {
300            canceled: false,
301            indicator,
302        }
303    }
304
305    fn cancel(mut self) {
306        self.canceled = true;
307    }
308}
309
310impl Drop for DropIndicator {
311    fn drop(&mut self) {
312        if !self.canceled {
313            self.indicator.store(true, SeqCst);
314        }
315    }
316}
317
318#[cfg(test)]
319mod tests;