orx_parallel/computational_variants/fallible_result/
map_result.rs

1use crate::computational_variants::ParMap;
2use crate::executor::parallel_compute as prc;
3use crate::par_iter_result::{IntoResult, ParIterResult};
4use crate::runner::{DefaultRunner, ParallelRunner};
5use crate::{IterationOrder, ParCollectInto, ParIter};
6use core::marker::PhantomData;
7use orx_concurrent_iter::ConcurrentIter;
8
9/// A parallel iterator for which the computation either completely succeeds,
10/// or fails and **early exits** with an error.
11pub struct ParMapResult<I, T, E, O, M1, R = DefaultRunner>
12where
13    R: ParallelRunner,
14    I: ConcurrentIter,
15    O: IntoResult<T, E>,
16    M1: Fn(I::Item) -> O + Sync,
17{
18    par: ParMap<I, O, M1, R>,
19    phantom: PhantomData<(T, E)>,
20}
21
22impl<I, T, E, O, M1, R> ParMapResult<I, T, E, O, M1, R>
23where
24    R: ParallelRunner,
25    I: ConcurrentIter,
26    O: IntoResult<T, E>,
27    M1: Fn(I::Item) -> O + Sync,
28{
29    pub(crate) fn new(par: ParMap<I, O, M1, R>) -> Self {
30        Self {
31            par,
32            phantom: PhantomData,
33        }
34    }
35}
36
37impl<I, T, E, O, M1, R> ParIterResult<R> for ParMapResult<I, T, E, O, M1, R>
38where
39    R: ParallelRunner,
40    I: ConcurrentIter,
41    O: IntoResult<T, E>,
42    M1: Fn(I::Item) -> O + Sync,
43{
44    type Item = T;
45
46    type Err = E;
47
48    type RegularItem = O;
49
50    type RegularParIter = ParMap<I, O, M1, R>;
51
52    fn con_iter_len(&self) -> Option<usize> {
53        self.par.con_iter().try_get_len()
54    }
55
56    fn into_regular_par(self) -> Self::RegularParIter {
57        self.par
58    }
59
60    fn from_regular_par(regular_par: Self::RegularParIter) -> Self {
61        Self {
62            par: regular_par,
63            phantom: PhantomData,
64        }
65    }
66
67    // params transformations
68
69    fn with_runner<Q: ParallelRunner>(
70        self,
71        orchestrator: Q,
72    ) -> impl ParIterResult<Q, Item = Self::Item, Err = Self::Err> {
73        let (_, params, iter, m1) = self.par.destruct();
74        ParMapResult {
75            par: ParMap::new(orchestrator, params, iter, m1),
76            phantom: PhantomData,
77        }
78    }
79
80    // collect
81
82    fn collect_into<C>(self, output: C) -> Result<C, Self::Err>
83    where
84        C: ParCollectInto<Self::Item>,
85        Self::Item: Send,
86        Self::Err: Send,
87    {
88        let (orchestrator, params, iter, m1) = self.par.destruct();
89        let x1 = |i: I::Item| m1(i).into_result();
90        output.x_try_collect_into(orchestrator, params, iter, x1)
91    }
92
93    // reduce
94
95    fn reduce<Reduce>(self, reduce: Reduce) -> Result<Option<Self::Item>, Self::Err>
96    where
97        Self::Item: Send,
98        Self::Err: Send,
99        Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Sync,
100    {
101        let (orchestrator, params, iter, m1) = self.par.destruct();
102        let x1 = |i: I::Item| m1(i).into_result();
103        prc::reduce::x(orchestrator, params, iter, x1, reduce).1
104    }
105
106    // early exit
107
108    fn first(self) -> Result<Option<Self::Item>, Self::Err>
109    where
110        Self::Item: Send,
111        Self::Err: Send,
112    {
113        let (orchestrator, params, iter, m1) = self.par.destruct();
114        let x1 = |i: I::Item| m1(i).into_result();
115        match params.iteration_order {
116            IterationOrder::Ordered => {
117                let (_, result) = prc::next::x(orchestrator, params, iter, x1);
118                result.map(|x| x.map(|y| y.1))
119            }
120            IterationOrder::Arbitrary => {
121                let (_, result) = prc::next_any::x(orchestrator, params, iter, x1);
122                result
123            }
124        }
125    }
126}