micropool/
lib.rs

1#![doc = include_str!("../README.md")]
2#![warn(missing_docs)]
3
4pub use paralight::iter;
5use paralight::iter::GenericThreadPool;
6
7pub use self::task::*;
8pub use self::thread_pool::*;
9
10/// Internal tracking for work trees executing on a thread pool.
11mod join_point;
12
13/// Implementation of tasks for the [`spawn`] API.
14mod task;
15
16/// The primary thread pool interface.
17mod thread_pool;
18
19/// Synchronization primitives and helper types used in the implementation.
20mod util;
21
22/// Execute [`paralight`] iterators with maximal parallelism.
23/// Every iterator item may be processed on a separate thread.
24///
25/// Note: by maximizing parallelism, this also maximizes overhead.
26/// This is best used with computationally-heavy iterators that have few
27/// elements. For alternatives, see [`split_per`], [`split_by`], and
28/// [`split_by_threads`].
29pub fn split_per_item() -> impl GenericThreadPool {
30    struct SplitPerItem;
31
32    unsafe impl GenericThreadPool for SplitPerItem {
33        fn upper_bounded_pipeline<Output: Send, Accum>(
34            self,
35            input_len: usize,
36            init: impl Fn() -> Accum + Sync,
37            process_item: impl Fn(Accum, usize) -> std::ops::ControlFlow<Accum, Accum> + Sync,
38            finalize: impl Fn(Accum) -> Output + Sync,
39            reduce: impl Fn(Output, Output) -> Output,
40            cleanup: &(impl paralight::iter::SourceCleanup + Sync),
41        ) -> Output {
42            ThreadPool::with_current(|f| {
43                f.split_per_item().upper_bounded_pipeline(
44                    input_len,
45                    init,
46                    process_item,
47                    finalize,
48                    reduce,
49                    cleanup,
50                )
51            })
52        }
53
54        fn iter_pipeline<Output: Send>(
55            self,
56            input_len: usize,
57            accum: impl paralight::iter::Accumulator<usize, Output> + Sync,
58            reduce: impl paralight::iter::Accumulator<Output, Output>,
59            cleanup: &(impl paralight::iter::SourceCleanup + Sync),
60        ) -> Output {
61            ThreadPool::with_current(|f| {
62                f.split_per_item()
63                    .iter_pipeline(input_len, accum, reduce, cleanup)
64            })
65        }
66    }
67
68    SplitPerItem
69}
70
71/// Execute [`paralight`] iterators by batching elements.
72/// Each group of `chunk_size` elements may be processed by a single thread.
73pub fn split_per(chunk_size: usize) -> impl GenericThreadPool {
74    struct ThreadPerChunk(usize);
75
76    unsafe impl GenericThreadPool for ThreadPerChunk {
77        fn upper_bounded_pipeline<Output: Send, Accum>(
78            self,
79            input_len: usize,
80            init: impl Fn() -> Accum + Sync,
81            process_item: impl Fn(Accum, usize) -> std::ops::ControlFlow<Accum, Accum> + Sync,
82            finalize: impl Fn(Accum) -> Output + Sync,
83            reduce: impl Fn(Output, Output) -> Output,
84            cleanup: &(impl paralight::iter::SourceCleanup + Sync),
85        ) -> Output {
86            ThreadPool::with_current(|f| {
87                f.split_by(self.0).upper_bounded_pipeline(
88                    input_len,
89                    init,
90                    process_item,
91                    finalize,
92                    reduce,
93                    cleanup,
94                )
95            })
96        }
97
98        fn iter_pipeline<Output: Send>(
99            self,
100            input_len: usize,
101            accum: impl paralight::iter::Accumulator<usize, Output> + Sync,
102            reduce: impl paralight::iter::Accumulator<Output, Output>,
103            cleanup: &(impl paralight::iter::SourceCleanup + Sync),
104        ) -> Output {
105            ThreadPool::with_current(|f| {
106                f.split_by(self.0)
107                    .iter_pipeline(input_len, accum, reduce, cleanup)
108            })
109        }
110    }
111
112    ThreadPerChunk(chunk_size)
113}
114
115/// Execute [`paralight`] iterators by batching elements.
116/// Every iterator will be broken up into `chunks`
117/// separate work units, which may be processed in parallel.
118pub fn split_by(chunks: usize) -> impl GenericThreadPool {
119    struct Chunks(usize);
120
121    unsafe impl GenericThreadPool for Chunks {
122        fn upper_bounded_pipeline<Output: Send, Accum>(
123            self,
124            input_len: usize,
125            init: impl Fn() -> Accum + Sync,
126            process_item: impl Fn(Accum, usize) -> std::ops::ControlFlow<Accum, Accum> + Sync,
127            finalize: impl Fn(Accum) -> Output + Sync,
128            reduce: impl Fn(Output, Output) -> Output,
129            cleanup: &(impl paralight::iter::SourceCleanup + Sync),
130        ) -> Output {
131            ThreadPool::with_current(|f| {
132                f.split_by(self.0).upper_bounded_pipeline(
133                    input_len,
134                    init,
135                    process_item,
136                    finalize,
137                    reduce,
138                    cleanup,
139                )
140            })
141        }
142
143        fn iter_pipeline<Output: Send>(
144            self,
145            input_len: usize,
146            accum: impl paralight::iter::Accumulator<usize, Output> + Sync,
147            reduce: impl paralight::iter::Accumulator<Output, Output>,
148            cleanup: &(impl paralight::iter::SourceCleanup + Sync),
149        ) -> Output {
150            ThreadPool::with_current(|f| {
151                f.split_by(self.0)
152                    .iter_pipeline(input_len, accum, reduce, cleanup)
153            })
154        }
155    }
156
157    Chunks(chunks)
158}
159
160/// Execute [`paralight`] iterators by batching elements.
161/// Every iterator will be broken up into `N` separate work units,
162/// where `N` is the number of pool threads. Each unit may be processed in
163/// parallel.
164pub fn split_by_threads() -> impl GenericThreadPool {
165    struct SplitByThreads;
166
167    unsafe impl GenericThreadPool for SplitByThreads {
168        fn upper_bounded_pipeline<Output: Send, Accum>(
169            self,
170            input_len: usize,
171            init: impl Fn() -> Accum + Sync,
172            process_item: impl Fn(Accum, usize) -> std::ops::ControlFlow<Accum, Accum> + Sync,
173            finalize: impl Fn(Accum) -> Output + Sync,
174            reduce: impl Fn(Output, Output) -> Output,
175            cleanup: &(impl paralight::iter::SourceCleanup + Sync),
176        ) -> Output {
177            ThreadPool::with_current(|f| {
178                f.split_by_threads().upper_bounded_pipeline(
179                    input_len,
180                    init,
181                    process_item,
182                    finalize,
183                    reduce,
184                    cleanup,
185                )
186            })
187        }
188
189        fn iter_pipeline<Output: Send>(
190            self,
191            input_len: usize,
192            accum: impl paralight::iter::Accumulator<usize, Output> + Sync,
193            reduce: impl paralight::iter::Accumulator<Output, Output>,
194            cleanup: &(impl paralight::iter::SourceCleanup + Sync),
195        ) -> Output {
196            ThreadPool::with_current(|f| {
197                f.split_by_threads()
198                    .iter_pipeline(input_len, accum, reduce, cleanup)
199            })
200        }
201    }
202
203    SplitByThreads
204}
205
206/// Takes two closures and *potentially* runs them in parallel. It
207/// returns a pair of the results from those closures.
208pub fn join<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
209where
210    A: FnOnce() -> RA + Send,
211    B: FnOnce() -> RB + Send,
212    RA: Send,
213    RB: Send,
214{
215    ThreadPool::with_current(|pool| pool.join(oper_a, oper_b))
216}
217
218/// The total number of worker threads in the current pool.
219pub fn num_threads() -> usize {
220    ThreadPool::with_current(|pool| pool.num_threads())
221}
222
223/// Spawns an asynchronous task on the global thread pool.
224/// The returned handle can be used to obtain the result.
225pub fn spawn<T: 'static + Send>(f: impl 'static + Send + FnOnce() -> T) -> Task<T> {
226    ThreadPool::with_current(|pool| pool.spawn(f))
227}
228
229/// Tests for `micropool`.
230#[cfg(test)]
231mod tests {
232    use crate::iter::*;
233
234    /// Tests that a parallel iterator can add things.
235    #[test]
236    fn test_add() {
237        let len = 10_000;
238        let mut output = vec![0; len];
239        let left = (0..len as u64).collect::<Vec<u64>>();
240        let right = (0..len as u64).collect::<Vec<u64>>();
241        let expected_output = (0..len as u64).map(|x| 2 * x).collect::<Vec<u64>>();
242
243        let output_slice = output.as_mut_slice();
244        let left_slice = left.as_slice();
245        let right_slice = right.as_slice();
246
247        (
248            std::hint::black_box(output_slice.par_iter_mut()),
249            std::hint::black_box(left_slice).par_iter(),
250            std::hint::black_box(right_slice).par_iter(),
251        )
252            .zip_eq()
253            .with_thread_pool(crate::split_by_threads())
254            .for_each(|(out, &a, &b)| *out = a + b);
255
256        assert_eq!(output, expected_output);
257    }
258
259    /// Tests that a parallel iterator can sum things.
260    #[test]
261    fn test_sum() {
262        let len = 10_000;
263        let input = (0..len as u64).collect::<Vec<u64>>();
264        let input_slice = input.as_slice();
265        let result = input_slice
266            .par_iter()
267            .with_thread_pool(crate::split_by_threads())
268            .sum::<u64>();
269        assert_eq!(result, 49995000);
270    }
271
272    /// Tests a simple for each loop.
273    #[test]
274    fn test_for_each() {
275        let mut result = [0; 5];
276        (result.par_iter_mut(), (1..=5).into_par_iter())
277            .zip_eq()
278            .with_thread_pool(crate::split_by_threads())
279            .for_each(|(out, x)| *out = x * x - 1);
280        assert_eq!([0, 3, 8, 15, 24], result);
281    }
282
283    /// Spawns and joins many tasks.
284    #[test]
285    fn execute_many() {
286        let first_task = crate::spawn(|| 2);
287        let second_task = crate::spawn(|| 2);
288        assert_eq!(first_task.join(), second_task.join());
289
290        for _ in 0..1000 {
291            let third_task = crate::spawn(|| std::thread::sleep(std::time::Duration::new(0, 10)));
292            let fourth_task = crate::spawn(|| std::thread::sleep(std::time::Duration::new(0, 200)));
293            assert_eq!(third_task.join(), fourth_task.join());
294        }
295    }
296}