orx_parallel/
par_thread_pool.rs

1use crate::{generic_values::runner_results::Fallibility, runner::NumSpawned};
2use alloc::vec::Vec;
3use core::num::NonZeroUsize;
4use orx_concurrent_bag::ConcurrentBag;
5
6/// A thread pool that can be used for parallel computation.
7///
8/// orx_parallel abstracts away the thread pool implementation and can work with different
9/// thread pool implementations.
10///
11/// Parallel computation will not use any threads outside the pool.
12/// Default std thread pool assumes all OS threads are available in the pool.
13///
14/// # Examples
15///
16/// ## Default std pool
17///
18/// **requires std feature**
19///
20/// Default parallel runner spawns scoped threads using `std::thread::scope`.
21///
22/// ```
23/// use orx_parallel::*;
24///
25/// let sum = (0..1000).par().sum();
26/// assert_eq!(sum, 1000 * 999 / 2);
27///
28/// // this is equivalent to
29/// let sum = (0..1000).par().with_runner(DefaultRunner::default()).sum();
30/// assert_eq!(sum, 1000 * 999 / 2);
31/// ```
32///
33/// ## Rayon thread pool
34///
35/// **requires rayon feature**
36///
37/// The following example demonstrate using a rayon thread pool as the thread provider of
38/// the parallel computation.
39///
40/// ```
41/// use orx_parallel::*;
42///
43/// #[cfg(feature = "rayon-core")]
44/// {
45///     let pool = rayon::ThreadPoolBuilder::new()
46///         .num_threads(4)
47///         .build()
48///         .unwrap();
49///
50///     // creating a runner for the computation
51///     let runner = RunnerWithPool::from(&pool);
52///     let sum = (0..1000).par().with_runner(runner).sum();
53///     assert_eq!(sum, 1000 * 999 / 2);
54///
55///     // or reuse a runner multiple times (identical under the hood)
56///     let mut runner = RunnerWithPool::from(&pool);
57///     let sum = (0..1000).par().with_runner(&mut runner).sum();
58///     assert_eq!(sum, 1000 * 999 / 2);
59/// }
60/// ```
61///
62/// Note that since rayon::ThreadPool::scope only requires a shared reference `&self`,
63/// we can concurrently create as many runners as we want from the same thread pool and use them concurrently.
64///
65/// ## Scoped thread pool
66///
67/// **requires scoped_threadpool feature**
68///
69/// The following example demonstrate using a scoped_threadpool thread pool as the thread provider of
70/// the parallel computation.
71///
72/// ```
73/// use orx_parallel::*;
74///
75/// #[cfg(feature = "scoped_threadpool")]
76/// {
77///     // creating a runner for the computation
78///     let mut pool = scoped_threadpool::Pool::new(4);
79///     let runner = RunnerWithPool::from(&mut pool);
80///     let sum = (0..1000).par().with_runner(runner).sum();
81///     assert_eq!(sum, 1000 * 999 / 2);
82///
83///     // or reuse a runner multiple times (identical under the hood)
84///     let mut pool = scoped_threadpool::Pool::new(4);
85///     let mut runner = RunnerWithPool::from(&mut pool);
86///     let sum = (0..1000).par().with_runner(&mut runner).sum();
87///     assert_eq!(sum, 1000 * 999 / 2);
88/// }
89/// ```
90///
91/// Since scoped_thread_pool::Pool::scoped requires an exclusive reference `&mut self`,
92/// we can create one runner from a pool at a time, note use of `&mut pool` in runner creation.
93pub trait ParThreadPool {
94    /// Scope type of the thread pool.
95    type ScopeRef<'s, 'env, 'scope>
96    where
97        'scope: 's,
98        'env: 'scope + 's;
99
100    /// Executes the `work` within scope `s`.
101    fn run_in_scope<'s, 'env, 'scope, W>(s: &Self::ScopeRef<'s, 'env, 'scope>, work: W)
102    where
103        'scope: 's,
104        'env: 'scope + 's,
105        W: Fn() + Send + 'scope + 'env;
106
107    /// Executes the scoped computation `f`.
108    fn scoped_computation<'env, 'scope, F>(&'env mut self, f: F)
109    where
110        'env: 'scope,
111        for<'s> F: FnOnce(Self::ScopeRef<'s, 'env, 'scope>) + Send;
112
113    /// Returns the maximum number of threads available in the pool.
114    fn max_num_threads(&self) -> NonZeroUsize;
115}
116
117// derived
118
119pub trait ParThreadPoolCompute: ParThreadPool {
120    fn map_in_pool<F, S, M, T>(
121        &mut self,
122        do_spawn: S,
123        thread_map: M,
124        max_num_threads: NonZeroUsize,
125    ) -> (NumSpawned, Result<Vec<T>, F::Error>)
126    where
127        F: Fallibility,
128        S: Fn(NumSpawned) -> bool + Sync,
129        M: Fn(NumSpawned) -> Result<T, F::Error> + Sync,
130        T: Send,
131        F::Error: Send,
132    {
133        let thread_map = &thread_map;
134        let mut nt = NumSpawned::zero();
135        let thread_results = ConcurrentBag::with_fixed_capacity(max_num_threads.into());
136        let bag = &thread_results;
137        self.scoped_computation(|s| {
138            while do_spawn(nt) {
139                let num_spawned = nt;
140                nt.increment();
141                let work = move || {
142                    bag.push(thread_map(num_spawned));
143                };
144                Self::run_in_scope(&s, work);
145            }
146        });
147
148        let thread_results: Vec<_> = thread_results.into_inner().into();
149        let result = F::reduce_results(thread_results);
150
151        (nt, result)
152    }
153
154    fn run_in_pool<S, F>(&mut self, do_spawn: S, thread_do: F) -> NumSpawned
155    where
156        S: Fn(NumSpawned) -> bool + Sync,
157        F: Fn(NumSpawned) + Sync,
158    {
159        let thread_do = &thread_do;
160        let mut nt = NumSpawned::zero();
161        self.scoped_computation(|s| {
162            while do_spawn(nt) {
163                let num_spawned = nt;
164                nt.increment();
165                let work = move || thread_do(num_spawned);
166                Self::run_in_scope(&s, work);
167            }
168        });
169        nt
170    }
171}
172
173impl<X: ParThreadPool> ParThreadPoolCompute for X {}