orx_parallel/runner/
parallel_runner.rs

1use super::{
2    computation_kind::ComputationKind,
3    parallel_task::{ParallelTask, ParallelTaskWithIdx},
4    thread_runner::{ThreadRunner, ThreadRunnerCompute},
5};
6use crate::{computations::Values, parameters::Params};
7use orx_concurrent_iter::ConcurrentIter;
8
9/// A parallel runner which is responsible for taking a computation defined as a composition
10/// of iterator methods, spawns threads, shares tasks and returns the result of the parallel
11/// execution.
12pub trait ParallelRunner: Sized + Sync {
13    /// Data shared to the thread runners.
14    type SharedState: Send + Sync;
15
16    /// Thread runner that is responsible for executing the tasks allocated to a thread.
17    type ThreadRunner: ThreadRunner<SharedState = Self::SharedState>;
18
19    /// Creates a new parallel runner for the given computation `kind`, parallelization `params`
20    /// and `initial_input_len`.
21    fn new(kind: ComputationKind, params: Params, initial_input_len: Option<usize>) -> Self;
22
23    /// Creates an initial shared state.
24    fn new_shared_state(&self) -> Self::SharedState;
25
26    /// Returns true if it is beneficial to spawn a new thread provided that:
27    ///
28    /// * `num_spawned` threads are already been spawned, and
29    /// * `shared_state` is the current parallel execution state.
30    fn do_spawn_new<I>(
31        &self,
32        num_spawned: usize,
33        shared_state: &Self::SharedState,
34        iter: &I,
35    ) -> bool
36    where
37        I: ConcurrentIter;
38
39    /// Creates a new thread runner provided that the current parallel execution state is
40    /// `shared_state`.
41    fn new_thread_runner(&self, shared_state: &Self::SharedState) -> Self::ThreadRunner;
42}
43
44pub trait ParallelRunnerCompute: ParallelRunner {
45    // run
46
47    fn run<I, T>(&self, iter: &I, task: T) -> usize
48    where
49        I: ConcurrentIter,
50        T: ParallelTask<Item = I::Item> + Sync,
51    {
52        let state = self.new_shared_state();
53        let shared_state = &state;
54
55        let mut num_spawned = 0;
56        std::thread::scope(|s| {
57            while self.do_spawn_new(num_spawned, shared_state, iter) {
58                num_spawned += 1;
59                s.spawn(|| {
60                    let thread_runner = self.new_thread_runner(shared_state);
61                    thread_runner.run(iter, shared_state, &task);
62                });
63            }
64        });
65        num_spawned
66    }
67
68    fn run_with_idx<I, T>(&self, iter: &I, task: T) -> usize
69    where
70        I: ConcurrentIter,
71        T: ParallelTaskWithIdx<Item = I::Item> + Sync,
72    {
73        let state = self.new_shared_state();
74        let shared_state = &state;
75
76        let mut num_spawned = 0;
77        std::thread::scope(|s| {
78            while self.do_spawn_new(num_spawned, shared_state, iter) {
79                num_spawned += 1;
80                s.spawn(|| {
81                    let thread_runner = self.new_thread_runner(shared_state);
82                    thread_runner.run_with_idx(iter, shared_state, &task);
83                });
84            }
85        });
86        num_spawned
87    }
88
89    // collect
90
91    #[allow(clippy::type_complexity)]
92    fn x_collect_with_idx<I, Vo, M1>(
93        &self,
94        iter: &I,
95        map1: &M1,
96    ) -> (usize, Vec<Vec<(usize, Vo::Item)>>)
97    where
98        I: ConcurrentIter,
99        Vo: Values,
100        Vo::Item: Send + Sync,
101        M1: Fn(I::Item) -> Vo + Send + Sync,
102    {
103        let state = self.new_shared_state();
104        let shared_state = &state;
105
106        let mut num_spawned = 0;
107        let vectors = std::thread::scope(|s| {
108            let mut handles = vec![];
109
110            while self.do_spawn_new(num_spawned, shared_state, iter) {
111                num_spawned += 1;
112                handles.push(s.spawn(move || {
113                    let thread_runner = self.new_thread_runner(shared_state);
114                    thread_runner.x_collect_with_idx(iter, shared_state, map1)
115                }));
116            }
117
118            let mut vectors = Vec::with_capacity(handles.len());
119            for x in handles {
120                vectors.push(x.join().expect("failed to join the thread"));
121            }
122            vectors
123        });
124
125        (num_spawned, vectors)
126    }
127
128    #[allow(clippy::type_complexity)]
129    fn xfx_collect_with_idx<I, Vt, Vo, M1, F, M2>(
130        &self,
131        iter: &I,
132        map1: &M1,
133        filter: &F,
134        map2: &M2,
135    ) -> (usize, Vec<Vec<(usize, Vo::Item)>>)
136    where
137        I: ConcurrentIter,
138        Vt: Values,
139        Vo: Values,
140        Vo::Item: Send + Sync,
141        M1: Fn(I::Item) -> Vt + Send + Sync,
142        F: Fn(&Vt::Item) -> bool + Send + Sync,
143        M2: Fn(Vt::Item) -> Vo + Send + Sync,
144    {
145        let state = self.new_shared_state();
146        let shared_state = &state;
147
148        let mut num_spawned = 0;
149        let vectors = std::thread::scope(|s| {
150            let mut handles = vec![];
151
152            while self.do_spawn_new(num_spawned, shared_state, iter) {
153                num_spawned += 1;
154                handles.push(s.spawn(move || {
155                    let thread_runner = self.new_thread_runner(shared_state);
156                    thread_runner.xfx_collect_with_idx(iter, shared_state, map1, filter, map2)
157                }));
158            }
159
160            let mut vectors = Vec::with_capacity(handles.len());
161            for x in handles {
162                vectors.push(x.join().expect("failed to join the thread"));
163            }
164            vectors
165        });
166
167        (num_spawned, vectors)
168    }
169
170    // reduce
171
172    fn x_reduce<I, Vo, M1, X>(&self, iter: &I, map1: &M1, reduce: &X) -> (usize, Option<Vo::Item>)
173    where
174        I: ConcurrentIter,
175        Vo: Values,
176        Vo::Item: Send + Sync,
177        M1: Fn(I::Item) -> Vo + Send + Sync,
178        X: Fn(Vo::Item, Vo::Item) -> Vo::Item + Send + Sync,
179    {
180        let state = self.new_shared_state();
181        let shared_state = &state;
182
183        let mut num_spawned = 0;
184        let results = std::thread::scope(|s| {
185            let mut handles = vec![];
186
187            while self.do_spawn_new(num_spawned, shared_state, iter) {
188                num_spawned += 1;
189                handles.push(s.spawn(move || {
190                    let thread_runner = self.new_thread_runner(shared_state);
191                    thread_runner.x_reduce(iter, shared_state, map1, reduce)
192                }));
193            }
194
195            let mut results = Vec::with_capacity(handles.len());
196            for x in handles {
197                if let Some(x) = x.join().expect("failed to join the thread") {
198                    results.push(x);
199                }
200            }
201            results
202        });
203
204        let acc = results.into_iter().reduce(reduce);
205
206        (num_spawned, acc)
207    }
208
209    fn xfx_reduce<I, Vt, Vo, M1, F, M2, X>(
210        &self,
211        iter: &I,
212        map1: &M1,
213        filter: &F,
214        map2: &M2,
215        reduce: &X,
216    ) -> (usize, Option<Vo::Item>)
217    where
218        I: ConcurrentIter,
219        Vt: Values,
220        Vo: Values,
221        Vo::Item: Send + Sync,
222        M1: Fn(I::Item) -> Vt + Send + Sync,
223        F: Fn(&Vt::Item) -> bool + Send + Sync,
224        M2: Fn(Vt::Item) -> Vo + Send + Sync,
225        X: Fn(Vo::Item, Vo::Item) -> Vo::Item + Send + Sync,
226    {
227        let state = self.new_shared_state();
228        let shared_state = &state;
229
230        let mut num_spawned = 0;
231        let results = std::thread::scope(|s| {
232            let mut handles = vec![];
233
234            while self.do_spawn_new(num_spawned, shared_state, iter) {
235                num_spawned += 1;
236                handles.push(s.spawn(move || {
237                    let thread_runner = self.new_thread_runner(shared_state);
238                    thread_runner.xfx_reduce(iter, shared_state, map1, filter, map2, reduce)
239                }));
240            }
241
242            let mut results = Vec::with_capacity(handles.len());
243            for x in handles {
244                if let Some(x) = x.join().expect("failed to join the thread") {
245                    results.push(x);
246                }
247            }
248            results
249        });
250
251        let acc = results.into_iter().reduce(reduce);
252
253        (num_spawned, acc)
254    }
255
256    // next
257
258    fn xfx_next<I, Vt, Vo, M1, F, M2>(
259        &self,
260        iter: &I,
261        map1: &M1,
262        filter: &F,
263        map2: &M2,
264    ) -> (usize, Option<Vo::Item>)
265    where
266        I: ConcurrentIter,
267        Vt: Values,
268        Vo: Values,
269        Vo::Item: Send + Sync,
270        M1: Fn(I::Item) -> Vt + Send + Sync,
271        F: Fn(&Vt::Item) -> bool + Send + Sync,
272        M2: Fn(Vt::Item) -> Vo + Send + Sync,
273    {
274        let state = self.new_shared_state();
275        let shared_state = &state;
276
277        let mut num_spawned = 0;
278        let results = std::thread::scope(|s| {
279            let mut handles = vec![];
280
281            while self.do_spawn_new(num_spawned, shared_state, iter) {
282                num_spawned += 1;
283                handles.push(s.spawn(move || {
284                    let thread_runner = self.new_thread_runner(shared_state);
285                    thread_runner.xfx_next(iter, shared_state, map1, filter, map2)
286                }));
287            }
288
289            let mut results: Vec<(usize, Vo::Item)> = Vec::with_capacity(handles.len());
290            for x in handles {
291                if let Some(x) = x.join().expect("failed to join the thread") {
292                    results.push(x);
293                }
294            }
295            results
296        });
297
298        let acc = results.into_iter().min_by_key(|x| x.0).map(|x| x.1);
299
300        (num_spawned, acc)
301    }
302
303    fn xfx_next_any<I, Vt, Vo, M1, F, M2>(
304        &self,
305        iter: &I,
306        map1: &M1,
307        filter: &F,
308        map2: &M2,
309    ) -> (usize, Option<Vo::Item>)
310    where
311        I: ConcurrentIter,
312        Vt: Values,
313        Vo: Values,
314        Vo::Item: Send + Sync,
315        M1: Fn(I::Item) -> Vt + Send + Sync,
316        F: Fn(&Vt::Item) -> bool + Send + Sync,
317        M2: Fn(Vt::Item) -> Vo + Send + Sync,
318    {
319        let state = self.new_shared_state();
320        let shared_state = &state;
321
322        let mut num_spawned = 0;
323        let result = std::thread::scope(|s| {
324            let mut handles = vec![];
325
326            while self.do_spawn_new(num_spawned, shared_state, iter) {
327                num_spawned += 1;
328                handles.push(s.spawn(move || {
329                    let thread_runner = self.new_thread_runner(shared_state);
330                    thread_runner.xfx_next_any(iter, shared_state, map1, filter, map2)
331                }));
332            }
333
334            // do not wait to join other threads
335            handles
336                .into_iter()
337                .find_map(|x| x.join().expect("failed to join the thread"))
338        });
339
340        (num_spawned, result)
341    }
342}
343
344impl<X: ParallelRunner> ParallelRunnerCompute for X {}