1use super::parallel_task::{ParallelTask, ParallelTaskWithIdx};
2use crate::computations::Values;
3use orx_concurrent_iter::{ChunkPuller, ConcurrentIter};
4
5pub trait ThreadRunner: Sized {
8 type SharedState;
10
11 fn next_chunk_size<I>(&self, shared_state: &Self::SharedState, iter: &I) -> usize
14 where
15 I: ConcurrentIter;
16
17 fn begin_chunk(&mut self, chunk_size: usize);
19
20 fn complete_chunk(&mut self, shared_state: &Self::SharedState, chunk_size: usize);
24
25 fn complete_task(&mut self, shared_state: &Self::SharedState);
29}
30
31pub(crate) trait ThreadRunnerCompute: ThreadRunner {
32 fn run<I, T>(mut self, iter: &I, shared_state: &Self::SharedState, task: &T)
35 where
36 I: ConcurrentIter,
37 T: ParallelTask<Item = I::Item>,
38 {
39 let mut chunk_puller = iter.chunk_puller(0);
40 let mut item_puller = iter.item_puller();
41
42 loop {
43 let chunk_size = self.next_chunk_size(shared_state, iter);
44
45 self.begin_chunk(chunk_size);
46
47 match chunk_size {
48 0 | 1 => match item_puller.next() {
49 Some(value) => task.f1(value),
50 None => break,
51 },
52 c => {
53 if c > chunk_puller.chunk_size() {
54 chunk_puller = iter.chunk_puller(c);
55 }
56
57 match chunk_puller.pull() {
58 Some(chunk) => task.fc(chunk),
59 None => break,
60 }
61 }
62 }
63
64 self.complete_chunk(shared_state, chunk_size);
65 }
66
67 self.complete_task(shared_state);
68 }
69
70 fn run_with_idx<I, T>(mut self, iter: &I, shared_state: &Self::SharedState, task: &T)
71 where
72 I: ConcurrentIter,
73 T: ParallelTaskWithIdx<Item = I::Item>,
74 {
75 let mut chunk_puller = iter.chunk_puller(0);
76 let mut item_puller = iter.item_puller_with_idx();
77
78 loop {
79 let chunk_size = self.next_chunk_size(shared_state, iter);
80
81 self.begin_chunk(chunk_size);
82
83 match chunk_size {
84 0 | 1 => match item_puller.next() {
85 Some((idx, value)) => task.f1(idx, value),
86 None => break,
87 },
88 c => {
89 if c > chunk_puller.chunk_size() {
90 chunk_puller = iter.chunk_puller(c);
91 }
92
93 match chunk_puller.pull_with_idx() {
94 Some((begin_idx, chunk)) => task.fc(begin_idx, chunk),
95 None => break,
96 }
97 }
98 }
99
100 self.complete_chunk(shared_state, chunk_size);
101 }
102
103 self.complete_task(shared_state);
104 }
105
106 fn x_collect_with_idx<I, Vo, M1>(
109 mut self,
110 iter: &I,
111 shared_state: &Self::SharedState,
112 map1: &M1,
113 ) -> Vec<(usize, Vo::Item)>
114 where
115 I: ConcurrentIter,
116 Vo: Values,
117 Vo::Item: Send + Sync,
118 M1: Fn(I::Item) -> Vo + Send + Sync,
119 {
120 let mut collected = Vec::new();
121 let out_vec = &mut collected;
122
123 let mut chunk_puller = iter.chunk_puller(0);
124 let mut item_puller = iter.item_puller_with_idx();
125
126 loop {
127 let chunk_size = self.next_chunk_size(shared_state, iter);
128
129 self.begin_chunk(chunk_size);
130
131 match chunk_size {
132 0 | 1 => match item_puller.next() {
133 Some((idx, i)) => {
134 let vo = map1(i);
135 vo.push_to_vec_with_idx(idx, out_vec);
136 }
137 None => break,
138 },
139 c => {
140 if c > chunk_puller.chunk_size() {
141 chunk_puller = iter.chunk_puller(c);
142 }
143
144 match chunk_puller.pull_with_idx() {
145 Some((chunk_begin_idx, chunk)) => {
146 for i in chunk {
147 let vo = map1(i);
148 vo.push_to_vec_with_idx(chunk_begin_idx, out_vec);
149 }
150 }
151 None => break,
152 }
153 }
154 }
155
156 self.complete_chunk(shared_state, chunk_size);
157 }
158
159 self.complete_task(shared_state);
160 collected
161 }
162
163 fn xfx_collect_with_idx<I, Vt, Vo, M1, F, M2>(
164 mut self,
165 iter: &I,
166 shared_state: &Self::SharedState,
167 map1: &M1,
168 filter: &F,
169 map2: &M2,
170 ) -> Vec<(usize, Vo::Item)>
171 where
172 I: ConcurrentIter,
173 Vt: Values,
174 Vo: Values,
175 Vo::Item: Send + Sync,
176 M1: Fn(I::Item) -> Vt + Send + Sync,
177 F: Fn(&Vt::Item) -> bool + Send + Sync,
178 M2: Fn(Vt::Item) -> Vo + Send + Sync,
179 {
180 let mut collected = Vec::new();
181 let out_vec = &mut collected;
182
183 let mut chunk_puller = iter.chunk_puller(0);
184 let mut item_puller = iter.item_puller_with_idx();
185
186 loop {
187 let chunk_size = self.next_chunk_size(shared_state, iter);
188
189 self.begin_chunk(chunk_size);
190
191 match chunk_size {
192 0 | 1 => match item_puller.next() {
193 Some((i_idx, i)) => {
194 let vt = map1(i);
195 vt.xfx_collect_heap(i_idx, filter, map2, out_vec);
196 }
197 None => break,
198 },
199 c => {
200 if c > chunk_puller.chunk_size() {
201 chunk_puller = iter.chunk_puller(c);
202 }
203
204 match chunk_puller.pull_with_idx() {
205 Some((chunk_begin_idx, chunk)) => {
206 for i in chunk {
207 let vt = map1(i);
208 vt.xfx_collect_heap(chunk_begin_idx, filter, map2, out_vec);
209 }
210 }
211 None => break,
212 }
213 }
214 }
215
216 self.complete_chunk(shared_state, chunk_size);
217 }
218
219 self.complete_task(shared_state);
220 collected
221 }
222
223 fn x_reduce<I, Vo, M1, X>(
226 mut self,
227 iter: &I,
228 shared_state: &Self::SharedState,
229 map1: &M1,
230 reduce: &X,
231 ) -> Option<Vo::Item>
232 where
233 I: ConcurrentIter,
234 Vo: Values,
235 Vo::Item: Send + Sync,
236 M1: Fn(I::Item) -> Vo + Send + Sync,
237 X: Fn(Vo::Item, Vo::Item) -> Vo::Item + Send + Sync,
238 {
239 let mut chunk_puller = iter.chunk_puller(0);
240 let mut item_puller = iter.item_puller();
241
242 let mut acc = None;
243 loop {
244 let chunk_size = self.next_chunk_size(shared_state, iter);
245
246 self.begin_chunk(chunk_size);
247
248 match chunk_size {
249 0 | 1 => match item_puller.next() {
250 Some(i) => {
251 let vo = map1(i);
252 acc = vo.acc_reduce(acc, reduce);
253 }
254 None => break,
255 },
256 c => {
257 if c > chunk_puller.chunk_size() {
258 chunk_puller = iter.chunk_puller(c);
259 }
260
261 match chunk_puller.pull() {
262 Some(chunk) => {
263 for i in chunk {
264 let vo = map1(i);
265 acc = vo.acc_reduce(acc, reduce);
266 }
267 }
268 None => break,
269 }
270 }
271 }
272
273 self.complete_chunk(shared_state, chunk_size);
274 }
275
276 self.complete_task(shared_state);
277 acc
278 }
279
280 fn xfx_reduce<I, Vt, Vo, M1, F, M2, X>(
281 mut self,
282 iter: &I,
283 shared_state: &Self::SharedState,
284 map1: &M1,
285 filter: &F,
286 map2: &M2,
287 reduce: &X,
288 ) -> Option<Vo::Item>
289 where
290 I: ConcurrentIter,
291 Vt: Values,
292 Vo: Values,
293 Vo::Item: Send + Sync,
294 M1: Fn(I::Item) -> Vt + Send + Sync,
295 F: Fn(&Vt::Item) -> bool + Send + Sync,
296 M2: Fn(Vt::Item) -> Vo + Send + Sync,
297 X: Fn(Vo::Item, Vo::Item) -> Vo::Item + Send + Sync,
298 {
299 let mut chunk_puller = iter.chunk_puller(0);
300 let mut item_puller = iter.item_puller();
301
302 let mut acc = None;
303
304 loop {
305 let chunk_size = self.next_chunk_size(shared_state, iter);
306
307 self.begin_chunk(chunk_size);
308
309 match chunk_size {
310 0 | 1 => match item_puller.next() {
311 Some(i) => {
312 let vt = map1(i);
313 acc = vt.fx_reduce(acc, filter, map2, reduce);
314 }
315 None => break,
316 },
317 c => {
318 if c > chunk_puller.chunk_size() {
319 chunk_puller = iter.chunk_puller(c);
320 }
321
322 match chunk_puller.pull() {
323 Some(chunk) => {
324 for i in chunk {
325 let vt = map1(i);
326 acc = vt.fx_reduce(acc, filter, map2, reduce);
327 }
328 }
329 None => break,
330 }
331 }
332 }
333
334 self.complete_chunk(shared_state, chunk_size);
335 }
336
337 self.complete_task(shared_state);
338 acc
339 }
340
341 fn xfx_next<I, Vt, Vo, M1, F, M2>(
344 mut self,
345 iter: &I,
346 shared_state: &Self::SharedState,
347 map1: &M1,
348 filter: &F,
349 map2: &M2,
350 ) -> Option<(usize, Vo::Item)>
351 where
352 I: ConcurrentIter,
353 Vt: Values,
354 Vo: Values,
355 Vo::Item: Send + Sync,
356 M1: Fn(I::Item) -> Vt + Send + Sync,
357 F: Fn(&Vt::Item) -> bool + Send + Sync,
358 M2: Fn(Vt::Item) -> Vo + Send + Sync,
359 {
360 let mut chunk_puller = iter.chunk_puller(0);
361 let mut item_puller = iter.item_puller_with_idx();
362
363 loop {
364 let chunk_size = self.next_chunk_size(shared_state, iter);
365
366 self.begin_chunk(chunk_size);
367
368 match chunk_size {
369 0 | 1 => match item_puller.next() {
370 Some((idx, i)) => {
371 let vt = map1(i);
372 if let Some(first) = vt.fx_next(filter, map2) {
373 iter.skip_to_end();
374 self.complete_chunk(shared_state, chunk_size);
375 self.complete_task(shared_state);
376 return Some((idx, first));
377 }
378 }
379 None => break,
380 },
381 c => {
382 if c > chunk_puller.chunk_size() {
383 chunk_puller = iter.chunk_puller(c);
384 }
385
386 match chunk_puller.pull_with_idx() {
387 Some((idx, chunk)) => {
388 for i in chunk {
389 let vt = map1(i);
390 if let Some(first) = vt.fx_next(filter, map2) {
391 iter.skip_to_end();
392 self.complete_chunk(shared_state, chunk_size);
393 self.complete_task(shared_state);
394 return Some((idx, first));
395 }
396 }
397 }
398 None => break,
399 }
400 }
401 }
402
403 self.complete_chunk(shared_state, chunk_size);
404 }
405
406 self.complete_task(shared_state);
407 None
408 }
409
410 fn xfx_next_any<I, Vt, Vo, M1, F, M2>(
411 mut self,
412 iter: &I,
413 shared_state: &Self::SharedState,
414 map1: &M1,
415 filter: &F,
416 map2: &M2,
417 ) -> Option<Vo::Item>
418 where
419 I: ConcurrentIter,
420 Vt: Values,
421 Vo: Values,
422 Vo::Item: Send + Sync,
423 M1: Fn(I::Item) -> Vt + Send + Sync,
424 F: Fn(&Vt::Item) -> bool + Send + Sync,
425 M2: Fn(Vt::Item) -> Vo + Send + Sync,
426 {
427 let mut chunk_puller = iter.chunk_puller(0);
428 let mut item_puller = iter.item_puller();
429
430 loop {
431 let chunk_size = self.next_chunk_size(shared_state, iter);
432
433 self.begin_chunk(chunk_size);
434
435 match chunk_size {
436 0 | 1 => match item_puller.next() {
437 Some(i) => {
438 let vt = map1(i);
439 let maybe_next = vt.fx_next(filter, map2);
440 if maybe_next.is_some() {
441 iter.skip_to_end();
442 self.complete_chunk(shared_state, chunk_size);
443 self.complete_task(shared_state);
444 return maybe_next;
445 }
446 }
447 None => break,
448 },
449 c => {
450 if c > chunk_puller.chunk_size() {
451 chunk_puller = iter.chunk_puller(c);
452 }
453
454 match chunk_puller.pull() {
455 Some(chunk) => {
456 for i in chunk {
457 let vt = map1(i);
458 let maybe_next = vt.fx_next(filter, map2);
459 if maybe_next.is_some() {
460 iter.skip_to_end();
461 self.complete_chunk(shared_state, chunk_size);
462 self.complete_task(shared_state);
463 return maybe_next;
464 }
465 }
466 }
467 None => break,
468 }
469 }
470 }
471
472 self.complete_chunk(shared_state, chunk_size);
473 }
474
475 self.complete_task(shared_state);
476 None
477 }
478}
479
480impl<X: ThreadRunner> ThreadRunnerCompute for X {}