1#![doc = include_str!("../README.md")]
2use std::sync::{
3 atomic::{AtomicBool, Ordering::SeqCst},
4 Arc,
5};
6
7mod parallel_map;
8pub use self::parallel_map::{ParallelMap, ParallelMapBuilder};
9
10mod readahead;
11pub use self::readahead::{Readahead, ReadaheadBuilder};
12
13mod parallel_filter;
14pub use self::parallel_filter::{ParallelFilter, ParallelFilterBuilder};
15
16pub mod profile;
17pub use self::profile::{
18 ProfileEgress, ProfileIngress, Profiler, TotalTimeProfiler, TotalTimeStats,
19};
20
21pub use crossbeam::{scope, thread::Scope};
22
23pub trait IteratorExt {
33 fn parallel_map<F, O>(self, f: F) -> ParallelMap<Self, O>
40 where
41 Self: Sized,
42 Self: Iterator + 'static,
43 F: 'static + Send + Clone,
44 Self::Item: Send + 'static,
45 F: FnMut(Self::Item) -> O,
46 O: Send + 'static,
47 {
48 ParallelMapBuilder::new(self).with(f)
49 }
50
51 fn parallel_map_custom<F, O, OF>(self, of: OF, f: F) -> ParallelMap<Self, O>
53 where
54 Self: Sized,
55 Self: Iterator + 'static,
56 F: 'static + Send + Clone,
57 F: FnMut(Self::Item) -> O,
58 Self::Item: Send + 'static,
59 O: Send + 'static,
60 OF: FnOnce(ParallelMapBuilder<Self>) -> ParallelMapBuilder<Self>,
61 {
62 of(ParallelMapBuilder::new(self)).with(f)
63 }
64
65 fn parallel_map_scoped<'env, 'scope, F, O>(
70 self,
71 scope: &'scope Scope<'env>,
72 f: F,
73 ) -> ParallelMap<Self, O>
74 where
75 Self: Sized,
76 Self: Iterator + 'env,
77 F: 'env + Send + Clone,
78 Self::Item: Send + 'env,
79 F: FnMut(Self::Item) -> O,
80 O: Send + 'env,
81 {
82 ParallelMapBuilder::new(self).with_scoped(scope, f)
83 }
84
85 fn parallel_map_scoped_custom<'env, 'scope, F, O, OF>(
87 self,
88 scope: &'scope Scope<'env>,
89 of: OF,
90 f: F,
91 ) -> ParallelMap<Self, O>
92 where
93 Self: Sized,
94 Self: Iterator + 'env,
95 F: 'env + Send + Clone,
96 Self::Item: Send + 'env,
97 F: FnMut(Self::Item) -> O,
98 O: Send + 'env,
99 OF: FnOnce(ParallelMapBuilder<Self>) -> ParallelMapBuilder<Self>,
100 {
101 of(ParallelMapBuilder::new(self)).with_scoped(scope, f)
102 }
103
104 fn parallel_filter<F>(self, f: F) -> ParallelFilter<Self>
108 where
109 Self: Sized,
110 Self: Iterator + 'static,
111 F: 'static + Send + Clone,
112 Self::Item: Send + 'static,
113 F: FnMut(&Self::Item) -> bool,
114 {
115 ParallelFilterBuilder::new(self).with(f)
116 }
117
118 fn parallel_filter_custom<F, OF>(self, of: OF, f: F) -> ParallelFilter<Self>
120 where
121 Self: Sized,
122 Self: Iterator + 'static,
123 F: 'static + Send + Clone,
124 Self::Item: Send + 'static,
125 F: FnMut(&Self::Item) -> bool,
126 OF: FnOnce(ParallelFilterBuilder<Self>) -> ParallelFilterBuilder<Self>,
127 {
128 of(ParallelFilterBuilder::new(self)).with(f)
129 }
130
131 fn parallel_filter_scoped<'env, 'scope, F>(
133 self,
134 scope: &'scope Scope<'env>,
135 f: F,
136 ) -> ParallelFilter<Self>
137 where
138 Self: Sized,
139 Self: Iterator + 'env,
140 F: 'env + Send + Clone,
141 Self::Item: Send + 'env,
142 F: FnMut(&Self::Item) -> bool,
143 {
144 ParallelFilterBuilder::new(self).with_scoped(scope, f)
145 }
146
147 fn parallel_filter_scoped_custom<'env, 'scope, F, OF>(
149 self,
150 scope: &'scope Scope<'env>,
151 of: OF,
152 f: F,
153 ) -> ParallelFilter<Self>
154 where
155 Self: Sized,
156 Self: Iterator + 'env,
157 F: 'env + Send + Clone,
158 Self::Item: Send + 'env,
159 F: FnMut(&Self::Item) -> bool,
160 OF: FnOnce(ParallelFilterBuilder<Self>) -> ParallelFilterBuilder<Self>,
161 {
162 of(ParallelFilterBuilder::new(self)).with_scoped(scope, f)
163 }
164 fn readahead(self) -> Readahead<Self>
176 where
177 Self: Iterator + Send + 'static,
178 Self: Sized,
179 Self::Item: Send + 'static,
180 {
181 ReadaheadBuilder::new(self).with()
182 }
183
184 fn readahead_custom<OF>(self, of: OF) -> Readahead<Self>
185 where
186 Self: Iterator,
187 Self: Sized + Send + 'static,
188 Self::Item: Send + 'static,
189 OF: FnOnce(ReadaheadBuilder<Self>) -> ReadaheadBuilder<Self>,
190 {
191 of(ReadaheadBuilder::new(self)).with()
192 }
193
194 fn readahead_scoped<'env, 'scope>(self, scope: &'scope Scope<'env>) -> Readahead<Self>
201 where
202 Self: Sized + Send,
203 Self: Iterator + 'scope + 'env,
204 Self::Item: Send + 'env + 'scope + Send,
205 {
206 ReadaheadBuilder::new(self).with_scoped(scope)
207 }
208
209 fn readahead_scoped_custom<'env, 'scope, OF>(
210 self,
211 scope: &'scope Scope<'env>,
212 of: OF,
213 ) -> Readahead<Self>
214 where
215 Self: Sized + Send,
216 Self: Iterator + 'scope + 'env,
217 Self::Item: Send + 'env + 'scope + Send,
218 OF: FnOnce(ReadaheadBuilder<Self>) -> ReadaheadBuilder<Self>,
219 {
220 of(ReadaheadBuilder::new(self)).with_scoped(scope)
221 }
222
223 fn profile_egress<P: profile::Profiler>(self, profiler: P) -> ProfileEgress<Self, P>
227 where
228 Self: Iterator,
229 Self: Sized,
230 {
231 ProfileEgress::new(self, profiler)
232 }
233
234 fn profile_ingress<P: profile::Profiler>(self, profiler: P) -> ProfileIngress<Self, P>
238 where
239 Self: Iterator,
240 Self: Sized,
241 {
242 ProfileIngress::new(self, profiler)
243 }
244
245 fn readahead_profiled<TxP: profile::Profiler, RxP: profile::Profiler>(
251 self,
252 tx_profiler: TxP,
253 rx_profiler: RxP,
254 ) -> ProfileIngress<Readahead<ProfileEgress<Self, TxP>>, RxP>
255 where
256 Self: Iterator,
257 Self: Sized,
258 Self: Send + 'static,
259 Self::Item: Send + 'static,
260 TxP: Send + 'static,
261 {
262 self.profile_egress(tx_profiler)
263 .readahead()
264 .profile_ingress(rx_profiler)
265 }
266
267 fn readahead_scoped_profiled<'env, 'scope, TxP: profile::Profiler, RxP: profile::Profiler>(
273 self,
274 scope: &'scope Scope<'env>,
275 tx_profiler: TxP,
276 rx_profiler: RxP,
277 ) -> ProfileIngress<Readahead<ProfileEgress<Self, TxP>>, RxP>
278 where
279 Self: Sized + Send,
280 Self: Iterator + 'scope + 'env,
281 Self::Item: Send + 'env + 'scope + Send,
282 TxP: Send + 'static,
283 {
284 self.profile_egress(tx_profiler)
285 .readahead_scoped(scope)
286 .profile_ingress(rx_profiler)
287 }
288}
289
290impl<I> IteratorExt for I where I: Iterator {}
291
292struct DropIndicator {
293 canceled: bool,
294 indicator: Arc<AtomicBool>,
295}
296
297impl DropIndicator {
298 fn new(indicator: Arc<AtomicBool>) -> Self {
299 Self {
300 canceled: false,
301 indicator,
302 }
303 }
304
305 fn cancel(mut self) {
306 self.canceled = true;
307 }
308}
309
310impl Drop for DropIndicator {
311 fn drop(&mut self) {
312 if !self.canceled {
313 self.indicator.store(true, SeqCst);
314 }
315 }
316}
317
318#[cfg(test)]
319mod tests;