orx_parallel/computations/map/
reduce.rs1use super::m::M;
2use crate::computations::Atom;
3use crate::runner::{ComputationKind, ParallelRunner, ParallelRunnerCompute};
4use orx_concurrent_iter::ConcurrentIter;
5
6impl<I, O, M1> M<I, O, M1>
7where
8 I: ConcurrentIter,
9 O: Send + Sync,
10 M1: Fn(I::Item) -> O + Send + Sync,
11{
12 pub fn reduce<R, X>(self, reduce: X) -> (usize, Option<O>)
13 where
14 R: ParallelRunner,
15 X: Fn(O, O) -> O + Send + Sync,
16 {
17 MReduce::compute::<R>(self, reduce)
18 }
19}
20
21pub struct MReduce<I, O, M1, X>
22where
23 I: ConcurrentIter,
24 O: Send + Sync,
25 M1: Fn(I::Item) -> O + Send + Sync,
26 X: Fn(O, O) -> O + Send + Sync,
27{
28 m: M<I, O, M1>,
29 reduce: X,
30}
31
32impl<I, O, M1, X> MReduce<I, O, M1, X>
33where
34 I: ConcurrentIter,
35 O: Send + Sync,
36 M1: Fn(I::Item) -> O + Send + Sync,
37 X: Fn(O, O) -> O + Send + Sync,
38{
39 pub fn compute<R: ParallelRunner>(m: M<I, O, M1>, reduce: X) -> (usize, Option<O>) {
40 let x = Self { m, reduce };
41 let p = x.m.params();
42 match p.is_sequential() {
43 true => (0, x.sequential()),
44 false => x.parallel::<R>(),
45 }
46 }
47
48 fn sequential(self) -> Option<O> {
49 let (m, reduce) = (self.m, self.reduce);
50 let (_, iter, map1) = m.destruct();
51 iter.into_seq_iter().map(map1).reduce(reduce)
52 }
53
54 fn parallel<R: ParallelRunner>(self) -> (usize, Option<O>) {
55 let (m, reduce) = (self.m, self.reduce);
56 let (params, iter, map1) = m.destruct();
57
58 let runner = R::new(ComputationKind::Reduce, params, iter.try_get_len());
59 let xap1 = |i: I::Item| Atom(map1(i));
60 runner.x_reduce(&iter, &xap1, &reduce)
61 }
62}