orx_parallel/runner/implementations/
runner_with_pool.rs

1#[cfg(feature = "std")]
2use crate::executor::ParallelExecutorWithDiagnostics;
3use crate::{DefaultExecutor, ParThreadPool, ParallelExecutor, runner::ParallelRunner};
4use core::marker::PhantomData;
5
6/// Parallel runner with a given pool of type `P` and parallel executor of `R`.
7///
8/// A `RunnerWithPool` can always be created from owned `pool` implementing [`ParThreadPool`], but also from
9/// * `&pool` in most cases,
10/// * `&mut pool` in others.
11///
12/// Note that default parallel runner; i.e., [`DefaultRunner`] is:
13/// * `RunnerWithPool<StdDefaultPool>` when "std" feature is enabled,
14/// * `RunnerWithPool<SequentialPool>` when "std" feature is disabled.
15///
16/// [`DefaultRunner`]: crate::DefaultRunner
17///
18/// # Examples
19///
20/// ```
21/// use orx_parallel::*;
22///
23/// // parallel computation generic over parallel runner; and hence, the thread pool
24/// fn run_with_runner<R: ParallelRunner>(runner: R, input: &[usize]) -> Vec<String> {
25///     input
26///         .par()
27///         .with_runner(runner)
28///         .flat_map(|x| [*x, 2 * x, x / 7])
29///         .map(|x| x.to_string())
30///         .collect()
31/// }
32///
33/// let vec: Vec<_> = (0..42).collect();
34/// let input = vec.as_slice();
35///
36/// // runs sequentially on the main thread
37/// let runner = RunnerWithPool::from(SequentialPool);
38/// let expected = run_with_runner(runner, input);
39///
40/// // uses native threads
41/// #[cfg(feature = "std")]
42/// {
43///     let runner = RunnerWithPool::from(StdDefaultPool::default());
44///     let result = run_with_runner(runner, input);
45///     assert_eq!(&expected, &result);
46/// }
47///
48/// // uses rayon-core ThreadPool with 8 threads
49/// #[cfg(not(miri))]
50/// #[cfg(feature = "rayon-core")]
51/// {
52///     let pool = rayon_core::ThreadPoolBuilder::new()
53///         .num_threads(8)
54///         .build()
55///         .unwrap();
56///     let result = run_with_runner(RunnerWithPool::from(&pool), input);
57///     assert_eq!(&expected, &result);
58/// }
59///
60/// // uses scoped-pool Pool with 8 threads
61/// #[cfg(not(miri))]
62/// #[cfg(feature = "scoped-pool")]
63/// {
64///     let pool = scoped_pool::Pool::new(8);
65///     let result = run_with_runner(RunnerWithPool::from(&pool), input);
66///     assert_eq!(&expected, &result);
67/// }
68///
69/// // uses scoped_threadpool Pool with 8 threads
70/// #[cfg(not(miri))]
71/// #[cfg(feature = "scoped_threadpool")]
72/// {
73///     let mut pool = scoped_threadpool::Pool::new(8);
74///     let result = run_with_runner(RunnerWithPool::from(&mut pool), input); // requires &mut pool
75///     assert_eq!(&expected, &result);
76/// }
77///
78/// // uses yastl Pool wrapped as YastlPool with 8 threads
79/// #[cfg(not(miri))]
80/// #[cfg(feature = "yastl")]
81/// {
82///     let pool = YastlPool::new(8);
83///     let result = run_with_runner(RunnerWithPool::from(&pool), input);
84///     assert_eq!(&expected, &result);
85/// }
86///
87/// // uses pond Pool wrapped as PondPool with 8 threads
88/// #[cfg(not(miri))]
89/// #[cfg(feature = "pond")]
90/// {
91///     let mut pool = PondPool::new_threads_unbounded(8);
92///     let result = run_with_runner(RunnerWithPool::from(&mut pool), input); // requires &mut pool
93///     assert_eq!(&expected, &result);
94/// }
95///
96/// // uses poolite Pool with 8 threads
97/// #[cfg(not(miri))]
98/// #[cfg(feature = "poolite")]
99/// {
100///     let pool = poolite::Pool::with_builder(poolite::Builder::new().min(8).max(8)).unwrap();
101///     let result = run_with_runner(RunnerWithPool::from(&pool), input);
102///     assert_eq!(&expected, &result);
103/// }
104/// ```
105#[derive(Clone)]
106pub struct RunnerWithPool<P, R = DefaultExecutor>
107where
108    P: ParThreadPool,
109    R: ParallelExecutor,
110{
111    pool: P,
112    runner: PhantomData<R>,
113}
114
115impl<P, R> Default for RunnerWithPool<P, R>
116where
117    P: ParThreadPool + Default,
118    R: ParallelExecutor,
119{
120    fn default() -> Self {
121        Self {
122            pool: Default::default(),
123            runner: PhantomData,
124        }
125    }
126}
127
128impl<P: ParThreadPool> From<P> for RunnerWithPool<P, DefaultExecutor> {
129    fn from(pool: P) -> Self {
130        Self {
131            pool,
132            runner: PhantomData,
133        }
134    }
135}
136
137impl<P, R> RunnerWithPool<P, R>
138where
139    P: ParThreadPool,
140    R: ParallelExecutor,
141{
142    /// Converts the runner into the wrapped underlying pool.
143    ///
144    /// Note that a `RunnerWithPool` can always be created from owned `pool`, but also from
145    /// * `&pool` in most cases,
146    /// * `&mut pool` in others.
147    ///
148    /// This function is only relevant when the runner is created from owned pool, in which case
149    /// `into_inner_pool` can be used to get back ownership of the pool.
150    ///
151    /// # Example
152    ///
153    /// The following example demonstrates the use case for rayon-core thread pool; however, it
154    /// holds for all thread pool implementations.
155    ///
156    /// ```
157    /// use orx_parallel::*;
158    ///
159    /// let vec: Vec<_> = (0..42).collect();
160    /// let input = vec.as_slice();
161    ///
162    /// #[cfg(not(miri))]
163    /// #[cfg(feature = "rayon-core")]
164    /// {
165    ///     let pool = rayon_core::ThreadPoolBuilder::new()
166    ///         .num_threads(8)
167    ///         .build()
168    ///         .unwrap();
169    ///
170    ///     // create runner owning the pool
171    ///     let mut runner = RunnerWithPool::from(pool);
172    ///
173    ///     // use runner, and hence the pool, in parallel computations
174    ///     let sum = input.par().with_runner(&mut runner).sum();
175    ///     let max = input.par().with_runner(&mut runner).max();
176    ///     let txt: Vec<_> = input
177    ///         .par()
178    ///         .with_runner(&mut runner)
179    ///         .map(|x| x.to_string())
180    ///         .collect();
181    ///
182    ///     // get back ownership of the pool
183    ///     let pool: rayon_core::ThreadPool = runner.into_inner_pool();
184    /// }
185    /// ```
186    pub fn into_inner_pool(self) -> P {
187        self.pool
188    }
189
190    /// Converts the runner into one using the [`ParallelExecutor`] `Q` rather than `R`.
191    pub fn with_executor<Q: ParallelExecutor>(self) -> RunnerWithPool<P, Q> {
192        RunnerWithPool {
193            pool: self.pool,
194            runner: PhantomData,
195        }
196    }
197
198    /// Converts executor of this runner `R` into one with diagnostics; i.e.,`ParallelExecutorWithDiagnostics<R>`.
199    ///
200    /// Note that [`ParallelExecutorWithDiagnostics`] prints the diagnostics on the stdout. Therefore, it must
201    /// only be used while testing a program, not in production.
202    ///
203    /// # Examples
204    ///
205    /// ```
206    /// use orx_parallel::*;
207    ///
208    /// // normal execution
209    ///
210    /// let range = 0..4096;
211    /// let sum = range
212    ///     .par()
213    ///     .map(|x| x + 1)
214    ///     .filter(|x| x.is_multiple_of(2))
215    ///     .sum();
216    /// assert_eq!(sum, 4196352);
217    ///
218    /// // execution with diagnostics
219    ///
220    /// let range = 0..4096;
221    /// let sum = range
222    ///     .par()
223    ///     .with_runner(DefaultRunner::default().with_diagnostics())
224    ///     .map(|x| x + 1)
225    ///     .filter(|x| x.is_multiple_of(2))
226    ///     .sum();
227    /// assert_eq!(sum, 4196352);
228    ///
229    /// // prints diagnostics, which looks something like the following:
230    /// //
231    /// // - Number of threads used = 5
232    /// //
233    /// // - [Thread idx]: num_calls, num_tasks, avg_chunk_size, first_chunk_sizes
234    /// //   - [0]: 25, 1600, 64, [64, 64, 64, 64, 64, 64, 64, 64, 64, 64]
235    /// //   - [1]: 26, 1664, 64, [64, 64, 64, 64, 64, 64, 64, 64, 64, 64]
236    /// //   - [2]: 13, 832, 64, [64, 64, 64, 64, 64, 64, 64, 64, 64, 64]
237    /// //   - [3]: 0, 0, 0, []
238    /// //   - [4]: 0, 0, 0, []
239    /// ```
240    #[cfg(feature = "std")]
241    pub fn with_diagnostics(self) -> RunnerWithPool<P, ParallelExecutorWithDiagnostics<R>> {
242        RunnerWithPool {
243            pool: self.pool,
244            runner: PhantomData,
245        }
246    }
247}
248
249impl<P, R> ParallelRunner for RunnerWithPool<P, R>
250where
251    P: ParThreadPool,
252    R: ParallelExecutor,
253{
254    type Executor = R;
255
256    type ThreadPool = P;
257
258    fn thread_pool(&self) -> &Self::ThreadPool {
259        &self.pool
260    }
261
262    fn thread_pool_mut(&mut self) -> &mut Self::ThreadPool {
263        &mut self.pool
264    }
265}