orx_parallel/computations/xap/
reduce.rs

1use super::x::X;
2use crate::computations::Values;
3use crate::runner::{ComputationKind, ParallelRunner, ParallelRunnerCompute};
4use orx_concurrent_iter::ConcurrentIter;
5
6impl<I, Vo, M1> X<I, Vo, M1>
7where
8    I: ConcurrentIter,
9    Vo: Values + Send + Sync,
10    Vo::Item: Send + Sync,
11    M1: Fn(I::Item) -> Vo + Send + Sync,
12{
13    pub fn reduce<R, Red>(self, reduce: Red) -> (usize, Option<Vo::Item>)
14    where
15        R: ParallelRunner,
16        Red: Fn(Vo::Item, Vo::Item) -> Vo::Item + Send + Sync,
17    {
18        XReduce::compute::<R>(self, reduce)
19    }
20}
21
22pub struct XReduce<I, Vo, M1, Red>
23where
24    I: ConcurrentIter,
25    Vo: Values + Send + Sync,
26    Vo::Item: Send + Sync,
27    M1: Fn(I::Item) -> Vo + Send + Sync,
28    Red: Fn(Vo::Item, Vo::Item) -> Vo::Item + Send + Sync,
29{
30    x: X<I, Vo, M1>,
31    reduce: Red,
32}
33
34impl<I, Vo, M1, Red> XReduce<I, Vo, M1, Red>
35where
36    I: ConcurrentIter,
37    Vo: Values + Send + Sync,
38    Vo::Item: Send + Sync,
39    M1: Fn(I::Item) -> Vo + Send + Sync,
40    Red: Fn(Vo::Item, Vo::Item) -> Vo::Item + Send + Sync,
41{
42    pub fn compute<R: ParallelRunner>(x: X<I, Vo, M1>, reduce: Red) -> (usize, Option<Vo::Item>) {
43        let x = Self { x, reduce };
44        let p = x.x.params();
45        match p.is_sequential() {
46            true => (0, x.sequential()),
47            false => x.parallel::<R>(),
48        }
49    }
50
51    fn sequential(self) -> Option<Vo::Item> {
52        let (x, reduce) = (self.x, self.reduce);
53        let (_, iter, xap1) = x.destruct();
54        iter.into_seq_iter()
55            .filter_map(|x| xap1(x).acc_reduce(None, &reduce))
56            .reduce(&reduce)
57    }
58
59    fn parallel<R: ParallelRunner>(self) -> (usize, Option<Vo::Item>) {
60        let (x, reduce) = (self.x, self.reduce);
61        let (params, iter, xap1) = x.destruct();
62
63        let runner = R::new(ComputationKind::Reduce, params, iter.try_get_len());
64        runner.x_reduce(&iter, &xap1, &reduce)
65    }
66}