orx_parallel/runner/
thread_runner.rs

1use super::parallel_task::{ParallelTask, ParallelTaskWithIdx};
2use crate::computations::Values;
3use orx_concurrent_iter::{ChunkPuller, ConcurrentIter};
4
5/// Thread runner responsible for executing the tasks assigned to the thread by the
6/// parallel runner.
7pub trait ThreadRunner: Sized {
8    /// Type of the shared state among threads.
9    type SharedState;
10
11    /// Returns the next chunks size to be pulled from the input `iter` for the given
12    /// current `shared_state`.
13    fn next_chunk_size<I>(&self, shared_state: &Self::SharedState, iter: &I) -> usize
14    where
15        I: ConcurrentIter;
16
17    /// Hook that will be called before starting to execute the chunk of the given `chunk_size`.
18    fn begin_chunk(&mut self, chunk_size: usize);
19
20    /// Hook that will be called after completing the chunk of the given `chunk_size`.
21    /// The `shared_state` is also provided so that it can be updated to send information to the
22    /// parallel runner and other thread runners.
23    fn complete_chunk(&mut self, shared_state: &Self::SharedState, chunk_size: usize);
24
25    /// Hook that will be called after completing the task.
26    /// The `shared_state` is also provided so that it can be updated to send information to the
27    /// parallel runner and other thread runners.
28    fn complete_task(&mut self, shared_state: &Self::SharedState);
29}
30
31pub(crate) trait ThreadRunnerCompute: ThreadRunner {
32    // run
33
34    fn run<I, T>(mut self, iter: &I, shared_state: &Self::SharedState, task: &T)
35    where
36        I: ConcurrentIter,
37        T: ParallelTask<Item = I::Item>,
38    {
39        let mut chunk_puller = iter.chunk_puller(0);
40        let mut item_puller = iter.item_puller();
41
42        loop {
43            let chunk_size = self.next_chunk_size(shared_state, iter);
44
45            self.begin_chunk(chunk_size);
46
47            match chunk_size {
48                0 | 1 => match item_puller.next() {
49                    Some(value) => task.f1(value),
50                    None => break,
51                },
52                c => {
53                    if c > chunk_puller.chunk_size() {
54                        chunk_puller = iter.chunk_puller(c);
55                    }
56
57                    match chunk_puller.pull() {
58                        Some(chunk) => task.fc(chunk),
59                        None => break,
60                    }
61                }
62            }
63
64            self.complete_chunk(shared_state, chunk_size);
65        }
66
67        self.complete_task(shared_state);
68    }
69
70    fn run_with_idx<I, T>(mut self, iter: &I, shared_state: &Self::SharedState, task: &T)
71    where
72        I: ConcurrentIter,
73        T: ParallelTaskWithIdx<Item = I::Item>,
74    {
75        let mut chunk_puller = iter.chunk_puller(0);
76        let mut item_puller = iter.item_puller_with_idx();
77
78        loop {
79            let chunk_size = self.next_chunk_size(shared_state, iter);
80
81            self.begin_chunk(chunk_size);
82
83            match chunk_size {
84                0 | 1 => match item_puller.next() {
85                    Some((idx, value)) => task.f1(idx, value),
86                    None => break,
87                },
88                c => {
89                    if c > chunk_puller.chunk_size() {
90                        chunk_puller = iter.chunk_puller(c);
91                    }
92
93                    match chunk_puller.pull_with_idx() {
94                        Some((begin_idx, chunk)) => task.fc(begin_idx, chunk),
95                        None => break,
96                    }
97                }
98            }
99
100            self.complete_chunk(shared_state, chunk_size);
101        }
102
103        self.complete_task(shared_state);
104    }
105
106    // collect
107
108    fn x_collect_with_idx<I, Vo, M1>(
109        mut self,
110        iter: &I,
111        shared_state: &Self::SharedState,
112        map1: &M1,
113    ) -> Vec<(usize, Vo::Item)>
114    where
115        I: ConcurrentIter,
116        Vo: Values,
117        Vo::Item: Send + Sync,
118        M1: Fn(I::Item) -> Vo + Send + Sync,
119    {
120        let mut collected = Vec::new();
121        let out_vec = &mut collected;
122
123        let mut chunk_puller = iter.chunk_puller(0);
124        let mut item_puller = iter.item_puller_with_idx();
125
126        loop {
127            let chunk_size = self.next_chunk_size(shared_state, iter);
128
129            self.begin_chunk(chunk_size);
130
131            match chunk_size {
132                0 | 1 => match item_puller.next() {
133                    Some((idx, i)) => {
134                        let vo = map1(i);
135                        vo.push_to_vec_with_idx(idx, out_vec);
136                    }
137                    None => break,
138                },
139                c => {
140                    if c > chunk_puller.chunk_size() {
141                        chunk_puller = iter.chunk_puller(c);
142                    }
143
144                    match chunk_puller.pull_with_idx() {
145                        Some((chunk_begin_idx, chunk)) => {
146                            for i in chunk {
147                                let vo = map1(i);
148                                vo.push_to_vec_with_idx(chunk_begin_idx, out_vec);
149                            }
150                        }
151                        None => break,
152                    }
153                }
154            }
155
156            self.complete_chunk(shared_state, chunk_size);
157        }
158
159        self.complete_task(shared_state);
160        collected
161    }
162
163    fn xfx_collect_with_idx<I, Vt, Vo, M1, F, M2>(
164        mut self,
165        iter: &I,
166        shared_state: &Self::SharedState,
167        map1: &M1,
168        filter: &F,
169        map2: &M2,
170    ) -> Vec<(usize, Vo::Item)>
171    where
172        I: ConcurrentIter,
173        Vt: Values,
174        Vo: Values,
175        Vo::Item: Send + Sync,
176        M1: Fn(I::Item) -> Vt + Send + Sync,
177        F: Fn(&Vt::Item) -> bool + Send + Sync,
178        M2: Fn(Vt::Item) -> Vo + Send + Sync,
179    {
180        let mut collected = Vec::new();
181        let out_vec = &mut collected;
182
183        let mut chunk_puller = iter.chunk_puller(0);
184        let mut item_puller = iter.item_puller_with_idx();
185
186        loop {
187            let chunk_size = self.next_chunk_size(shared_state, iter);
188
189            self.begin_chunk(chunk_size);
190
191            match chunk_size {
192                0 | 1 => match item_puller.next() {
193                    Some((i_idx, i)) => {
194                        let vt = map1(i);
195                        vt.xfx_collect_heap(i_idx, filter, map2, out_vec);
196                    }
197                    None => break,
198                },
199                c => {
200                    if c > chunk_puller.chunk_size() {
201                        chunk_puller = iter.chunk_puller(c);
202                    }
203
204                    match chunk_puller.pull_with_idx() {
205                        Some((chunk_begin_idx, chunk)) => {
206                            for i in chunk {
207                                let vt = map1(i);
208                                vt.xfx_collect_heap(chunk_begin_idx, filter, map2, out_vec);
209                            }
210                        }
211                        None => break,
212                    }
213                }
214            }
215
216            self.complete_chunk(shared_state, chunk_size);
217        }
218
219        self.complete_task(shared_state);
220        collected
221    }
222
223    // reduce
224
225    fn x_reduce<I, Vo, M1, X>(
226        mut self,
227        iter: &I,
228        shared_state: &Self::SharedState,
229        map1: &M1,
230        reduce: &X,
231    ) -> Option<Vo::Item>
232    where
233        I: ConcurrentIter,
234        Vo: Values,
235        Vo::Item: Send + Sync,
236        M1: Fn(I::Item) -> Vo + Send + Sync,
237        X: Fn(Vo::Item, Vo::Item) -> Vo::Item + Send + Sync,
238    {
239        let mut chunk_puller = iter.chunk_puller(0);
240        let mut item_puller = iter.item_puller();
241
242        let mut acc = None;
243        loop {
244            let chunk_size = self.next_chunk_size(shared_state, iter);
245
246            self.begin_chunk(chunk_size);
247
248            match chunk_size {
249                0 | 1 => match item_puller.next() {
250                    Some(i) => {
251                        let vo = map1(i);
252                        acc = vo.acc_reduce(acc, reduce);
253                    }
254                    None => break,
255                },
256                c => {
257                    if c > chunk_puller.chunk_size() {
258                        chunk_puller = iter.chunk_puller(c);
259                    }
260
261                    match chunk_puller.pull() {
262                        Some(chunk) => {
263                            for i in chunk {
264                                let vo = map1(i);
265                                acc = vo.acc_reduce(acc, reduce);
266                            }
267                        }
268                        None => break,
269                    }
270                }
271            }
272
273            self.complete_chunk(shared_state, chunk_size);
274        }
275
276        self.complete_task(shared_state);
277        acc
278    }
279
280    fn xfx_reduce<I, Vt, Vo, M1, F, M2, X>(
281        mut self,
282        iter: &I,
283        shared_state: &Self::SharedState,
284        map1: &M1,
285        filter: &F,
286        map2: &M2,
287        reduce: &X,
288    ) -> Option<Vo::Item>
289    where
290        I: ConcurrentIter,
291        Vt: Values,
292        Vo: Values,
293        Vo::Item: Send + Sync,
294        M1: Fn(I::Item) -> Vt + Send + Sync,
295        F: Fn(&Vt::Item) -> bool + Send + Sync,
296        M2: Fn(Vt::Item) -> Vo + Send + Sync,
297        X: Fn(Vo::Item, Vo::Item) -> Vo::Item + Send + Sync,
298    {
299        let mut chunk_puller = iter.chunk_puller(0);
300        let mut item_puller = iter.item_puller();
301
302        let mut acc = None;
303
304        loop {
305            let chunk_size = self.next_chunk_size(shared_state, iter);
306
307            self.begin_chunk(chunk_size);
308
309            match chunk_size {
310                0 | 1 => match item_puller.next() {
311                    Some(i) => {
312                        let vt = map1(i);
313                        acc = vt.fx_reduce(acc, filter, map2, reduce);
314                    }
315                    None => break,
316                },
317                c => {
318                    if c > chunk_puller.chunk_size() {
319                        chunk_puller = iter.chunk_puller(c);
320                    }
321
322                    match chunk_puller.pull() {
323                        Some(chunk) => {
324                            for i in chunk {
325                                let vt = map1(i);
326                                acc = vt.fx_reduce(acc, filter, map2, reduce);
327                            }
328                        }
329                        None => break,
330                    }
331                }
332            }
333
334            self.complete_chunk(shared_state, chunk_size);
335        }
336
337        self.complete_task(shared_state);
338        acc
339    }
340
341    // next
342
343    fn xfx_next<I, Vt, Vo, M1, F, M2>(
344        mut self,
345        iter: &I,
346        shared_state: &Self::SharedState,
347        map1: &M1,
348        filter: &F,
349        map2: &M2,
350    ) -> Option<(usize, Vo::Item)>
351    where
352        I: ConcurrentIter,
353        Vt: Values,
354        Vo: Values,
355        Vo::Item: Send + Sync,
356        M1: Fn(I::Item) -> Vt + Send + Sync,
357        F: Fn(&Vt::Item) -> bool + Send + Sync,
358        M2: Fn(Vt::Item) -> Vo + Send + Sync,
359    {
360        let mut chunk_puller = iter.chunk_puller(0);
361        let mut item_puller = iter.item_puller_with_idx();
362
363        loop {
364            let chunk_size = self.next_chunk_size(shared_state, iter);
365
366            self.begin_chunk(chunk_size);
367
368            match chunk_size {
369                0 | 1 => match item_puller.next() {
370                    Some((idx, i)) => {
371                        let vt = map1(i);
372                        if let Some(first) = vt.fx_next(filter, map2) {
373                            iter.skip_to_end();
374                            self.complete_chunk(shared_state, chunk_size);
375                            self.complete_task(shared_state);
376                            return Some((idx, first));
377                        }
378                    }
379                    None => break,
380                },
381                c => {
382                    if c > chunk_puller.chunk_size() {
383                        chunk_puller = iter.chunk_puller(c);
384                    }
385
386                    match chunk_puller.pull_with_idx() {
387                        Some((idx, chunk)) => {
388                            for i in chunk {
389                                let vt = map1(i);
390                                if let Some(first) = vt.fx_next(filter, map2) {
391                                    iter.skip_to_end();
392                                    self.complete_chunk(shared_state, chunk_size);
393                                    self.complete_task(shared_state);
394                                    return Some((idx, first));
395                                }
396                            }
397                        }
398                        None => break,
399                    }
400                }
401            }
402
403            self.complete_chunk(shared_state, chunk_size);
404        }
405
406        self.complete_task(shared_state);
407        None
408    }
409
410    fn xfx_next_any<I, Vt, Vo, M1, F, M2>(
411        mut self,
412        iter: &I,
413        shared_state: &Self::SharedState,
414        map1: &M1,
415        filter: &F,
416        map2: &M2,
417    ) -> Option<Vo::Item>
418    where
419        I: ConcurrentIter,
420        Vt: Values,
421        Vo: Values,
422        Vo::Item: Send + Sync,
423        M1: Fn(I::Item) -> Vt + Send + Sync,
424        F: Fn(&Vt::Item) -> bool + Send + Sync,
425        M2: Fn(Vt::Item) -> Vo + Send + Sync,
426    {
427        let mut chunk_puller = iter.chunk_puller(0);
428        let mut item_puller = iter.item_puller();
429
430        loop {
431            let chunk_size = self.next_chunk_size(shared_state, iter);
432
433            self.begin_chunk(chunk_size);
434
435            match chunk_size {
436                0 | 1 => match item_puller.next() {
437                    Some(i) => {
438                        let vt = map1(i);
439                        let maybe_next = vt.fx_next(filter, map2);
440                        if maybe_next.is_some() {
441                            iter.skip_to_end();
442                            self.complete_chunk(shared_state, chunk_size);
443                            self.complete_task(shared_state);
444                            return maybe_next;
445                        }
446                    }
447                    None => break,
448                },
449                c => {
450                    if c > chunk_puller.chunk_size() {
451                        chunk_puller = iter.chunk_puller(c);
452                    }
453
454                    match chunk_puller.pull() {
455                        Some(chunk) => {
456                            for i in chunk {
457                                let vt = map1(i);
458                                let maybe_next = vt.fx_next(filter, map2);
459                                if maybe_next.is_some() {
460                                    iter.skip_to_end();
461                                    self.complete_chunk(shared_state, chunk_size);
462                                    self.complete_task(shared_state);
463                                    return maybe_next;
464                                }
465                            }
466                        }
467                        None => break,
468                    }
469                }
470            }
471
472            self.complete_chunk(shared_state, chunk_size);
473        }
474
475        self.complete_task(shared_state);
476        None
477    }
478}
479
480impl<X: ThreadRunner> ThreadRunnerCompute for X {}