1use super::{
2 computation_kind::ComputationKind,
3 parallel_task::{ParallelTask, ParallelTaskWithIdx},
4 thread_runner::{ThreadRunner, ThreadRunnerCompute},
5};
6use crate::{computations::Values, parameters::Params};
7use orx_concurrent_iter::ConcurrentIter;
8
9pub trait ParallelRunner: Sized + Sync {
13 type SharedState: Send + Sync;
15
16 type ThreadRunner: ThreadRunner<SharedState = Self::SharedState>;
18
19 fn new(kind: ComputationKind, params: Params, initial_input_len: Option<usize>) -> Self;
22
23 fn new_shared_state(&self) -> Self::SharedState;
25
26 fn do_spawn_new<I>(
31 &self,
32 num_spawned: usize,
33 shared_state: &Self::SharedState,
34 iter: &I,
35 ) -> bool
36 where
37 I: ConcurrentIter;
38
39 fn new_thread_runner(&self, shared_state: &Self::SharedState) -> Self::ThreadRunner;
42}
43
44pub trait ParallelRunnerCompute: ParallelRunner {
45 fn run<I, T>(&self, iter: &I, task: T) -> usize
48 where
49 I: ConcurrentIter,
50 T: ParallelTask<Item = I::Item> + Sync,
51 {
52 let state = self.new_shared_state();
53 let shared_state = &state;
54
55 let mut num_spawned = 0;
56 std::thread::scope(|s| {
57 while self.do_spawn_new(num_spawned, shared_state, iter) {
58 num_spawned += 1;
59 s.spawn(|| {
60 let thread_runner = self.new_thread_runner(shared_state);
61 thread_runner.run(iter, shared_state, &task);
62 });
63 }
64 });
65 num_spawned
66 }
67
68 fn run_with_idx<I, T>(&self, iter: &I, task: T) -> usize
69 where
70 I: ConcurrentIter,
71 T: ParallelTaskWithIdx<Item = I::Item> + Sync,
72 {
73 let state = self.new_shared_state();
74 let shared_state = &state;
75
76 let mut num_spawned = 0;
77 std::thread::scope(|s| {
78 while self.do_spawn_new(num_spawned, shared_state, iter) {
79 num_spawned += 1;
80 s.spawn(|| {
81 let thread_runner = self.new_thread_runner(shared_state);
82 thread_runner.run_with_idx(iter, shared_state, &task);
83 });
84 }
85 });
86 num_spawned
87 }
88
89 #[allow(clippy::type_complexity)]
92 fn x_collect_with_idx<I, Vo, M1>(
93 &self,
94 iter: &I,
95 map1: &M1,
96 ) -> (usize, Vec<Vec<(usize, Vo::Item)>>)
97 where
98 I: ConcurrentIter,
99 Vo: Values,
100 Vo::Item: Send + Sync,
101 M1: Fn(I::Item) -> Vo + Send + Sync,
102 {
103 let state = self.new_shared_state();
104 let shared_state = &state;
105
106 let mut num_spawned = 0;
107 let vectors = std::thread::scope(|s| {
108 let mut handles = vec![];
109
110 while self.do_spawn_new(num_spawned, shared_state, iter) {
111 num_spawned += 1;
112 handles.push(s.spawn(move || {
113 let thread_runner = self.new_thread_runner(shared_state);
114 thread_runner.x_collect_with_idx(iter, shared_state, map1)
115 }));
116 }
117
118 let mut vectors = Vec::with_capacity(handles.len());
119 for x in handles {
120 vectors.push(x.join().expect("failed to join the thread"));
121 }
122 vectors
123 });
124
125 (num_spawned, vectors)
126 }
127
128 #[allow(clippy::type_complexity)]
129 fn xfx_collect_with_idx<I, Vt, Vo, M1, F, M2>(
130 &self,
131 iter: &I,
132 map1: &M1,
133 filter: &F,
134 map2: &M2,
135 ) -> (usize, Vec<Vec<(usize, Vo::Item)>>)
136 where
137 I: ConcurrentIter,
138 Vt: Values,
139 Vo: Values,
140 Vo::Item: Send + Sync,
141 M1: Fn(I::Item) -> Vt + Send + Sync,
142 F: Fn(&Vt::Item) -> bool + Send + Sync,
143 M2: Fn(Vt::Item) -> Vo + Send + Sync,
144 {
145 let state = self.new_shared_state();
146 let shared_state = &state;
147
148 let mut num_spawned = 0;
149 let vectors = std::thread::scope(|s| {
150 let mut handles = vec![];
151
152 while self.do_spawn_new(num_spawned, shared_state, iter) {
153 num_spawned += 1;
154 handles.push(s.spawn(move || {
155 let thread_runner = self.new_thread_runner(shared_state);
156 thread_runner.xfx_collect_with_idx(iter, shared_state, map1, filter, map2)
157 }));
158 }
159
160 let mut vectors = Vec::with_capacity(handles.len());
161 for x in handles {
162 vectors.push(x.join().expect("failed to join the thread"));
163 }
164 vectors
165 });
166
167 (num_spawned, vectors)
168 }
169
170 fn x_reduce<I, Vo, M1, X>(&self, iter: &I, map1: &M1, reduce: &X) -> (usize, Option<Vo::Item>)
173 where
174 I: ConcurrentIter,
175 Vo: Values,
176 Vo::Item: Send + Sync,
177 M1: Fn(I::Item) -> Vo + Send + Sync,
178 X: Fn(Vo::Item, Vo::Item) -> Vo::Item + Send + Sync,
179 {
180 let state = self.new_shared_state();
181 let shared_state = &state;
182
183 let mut num_spawned = 0;
184 let results = std::thread::scope(|s| {
185 let mut handles = vec![];
186
187 while self.do_spawn_new(num_spawned, shared_state, iter) {
188 num_spawned += 1;
189 handles.push(s.spawn(move || {
190 let thread_runner = self.new_thread_runner(shared_state);
191 thread_runner.x_reduce(iter, shared_state, map1, reduce)
192 }));
193 }
194
195 let mut results = Vec::with_capacity(handles.len());
196 for x in handles {
197 if let Some(x) = x.join().expect("failed to join the thread") {
198 results.push(x);
199 }
200 }
201 results
202 });
203
204 let acc = results.into_iter().reduce(reduce);
205
206 (num_spawned, acc)
207 }
208
209 fn xfx_reduce<I, Vt, Vo, M1, F, M2, X>(
210 &self,
211 iter: &I,
212 map1: &M1,
213 filter: &F,
214 map2: &M2,
215 reduce: &X,
216 ) -> (usize, Option<Vo::Item>)
217 where
218 I: ConcurrentIter,
219 Vt: Values,
220 Vo: Values,
221 Vo::Item: Send + Sync,
222 M1: Fn(I::Item) -> Vt + Send + Sync,
223 F: Fn(&Vt::Item) -> bool + Send + Sync,
224 M2: Fn(Vt::Item) -> Vo + Send + Sync,
225 X: Fn(Vo::Item, Vo::Item) -> Vo::Item + Send + Sync,
226 {
227 let state = self.new_shared_state();
228 let shared_state = &state;
229
230 let mut num_spawned = 0;
231 let results = std::thread::scope(|s| {
232 let mut handles = vec![];
233
234 while self.do_spawn_new(num_spawned, shared_state, iter) {
235 num_spawned += 1;
236 handles.push(s.spawn(move || {
237 let thread_runner = self.new_thread_runner(shared_state);
238 thread_runner.xfx_reduce(iter, shared_state, map1, filter, map2, reduce)
239 }));
240 }
241
242 let mut results = Vec::with_capacity(handles.len());
243 for x in handles {
244 if let Some(x) = x.join().expect("failed to join the thread") {
245 results.push(x);
246 }
247 }
248 results
249 });
250
251 let acc = results.into_iter().reduce(reduce);
252
253 (num_spawned, acc)
254 }
255
256 fn xfx_next<I, Vt, Vo, M1, F, M2>(
259 &self,
260 iter: &I,
261 map1: &M1,
262 filter: &F,
263 map2: &M2,
264 ) -> (usize, Option<Vo::Item>)
265 where
266 I: ConcurrentIter,
267 Vt: Values,
268 Vo: Values,
269 Vo::Item: Send + Sync,
270 M1: Fn(I::Item) -> Vt + Send + Sync,
271 F: Fn(&Vt::Item) -> bool + Send + Sync,
272 M2: Fn(Vt::Item) -> Vo + Send + Sync,
273 {
274 let state = self.new_shared_state();
275 let shared_state = &state;
276
277 let mut num_spawned = 0;
278 let results = std::thread::scope(|s| {
279 let mut handles = vec![];
280
281 while self.do_spawn_new(num_spawned, shared_state, iter) {
282 num_spawned += 1;
283 handles.push(s.spawn(move || {
284 let thread_runner = self.new_thread_runner(shared_state);
285 thread_runner.xfx_next(iter, shared_state, map1, filter, map2)
286 }));
287 }
288
289 let mut results: Vec<(usize, Vo::Item)> = Vec::with_capacity(handles.len());
290 for x in handles {
291 if let Some(x) = x.join().expect("failed to join the thread") {
292 results.push(x);
293 }
294 }
295 results
296 });
297
298 let acc = results.into_iter().min_by_key(|x| x.0).map(|x| x.1);
299
300 (num_spawned, acc)
301 }
302
303 fn xfx_next_any<I, Vt, Vo, M1, F, M2>(
304 &self,
305 iter: &I,
306 map1: &M1,
307 filter: &F,
308 map2: &M2,
309 ) -> (usize, Option<Vo::Item>)
310 where
311 I: ConcurrentIter,
312 Vt: Values,
313 Vo: Values,
314 Vo::Item: Send + Sync,
315 M1: Fn(I::Item) -> Vt + Send + Sync,
316 F: Fn(&Vt::Item) -> bool + Send + Sync,
317 M2: Fn(Vt::Item) -> Vo + Send + Sync,
318 {
319 let state = self.new_shared_state();
320 let shared_state = &state;
321
322 let mut num_spawned = 0;
323 let result = std::thread::scope(|s| {
324 let mut handles = vec![];
325
326 while self.do_spawn_new(num_spawned, shared_state, iter) {
327 num_spawned += 1;
328 handles.push(s.spawn(move || {
329 let thread_runner = self.new_thread_runner(shared_state);
330 thread_runner.xfx_next_any(iter, shared_state, map1, filter, map2)
331 }));
332 }
333
334 handles
336 .into_iter()
337 .find_map(|x| x.join().expect("failed to join the thread"))
338 });
339
340 (num_spawned, result)
341 }
342}
343
344impl<X: ParallelRunner> ParallelRunnerCompute for X {}