orx_parallel/computational_variants/fallible_result/
par_result.rs

1use crate::computational_variants::Par;
2use crate::executor::parallel_compute as prc;
3use crate::par_iter_result::{IntoResult, ParIterResult};
4use crate::runner::{DefaultRunner, ParallelRunner};
5use crate::{IterationOrder, ParCollectInto, ParIter};
6use core::marker::PhantomData;
7use orx_concurrent_iter::ConcurrentIter;
8
9/// A parallel iterator for which the computation either completely succeeds,
10/// or fails and **early exits** with an error.
11pub struct ParResult<I, T, E, R = DefaultRunner>
12where
13    R: ParallelRunner,
14    I: ConcurrentIter,
15    I::Item: IntoResult<T, E>,
16{
17    par: Par<I, R>,
18    phantom: PhantomData<(T, E)>,
19}
20
21impl<I, T, E, R> ParResult<I, T, E, R>
22where
23    R: ParallelRunner,
24    I: ConcurrentIter,
25    I::Item: IntoResult<T, E>,
26{
27    pub(crate) fn new(par: Par<I, R>) -> Self {
28        Self {
29            par,
30            phantom: PhantomData,
31        }
32    }
33}
34
35impl<I, T, E, R> ParIterResult<R> for ParResult<I, T, E, R>
36where
37    R: ParallelRunner,
38    I: ConcurrentIter,
39    I::Item: IntoResult<T, E>,
40{
41    type Item = T;
42
43    type Err = E;
44
45    type RegularItem = I::Item;
46
47    type RegularParIter = Par<I, R>;
48
49    fn con_iter_len(&self) -> Option<usize> {
50        self.par.con_iter().try_get_len()
51    }
52
53    fn into_regular_par(self) -> Self::RegularParIter {
54        self.par
55    }
56
57    fn from_regular_par(regular_par: Self::RegularParIter) -> Self {
58        Self {
59            par: regular_par,
60            phantom: PhantomData,
61        }
62    }
63
64    // params transformations
65
66    fn with_runner<Q: ParallelRunner>(
67        self,
68        orchestrator: Q,
69    ) -> impl ParIterResult<Q, Item = Self::Item, Err = Self::Err> {
70        let (_, params, iter) = self.par.destruct();
71        ParResult {
72            par: Par::new(orchestrator, params, iter),
73            phantom: PhantomData,
74        }
75    }
76
77    // collect
78
79    fn collect_into<C>(self, output: C) -> Result<C, Self::Err>
80    where
81        C: ParCollectInto<Self::Item>,
82        Self::Item: Send,
83        Self::Err: Send,
84    {
85        let (orchestrator, params, iter) = self.par.destruct();
86        let x1 = |i: I::Item| i.into_result();
87        output.x_try_collect_into(orchestrator, params, iter, x1)
88    }
89
90    // reduce
91
92    fn reduce<Reduce>(self, reduce: Reduce) -> Result<Option<Self::Item>, Self::Err>
93    where
94        Self::Item: Send,
95        Self::Err: Send,
96        Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Sync,
97    {
98        let (orchestrator, params, iter) = self.par.destruct();
99        let x1 = |i: I::Item| i.into_result();
100        prc::reduce::x(orchestrator, params, iter, x1, reduce).1
101    }
102
103    // early exit
104
105    fn first(self) -> Result<Option<Self::Item>, Self::Err>
106    where
107        Self::Item: Send,
108        Self::Err: Send,
109    {
110        let (orchestrator, params, iter) = self.par.destruct();
111        let x1 = |i: I::Item| i.into_result();
112        match params.iteration_order {
113            IterationOrder::Ordered => {
114                let (_, result) = prc::next::x(orchestrator, params, iter, x1);
115                result.map(|x| x.map(|y| y.1))
116            }
117            IterationOrder::Arbitrary => {
118                let (_, result) = prc::next_any::x(orchestrator, params, iter, x1);
119                result
120            }
121        }
122    }
123}