orx_parallel/computational_variants/
map.rs1use super::{xap::ParXap, xap_filter_xap::ParXapFilterXap};
2use crate::{
3 ChunkSize, IterationOrder, NumThreads, ParCollectInto, ParIter, ParIterUsing, Params,
4 computations::{Atom, M, Vector, map_self_atom},
5 runner::{DefaultRunner, ParallelRunner},
6 using::computational_variants::UParMap,
7 using::{UsingClone, UsingFun},
8};
9use orx_concurrent_iter::ConcurrentIter;
10use std::marker::PhantomData;
11
12pub struct ParMap<I, O, M1, R = DefaultRunner>
14where
15 R: ParallelRunner,
16 I: ConcurrentIter,
17 O: Send + Sync,
18 M1: Fn(I::Item) -> O + Send + Sync + Clone,
19{
20 m: M<I, O, M1>,
21 phantom: PhantomData<R>,
22}
23
24impl<I, O, M1, R> ParMap<I, O, M1, R>
25where
26 R: ParallelRunner,
27 I: ConcurrentIter,
28 O: Send + Sync,
29 M1: Fn(I::Item) -> O + Send + Sync + Clone,
30{
31 pub(crate) fn new(params: Params, iter: I, m1: M1) -> Self {
32 Self {
33 m: M::new(params, iter, m1),
34 phantom: PhantomData,
35 }
36 }
37
38 fn destruct(self) -> (Params, I, M1) {
39 self.m.destruct()
40 }
41}
42
43unsafe impl<I, O, M1, R> Send for ParMap<I, O, M1, R>
44where
45 R: ParallelRunner,
46 I: ConcurrentIter,
47 O: Send + Sync,
48 M1: Fn(I::Item) -> O + Send + Sync + Clone,
49{
50}
51
52unsafe impl<I, O, M1, R> Sync for ParMap<I, O, M1, R>
53where
54 R: ParallelRunner,
55 I: ConcurrentIter,
56 O: Send + Sync,
57 M1: Fn(I::Item) -> O + Send + Sync + Clone,
58{
59}
60
61impl<I, O, M1, R> ParIter<R> for ParMap<I, O, M1, R>
62where
63 R: ParallelRunner,
64 I: ConcurrentIter,
65 O: Send + Sync,
66 M1: Fn(I::Item) -> O + Send + Sync + Clone,
67{
68 type Item = O;
69
70 fn con_iter(&self) -> &impl ConcurrentIter {
71 self.m.iter()
72 }
73
74 fn params(&self) -> Params {
75 self.m.params()
76 }
77
78 fn num_threads(mut self, num_threads: impl Into<NumThreads>) -> Self {
81 self.m.num_threads(num_threads);
82 self
83 }
84
85 fn chunk_size(mut self, chunk_size: impl Into<ChunkSize>) -> Self {
86 self.m.chunk_size(chunk_size);
87 self
88 }
89
90 fn iteration_order(mut self, collect: IterationOrder) -> Self {
91 self.m.iteration_order(collect);
92 self
93 }
94
95 fn with_runner<Q: ParallelRunner>(self) -> impl ParIter<Q, Item = Self::Item> {
96 let (params, iter, map) = self.destruct();
97 ParMap::new(params, iter, map)
98 }
99
100 fn using<U, F>(
103 self,
104 using: F,
105 ) -> impl ParIterUsing<UsingFun<F, U>, R, Item = <Self as ParIter<R>>::Item>
106 where
107 U: Send,
108 F: FnMut(usize) -> U,
109 {
110 let using = UsingFun::new(using);
111 let (params, iter, m1) = self.destruct();
112 let m1 = move |_: &mut U, t: I::Item| m1(t);
113 UParMap::new(using, params, iter, m1)
114 }
115
116 fn using_clone<U>(
117 self,
118 using: U,
119 ) -> impl ParIterUsing<UsingClone<U>, R, Item = <Self as ParIter<R>>::Item>
120 where
121 U: Clone + Send,
122 {
123 let using = UsingClone::new(using);
124 let (params, iter, m1) = self.destruct();
125 let m1 = move |_: &mut U, t: I::Item| m1(t);
126 UParMap::new(using, params, iter, m1)
127 }
128
129 fn map<Out, Map>(self, map: Map) -> impl ParIter<R, Item = Out>
132 where
133 Out: Send + Sync,
134 Map: Fn(Self::Item) -> Out + Send + Sync + Clone,
135 {
136 let (params, iter, m1) = self.destruct();
137 let m1 = move |x| map(m1(x));
138 ParMap::new(params, iter, m1)
139 }
140
141 fn filter<Filter>(self, filter: Filter) -> impl ParIter<R, Item = Self::Item>
142 where
143 Filter: Fn(&Self::Item) -> bool + Send + Sync,
144 {
145 let (params, iter, m1) = self.destruct();
146 let m1 = move |i: I::Item| Atom(m1(i));
147 ParXapFilterXap::new(params, iter, m1, filter, map_self_atom)
148 }
149
150 fn flat_map<IOut, FlatMap>(self, flat_map: FlatMap) -> impl ParIter<R, Item = IOut::Item>
151 where
152 IOut: IntoIterator + Send + Sync,
153 IOut::IntoIter: Send + Sync,
154 IOut::Item: Send + Sync,
155 FlatMap: Fn(Self::Item) -> IOut + Send + Sync,
156 {
157 let (params, iter, m1) = self.destruct();
158 let x1 = move |i: I::Item| Vector(flat_map(m1(i)));
159 ParXap::new(params, iter, x1)
160 }
161
162 fn filter_map<Out, FilterMap>(self, filter_map: FilterMap) -> impl ParIter<R, Item = Out>
163 where
164 Out: Send + Sync,
165 FilterMap: Fn(Self::Item) -> Option<Out> + Send + Sync + Clone,
166 {
167 let (params, iter, m1) = self.destruct();
168 let x1 = move |i: I::Item| filter_map(m1(i));
169 ParXap::new(params, iter, x1)
170 }
171
172 fn collect_into<C>(self, output: C) -> C
175 where
176 C: ParCollectInto<Self::Item>,
177 {
178 output.m_collect_into::<R, _, _>(self.m)
179 }
180
181 fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
184 where
185 Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Send + Sync,
186 {
187 self.m.reduce::<R, _>(reduce).1
188 }
189
190 fn first(self) -> Option<Self::Item> {
193 self.m.next()
194 }
195}