orx_parallel/runner/implementations/
runner_with_pool.rs

1use crate::{DefaultExecutor, ParThreadPool, ParallelExecutor, runner::ParallelRunner};
2use core::marker::PhantomData;
3
4/// Parallel runner with a given pool of type `P` and parallel executor of `R`.
5///
6/// A `RunnerWithPool` can always be created from owned `pool` implementing [`ParThreadPool`], but also from
7/// * `&pool` in most cases,
8/// * `&mut pool` in others.
9///
10/// Note that default parallel runner; i.e., [`DefaultRunner`] is:
11/// * `RunnerWithPool<StdDefaultPool>` when "std" feature is enabled,
12/// * `RunnerWithPool<SequentialPool>` when "std" feature is disabled.
13///
14/// [`DefaultRunner`]: crate::DefaultRunner
15///
16/// # Examples
17///
18/// ```
19/// use orx_parallel::*;
20///
21/// // parallel computation generic over parallel runner; and hence, the thread pool
22/// fn run_with_runner<R: ParallelRunner>(runner: R, input: &[usize]) -> Vec<String> {
23///     input
24///         .par()
25///         .with_runner(runner)
26///         .flat_map(|x| [*x, 2 * x, x / 7])
27///         .map(|x| x.to_string())
28///         .collect()
29/// }
30///
31/// let vec: Vec<_> = (0..42).collect();
32/// let input = vec.as_slice();
33///
34/// // runs sequentially on the main thread
35/// let runner = RunnerWithPool::from(SequentialPool);
36/// let expected = run_with_runner(runner, input);
37///
38/// // uses native threads
39/// let runner = RunnerWithPool::from(StdDefaultPool::default());
40/// let result = run_with_runner(runner, input);
41/// assert_eq!(&expected, &result);
42///
43/// // uses rayon-core ThreadPool with 8 threads
44/// #[cfg(feature = "rayon-core")]
45/// {
46///     let pool = rayon_core::ThreadPoolBuilder::new()
47///         .num_threads(8)
48///         .build()
49///         .unwrap();
50///     let result = run_with_runner(RunnerWithPool::from(&pool), input);
51///     assert_eq!(&expected, &result);
52/// }
53///
54/// // uses scoped-pool Pool with 8 threads
55/// #[cfg(feature = "scoped-pool")]
56/// {
57///     let pool = scoped_pool::Pool::new(8);
58///     let result = run_with_runner(RunnerWithPool::from(&pool), input);
59///     assert_eq!(&expected, &result);
60/// }
61///
62/// // uses scoped_threadpool Pool with 8 threads
63/// #[cfg(feature = "scoped_threadpool")]
64/// {
65///     let mut pool = scoped_threadpool::Pool::new(8);
66///     let result = run_with_runner(RunnerWithPool::from(&mut pool), input); // requires &mut pool
67///     assert_eq!(&expected, &result);
68/// }
69///
70/// // uses yastl Pool wrapped as YastlPool with 8 threads
71/// #[cfg(feature = "yastl")]
72/// {
73///     let pool = YastlPool::new(8);
74///     let result = run_with_runner(RunnerWithPool::from(&pool), input);
75///     assert_eq!(&expected, &result);
76/// }
77///
78/// // uses pond Pool wrapped as PondPool with 8 threads
79/// #[cfg(feature = "pond")]
80/// {
81///     let mut pool = PondPool::new_threads_unbounded(8);
82///     let result = run_with_runner(RunnerWithPool::from(&mut pool), input); // requires &mut pool
83///     assert_eq!(&expected, &result);
84/// }
85///
86/// // uses poolite Pool with 8 threads
87/// #[cfg(feature = "poolite")]
88/// {
89///     let pool = poolite::Pool::with_builder(poolite::Builder::new().min(8).max(8)).unwrap();
90///     let result = run_with_runner(RunnerWithPool::from(&pool), input);
91///     assert_eq!(&expected, &result);
92/// }
93/// ```
94pub struct RunnerWithPool<P, R = DefaultExecutor>
95where
96    P: ParThreadPool,
97    R: ParallelExecutor,
98{
99    pool: P,
100    runner: PhantomData<R>,
101}
102
103impl<P, R> Default for RunnerWithPool<P, R>
104where
105    P: ParThreadPool + Default,
106    R: ParallelExecutor,
107{
108    fn default() -> Self {
109        Self {
110            pool: Default::default(),
111            runner: PhantomData,
112        }
113    }
114}
115
116impl<P: ParThreadPool> From<P> for RunnerWithPool<P, DefaultExecutor> {
117    fn from(pool: P) -> Self {
118        Self {
119            pool,
120            runner: PhantomData,
121        }
122    }
123}
124
125impl<P, R> RunnerWithPool<P, R>
126where
127    P: ParThreadPool,
128    R: ParallelExecutor,
129{
130    /// Converts the runner into the wrapped underlying pool.
131    ///
132    /// Note that a `RunnerWithPool` can always be created from owned `pool`, but also from
133    /// * `&pool` in most cases,
134    /// * `&mut pool` in others.
135    ///
136    /// This function is only relevant when the runner is created from owned pool, in which case
137    /// `into_inner_pool` can be used to get back ownership of the pool.
138    ///
139    /// # Example
140    ///
141    /// The following example demonstrates the use case for rayon-core thread pool; however, it
142    /// holds for all thread pool implementations.
143    ///
144    /// ```
145    /// use orx_parallel::*;
146    ///
147    /// let vec: Vec<_> = (0..42).collect();
148    /// let input = vec.as_slice();
149    ///
150    /// #[cfg(feature = "rayon-core")]
151    /// {
152    ///     let pool = rayon_core::ThreadPoolBuilder::new()
153    ///         .num_threads(8)
154    ///         .build()
155    ///         .unwrap();
156    ///
157    ///     // create runner owning the pool
158    ///     let mut runner = RunnerWithPool::from(pool);
159    ///
160    ///     // use runner, and hence the pool, in parallel computations
161    ///     let sum = input.par().with_runner(&mut runner).sum();
162    ///     let max = input.par().with_runner(&mut runner).max();
163    ///     let txt: Vec<_> = input
164    ///         .par()
165    ///         .with_runner(&mut runner)
166    ///         .map(|x| x.to_string())
167    ///         .collect();
168    ///
169    ///     // get back ownership of the pool
170    ///     let pool: rayon_core::ThreadPool = runner.into_inner_pool();
171    /// }
172    /// ```
173    pub fn into_inner_pool(self) -> P {
174        self.pool
175    }
176
177    /// Converts the runner into one using the [`ParallelExecutor`] `Q` rather than `R`.
178    pub fn with_executor<Q: ParallelExecutor>(self) -> RunnerWithPool<P, Q> {
179        RunnerWithPool {
180            pool: self.pool,
181            runner: PhantomData,
182        }
183    }
184}
185
186impl<P, R> ParallelRunner for RunnerWithPool<P, R>
187where
188    P: ParThreadPool,
189    R: ParallelExecutor,
190{
191    type Executor = R;
192
193    type ThreadPool = P;
194
195    fn thread_pool(&self) -> &Self::ThreadPool {
196        &self.pool
197    }
198
199    fn thread_pool_mut(&mut self) -> &mut Self::ThreadPool {
200        &mut self.pool
201    }
202}