orx_parallel/computational_variants/fallible_result/
par_result.rs1use crate::computational_variants::Par;
2use crate::executor::parallel_compute as prc;
3use crate::par_iter_result::{IntoResult, ParIterResult};
4use crate::runner::{DefaultRunner, ParallelRunner};
5use crate::{IterationOrder, ParCollectInto, ParIter};
6use core::marker::PhantomData;
7use orx_concurrent_iter::ConcurrentIter;
8
9pub struct ParResult<I, T, E, R = DefaultRunner>
12where
13 R: ParallelRunner,
14 I: ConcurrentIter,
15 I::Item: IntoResult<T, E>,
16{
17 par: Par<I, R>,
18 phantom: PhantomData<(T, E)>,
19}
20
21impl<I, T, E, R> ParResult<I, T, E, R>
22where
23 R: ParallelRunner,
24 I: ConcurrentIter,
25 I::Item: IntoResult<T, E>,
26{
27 pub(crate) fn new(par: Par<I, R>) -> Self {
28 Self {
29 par,
30 phantom: PhantomData,
31 }
32 }
33}
34
35impl<I, T, E, R> ParIterResult<R> for ParResult<I, T, E, R>
36where
37 R: ParallelRunner,
38 I: ConcurrentIter,
39 I::Item: IntoResult<T, E>,
40{
41 type Item = T;
42
43 type Err = E;
44
45 type RegularItem = I::Item;
46
47 type RegularParIter = Par<I, R>;
48
49 fn con_iter_len(&self) -> Option<usize> {
50 self.par.con_iter().try_get_len()
51 }
52
53 fn into_regular_par(self) -> Self::RegularParIter {
54 self.par
55 }
56
57 fn from_regular_par(regular_par: Self::RegularParIter) -> Self {
58 Self {
59 par: regular_par,
60 phantom: PhantomData,
61 }
62 }
63
64 fn with_runner<Q: ParallelRunner>(
67 self,
68 orchestrator: Q,
69 ) -> impl ParIterResult<Q, Item = Self::Item, Err = Self::Err> {
70 let (_, params, iter) = self.par.destruct();
71 ParResult {
72 par: Par::new(orchestrator, params, iter),
73 phantom: PhantomData,
74 }
75 }
76
77 fn collect_into<C>(self, output: C) -> Result<C, Self::Err>
80 where
81 C: ParCollectInto<Self::Item>,
82 Self::Item: Send,
83 Self::Err: Send,
84 {
85 let (orchestrator, params, iter) = self.par.destruct();
86 let x1 = |i: I::Item| i.into_result();
87 output.x_try_collect_into(orchestrator, params, iter, x1)
88 }
89
90 fn reduce<Reduce>(self, reduce: Reduce) -> Result<Option<Self::Item>, Self::Err>
93 where
94 Self::Item: Send,
95 Self::Err: Send,
96 Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Sync,
97 {
98 let (orchestrator, params, iter) = self.par.destruct();
99 let x1 = |i: I::Item| i.into_result();
100 prc::reduce::x(orchestrator, params, iter, x1, reduce).1
101 }
102
103 fn first(self) -> Result<Option<Self::Item>, Self::Err>
106 where
107 Self::Item: Send,
108 Self::Err: Send,
109 {
110 let (orchestrator, params, iter) = self.par.destruct();
111 let x1 = |i: I::Item| i.into_result();
112 match params.iteration_order {
113 IterationOrder::Ordered => {
114 let (_, result) = prc::next::x(orchestrator, params, iter, x1);
115 result.map(|x| x.map(|y| y.1))
116 }
117 IterationOrder::Arbitrary => {
118 let (_, result) = prc::next_any::x(orchestrator, params, iter, x1);
119 result
120 }
121 }
122 }
123}