orx_parallel/computational_variants/fallible_result/
xap_result.rs

1use crate::computational_variants::ParXap;
2use crate::executor::parallel_compute as prc;
3use crate::generic_values::TransformableValues;
4use crate::generic_values::runner_results::Infallible;
5use crate::par_iter_result::{IntoResult, ParIterResult};
6use crate::runner::{DefaultRunner, ParallelRunner};
7use crate::{IterationOrder, ParCollectInto, Params};
8use core::marker::PhantomData;
9use orx_concurrent_iter::ConcurrentIter;
10
11/// A parallel iterator for which the computation either completely succeeds,
12/// or fails and **early exits** with an error.
13pub struct ParXapResult<I, T, E, Vo, X1, R = DefaultRunner>
14where
15    R: ParallelRunner,
16    I: ConcurrentIter,
17    Vo: TransformableValues,
18    Vo::Item: IntoResult<T, E>,
19    X1: Fn(I::Item) -> Vo + Sync,
20{
21    orchestrator: R,
22    params: Params,
23    iter: I,
24    xap1: X1,
25    phantom: PhantomData<(T, E)>,
26}
27
28impl<I, T, E, Vo, X1, R> ParXapResult<I, T, E, Vo, X1, R>
29where
30    R: ParallelRunner,
31    I: ConcurrentIter,
32    Vo: TransformableValues,
33    Vo::Item: IntoResult<T, E>,
34    X1: Fn(I::Item) -> Vo + Sync,
35{
36    pub(crate) fn new(orchestrator: R, params: Params, iter: I, xap1: X1) -> Self {
37        Self {
38            orchestrator,
39            params,
40            iter,
41            xap1,
42            phantom: PhantomData,
43        }
44    }
45
46    fn destruct(self) -> (R, Params, I, X1) {
47        (self.orchestrator, self.params, self.iter, self.xap1)
48    }
49}
50
51impl<I, T, E, Vo, X1, R> ParIterResult<R> for ParXapResult<I, T, E, Vo, X1, R>
52where
53    R: ParallelRunner,
54    I: ConcurrentIter,
55    Vo: TransformableValues<Fallibility = Infallible>,
56    Vo::Item: IntoResult<T, E>,
57    X1: Fn(I::Item) -> Vo + Sync,
58{
59    type Item = T;
60
61    type Err = E;
62
63    type RegularItem = Vo::Item;
64
65    type RegularParIter = ParXap<I, Vo, X1, R>;
66
67    fn con_iter_len(&self) -> Option<usize> {
68        self.iter.try_get_len()
69    }
70
71    fn into_regular_par(self) -> Self::RegularParIter {
72        let (orchestrator, params, iter, x1) = self.destruct();
73        ParXap::new(orchestrator, params, iter, x1)
74    }
75
76    fn from_regular_par(regular_par: Self::RegularParIter) -> Self {
77        let (orchestrator, params, iter, x1) = regular_par.destruct();
78        Self::new(orchestrator, params, iter, x1)
79    }
80
81    // params transformations
82
83    fn with_runner<Q: ParallelRunner>(
84        self,
85        orchestrator: Q,
86    ) -> impl ParIterResult<Q, Item = Self::Item, Err = Self::Err> {
87        let (_, params, iter, x1) = self.destruct();
88        ParXapResult::new(orchestrator, params, iter, x1)
89    }
90
91    // collect
92
93    fn collect_into<C>(self, output: C) -> Result<C, Self::Err>
94    where
95        C: ParCollectInto<Self::Item>,
96        Self::Item: Send,
97        Self::Err: Send,
98    {
99        let (orchestrator, params, iter, x1) = self.destruct();
100        let x1 = |i: I::Item| x1(i).map_while_ok(|x| x.into_result());
101        output.x_try_collect_into(orchestrator, params, iter, x1)
102    }
103
104    // reduce
105
106    fn reduce<Reduce>(self, reduce: Reduce) -> Result<Option<Self::Item>, Self::Err>
107    where
108        Self::Item: Send,
109        Self::Err: Send,
110        Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Sync,
111    {
112        let (orchestrator, params, iter, x1) = self.destruct();
113        let x1 = |i: I::Item| x1(i).map_while_ok(|x| x.into_result());
114        prc::reduce::x(orchestrator, params, iter, x1, reduce).1
115    }
116
117    // early exit
118
119    fn first(self) -> Result<Option<Self::Item>, Self::Err>
120    where
121        Self::Item: Send,
122        Self::Err: Send,
123    {
124        let (orchestrator, params, iter, x1) = self.destruct();
125        let x1 = |i: I::Item| x1(i).map_while_ok(|x| x.into_result());
126        match params.iteration_order {
127            IterationOrder::Ordered => {
128                let (_, result) = prc::next::x(orchestrator, params, iter, x1);
129                result.map(|x| x.map(|y| y.1))
130            }
131            IterationOrder::Arbitrary => {
132                let (_, result) = prc::next_any::x(orchestrator, params, iter, x1);
133                result
134            }
135        }
136    }
137}