orx_parallel/par_thread_pool.rs
1use crate::{generic_values::runner_results::Fallibility, runner::NumSpawned};
2use alloc::vec::Vec;
3use core::num::NonZeroUsize;
4use orx_concurrent_bag::ConcurrentBag;
5
6/// A thread pool that can be used for parallel computation.
7///
8/// orx_parallel abstracts away the thread pool implementation and can work with different
9/// thread pool implementations.
10///
11/// Parallel computation will not use any threads outside the pool.
12/// Default std thread pool assumes all OS threads are available in the pool.
13///
14/// # Examples
15///
16/// ## Default std pool
17///
18/// **requires std feature**
19///
20/// Default parallel runner spawns scoped threads using `std::thread::scope`.
21///
22/// ```
23/// use orx_parallel::*;
24///
25/// let sum = (0..1000).par().sum();
26/// assert_eq!(sum, 1000 * 999 / 2);
27///
28/// // this is equivalent to
29/// let sum = (0..1000).par().with_runner(DefaultRunner::default()).sum();
30/// assert_eq!(sum, 1000 * 999 / 2);
31/// ```
32///
33/// ## Rayon thread pool
34///
35/// **requires rayon feature**
36///
37/// The following example demonstrate using a rayon thread pool as the thread provider of
38/// the parallel computation.
39///
40/// ```
41/// use orx_parallel::*;
42///
43/// #[cfg(feature = "rayon-core")]
44/// {
45/// let pool = rayon::ThreadPoolBuilder::new()
46/// .num_threads(4)
47/// .build()
48/// .unwrap();
49///
50/// // creating a runner for the computation
51/// let runner = RunnerWithPool::from(&pool);
52/// let sum = (0..1000).par().with_runner(runner).sum();
53/// assert_eq!(sum, 1000 * 999 / 2);
54///
55/// // or reuse a runner multiple times (identical under the hood)
56/// let mut runner = RunnerWithPool::from(&pool);
57/// let sum = (0..1000).par().with_runner(&mut runner).sum();
58/// assert_eq!(sum, 1000 * 999 / 2);
59/// }
60/// ```
61///
62/// Note that since rayon::ThreadPool::scope only requires a shared reference `&self`,
63/// we can concurrently create as many runners as we want from the same thread pool and use them concurrently.
64///
65/// ## Scoped thread pool
66///
67/// **requires scoped_threadpool feature**
68///
69/// The following example demonstrate using a scoped_threadpool thread pool as the thread provider of
70/// the parallel computation.
71///
72/// ```
73/// use orx_parallel::*;
74///
75/// #[cfg(feature = "scoped_threadpool")]
76/// {
77/// // creating a runner for the computation
78/// let mut pool = scoped_threadpool::Pool::new(4);
79/// let runner = RunnerWithPool::from(&mut pool);
80/// let sum = (0..1000).par().with_runner(runner).sum();
81/// assert_eq!(sum, 1000 * 999 / 2);
82///
83/// // or reuse a runner multiple times (identical under the hood)
84/// let mut pool = scoped_threadpool::Pool::new(4);
85/// let mut runner = RunnerWithPool::from(&mut pool);
86/// let sum = (0..1000).par().with_runner(&mut runner).sum();
87/// assert_eq!(sum, 1000 * 999 / 2);
88/// }
89/// ```
90///
91/// Since scoped_thread_pool::Pool::scoped requires an exclusive reference `&mut self`,
92/// we can create one runner from a pool at a time, note use of `&mut pool` in runner creation.
93pub trait ParThreadPool {
94 /// Scope type of the thread pool.
95 type ScopeRef<'s, 'env, 'scope>
96 where
97 'scope: 's,
98 'env: 'scope + 's;
99
100 /// Executes the `work` within scope `s`.
101 fn run_in_scope<'s, 'env, 'scope, W>(s: &Self::ScopeRef<'s, 'env, 'scope>, work: W)
102 where
103 'scope: 's,
104 'env: 'scope + 's,
105 W: Fn() + Send + 'scope + 'env;
106
107 /// Executes the scoped computation `f`.
108 fn scoped_computation<'env, 'scope, F>(&'env mut self, f: F)
109 where
110 'env: 'scope,
111 for<'s> F: FnOnce(Self::ScopeRef<'s, 'env, 'scope>) + Send;
112
113 /// Returns the maximum number of threads available in the pool.
114 fn max_num_threads(&self) -> NonZeroUsize;
115}
116
117// derived
118
119pub trait ParThreadPoolCompute: ParThreadPool {
120 fn map_in_pool<F, S, M, T>(
121 &mut self,
122 do_spawn: S,
123 thread_map: M,
124 max_num_threads: NonZeroUsize,
125 ) -> (NumSpawned, Result<Vec<T>, F::Error>)
126 where
127 F: Fallibility,
128 S: Fn(NumSpawned) -> bool + Sync,
129 M: Fn(NumSpawned) -> Result<T, F::Error> + Sync,
130 T: Send,
131 F::Error: Send,
132 {
133 let thread_map = &thread_map;
134 let mut nt = NumSpawned::zero();
135 let thread_results = ConcurrentBag::with_fixed_capacity(max_num_threads.into());
136 let bag = &thread_results;
137 self.scoped_computation(|s| {
138 while do_spawn(nt) {
139 let num_spawned = nt;
140 nt.increment();
141 let work = move || {
142 bag.push(thread_map(num_spawned));
143 };
144 Self::run_in_scope(&s, work);
145 }
146 });
147
148 let thread_results: Vec<_> = thread_results.into_inner().into();
149 let result = F::reduce_results(thread_results);
150
151 (nt, result)
152 }
153
154 fn run_in_pool<S, F>(&mut self, do_spawn: S, thread_do: F) -> NumSpawned
155 where
156 S: Fn(NumSpawned) -> bool + Sync,
157 F: Fn(NumSpawned) + Sync,
158 {
159 let thread_do = &thread_do;
160 let mut nt = NumSpawned::zero();
161 self.scoped_computation(|s| {
162 while do_spawn(nt) {
163 let num_spawned = nt;
164 nt.increment();
165 let work = move || thread_do(num_spawned);
166 Self::run_in_scope(&s, work);
167 }
168 });
169 nt
170 }
171}
172
173impl<X: ParThreadPool> ParThreadPoolCompute for X {}