1use crate::{
2 NumThreads, ParallelExecutor, Params,
3 generic_values::runner_results::{Fallibility, Infallible, Never},
4 par_thread_pool::{ParThreadPool, ParThreadPoolCompute},
5 runner::{ComputationKind, NumSpawned},
6};
7use alloc::vec::Vec;
8use core::num::NonZeroUsize;
9use orx_concurrent_iter::ConcurrentIter;
10
11pub trait ParallelRunner {
13 type Executor: ParallelExecutor;
15
16 type ThreadPool: ParThreadPool;
18
19 fn new_executor(
21 &self,
22 kind: ComputationKind,
23 params: Params,
24 initial_input_len: Option<usize>,
25 ) -> Self::Executor {
26 let max_num_threads = self.max_num_threads_for_computation(params, initial_input_len);
27 <Self::Executor as ParallelExecutor>::new(kind, params, initial_input_len, max_num_threads)
28 }
29
30 fn thread_pool(&self) -> &Self::ThreadPool;
32
33 fn thread_pool_mut(&mut self) -> &mut Self::ThreadPool;
35
36 fn run_all<I, F>(
40 &mut self,
41 params: Params,
42 iter: I,
43 kind: ComputationKind,
44 thread_do: F,
45 ) -> NumSpawned
46 where
47 I: ConcurrentIter,
48 F: Fn(NumSpawned, &I, &SharedStateOf<Self>, ThreadRunnerOf<Self>) + Sync,
49 {
50 let executor = self.new_executor(kind, params, iter.try_get_len());
51 let state = executor.new_shared_state();
52 let do_spawn = |num_spawned| executor.do_spawn_new(num_spawned, &state, &iter);
53 let work = |num_spawned: NumSpawned| {
54 let thread_idx = num_spawned.into_inner();
55 thread_do(
56 num_spawned,
57 &iter,
58 &state,
59 executor.new_thread_executor(thread_idx, &state),
60 );
61 };
62 let result = self.thread_pool_mut().run_in_pool(do_spawn, work);
63 executor.complete_task(state);
64 result
65 }
66
67 fn map_all<F, I, M, T>(
69 &mut self,
70 params: Params,
71 iter: I,
72 kind: ComputationKind,
73 thread_map: M,
74 ) -> (NumSpawned, Result<Vec<T>, F::Error>)
75 where
76 F: Fallibility,
77 I: ConcurrentIter,
78 M: Fn(NumSpawned, &I, &SharedStateOf<Self>, ThreadRunnerOf<Self>) -> Result<T, F::Error>
79 + Sync,
80 T: Send,
81 F::Error: Send,
82 {
83 let iter_len = iter.try_get_len();
84 let executor = self.new_executor(kind, params, iter_len);
85 let state = executor.new_shared_state();
86 let do_spawn = |num_spawned| executor.do_spawn_new(num_spawned, &state, &iter);
87 let work = |num_spawned: NumSpawned| {
88 let thread_idx = num_spawned.into_inner();
89 thread_map(
90 num_spawned,
91 &iter,
92 &state,
93 executor.new_thread_executor(thread_idx, &state),
94 )
95 };
96 let max_num_threads = self.max_num_threads_for_computation(params, iter_len);
97 let result =
98 self.thread_pool_mut()
99 .map_in_pool::<F, _, _, _>(do_spawn, work, max_num_threads);
100 executor.complete_task(state);
101 result
102 }
103
104 fn map_infallible<I, M, T>(
106 &mut self,
107 params: Params,
108 iter: I,
109 kind: ComputationKind,
110 thread_map: M,
111 ) -> (NumSpawned, Result<Vec<T>, Never>)
112 where
113 I: ConcurrentIter,
114 M: Fn(NumSpawned, &I, &SharedStateOf<Self>, ThreadRunnerOf<Self>) -> Result<T, Never>
115 + Sync,
116 T: Send,
117 {
118 self.map_all::<Infallible, _, _, _>(params, iter, kind, thread_map)
119 }
120
121 fn max_num_threads_for_computation(
124 &self,
125 params: Params,
126 iter_len: Option<usize>,
127 ) -> NonZeroUsize {
128 let pool = self.thread_pool().max_num_threads();
129
130 let env = crate::env::max_num_threads_by_env_variable().unwrap_or(NonZeroUsize::MAX);
131
132 let req = match (iter_len, params.num_threads) {
133 (Some(len), NumThreads::Auto) => NonZeroUsize::new(len.max(1)).expect(">0"),
134 (Some(len), NumThreads::Max(nt)) => NonZeroUsize::new(len.max(1)).expect(">0").min(nt),
135 (None, NumThreads::Auto) => NonZeroUsize::MAX,
136 (None, NumThreads::Max(nt)) => nt,
137 };
138
139 req.min(pool.min(env))
140 }
141}
142
143pub(crate) type SharedStateOf<C> =
144 <<C as ParallelRunner>::Executor as ParallelExecutor>::SharedState;
145pub(crate) type ThreadRunnerOf<C> =
146 <<C as ParallelRunner>::Executor as ParallelExecutor>::ThreadExecutor;
147
148impl<O> ParallelRunner for &'_ mut O
151where
152 O: ParallelRunner,
153{
154 type Executor = O::Executor;
155
156 type ThreadPool = O::ThreadPool;
157
158 fn thread_pool(&self) -> &Self::ThreadPool {
159 O::thread_pool(self)
160 }
161
162 fn thread_pool_mut(&mut self) -> &mut Self::ThreadPool {
163 O::thread_pool_mut(self)
164 }
165}