1use crate::{
2 NumThreads, ParallelExecutor, Params,
3 generic_values::runner_results::{Fallibility, Infallible, Never},
4 par_thread_pool::{ParThreadPool, ParThreadPoolCompute},
5 runner::{ComputationKind, NumSpawned},
6};
7use alloc::vec::Vec;
8use core::num::NonZeroUsize;
9use orx_concurrent_iter::ConcurrentIter;
10
11pub trait ParallelRunner {
13 type Executor: ParallelExecutor;
15
16 type ThreadPool: ParThreadPool;
18
19 fn new_executor(
21 &self,
22 kind: ComputationKind,
23 params: Params,
24 initial_input_len: Option<usize>,
25 ) -> Self::Executor {
26 let max_num_threads = self.max_num_threads_for_computation(params, initial_input_len);
27 <Self::Executor as ParallelExecutor>::new(kind, params, initial_input_len, max_num_threads)
28 }
29
30 fn thread_pool(&self) -> &Self::ThreadPool;
32
33 fn thread_pool_mut(&mut self) -> &mut Self::ThreadPool;
35
36 fn run_all<I, F>(
40 &mut self,
41 params: Params,
42 iter: I,
43 kind: ComputationKind,
44 thread_do: F,
45 ) -> NumSpawned
46 where
47 I: ConcurrentIter,
48 F: Fn(NumSpawned, &I, &SharedStateOf<Self>, ThreadRunnerOf<Self>) + Sync,
49 {
50 let runner = self.new_executor(kind, params, iter.try_get_len());
51 let state = runner.new_shared_state();
52 let do_spawn = |num_spawned| runner.do_spawn_new(num_spawned, &state, &iter);
53 let work = |num_spawned| {
54 thread_do(
55 num_spawned,
56 &iter,
57 &state,
58 runner.new_thread_executor(&state),
59 );
60 };
61 self.thread_pool_mut().run_in_pool(do_spawn, work)
62 }
63
64 fn map_all<F, I, M, T>(
66 &mut self,
67 params: Params,
68 iter: I,
69 kind: ComputationKind,
70 thread_map: M,
71 ) -> (NumSpawned, Result<Vec<T>, F::Error>)
72 where
73 F: Fallibility,
74 I: ConcurrentIter,
75 M: Fn(NumSpawned, &I, &SharedStateOf<Self>, ThreadRunnerOf<Self>) -> Result<T, F::Error>
76 + Sync,
77 T: Send,
78 F::Error: Send,
79 {
80 let iter_len = iter.try_get_len();
81 let runner = self.new_executor(kind, params, iter_len);
82 let state = runner.new_shared_state();
83 let do_spawn = |num_spawned| runner.do_spawn_new(num_spawned, &state, &iter);
84 let work = |nt| thread_map(nt, &iter, &state, runner.new_thread_executor(&state));
85 let max_num_threads = self.max_num_threads_for_computation(params, iter_len);
86 self.thread_pool_mut()
87 .map_in_pool::<F, _, _, _>(do_spawn, work, max_num_threads)
88 }
89
90 fn map_infallible<I, M, T>(
92 &mut self,
93 params: Params,
94 iter: I,
95 kind: ComputationKind,
96 thread_map: M,
97 ) -> (NumSpawned, Result<Vec<T>, Never>)
98 where
99 I: ConcurrentIter,
100 M: Fn(NumSpawned, &I, &SharedStateOf<Self>, ThreadRunnerOf<Self>) -> Result<T, Never>
101 + Sync,
102 T: Send,
103 {
104 self.map_all::<Infallible, _, _, _>(params, iter, kind, thread_map)
105 }
106
107 fn max_num_threads_for_computation(
110 &self,
111 params: Params,
112 iter_len: Option<usize>,
113 ) -> NonZeroUsize {
114 let pool = self.thread_pool().max_num_threads();
115
116 let env = crate::env::max_num_threads_by_env_variable().unwrap_or(NonZeroUsize::MAX);
117
118 let req = match (iter_len, params.num_threads) {
119 (Some(len), NumThreads::Auto) => NonZeroUsize::new(len.max(1)).expect(">0"),
120 (Some(len), NumThreads::Max(nt)) => NonZeroUsize::new(len.max(1)).expect(">0").min(nt),
121 (None, NumThreads::Auto) => NonZeroUsize::MAX,
122 (None, NumThreads::Max(nt)) => nt,
123 };
124
125 req.min(pool.min(env))
126 }
127}
128
129pub(crate) type SharedStateOf<C> =
130 <<C as ParallelRunner>::Executor as ParallelExecutor>::SharedState;
131pub(crate) type ThreadRunnerOf<C> =
132 <<C as ParallelRunner>::Executor as ParallelExecutor>::ThreadExecutor;
133
134impl<O> ParallelRunner for &'_ mut O
137where
138 O: ParallelRunner,
139{
140 type Executor = O::Executor;
141
142 type ThreadPool = O::ThreadPool;
143
144 fn thread_pool(&self) -> &Self::ThreadPool {
145 O::thread_pool(self)
146 }
147
148 fn thread_pool_mut(&mut self) -> &mut Self::ThreadPool {
149 O::thread_pool_mut(self)
150 }
151}