orx_parallel/computational_variants/fallible_result/
map_result.rs1use crate::computational_variants::ParMap;
2use crate::executor::parallel_compute as prc;
3use crate::par_iter_result::{IntoResult, ParIterResult};
4use crate::runner::{DefaultRunner, ParallelRunner};
5use crate::{IterationOrder, ParCollectInto, ParIter};
6use core::marker::PhantomData;
7use orx_concurrent_iter::ConcurrentIter;
8
9pub struct ParMapResult<I, T, E, O, M1, R = DefaultRunner>
12where
13 R: ParallelRunner,
14 I: ConcurrentIter,
15 O: IntoResult<T, E>,
16 M1: Fn(I::Item) -> O + Sync,
17{
18 par: ParMap<I, O, M1, R>,
19 phantom: PhantomData<(T, E)>,
20}
21
22impl<I, T, E, O, M1, R> ParMapResult<I, T, E, O, M1, R>
23where
24 R: ParallelRunner,
25 I: ConcurrentIter,
26 O: IntoResult<T, E>,
27 M1: Fn(I::Item) -> O + Sync,
28{
29 pub(crate) fn new(par: ParMap<I, O, M1, R>) -> Self {
30 Self {
31 par,
32 phantom: PhantomData,
33 }
34 }
35}
36
37impl<I, T, E, O, M1, R> ParIterResult<R> for ParMapResult<I, T, E, O, M1, R>
38where
39 R: ParallelRunner,
40 I: ConcurrentIter,
41 O: IntoResult<T, E>,
42 M1: Fn(I::Item) -> O + Sync,
43{
44 type Item = T;
45
46 type Err = E;
47
48 type RegularItem = O;
49
50 type RegularParIter = ParMap<I, O, M1, R>;
51
52 fn con_iter_len(&self) -> Option<usize> {
53 self.par.con_iter().try_get_len()
54 }
55
56 fn into_regular_par(self) -> Self::RegularParIter {
57 self.par
58 }
59
60 fn from_regular_par(regular_par: Self::RegularParIter) -> Self {
61 Self {
62 par: regular_par,
63 phantom: PhantomData,
64 }
65 }
66
67 fn with_runner<Q: ParallelRunner>(
70 self,
71 orchestrator: Q,
72 ) -> impl ParIterResult<Q, Item = Self::Item, Err = Self::Err> {
73 let (_, params, iter, m1) = self.par.destruct();
74 ParMapResult {
75 par: ParMap::new(orchestrator, params, iter, m1),
76 phantom: PhantomData,
77 }
78 }
79
80 fn collect_into<C>(self, output: C) -> Result<C, Self::Err>
83 where
84 C: ParCollectInto<Self::Item>,
85 Self::Item: Send,
86 Self::Err: Send,
87 {
88 let (orchestrator, params, iter, m1) = self.par.destruct();
89 let x1 = |i: I::Item| m1(i).into_result();
90 output.x_try_collect_into(orchestrator, params, iter, x1)
91 }
92
93 fn reduce<Reduce>(self, reduce: Reduce) -> Result<Option<Self::Item>, Self::Err>
96 where
97 Self::Item: Send,
98 Self::Err: Send,
99 Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Sync,
100 {
101 let (orchestrator, params, iter, m1) = self.par.destruct();
102 let x1 = |i: I::Item| m1(i).into_result();
103 prc::reduce::x(orchestrator, params, iter, x1, reduce).1
104 }
105
106 fn first(self) -> Result<Option<Self::Item>, Self::Err>
109 where
110 Self::Item: Send,
111 Self::Err: Send,
112 {
113 let (orchestrator, params, iter, m1) = self.par.destruct();
114 let x1 = |i: I::Item| m1(i).into_result();
115 match params.iteration_order {
116 IterationOrder::Ordered => {
117 let (_, result) = prc::next::x(orchestrator, params, iter, x1);
118 result.map(|x| x.map(|y| y.1))
119 }
120 IterationOrder::Arbitrary => {
121 let (_, result) = prc::next_any::x(orchestrator, params, iter, x1);
122 result
123 }
124 }
125 }
126}