orx_parallel/runner/implementations/runner_with_pool.rs
1#[cfg(feature = "std")]
2use crate::executor::ParallelExecutorWithDiagnostics;
3use crate::{DefaultExecutor, ParThreadPool, ParallelExecutor, runner::ParallelRunner};
4use core::marker::PhantomData;
5
6/// Parallel runner with a given pool of type `P` and parallel executor of `R`.
7///
8/// A `RunnerWithPool` can always be created from owned `pool` implementing [`ParThreadPool`], but also from
9/// * `&pool` in most cases,
10/// * `&mut pool` in others.
11///
12/// Note that default parallel runner; i.e., [`DefaultRunner`] is:
13/// * `RunnerWithPool<StdDefaultPool>` when "std" feature is enabled,
14/// * `RunnerWithPool<SequentialPool>` when "std" feature is disabled.
15///
16/// [`DefaultRunner`]: crate::DefaultRunner
17///
18/// # Examples
19///
20/// ```
21/// use orx_parallel::*;
22///
23/// // parallel computation generic over parallel runner; and hence, the thread pool
24/// fn run_with_runner<R: ParallelRunner>(runner: R, input: &[usize]) -> Vec<String> {
25/// input
26/// .par()
27/// .with_runner(runner)
28/// .flat_map(|x| [*x, 2 * x, x / 7])
29/// .map(|x| x.to_string())
30/// .collect()
31/// }
32///
33/// let vec: Vec<_> = (0..42).collect();
34/// let input = vec.as_slice();
35///
36/// // runs sequentially on the main thread
37/// let runner = RunnerWithPool::from(SequentialPool);
38/// let expected = run_with_runner(runner, input);
39///
40/// // uses native threads
41/// #[cfg(feature = "std")]
42/// {
43/// let runner = RunnerWithPool::from(StdDefaultPool::default());
44/// let result = run_with_runner(runner, input);
45/// assert_eq!(&expected, &result);
46/// }
47///
48/// // uses rayon-core ThreadPool with 8 threads
49/// #[cfg(not(miri))]
50/// #[cfg(feature = "rayon-core")]
51/// {
52/// let pool = rayon_core::ThreadPoolBuilder::new()
53/// .num_threads(8)
54/// .build()
55/// .unwrap();
56/// let result = run_with_runner(RunnerWithPool::from(&pool), input);
57/// assert_eq!(&expected, &result);
58/// }
59///
60/// // uses scoped-pool Pool with 8 threads
61/// #[cfg(not(miri))]
62/// #[cfg(feature = "scoped-pool")]
63/// {
64/// let pool = scoped_pool::Pool::new(8);
65/// let result = run_with_runner(RunnerWithPool::from(&pool), input);
66/// assert_eq!(&expected, &result);
67/// }
68///
69/// // uses scoped_threadpool Pool with 8 threads
70/// #[cfg(not(miri))]
71/// #[cfg(feature = "scoped_threadpool")]
72/// {
73/// let mut pool = scoped_threadpool::Pool::new(8);
74/// let result = run_with_runner(RunnerWithPool::from(&mut pool), input); // requires &mut pool
75/// assert_eq!(&expected, &result);
76/// }
77///
78/// // uses yastl Pool wrapped as YastlPool with 8 threads
79/// #[cfg(not(miri))]
80/// #[cfg(feature = "yastl")]
81/// {
82/// let pool = YastlPool::new(8);
83/// let result = run_with_runner(RunnerWithPool::from(&pool), input);
84/// assert_eq!(&expected, &result);
85/// }
86///
87/// // uses pond Pool wrapped as PondPool with 8 threads
88/// #[cfg(not(miri))]
89/// #[cfg(feature = "pond")]
90/// {
91/// let mut pool = PondPool::new_threads_unbounded(8);
92/// let result = run_with_runner(RunnerWithPool::from(&mut pool), input); // requires &mut pool
93/// assert_eq!(&expected, &result);
94/// }
95///
96/// // uses poolite Pool with 8 threads
97/// #[cfg(not(miri))]
98/// #[cfg(feature = "poolite")]
99/// {
100/// let pool = poolite::Pool::with_builder(poolite::Builder::new().min(8).max(8)).unwrap();
101/// let result = run_with_runner(RunnerWithPool::from(&pool), input);
102/// assert_eq!(&expected, &result);
103/// }
104/// ```
105#[derive(Clone)]
106pub struct RunnerWithPool<P, R = DefaultExecutor>
107where
108 P: ParThreadPool,
109 R: ParallelExecutor,
110{
111 pool: P,
112 runner: PhantomData<R>,
113}
114
115impl<P, R> Default for RunnerWithPool<P, R>
116where
117 P: ParThreadPool + Default,
118 R: ParallelExecutor,
119{
120 fn default() -> Self {
121 Self {
122 pool: Default::default(),
123 runner: PhantomData,
124 }
125 }
126}
127
128impl<P: ParThreadPool> From<P> for RunnerWithPool<P, DefaultExecutor> {
129 fn from(pool: P) -> Self {
130 Self {
131 pool,
132 runner: PhantomData,
133 }
134 }
135}
136
137impl<P, R> RunnerWithPool<P, R>
138where
139 P: ParThreadPool,
140 R: ParallelExecutor,
141{
142 /// Converts the runner into the wrapped underlying pool.
143 ///
144 /// Note that a `RunnerWithPool` can always be created from owned `pool`, but also from
145 /// * `&pool` in most cases,
146 /// * `&mut pool` in others.
147 ///
148 /// This function is only relevant when the runner is created from owned pool, in which case
149 /// `into_inner_pool` can be used to get back ownership of the pool.
150 ///
151 /// # Example
152 ///
153 /// The following example demonstrates the use case for rayon-core thread pool; however, it
154 /// holds for all thread pool implementations.
155 ///
156 /// ```
157 /// use orx_parallel::*;
158 ///
159 /// let vec: Vec<_> = (0..42).collect();
160 /// let input = vec.as_slice();
161 ///
162 /// #[cfg(not(miri))]
163 /// #[cfg(feature = "rayon-core")]
164 /// {
165 /// let pool = rayon_core::ThreadPoolBuilder::new()
166 /// .num_threads(8)
167 /// .build()
168 /// .unwrap();
169 ///
170 /// // create runner owning the pool
171 /// let mut runner = RunnerWithPool::from(pool);
172 ///
173 /// // use runner, and hence the pool, in parallel computations
174 /// let sum = input.par().with_runner(&mut runner).sum();
175 /// let max = input.par().with_runner(&mut runner).max();
176 /// let txt: Vec<_> = input
177 /// .par()
178 /// .with_runner(&mut runner)
179 /// .map(|x| x.to_string())
180 /// .collect();
181 ///
182 /// // get back ownership of the pool
183 /// let pool: rayon_core::ThreadPool = runner.into_inner_pool();
184 /// }
185 /// ```
186 pub fn into_inner_pool(self) -> P {
187 self.pool
188 }
189
190 /// Converts the runner into one using the [`ParallelExecutor`] `Q` rather than `R`.
191 pub fn with_executor<Q: ParallelExecutor>(self) -> RunnerWithPool<P, Q> {
192 RunnerWithPool {
193 pool: self.pool,
194 runner: PhantomData,
195 }
196 }
197
198 /// Converts executor of this runner `R` into one with diagnostics; i.e.,`ParallelExecutorWithDiagnostics<R>`.
199 ///
200 /// Note that [`ParallelExecutorWithDiagnostics`] prints the diagnostics on the stdout. Therefore, it must
201 /// only be used while testing a program, not in production.
202 ///
203 /// # Examples
204 ///
205 /// ```
206 /// use orx_parallel::*;
207 ///
208 /// // normal execution
209 ///
210 /// let range = 0..4096;
211 /// let sum = range
212 /// .par()
213 /// .map(|x| x + 1)
214 /// .filter(|x| x.is_multiple_of(2))
215 /// .sum();
216 /// assert_eq!(sum, 4196352);
217 ///
218 /// // execution with diagnostics
219 ///
220 /// let range = 0..4096;
221 /// let sum = range
222 /// .par()
223 /// .with_runner(DefaultRunner::default().with_diagnostics())
224 /// .map(|x| x + 1)
225 /// .filter(|x| x.is_multiple_of(2))
226 /// .sum();
227 /// assert_eq!(sum, 4196352);
228 ///
229 /// // prints diagnostics, which looks something like the following:
230 /// //
231 /// // - Number of threads used = 5
232 /// //
233 /// // - [Thread idx]: num_calls, num_tasks, avg_chunk_size, first_chunk_sizes
234 /// // - [0]: 25, 1600, 64, [64, 64, 64, 64, 64, 64, 64, 64, 64, 64]
235 /// // - [1]: 26, 1664, 64, [64, 64, 64, 64, 64, 64, 64, 64, 64, 64]
236 /// // - [2]: 13, 832, 64, [64, 64, 64, 64, 64, 64, 64, 64, 64, 64]
237 /// // - [3]: 0, 0, 0, []
238 /// // - [4]: 0, 0, 0, []
239 /// ```
240 #[cfg(feature = "std")]
241 pub fn with_diagnostics(self) -> RunnerWithPool<P, ParallelExecutorWithDiagnostics<R>> {
242 RunnerWithPool {
243 pool: self.pool,
244 runner: PhantomData,
245 }
246 }
247}
248
249impl<P, R> ParallelRunner for RunnerWithPool<P, R>
250where
251 P: ParThreadPool,
252 R: ParallelExecutor,
253{
254 type Executor = R;
255
256 type ThreadPool = P;
257
258 fn thread_pool(&self) -> &Self::ThreadPool {
259 &self.pool
260 }
261
262 fn thread_pool_mut(&mut self) -> &mut Self::ThreadPool {
263 &mut self.pool
264 }
265}