1use crate::computational_variants::fallible_result::ParXapResult;
2use crate::executor::parallel_compute as prc;
3use crate::generic_values::TransformableValues;
4use crate::generic_values::runner_results::Infallible;
5use crate::par_iter_result::IntoResult;
6use crate::runner::{DefaultRunner, ParallelRunner};
7use crate::using::{UParXap, UsingClone, UsingFun};
8use crate::{ChunkSize, IterationOrder, NumThreads, ParCollectInto, ParIter, Params};
9use crate::{ParIterResult, ParIterUsing};
10use orx_concurrent_iter::ConcurrentIter;
11
12pub struct ParXap<I, Vo, X1, R = DefaultRunner>
16where
17 R: ParallelRunner,
18 I: ConcurrentIter,
19 Vo: TransformableValues<Fallibility = Infallible>,
20 X1: Fn(I::Item) -> Vo + Sync,
21{
22 orchestrator: R,
23 params: Params,
24 iter: I,
25 xap1: X1,
26}
27
28impl<I, Vo, X1, R> ParXap<I, Vo, X1, R>
29where
30 R: ParallelRunner,
31 I: ConcurrentIter,
32 Vo: TransformableValues<Fallibility = Infallible>,
33 X1: Fn(I::Item) -> Vo + Sync,
34{
35 pub(crate) fn new(orchestrator: R, params: Params, iter: I, xap1: X1) -> Self {
36 Self {
37 orchestrator,
38 params,
39 iter,
40 xap1,
41 }
42 }
43
44 pub(crate) fn destruct(self) -> (R, Params, I, X1) {
45 (self.orchestrator, self.params, self.iter, self.xap1)
46 }
47}
48
49unsafe impl<I, Vo, X1, R> Send for ParXap<I, Vo, X1, R>
50where
51 R: ParallelRunner,
52 I: ConcurrentIter,
53 Vo: TransformableValues<Fallibility = Infallible>,
54 X1: Fn(I::Item) -> Vo + Sync,
55{
56}
57
58unsafe impl<I, Vo, X1, R> Sync for ParXap<I, Vo, X1, R>
59where
60 R: ParallelRunner,
61 I: ConcurrentIter,
62 Vo: TransformableValues<Fallibility = Infallible>,
63 X1: Fn(I::Item) -> Vo + Sync,
64{
65}
66
67impl<I, Vo, X1, R> ParIter<R> for ParXap<I, Vo, X1, R>
68where
69 R: ParallelRunner,
70 I: ConcurrentIter,
71 Vo: TransformableValues<Fallibility = Infallible>,
72 X1: Fn(I::Item) -> Vo + Sync,
73{
74 type Item = Vo::Item;
75
76 fn con_iter(&self) -> &impl ConcurrentIter {
77 &self.iter
78 }
79
80 fn params(&self) -> Params {
81 self.params
82 }
83
84 fn num_threads(mut self, num_threads: impl Into<NumThreads>) -> Self {
87 self.params = self.params.with_num_threads(num_threads);
88 self
89 }
90
91 fn chunk_size(mut self, chunk_size: impl Into<ChunkSize>) -> Self {
92 self.params = self.params.with_chunk_size(chunk_size);
93 self
94 }
95
96 fn iteration_order(mut self, collect: IterationOrder) -> Self {
97 self.params = self.params.with_collect_ordering(collect);
98 self
99 }
100
101 fn with_runner<Q: ParallelRunner>(self, orchestrator: Q) -> impl ParIter<Q, Item = Self::Item> {
102 let (_, params, iter, x1) = self.destruct();
103 ParXap::new(orchestrator, params, iter, x1)
104 }
105
106 fn using<U, F>(
109 self,
110 using: F,
111 ) -> impl ParIterUsing<UsingFun<F, U>, R, Item = <Self as ParIter<R>>::Item>
112 where
113 U: 'static,
114 F: Fn(usize) -> U + Sync,
115 {
116 let using = UsingFun::new(using);
117 let (orchestrator, params, iter, x1) = self.destruct();
118 let m1 = move |_: &mut U, t: I::Item| x1(t);
119 UParXap::new(using, orchestrator, params, iter, m1)
120 }
121
122 fn using_clone<U>(
123 self,
124 value: U,
125 ) -> impl ParIterUsing<UsingClone<U>, R, Item = <Self as ParIter<R>>::Item>
126 where
127 U: Clone + 'static,
128 {
129 let using = UsingClone::new(value);
130 let (orchestrator, params, iter, x1) = self.destruct();
131 let m1 = move |_: &mut U, t: I::Item| x1(t);
132 UParXap::new(using, orchestrator, params, iter, m1)
133 }
134
135 fn map<Out, Map>(self, map: Map) -> impl ParIter<R, Item = Out>
138 where
139 Map: Fn(Self::Item) -> Out + Sync + Clone,
140 {
141 let (orchestrator, params, iter, x1) = self.destruct();
142 let x1 = move |i: I::Item| {
143 let vo = x1(i);
144 vo.map(map.clone())
145 };
146
147 ParXap::new(orchestrator, params, iter, x1)
148 }
149
150 fn filter<Filter>(self, filter: Filter) -> impl ParIter<R, Item = Self::Item>
151 where
152 Filter: Fn(&Self::Item) -> bool + Sync + Clone,
153 {
154 let (orchestrator, params, iter, x1) = self.destruct();
155 let x1 = move |i: I::Item| {
156 let values = x1(i);
157 values.filter(filter.clone())
158 };
159 ParXap::new(orchestrator, params, iter, x1)
160 }
161
162 fn flat_map<IOut, FlatMap>(self, flat_map: FlatMap) -> impl ParIter<R, Item = IOut::Item>
163 where
164 IOut: IntoIterator,
165 FlatMap: Fn(Self::Item) -> IOut + Sync + Clone,
166 {
167 let (orchestrator, params, iter, x1) = self.destruct();
168 let x1 = move |i: I::Item| {
169 let vo = x1(i);
170 vo.flat_map(flat_map.clone())
171 };
172 ParXap::new(orchestrator, params, iter, x1)
173 }
174
175 fn filter_map<Out, FilterMap>(self, filter_map: FilterMap) -> impl ParIter<R, Item = Out>
176 where
177 FilterMap: Fn(Self::Item) -> Option<Out> + Sync + Clone,
178 {
179 let (orchestrator, params, iter, x1) = self.destruct();
180 let x1 = move |i: I::Item| {
181 let vo = x1(i);
182 vo.filter_map(filter_map.clone())
183 };
184 ParXap::new(orchestrator, params, iter, x1)
185 }
186
187 fn take_while<While>(self, take_while: While) -> impl ParIter<R, Item = Self::Item>
188 where
189 While: Fn(&Self::Item) -> bool + Sync + Clone,
190 {
191 let (orchestrator, params, iter, x1) = self.destruct();
192 let x1 = move |i: I::Item| {
193 let vo = x1(i);
194 vo.whilst(take_while.clone())
195 };
196 ParXap::new(orchestrator, params, iter, x1)
197 }
198
199 fn into_fallible_result<Out, Err>(self) -> impl ParIterResult<R, Item = Out, Err = Err>
200 where
201 Self::Item: IntoResult<Out, Err>,
202 {
203 let (orchestrator, params, iter, x1) = self.destruct();
204 ParXapResult::new(orchestrator, params, iter, x1)
205 }
206
207 fn collect_into<C>(self, output: C) -> C
210 where
211 C: ParCollectInto<Self::Item>,
212 {
213 let (orchestrator, params, iter, x1) = self.destruct();
214 output.x_collect_into(orchestrator, params, iter, x1)
215 }
216
217 fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
220 where
221 Self::Item: Send,
222 Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Sync,
223 {
224 let (orchestrator, params, iter, x1) = self.destruct();
225 let (_, Ok(acc)) = prc::reduce::x(orchestrator, params, iter, x1, reduce);
226 acc
227 }
228
229 fn first(self) -> Option<Self::Item>
232 where
233 Self::Item: Send,
234 {
235 let (orchestrator, params, iter, x1) = self.destruct();
236 match params.iteration_order {
237 IterationOrder::Ordered => {
238 let (_num_threads, Ok(result)) = prc::next::x(orchestrator, params, iter, x1);
239 result.map(|x| x.1)
240 }
241 IterationOrder::Arbitrary => {
242 let (_num_threads, Ok(result)) = prc::next_any::x(orchestrator, params, iter, x1);
243 result
244 }
245 }
246 }
247}