orx_parallel/runner/implementations/
std_runner.rs

1use crate::par_thread_pool::ParThreadPool;
2use core::num::NonZeroUsize;
3
4const MAX_UNSET_NUM_THREADS: NonZeroUsize = NonZeroUsize::new(8).expect(">0");
5
6/// Native standard thread pool.
7///
8/// This is the default thread pool used when "std" feature is enabled.
9/// Note that the thread pool to be used for a parallel computation can be set by the
10/// [`with_runner`] transformation separately for each parallel iterator.
11///
12/// Uses `std::thread::scope` and `scope.spawn(..)` to distribute work to threads.
13///
14/// Value of [`max_num_threads`] is determined as the minimum of:
15///
16/// * the available parallelism of the host obtained via `std::thread::available_parallelism()`, and
17/// * the upper bound set by the environment variable "ORX_PARALLEL_MAX_NUM_THREADS", when set.
18///
19/// [`max_num_threads`]: ParThreadPool::max_num_threads
20/// [`with_runner`]: crate::ParIter::with_runner
21pub struct StdDefaultPool {
22    max_num_threads: NonZeroUsize,
23}
24
25impl Default for StdDefaultPool {
26    fn default() -> Self {
27        let env_max_num_threads = crate::env::max_num_threads_by_env_variable();
28
29        let ava_max_num_threads = std::thread::available_parallelism().ok();
30
31        let max_num_threads = match (env_max_num_threads, ava_max_num_threads) {
32            (Some(env), Some(ava)) => env.min(ava),
33            (Some(env), None) => env,
34            (None, Some(ava)) => ava,
35            (None, None) => MAX_UNSET_NUM_THREADS,
36        };
37
38        Self { max_num_threads }
39    }
40}
41
42impl ParThreadPool for StdDefaultPool {
43    type ScopeRef<'s, 'env, 'scope>
44        = &'s std::thread::Scope<'s, 'env>
45    where
46        'scope: 's,
47        'env: 'scope + 's;
48
49    fn max_num_threads(&self) -> NonZeroUsize {
50        self.max_num_threads
51    }
52
53    fn scoped_computation<'env, 'scope, F>(&'env mut self, f: F)
54    where
55        'env: 'scope,
56        for<'s> F: FnOnce(&'s std::thread::Scope<'s, 'env>) + Send,
57    {
58        std::thread::scope(f)
59    }
60
61    fn run_in_scope<'s, 'env, 'scope, W>(s: &Self::ScopeRef<'s, 'env, 'scope>, work: W)
62    where
63        'scope: 's,
64        'env: 'scope + 's,
65        W: Fn() + Send + 'scope + 'env,
66    {
67        s.spawn(work);
68    }
69}
70
71impl ParThreadPool for &StdDefaultPool {
72    type ScopeRef<'s, 'env, 'scope>
73        = &'s std::thread::Scope<'s, 'env>
74    where
75        'scope: 's,
76        'env: 'scope + 's;
77
78    fn max_num_threads(&self) -> NonZeroUsize {
79        self.max_num_threads
80    }
81
82    fn scoped_computation<'env, 'scope, F>(&'env mut self, f: F)
83    where
84        'env: 'scope,
85        for<'s> F: FnOnce(&'s std::thread::Scope<'s, 'env>) + Send,
86    {
87        std::thread::scope(f)
88    }
89
90    fn run_in_scope<'s, 'env, 'scope, W>(s: &Self::ScopeRef<'s, 'env, 'scope>, work: W)
91    where
92        'scope: 's,
93        'env: 'scope + 's,
94        W: Fn() + Send + 'scope + 'env,
95    {
96        s.spawn(work);
97    }
98}