orx_parallel/computations/xap/
reduce.rs1use super::x::X;
2use crate::computations::Values;
3use crate::runner::{ComputationKind, ParallelRunner, ParallelRunnerCompute};
4use orx_concurrent_iter::ConcurrentIter;
5
6impl<I, Vo, M1> X<I, Vo, M1>
7where
8 I: ConcurrentIter,
9 Vo: Values + Send + Sync,
10 Vo::Item: Send + Sync,
11 M1: Fn(I::Item) -> Vo + Send + Sync,
12{
13 pub fn reduce<R, Red>(self, reduce: Red) -> (usize, Option<Vo::Item>)
14 where
15 R: ParallelRunner,
16 Red: Fn(Vo::Item, Vo::Item) -> Vo::Item + Send + Sync,
17 {
18 XReduce::compute::<R>(self, reduce)
19 }
20}
21
22pub struct XReduce<I, Vo, M1, Red>
23where
24 I: ConcurrentIter,
25 Vo: Values + Send + Sync,
26 Vo::Item: Send + Sync,
27 M1: Fn(I::Item) -> Vo + Send + Sync,
28 Red: Fn(Vo::Item, Vo::Item) -> Vo::Item + Send + Sync,
29{
30 x: X<I, Vo, M1>,
31 reduce: Red,
32}
33
34impl<I, Vo, M1, Red> XReduce<I, Vo, M1, Red>
35where
36 I: ConcurrentIter,
37 Vo: Values + Send + Sync,
38 Vo::Item: Send + Sync,
39 M1: Fn(I::Item) -> Vo + Send + Sync,
40 Red: Fn(Vo::Item, Vo::Item) -> Vo::Item + Send + Sync,
41{
42 pub fn compute<R: ParallelRunner>(x: X<I, Vo, M1>, reduce: Red) -> (usize, Option<Vo::Item>) {
43 let x = Self { x, reduce };
44 let p = x.x.params();
45 match p.is_sequential() {
46 true => (0, x.sequential()),
47 false => x.parallel::<R>(),
48 }
49 }
50
51 fn sequential(self) -> Option<Vo::Item> {
52 let (x, reduce) = (self.x, self.reduce);
53 let (_, iter, xap1) = x.destruct();
54 iter.into_seq_iter()
55 .filter_map(|x| xap1(x).acc_reduce(None, &reduce))
56 .reduce(&reduce)
57 }
58
59 fn parallel<R: ParallelRunner>(self) -> (usize, Option<Vo::Item>) {
60 let (x, reduce) = (self.x, self.reduce);
61 let (params, iter, xap1) = x.destruct();
62
63 let runner = R::new(ComputationKind::Reduce, params, iter.try_get_len());
64 runner.x_reduce(&iter, &xap1, &reduce)
65 }
66}