orx_parallel/
par_thread_pool.rs

1use crate::{generic_values::runner_results::Fallibility, runner::NumSpawned};
2use alloc::vec::Vec;
3use core::num::NonZeroUsize;
4use orx_concurrent_bag::ConcurrentBag;
5
6/// A thread pool that can be used for parallel computation.
7///
8/// orx_parallel abstracts away the thread pool implementation and can work with different
9/// thread pool implementations.
10///
11/// Parallel computation will not use any threads outside the pool.
12/// Default std thread pool assumes all OS threads are available in the pool.
13///
14/// # Examples
15///
16/// ## Default std pool
17///
18/// **requires std feature**
19///
20/// Default parallel runner spawns scoped threads using `std::thread::scope`.
21///
22/// ```
23/// use orx_parallel::*;
24///
25/// let sum = (0..1000).par().sum();
26/// assert_eq!(sum, 1000 * 999 / 2);
27///
28/// // this is equivalent to
29/// let sum = (0..1000).par().with_runner(DefaultRunner::default()).sum();
30/// assert_eq!(sum, 1000 * 999 / 2);
31/// ```
32///
33/// ## Rayon thread pool
34///
35/// **requires rayon feature**
36///
37/// The following example demonstrate using a rayon thread pool as the thread provider of
38/// the parallel computation.
39///
40/// ```
41/// use orx_parallel::*;
42///
43/// #[cfg(not(miri))]
44/// #[cfg(feature = "rayon-core")]
45/// {
46///     let pool = rayon::ThreadPoolBuilder::new()
47///         .num_threads(4)
48///         .build()
49///         .unwrap();
50///
51///     // creating a runner for the computation
52///     let runner = RunnerWithPool::from(&pool);
53///     let sum = (0..1000).par().with_runner(runner).sum();
54///     assert_eq!(sum, 1000 * 999 / 2);
55///
56///     // or reuse a runner multiple times (identical under the hood)
57///     let mut runner = RunnerWithPool::from(&pool);
58///     let sum = (0..1000).par().with_runner(&mut runner).sum();
59///     assert_eq!(sum, 1000 * 999 / 2);
60/// }
61/// ```
62///
63/// Note that since rayon::ThreadPool::scope only requires a shared reference `&self`,
64/// we can concurrently create as many runners as we want from the same thread pool and use them concurrently.
65///
66/// ## Scoped thread pool
67///
68/// **requires scoped_threadpool feature**
69///
70/// The following example demonstrate using a scoped_threadpool thread pool as the thread provider of
71/// the parallel computation.
72///
73/// ```
74/// use orx_parallel::*;
75///
76/// #[cfg(not(miri))]
77/// #[cfg(feature = "scoped_threadpool")]
78/// {
79///     // creating a runner for the computation
80///     let mut pool = scoped_threadpool::Pool::new(4);
81///     let runner = RunnerWithPool::from(&mut pool);
82///     let sum = (0..1000).par().with_runner(runner).sum();
83///     assert_eq!(sum, 1000 * 999 / 2);
84///
85///     // or reuse a runner multiple times (identical under the hood)
86///     let mut pool = scoped_threadpool::Pool::new(4);
87///     let mut runner = RunnerWithPool::from(&mut pool);
88///     let sum = (0..1000).par().with_runner(&mut runner).sum();
89///     assert_eq!(sum, 1000 * 999 / 2);
90/// }
91/// ```
92///
93/// Since scoped_thread_pool::Pool::scoped requires an exclusive reference `&mut self`,
94/// we can create one runner from a pool at a time, note use of `&mut pool` in runner creation.
95pub trait ParThreadPool {
96    /// Scope type of the thread pool.
97    type ScopeRef<'s, 'env, 'scope>
98    where
99        'scope: 's,
100        'env: 'scope + 's;
101
102    /// Executes the `work` within scope `s`.
103    fn run_in_scope<'s, 'env, 'scope, W>(s: &Self::ScopeRef<'s, 'env, 'scope>, work: W)
104    where
105        'scope: 's,
106        'env: 'scope + 's,
107        W: Fn() + Send + 'scope + 'env;
108
109    /// Executes the scoped computation `f`.
110    fn scoped_computation<'env, 'scope, F>(&'env mut self, f: F)
111    where
112        'env: 'scope,
113        for<'s> F: FnOnce(Self::ScopeRef<'s, 'env, 'scope>) + Send;
114
115    /// Returns the maximum number of threads available in the pool.
116    fn max_num_threads(&self) -> NonZeroUsize;
117}
118
119// derived
120
121pub trait ParThreadPoolCompute: ParThreadPool {
122    fn map_in_pool<F, S, M, T>(
123        &mut self,
124        do_spawn: S,
125        thread_map: M,
126        max_num_threads: NonZeroUsize,
127    ) -> (NumSpawned, Result<Vec<T>, F::Error>)
128    where
129        F: Fallibility,
130        S: Fn(NumSpawned) -> bool + Sync,
131        M: Fn(NumSpawned) -> Result<T, F::Error> + Sync,
132        T: Send,
133        F::Error: Send,
134    {
135        let thread_map = &thread_map;
136        let mut nt = NumSpawned::zero();
137        let thread_results = ConcurrentBag::with_fixed_capacity(max_num_threads.into());
138        let bag = &thread_results;
139        self.scoped_computation(|s| {
140            while do_spawn(nt) {
141                let num_spawned = nt;
142                nt.increment();
143                let work = move || {
144                    bag.push(thread_map(num_spawned));
145                };
146                Self::run_in_scope(&s, work);
147            }
148        });
149
150        let thread_results: Vec<_> = thread_results.into_inner().into();
151        let result = F::reduce_results(thread_results);
152
153        (nt, result)
154    }
155
156    fn run_in_pool<S, F>(&mut self, do_spawn: S, thread_do: F) -> NumSpawned
157    where
158        S: Fn(NumSpawned) -> bool + Sync,
159        F: Fn(NumSpawned) + Sync,
160    {
161        let thread_do = &thread_do;
162        let mut nt = NumSpawned::zero();
163        self.scoped_computation(|s| {
164            while do_spawn(nt) {
165                let num_spawned = nt;
166                nt.increment();
167                let work = move || thread_do(num_spawned);
168                Self::run_in_scope(&s, work);
169            }
170        });
171        nt
172    }
173}
174
175impl<X: ParThreadPool> ParThreadPoolCompute for X {}