orx_parallel/computational_variants/
map.rs1use super::xap::ParXap;
2use crate::ParIterResult;
3use crate::computational_variants::fallible_result::ParMapResult;
4use crate::generic_values::{Vector, WhilstAtom};
5use crate::par_iter_result::IntoResult;
6use crate::{
7 ChunkSize, IterationOrder, NumThreads, ParCollectInto, ParIter, ParIterUsing, Params,
8 computations::M,
9 runner::{DefaultRunner, ParallelRunner},
10 using::{UsingClone, UsingFun, computational_variants::UParMap},
11};
12use orx_concurrent_iter::ConcurrentIter;
13use std::marker::PhantomData;
14
15pub struct ParMap<I, O, M1, R = DefaultRunner>
17where
18 R: ParallelRunner,
19 I: ConcurrentIter,
20 M1: Fn(I::Item) -> O + Sync,
21{
22 m: M<I, O, M1>,
23 phantom: PhantomData<R>,
24}
25
26impl<I, O, M1, R> ParMap<I, O, M1, R>
27where
28 R: ParallelRunner,
29 I: ConcurrentIter,
30 M1: Fn(I::Item) -> O + Sync,
31{
32 pub(crate) fn new(params: Params, iter: I, m1: M1) -> Self {
33 Self {
34 m: M::new(params, iter, m1),
35 phantom: PhantomData,
36 }
37 }
38
39 pub(crate) fn destruct(self) -> (Params, I, M1) {
40 self.m.destruct()
41 }
42}
43
44unsafe impl<I, O, M1, R> Send for ParMap<I, O, M1, R>
45where
46 R: ParallelRunner,
47 I: ConcurrentIter,
48 M1: Fn(I::Item) -> O + Sync,
49{
50}
51
52unsafe impl<I, O, M1, R> Sync for ParMap<I, O, M1, R>
53where
54 R: ParallelRunner,
55 I: ConcurrentIter,
56 M1: Fn(I::Item) -> O + Sync,
57{
58}
59
60impl<I, O, M1, R> ParIter<R> for ParMap<I, O, M1, R>
61where
62 R: ParallelRunner,
63 I: ConcurrentIter,
64 M1: Fn(I::Item) -> O + Sync,
65{
66 type Item = O;
67
68 fn con_iter(&self) -> &impl ConcurrentIter {
69 self.m.iter()
70 }
71
72 fn params(&self) -> Params {
73 self.m.params()
74 }
75
76 fn num_threads(mut self, num_threads: impl Into<NumThreads>) -> Self {
79 self.m.num_threads(num_threads);
80 self
81 }
82
83 fn chunk_size(mut self, chunk_size: impl Into<ChunkSize>) -> Self {
84 self.m.chunk_size(chunk_size);
85 self
86 }
87
88 fn iteration_order(mut self, collect: IterationOrder) -> Self {
89 self.m.iteration_order(collect);
90 self
91 }
92
93 fn with_runner<Q: ParallelRunner>(self) -> impl ParIter<Q, Item = Self::Item> {
94 let (params, iter, map) = self.destruct();
95 ParMap::new(params, iter, map)
96 }
97
98 fn using<U, F>(
101 self,
102 using: F,
103 ) -> impl ParIterUsing<UsingFun<F, U>, R, Item = <Self as ParIter<R>>::Item>
104 where
105 U: Send + 'static,
106 F: FnMut(usize) -> U,
107 {
108 let using = UsingFun::new(using);
109 let (params, iter, m1) = self.destruct();
110 let m1 = move |_: &mut U, t: I::Item| m1(t);
111 UParMap::new(using, params, iter, m1)
112 }
113
114 fn using_clone<U>(
115 self,
116 using: U,
117 ) -> impl ParIterUsing<UsingClone<U>, R, Item = <Self as ParIter<R>>::Item>
118 where
119 U: Clone + Send + 'static,
120 {
121 let using = UsingClone::new(using);
122 let (params, iter, m1) = self.destruct();
123 let m1 = move |_: &mut U, t: I::Item| m1(t);
124 UParMap::new(using, params, iter, m1)
125 }
126
127 fn map<Out, Map>(self, map: Map) -> impl ParIter<R, Item = Out>
130 where
131 Map: Fn(Self::Item) -> Out + Sync,
132 {
133 let (params, iter, m1) = self.destruct();
134 let m1 = move |x| map(m1(x));
135 ParMap::new(params, iter, m1)
136 }
137
138 fn filter<Filter>(self, filter: Filter) -> impl ParIter<R, Item = Self::Item>
139 where
140 Filter: Fn(&Self::Item) -> bool + Sync,
141 {
142 let (params, iter, m1) = self.destruct();
143
144 let x1 = move |i: I::Item| {
145 let value = m1(i);
146 filter(&value).then_some(value)
147 };
148 ParXap::new(params, iter, x1)
149 }
150
151 fn flat_map<IOut, FlatMap>(self, flat_map: FlatMap) -> impl ParIter<R, Item = IOut::Item>
152 where
153 IOut: IntoIterator,
154 FlatMap: Fn(Self::Item) -> IOut + Sync,
155 {
156 let (params, iter, m1) = self.destruct();
157 let x1 = move |i: I::Item| Vector(flat_map(m1(i)));
158 ParXap::new(params, iter, x1)
159 }
160
161 fn filter_map<Out, FilterMap>(self, filter_map: FilterMap) -> impl ParIter<R, Item = Out>
162 where
163 FilterMap: Fn(Self::Item) -> Option<Out> + Sync,
164 {
165 let (params, iter, m1) = self.destruct();
166 let x1 = move |i: I::Item| filter_map(m1(i));
167 ParXap::new(params, iter, x1)
168 }
169
170 fn take_while<While>(self, take_while: While) -> impl ParIter<R, Item = Self::Item>
171 where
172 While: Fn(&Self::Item) -> bool + Sync,
173 {
174 let (params, iter, m1) = self.destruct();
175 let x1 = move |value: I::Item| WhilstAtom::new(m1(value), &take_while);
176 ParXap::new(params, iter, x1)
177 }
178
179 fn into_fallible_result<Out, Err>(self) -> impl ParIterResult<R, Item = Out, Err = Err>
180 where
181 Self::Item: IntoResult<Out, Err>,
182 {
183 ParMapResult::new(self)
184 }
185
186 fn collect_into<C>(self, output: C) -> C
189 where
190 C: ParCollectInto<Self::Item>,
191 {
192 output.m_collect_into::<R, _, _>(self.m)
193 }
194
195 fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
198 where
199 Self::Item: Send,
200 Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Sync,
201 {
202 self.m.reduce::<R, _>(reduce).1
203 }
204
205 fn first(self) -> Option<Self::Item>
208 where
209 Self::Item: Send,
210 {
211 match self.params().iteration_order {
212 IterationOrder::Ordered => self.m.next::<R>().1,
213 IterationOrder::Arbitrary => self.m.next_any::<R>().1,
214 }
215 }
216}