orx_parallel/runner/
parallel_runner.rs

1use crate::{
2    NumThreads, ParallelExecutor, Params,
3    generic_values::runner_results::{Fallibility, Infallible, Never},
4    par_thread_pool::{ParThreadPool, ParThreadPoolCompute},
5    runner::{ComputationKind, NumSpawned},
6};
7use alloc::vec::Vec;
8use core::num::NonZeroUsize;
9use orx_concurrent_iter::ConcurrentIter;
10
11/// Parallel runner defining how the threads must be spawned and job must be distributed.
12pub trait ParallelRunner {
13    /// Parallel executor responsible for distribution of tasks to the threads.
14    type Executor: ParallelExecutor;
15
16    /// Thread pool responsible for providing threads to the parallel computation.
17    type ThreadPool: ParThreadPool;
18
19    /// Creates a new parallel executor for a parallel computation.
20    fn new_executor(
21        &self,
22        kind: ComputationKind,
23        params: Params,
24        initial_input_len: Option<usize>,
25    ) -> Self::Executor {
26        let max_num_threads = self.max_num_threads_for_computation(params, initial_input_len);
27        <Self::Executor as ParallelExecutor>::new(kind, params, initial_input_len, max_num_threads)
28    }
29
30    /// Reference to the underlying thread pool.
31    fn thread_pool(&self) -> &Self::ThreadPool;
32
33    /// Mutable reference to the underlying thread pool.
34    fn thread_pool_mut(&mut self) -> &mut Self::ThreadPool;
35
36    // derived
37
38    /// Runs `thread_do` using threads provided by the thread pool.
39    fn run_all<I, F>(
40        &mut self,
41        params: Params,
42        iter: I,
43        kind: ComputationKind,
44        thread_do: F,
45    ) -> NumSpawned
46    where
47        I: ConcurrentIter,
48        F: Fn(NumSpawned, &I, &SharedStateOf<Self>, ThreadRunnerOf<Self>) + Sync,
49    {
50        let runner = self.new_executor(kind, params, iter.try_get_len());
51        let state = runner.new_shared_state();
52        let do_spawn = |num_spawned| runner.do_spawn_new(num_spawned, &state, &iter);
53        let work = |num_spawned| {
54            thread_do(
55                num_spawned,
56                &iter,
57                &state,
58                runner.new_thread_executor(&state),
59            );
60        };
61        self.thread_pool_mut().run_in_pool(do_spawn, work)
62    }
63
64    /// Runs `thread_map` using threads provided by the thread pool.
65    fn map_all<F, I, M, T>(
66        &mut self,
67        params: Params,
68        iter: I,
69        kind: ComputationKind,
70        thread_map: M,
71    ) -> (NumSpawned, Result<Vec<T>, F::Error>)
72    where
73        F: Fallibility,
74        I: ConcurrentIter,
75        M: Fn(NumSpawned, &I, &SharedStateOf<Self>, ThreadRunnerOf<Self>) -> Result<T, F::Error>
76            + Sync,
77        T: Send,
78        F::Error: Send,
79    {
80        let iter_len = iter.try_get_len();
81        let runner = self.new_executor(kind, params, iter_len);
82        let state = runner.new_shared_state();
83        let do_spawn = |num_spawned| runner.do_spawn_new(num_spawned, &state, &iter);
84        let work = |nt| thread_map(nt, &iter, &state, runner.new_thread_executor(&state));
85        let max_num_threads = self.max_num_threads_for_computation(params, iter_len);
86        self.thread_pool_mut()
87            .map_in_pool::<F, _, _, _>(do_spawn, work, max_num_threads)
88    }
89
90    /// Runs infallible `thread_map` using threads provided by the thread pool.
91    fn map_infallible<I, M, T>(
92        &mut self,
93        params: Params,
94        iter: I,
95        kind: ComputationKind,
96        thread_map: M,
97    ) -> (NumSpawned, Result<Vec<T>, Never>)
98    where
99        I: ConcurrentIter,
100        M: Fn(NumSpawned, &I, &SharedStateOf<Self>, ThreadRunnerOf<Self>) -> Result<T, Never>
101            + Sync,
102        T: Send,
103    {
104        self.map_all::<Infallible, _, _, _>(params, iter, kind, thread_map)
105    }
106
107    /// Returns the maximum number of threads that can be used for the computation defined by
108    /// the `params` and input `iter_len`.
109    fn max_num_threads_for_computation(
110        &self,
111        params: Params,
112        iter_len: Option<usize>,
113    ) -> NonZeroUsize {
114        let pool = self.thread_pool().max_num_threads();
115
116        let env = crate::env::max_num_threads_by_env_variable().unwrap_or(NonZeroUsize::MAX);
117
118        let req = match (iter_len, params.num_threads) {
119            (Some(len), NumThreads::Auto) => NonZeroUsize::new(len.max(1)).expect(">0"),
120            (Some(len), NumThreads::Max(nt)) => NonZeroUsize::new(len.max(1)).expect(">0").min(nt),
121            (None, NumThreads::Auto) => NonZeroUsize::MAX,
122            (None, NumThreads::Max(nt)) => nt,
123        };
124
125        req.min(pool.min(env))
126    }
127}
128
129pub(crate) type SharedStateOf<C> =
130    <<C as ParallelRunner>::Executor as ParallelExecutor>::SharedState;
131pub(crate) type ThreadRunnerOf<C> =
132    <<C as ParallelRunner>::Executor as ParallelExecutor>::ThreadExecutor;
133
134// auto impl for &mut pool
135
136impl<O> ParallelRunner for &'_ mut O
137where
138    O: ParallelRunner,
139{
140    type Executor = O::Executor;
141
142    type ThreadPool = O::ThreadPool;
143
144    fn thread_pool(&self) -> &Self::ThreadPool {
145        O::thread_pool(self)
146    }
147
148    fn thread_pool_mut(&mut self) -> &mut Self::ThreadPool {
149        O::thread_pool_mut(self)
150    }
151}