orx_parallel/runner/
parallel_runner.rs

1use crate::{
2    NumThreads, ParallelExecutor, Params,
3    generic_values::runner_results::{Fallibility, Infallible, Never},
4    par_thread_pool::{ParThreadPool, ParThreadPoolCompute},
5    runner::{ComputationKind, NumSpawned},
6};
7use alloc::vec::Vec;
8use core::num::NonZeroUsize;
9use orx_concurrent_iter::ConcurrentIter;
10
11/// Parallel runner defining how the threads must be spawned and job must be distributed.
12pub trait ParallelRunner {
13    /// Parallel executor responsible for distribution of tasks to the threads.
14    type Executor: ParallelExecutor;
15
16    /// Thread pool responsible for providing threads to the parallel computation.
17    type ThreadPool: ParThreadPool;
18
19    /// Creates a new parallel executor for a parallel computation.
20    fn new_executor(
21        &self,
22        kind: ComputationKind,
23        params: Params,
24        initial_input_len: Option<usize>,
25    ) -> Self::Executor {
26        let max_num_threads = self.max_num_threads_for_computation(params, initial_input_len);
27        <Self::Executor as ParallelExecutor>::new(kind, params, initial_input_len, max_num_threads)
28    }
29
30    /// Reference to the underlying thread pool.
31    fn thread_pool(&self) -> &Self::ThreadPool;
32
33    /// Mutable reference to the underlying thread pool.
34    fn thread_pool_mut(&mut self) -> &mut Self::ThreadPool;
35
36    // derived
37
38    /// Runs `thread_do` using threads provided by the thread pool.
39    fn run_all<I, F>(
40        &mut self,
41        params: Params,
42        iter: I,
43        kind: ComputationKind,
44        thread_do: F,
45    ) -> NumSpawned
46    where
47        I: ConcurrentIter,
48        F: Fn(NumSpawned, &I, &SharedStateOf<Self>, ThreadRunnerOf<Self>) + Sync,
49    {
50        let executor = self.new_executor(kind, params, iter.try_get_len());
51        let state = executor.new_shared_state();
52        let do_spawn = |num_spawned| executor.do_spawn_new(num_spawned, &state, &iter);
53        let work = |num_spawned: NumSpawned| {
54            let thread_idx = num_spawned.into_inner();
55            thread_do(
56                num_spawned,
57                &iter,
58                &state,
59                executor.new_thread_executor(thread_idx, &state),
60            );
61        };
62        let result = self.thread_pool_mut().run_in_pool(do_spawn, work);
63        executor.complete_task(state);
64        result
65    }
66
67    /// Runs `thread_map` using threads provided by the thread pool.
68    fn map_all<F, I, M, T>(
69        &mut self,
70        params: Params,
71        iter: I,
72        kind: ComputationKind,
73        thread_map: M,
74    ) -> (NumSpawned, Result<Vec<T>, F::Error>)
75    where
76        F: Fallibility,
77        I: ConcurrentIter,
78        M: Fn(NumSpawned, &I, &SharedStateOf<Self>, ThreadRunnerOf<Self>) -> Result<T, F::Error>
79            + Sync,
80        T: Send,
81        F::Error: Send,
82    {
83        let iter_len = iter.try_get_len();
84        let executor = self.new_executor(kind, params, iter_len);
85        let state = executor.new_shared_state();
86        let do_spawn = |num_spawned| executor.do_spawn_new(num_spawned, &state, &iter);
87        let work = |num_spawned: NumSpawned| {
88            let thread_idx = num_spawned.into_inner();
89            thread_map(
90                num_spawned,
91                &iter,
92                &state,
93                executor.new_thread_executor(thread_idx, &state),
94            )
95        };
96        let max_num_threads = self.max_num_threads_for_computation(params, iter_len);
97        let result =
98            self.thread_pool_mut()
99                .map_in_pool::<F, _, _, _>(do_spawn, work, max_num_threads);
100        executor.complete_task(state);
101        result
102    }
103
104    /// Runs infallible `thread_map` using threads provided by the thread pool.
105    fn map_infallible<I, M, T>(
106        &mut self,
107        params: Params,
108        iter: I,
109        kind: ComputationKind,
110        thread_map: M,
111    ) -> (NumSpawned, Result<Vec<T>, Never>)
112    where
113        I: ConcurrentIter,
114        M: Fn(NumSpawned, &I, &SharedStateOf<Self>, ThreadRunnerOf<Self>) -> Result<T, Never>
115            + Sync,
116        T: Send,
117    {
118        self.map_all::<Infallible, _, _, _>(params, iter, kind, thread_map)
119    }
120
121    /// Returns the maximum number of threads that can be used for the computation defined by
122    /// the `params` and input `iter_len`.
123    fn max_num_threads_for_computation(
124        &self,
125        params: Params,
126        iter_len: Option<usize>,
127    ) -> NonZeroUsize {
128        let pool = self.thread_pool().max_num_threads();
129
130        let env = crate::env::max_num_threads_by_env_variable().unwrap_or(NonZeroUsize::MAX);
131
132        let req = match (iter_len, params.num_threads) {
133            (Some(len), NumThreads::Auto) => NonZeroUsize::new(len.max(1)).expect(">0"),
134            (Some(len), NumThreads::Max(nt)) => NonZeroUsize::new(len.max(1)).expect(">0").min(nt),
135            (None, NumThreads::Auto) => NonZeroUsize::MAX,
136            (None, NumThreads::Max(nt)) => nt,
137        };
138
139        req.min(pool.min(env))
140    }
141}
142
143pub(crate) type SharedStateOf<C> =
144    <<C as ParallelRunner>::Executor as ParallelExecutor>::SharedState;
145pub(crate) type ThreadRunnerOf<C> =
146    <<C as ParallelRunner>::Executor as ParallelExecutor>::ThreadExecutor;
147
148// auto impl for &mut pool
149
150impl<O> ParallelRunner for &'_ mut O
151where
152    O: ParallelRunner,
153{
154    type Executor = O::Executor;
155
156    type ThreadPool = O::ThreadPool;
157
158    fn thread_pool(&self) -> &Self::ThreadPool {
159        O::thread_pool(self)
160    }
161
162    fn thread_pool_mut(&mut self) -> &mut Self::ThreadPool {
163        O::thread_pool_mut(self)
164    }
165}