1use super::xap::ParXap;
2use crate::computational_variants::fallible_result::ParMapResult;
3use crate::executor::parallel_compute as prc;
4use crate::generic_values::{Vector, WhilstAtom};
5use crate::par_iter_result::IntoResult;
6use crate::runner::{DefaultRunner, ParallelRunner};
7use crate::using::{UParMap, UsingClone, UsingFun};
8use crate::{ChunkSize, IterationOrder, NumThreads, ParCollectInto, ParIter, Params};
9use crate::{ParIterResult, ParIterUsing};
10use orx_concurrent_iter::ConcurrentIter;
11
12pub struct ParMap<I, O, M1, R = DefaultRunner>
14where
15 R: ParallelRunner,
16 I: ConcurrentIter,
17 M1: Fn(I::Item) -> O + Sync,
18{
19 orchestrator: R,
20 params: Params,
21 iter: I,
22 map1: M1,
23}
24
25impl<I, O, M1, R> ParMap<I, O, M1, R>
26where
27 R: ParallelRunner,
28 I: ConcurrentIter,
29 M1: Fn(I::Item) -> O + Sync,
30{
31 pub(crate) fn new(orchestrator: R, params: Params, iter: I, map1: M1) -> Self {
32 Self {
33 orchestrator,
34 params,
35 iter,
36 map1,
37 }
38 }
39
40 pub(crate) fn destruct(self) -> (R, Params, I, M1) {
41 (self.orchestrator, self.params, self.iter, self.map1)
42 }
43}
44
45unsafe impl<I, O, M1, R> Send for ParMap<I, O, M1, R>
46where
47 R: ParallelRunner,
48 I: ConcurrentIter,
49 M1: Fn(I::Item) -> O + Sync,
50{
51}
52
53unsafe impl<I, O, M1, R> Sync for ParMap<I, O, M1, R>
54where
55 R: ParallelRunner,
56 I: ConcurrentIter,
57 M1: Fn(I::Item) -> O + Sync,
58{
59}
60
61impl<I, O, M1, R> ParIter<R> for ParMap<I, O, M1, R>
62where
63 R: ParallelRunner,
64 I: ConcurrentIter,
65 M1: Fn(I::Item) -> O + Sync,
66{
67 type Item = O;
68
69 fn con_iter(&self) -> &impl ConcurrentIter {
70 &self.iter
71 }
72
73 fn params(&self) -> Params {
74 self.params
75 }
76
77 fn num_threads(mut self, num_threads: impl Into<NumThreads>) -> Self {
80 self.params = self.params.with_num_threads(num_threads);
81 self
82 }
83
84 fn chunk_size(mut self, chunk_size: impl Into<ChunkSize>) -> Self {
85 self.params = self.params.with_chunk_size(chunk_size);
86 self
87 }
88
89 fn iteration_order(mut self, collect: IterationOrder) -> Self {
90 self.params = self.params.with_collect_ordering(collect);
91 self
92 }
93
94 fn with_runner<Q: ParallelRunner>(self, orchestrator: Q) -> impl ParIter<Q, Item = Self::Item> {
95 let (_, params, iter, map) = self.destruct();
96 ParMap::new(orchestrator, params, iter, map)
97 }
98
99 fn using<U, F>(
102 self,
103 using: F,
104 ) -> impl ParIterUsing<UsingFun<F, U>, R, Item = <Self as ParIter<R>>::Item>
105 where
106 U: 'static,
107 F: Fn(usize) -> U + Sync,
108 {
109 let using = UsingFun::new(using);
110 let (orchestrator, params, iter, x1) = self.destruct();
111 let m1 = move |_: &mut U, t: I::Item| x1(t);
112 UParMap::new(using, orchestrator, params, iter, m1)
113 }
114
115 fn using_clone<U>(
116 self,
117 value: U,
118 ) -> impl ParIterUsing<UsingClone<U>, R, Item = <Self as ParIter<R>>::Item>
119 where
120 U: Clone + 'static,
121 {
122 let using = UsingClone::new(value);
123 let (orchestrator, params, iter, x1) = self.destruct();
124 let m1 = move |_: &mut U, t: I::Item| x1(t);
125 UParMap::new(using, orchestrator, params, iter, m1)
126 }
127
128 fn map<Out, Map>(self, map: Map) -> impl ParIter<R, Item = Out>
131 where
132 Map: Fn(Self::Item) -> Out + Sync,
133 {
134 let (orchestrator, params, iter, m1) = self.destruct();
135 let m1 = move |x| map(m1(x));
136 ParMap::new(orchestrator, params, iter, m1)
137 }
138
139 fn filter<Filter>(self, filter: Filter) -> impl ParIter<R, Item = Self::Item>
140 where
141 Filter: Fn(&Self::Item) -> bool + Sync,
142 {
143 let (orchestrator, params, iter, m1) = self.destruct();
144
145 let x1 = move |i: I::Item| {
146 let value = m1(i);
147 filter(&value).then_some(value)
148 };
149 ParXap::new(orchestrator, params, iter, x1)
150 }
151
152 fn flat_map<IOut, FlatMap>(self, flat_map: FlatMap) -> impl ParIter<R, Item = IOut::Item>
153 where
154 IOut: IntoIterator,
155 FlatMap: Fn(Self::Item) -> IOut + Sync,
156 {
157 let (orchestrator, params, iter, m1) = self.destruct();
158 let x1 = move |i: I::Item| Vector(flat_map(m1(i)));
159 ParXap::new(orchestrator, params, iter, x1)
160 }
161
162 fn filter_map<Out, FilterMap>(self, filter_map: FilterMap) -> impl ParIter<R, Item = Out>
163 where
164 FilterMap: Fn(Self::Item) -> Option<Out> + Sync,
165 {
166 let (orchestrator, params, iter, m1) = self.destruct();
167 let x1 = move |i: I::Item| filter_map(m1(i));
168 ParXap::new(orchestrator, params, iter, x1)
169 }
170
171 fn take_while<While>(self, take_while: While) -> impl ParIter<R, Item = Self::Item>
172 where
173 While: Fn(&Self::Item) -> bool + Sync,
174 {
175 let (orchestrator, params, iter, m1) = self.destruct();
176 let x1 = move |value: I::Item| WhilstAtom::new(m1(value), &take_while);
177 ParXap::new(orchestrator, params, iter, x1)
178 }
179
180 fn into_fallible_result<Out, Err>(self) -> impl ParIterResult<R, Item = Out, Err = Err>
181 where
182 Self::Item: IntoResult<Out, Err>,
183 {
184 ParMapResult::new(self)
185 }
186
187 fn collect_into<C>(self, output: C) -> C
190 where
191 C: ParCollectInto<Self::Item>,
192 {
193 let (orchestrator, params, iter, m1) = self.destruct();
194 output.m_collect_into(orchestrator, params, iter, m1)
195 }
196
197 fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
200 where
201 Self::Item: Send,
202 Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Sync,
203 {
204 let (orchestrator, params, iter, m1) = self.destruct();
205 prc::reduce::m(orchestrator, params, iter, m1, reduce).1
206 }
207
208 fn first(self) -> Option<Self::Item>
211 where
212 Self::Item: Send,
213 {
214 let (orchestrator, params, iter, m1) = self.destruct();
215 match params.iteration_order {
216 IterationOrder::Ordered => prc::next::m(orchestrator, params, iter, m1).1,
217 IterationOrder::Arbitrary => prc::next_any::m(orchestrator, params, iter, m1).1,
218 }
219 }
220}