orx_parallel/computational_variants/fallible_result/
xap_result.rs1use crate::computational_variants::ParXap;
2use crate::executor::parallel_compute as prc;
3use crate::generic_values::TransformableValues;
4use crate::generic_values::runner_results::Infallible;
5use crate::par_iter_result::{IntoResult, ParIterResult};
6use crate::runner::{DefaultRunner, ParallelRunner};
7use crate::{IterationOrder, ParCollectInto, Params};
8use core::marker::PhantomData;
9use orx_concurrent_iter::ConcurrentIter;
10
11pub struct ParXapResult<I, T, E, Vo, X1, R = DefaultRunner>
14where
15 R: ParallelRunner,
16 I: ConcurrentIter,
17 Vo: TransformableValues,
18 Vo::Item: IntoResult<T, E>,
19 X1: Fn(I::Item) -> Vo + Sync,
20{
21 orchestrator: R,
22 params: Params,
23 iter: I,
24 xap1: X1,
25 phantom: PhantomData<(T, E)>,
26}
27
28impl<I, T, E, Vo, X1, R> ParXapResult<I, T, E, Vo, X1, R>
29where
30 R: ParallelRunner,
31 I: ConcurrentIter,
32 Vo: TransformableValues,
33 Vo::Item: IntoResult<T, E>,
34 X1: Fn(I::Item) -> Vo + Sync,
35{
36 pub(crate) fn new(orchestrator: R, params: Params, iter: I, xap1: X1) -> Self {
37 Self {
38 orchestrator,
39 params,
40 iter,
41 xap1,
42 phantom: PhantomData,
43 }
44 }
45
46 fn destruct(self) -> (R, Params, I, X1) {
47 (self.orchestrator, self.params, self.iter, self.xap1)
48 }
49}
50
51impl<I, T, E, Vo, X1, R> ParIterResult<R> for ParXapResult<I, T, E, Vo, X1, R>
52where
53 R: ParallelRunner,
54 I: ConcurrentIter,
55 Vo: TransformableValues<Fallibility = Infallible>,
56 Vo::Item: IntoResult<T, E>,
57 X1: Fn(I::Item) -> Vo + Sync,
58{
59 type Item = T;
60
61 type Err = E;
62
63 type RegularItem = Vo::Item;
64
65 type RegularParIter = ParXap<I, Vo, X1, R>;
66
67 fn con_iter_len(&self) -> Option<usize> {
68 self.iter.try_get_len()
69 }
70
71 fn into_regular_par(self) -> Self::RegularParIter {
72 let (orchestrator, params, iter, x1) = self.destruct();
73 ParXap::new(orchestrator, params, iter, x1)
74 }
75
76 fn from_regular_par(regular_par: Self::RegularParIter) -> Self {
77 let (orchestrator, params, iter, x1) = regular_par.destruct();
78 Self::new(orchestrator, params, iter, x1)
79 }
80
81 fn with_runner<Q: ParallelRunner>(
84 self,
85 orchestrator: Q,
86 ) -> impl ParIterResult<Q, Item = Self::Item, Err = Self::Err> {
87 let (_, params, iter, x1) = self.destruct();
88 ParXapResult::new(orchestrator, params, iter, x1)
89 }
90
91 fn collect_into<C>(self, output: C) -> Result<C, Self::Err>
94 where
95 C: ParCollectInto<Self::Item>,
96 Self::Item: Send,
97 Self::Err: Send,
98 {
99 let (orchestrator, params, iter, x1) = self.destruct();
100 let x1 = |i: I::Item| x1(i).map_while_ok(|x| x.into_result());
101 output.x_try_collect_into(orchestrator, params, iter, x1)
102 }
103
104 fn reduce<Reduce>(self, reduce: Reduce) -> Result<Option<Self::Item>, Self::Err>
107 where
108 Self::Item: Send,
109 Self::Err: Send,
110 Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Sync,
111 {
112 let (orchestrator, params, iter, x1) = self.destruct();
113 let x1 = |i: I::Item| x1(i).map_while_ok(|x| x.into_result());
114 prc::reduce::x(orchestrator, params, iter, x1, reduce).1
115 }
116
117 fn first(self) -> Result<Option<Self::Item>, Self::Err>
120 where
121 Self::Item: Send,
122 Self::Err: Send,
123 {
124 let (orchestrator, params, iter, x1) = self.destruct();
125 let x1 = |i: I::Item| x1(i).map_while_ok(|x| x.into_result());
126 match params.iteration_order {
127 IterationOrder::Ordered => {
128 let (_, result) = prc::next::x(orchestrator, params, iter, x1);
129 result.map(|x| x.map(|y| y.1))
130 }
131 IterationOrder::Arbitrary => {
132 let (_, result) = prc::next_any::x(orchestrator, params, iter, x1);
133 result
134 }
135 }
136 }
137}