orx_parallel/runner/implementations/
std_runner.rs

1use crate::par_thread_pool::ParThreadPool;
2use core::num::NonZeroUsize;
3
4const MAX_UNSET_NUM_THREADS: NonZeroUsize = NonZeroUsize::new(8).expect(">0");
5
6/// Native standard thread pool.
7///
8/// This is the default thread pool used when "std" feature is enabled.
9/// Note that the thread pool to be used for a parallel computation can be set by the
10/// [`with_runner`] transformation separately for each parallel iterator.
11///
12/// Uses `std::thread::scope` and `scope.spawn(..)` to distribute work to threads.
13///
14/// Value of [`max_num_threads`] is determined as the minimum of:
15///
16/// * the available parallelism of the host obtained via `std::thread::available_parallelism()`, and
17/// * the upper bound set by the environment variable "ORX_PARALLEL_MAX_NUM_THREADS", when set.
18///
19/// [`max_num_threads`]: ParThreadPool::max_num_threads
20/// [`with_runner`]: crate::ParIter::with_runner
21#[derive(Clone)]
22pub struct StdDefaultPool {
23    max_num_threads: NonZeroUsize,
24}
25
26impl Default for StdDefaultPool {
27    fn default() -> Self {
28        let env_max_num_threads = crate::env::max_num_threads_by_env_variable();
29
30        let ava_max_num_threads = std::thread::available_parallelism().ok();
31
32        let max_num_threads = match (env_max_num_threads, ava_max_num_threads) {
33            (Some(env), Some(ava)) => env.min(ava),
34            (Some(env), None) => env,
35            (None, Some(ava)) => ava,
36            (None, None) => MAX_UNSET_NUM_THREADS,
37        };
38
39        Self { max_num_threads }
40    }
41}
42
43impl ParThreadPool for StdDefaultPool {
44    type ScopeRef<'s, 'env, 'scope>
45        = &'s std::thread::Scope<'s, 'env>
46    where
47        'scope: 's,
48        'env: 'scope + 's;
49
50    fn max_num_threads(&self) -> NonZeroUsize {
51        self.max_num_threads
52    }
53
54    fn scoped_computation<'env, 'scope, F>(&'env mut self, f: F)
55    where
56        'env: 'scope,
57        for<'s> F: FnOnce(&'s std::thread::Scope<'s, 'env>) + Send,
58    {
59        std::thread::scope(f)
60    }
61
62    fn run_in_scope<'s, 'env, 'scope, W>(s: &Self::ScopeRef<'s, 'env, 'scope>, work: W)
63    where
64        'scope: 's,
65        'env: 'scope + 's,
66        W: Fn() + Send + 'scope + 'env,
67    {
68        s.spawn(work);
69    }
70}
71
72impl ParThreadPool for &StdDefaultPool {
73    type ScopeRef<'s, 'env, 'scope>
74        = &'s std::thread::Scope<'s, 'env>
75    where
76        'scope: 's,
77        'env: 'scope + 's;
78
79    fn max_num_threads(&self) -> NonZeroUsize {
80        self.max_num_threads
81    }
82
83    fn scoped_computation<'env, 'scope, F>(&'env mut self, f: F)
84    where
85        'env: 'scope,
86        for<'s> F: FnOnce(&'s std::thread::Scope<'s, 'env>) + Send,
87    {
88        std::thread::scope(f)
89    }
90
91    fn run_in_scope<'s, 'env, 'scope, W>(s: &Self::ScopeRef<'s, 'env, 'scope>, work: W)
92    where
93        'scope: 's,
94        'env: 'scope + 's,
95        W: Fn() + Send + 'scope + 'env,
96    {
97        s.spawn(work);
98    }
99}