orx_parallel/par_thread_pool.rs
1use crate::{generic_values::runner_results::Fallibility, runner::NumSpawned};
2use alloc::vec::Vec;
3use core::num::NonZeroUsize;
4use orx_concurrent_bag::ConcurrentBag;
5
6/// A thread pool that can be used for parallel computation.
7///
8/// orx_parallel abstracts away the thread pool implementation and can work with different
9/// thread pool implementations.
10///
11/// Parallel computation will not use any threads outside the pool.
12/// Default std thread pool assumes all OS threads are available in the pool.
13///
14/// # Examples
15///
16/// ## Default std pool
17///
18/// **requires std feature**
19///
20/// Default parallel runner spawns scoped threads using `std::thread::scope`.
21///
22/// ```
23/// use orx_parallel::*;
24///
25/// let sum = (0..1000).par().sum();
26/// assert_eq!(sum, 1000 * 999 / 2);
27///
28/// // this is equivalent to
29/// let sum = (0..1000).par().with_runner(DefaultRunner::default()).sum();
30/// assert_eq!(sum, 1000 * 999 / 2);
31/// ```
32///
33/// ## Rayon thread pool
34///
35/// **requires rayon feature**
36///
37/// The following example demonstrate using a rayon thread pool as the thread provider of
38/// the parallel computation.
39///
40/// ```
41/// use orx_parallel::*;
42///
43/// #[cfg(not(miri))]
44/// #[cfg(feature = "rayon-core")]
45/// {
46/// let pool = rayon::ThreadPoolBuilder::new()
47/// .num_threads(4)
48/// .build()
49/// .unwrap();
50///
51/// // creating a runner for the computation
52/// let runner = RunnerWithPool::from(&pool);
53/// let sum = (0..1000).par().with_runner(runner).sum();
54/// assert_eq!(sum, 1000 * 999 / 2);
55///
56/// // or reuse a runner multiple times (identical under the hood)
57/// let mut runner = RunnerWithPool::from(&pool);
58/// let sum = (0..1000).par().with_runner(&mut runner).sum();
59/// assert_eq!(sum, 1000 * 999 / 2);
60/// }
61/// ```
62///
63/// Note that since rayon::ThreadPool::scope only requires a shared reference `&self`,
64/// we can concurrently create as many runners as we want from the same thread pool and use them concurrently.
65///
66/// ## Scoped thread pool
67///
68/// **requires scoped_threadpool feature**
69///
70/// The following example demonstrate using a scoped_threadpool thread pool as the thread provider of
71/// the parallel computation.
72///
73/// ```
74/// use orx_parallel::*;
75///
76/// #[cfg(not(miri))]
77/// #[cfg(feature = "scoped_threadpool")]
78/// {
79/// // creating a runner for the computation
80/// let mut pool = scoped_threadpool::Pool::new(4);
81/// let runner = RunnerWithPool::from(&mut pool);
82/// let sum = (0..1000).par().with_runner(runner).sum();
83/// assert_eq!(sum, 1000 * 999 / 2);
84///
85/// // or reuse a runner multiple times (identical under the hood)
86/// let mut pool = scoped_threadpool::Pool::new(4);
87/// let mut runner = RunnerWithPool::from(&mut pool);
88/// let sum = (0..1000).par().with_runner(&mut runner).sum();
89/// assert_eq!(sum, 1000 * 999 / 2);
90/// }
91/// ```
92///
93/// Since scoped_thread_pool::Pool::scoped requires an exclusive reference `&mut self`,
94/// we can create one runner from a pool at a time, note use of `&mut pool` in runner creation.
95pub trait ParThreadPool {
96 /// Scope type of the thread pool.
97 type ScopeRef<'s, 'env, 'scope>
98 where
99 'scope: 's,
100 'env: 'scope + 's;
101
102 /// Executes the `work` within scope `s`.
103 fn run_in_scope<'s, 'env, 'scope, W>(s: &Self::ScopeRef<'s, 'env, 'scope>, work: W)
104 where
105 'scope: 's,
106 'env: 'scope + 's,
107 W: Fn() + Send + 'scope + 'env;
108
109 /// Executes the scoped computation `f`.
110 fn scoped_computation<'env, 'scope, F>(&'env mut self, f: F)
111 where
112 'env: 'scope,
113 for<'s> F: FnOnce(Self::ScopeRef<'s, 'env, 'scope>) + Send;
114
115 /// Returns the maximum number of threads available in the pool.
116 fn max_num_threads(&self) -> NonZeroUsize;
117}
118
119// derived
120
121pub trait ParThreadPoolCompute: ParThreadPool {
122 fn map_in_pool<F, S, M, T>(
123 &mut self,
124 do_spawn: S,
125 thread_map: M,
126 max_num_threads: NonZeroUsize,
127 ) -> (NumSpawned, Result<Vec<T>, F::Error>)
128 where
129 F: Fallibility,
130 S: Fn(NumSpawned) -> bool + Sync,
131 M: Fn(NumSpawned) -> Result<T, F::Error> + Sync,
132 T: Send,
133 F::Error: Send,
134 {
135 let thread_map = &thread_map;
136 let mut nt = NumSpawned::zero();
137 let thread_results = ConcurrentBag::with_fixed_capacity(max_num_threads.into());
138 let bag = &thread_results;
139 self.scoped_computation(|s| {
140 while do_spawn(nt) {
141 let num_spawned = nt;
142 nt.increment();
143 let work = move || {
144 bag.push(thread_map(num_spawned));
145 };
146 Self::run_in_scope(&s, work);
147 }
148 });
149
150 let thread_results: Vec<_> = thread_results.into_inner().into();
151 let result = F::reduce_results(thread_results);
152
153 (nt, result)
154 }
155
156 fn run_in_pool<S, F>(&mut self, do_spawn: S, thread_do: F) -> NumSpawned
157 where
158 S: Fn(NumSpawned) -> bool + Sync,
159 F: Fn(NumSpawned) + Sync,
160 {
161 let thread_do = &thread_do;
162 let mut nt = NumSpawned::zero();
163 self.scoped_computation(|s| {
164 while do_spawn(nt) {
165 let num_spawned = nt;
166 nt.increment();
167 let work = move || thread_do(num_spawned);
168 Self::run_in_scope(&s, work);
169 }
170 });
171 nt
172 }
173}
174
175impl<X: ParThreadPool> ParThreadPoolCompute for X {}