orx_parallel/computations/map/
reduce.rs

1use super::m::M;
2use crate::computations::Atom;
3use crate::runner::{ComputationKind, ParallelRunner, ParallelRunnerCompute};
4use orx_concurrent_iter::ConcurrentIter;
5
6impl<I, O, M1> M<I, O, M1>
7where
8    I: ConcurrentIter,
9    O: Send + Sync,
10    M1: Fn(I::Item) -> O + Send + Sync,
11{
12    pub fn reduce<R, X>(self, reduce: X) -> (usize, Option<O>)
13    where
14        R: ParallelRunner,
15        X: Fn(O, O) -> O + Send + Sync,
16    {
17        MReduce::compute::<R>(self, reduce)
18    }
19}
20
21pub struct MReduce<I, O, M1, X>
22where
23    I: ConcurrentIter,
24    O: Send + Sync,
25    M1: Fn(I::Item) -> O + Send + Sync,
26    X: Fn(O, O) -> O + Send + Sync,
27{
28    m: M<I, O, M1>,
29    reduce: X,
30}
31
32impl<I, O, M1, X> MReduce<I, O, M1, X>
33where
34    I: ConcurrentIter,
35    O: Send + Sync,
36    M1: Fn(I::Item) -> O + Send + Sync,
37    X: Fn(O, O) -> O + Send + Sync,
38{
39    pub fn compute<R: ParallelRunner>(m: M<I, O, M1>, reduce: X) -> (usize, Option<O>) {
40        let x = Self { m, reduce };
41        let p = x.m.params();
42        match p.is_sequential() {
43            true => (0, x.sequential()),
44            false => x.parallel::<R>(),
45        }
46    }
47
48    fn sequential(self) -> Option<O> {
49        let (m, reduce) = (self.m, self.reduce);
50        let (_, iter, map1) = m.destruct();
51        iter.into_seq_iter().map(map1).reduce(reduce)
52    }
53
54    fn parallel<R: ParallelRunner>(self) -> (usize, Option<O>) {
55        let (m, reduce) = (self.m, self.reduce);
56        let (params, iter, map1) = m.destruct();
57
58        let runner = R::new(ComputationKind::Reduce, params, iter.try_get_len());
59        let xap1 = |i: I::Item| Atom(map1(i));
60        runner.x_reduce(&iter, &xap1, &reduce)
61    }
62}