1use crate::{
2 ChunkSize, IterationOrder, NumThreads, ParCollectInto, Params,
3 computations::{Atom, Vector},
4 runner::{DefaultRunner, ParallelRunner},
5 using::u_par_iter::ParIterUsing,
6 using::{
7 Using,
8 computational_variants::{u_xap::UParXap, u_xap_filter_xap::UParXapFilterXap},
9 computations::{UM, u_map_self_atom},
10 },
11};
12use orx_concurrent_iter::ConcurrentIter;
13use std::marker::PhantomData;
14
15pub struct UParMap<U, I, O, M1, R = DefaultRunner>
17where
18 R: ParallelRunner,
19 U: Using,
20 I: ConcurrentIter,
21 O: Send + Sync,
22 M1: Fn(&mut U::Item, I::Item) -> O + Send + Sync + Clone,
23{
24 um: UM<U, I, O, M1>,
25 phantom: PhantomData<R>,
26}
27
28impl<U, I, O, M1, R> UParMap<U, I, O, M1, R>
29where
30 R: ParallelRunner,
31 U: Using,
32 I: ConcurrentIter,
33 O: Send + Sync,
34 M1: Fn(&mut U::Item, I::Item) -> O + Send + Sync + Clone,
35{
36 pub(crate) fn new(using: U, params: Params, iter: I, m1: M1) -> Self {
37 Self {
38 um: UM::new(using, params, iter, m1),
39 phantom: PhantomData,
40 }
41 }
42
43 fn destruct(self) -> (U, Params, I, M1) {
44 self.um.destruct()
45 }
46}
47
48unsafe impl<U, I, O, M1, R> Send for UParMap<U, I, O, M1, R>
49where
50 R: ParallelRunner,
51 U: Using,
52 I: ConcurrentIter,
53 O: Send + Sync,
54 M1: Fn(&mut U::Item, I::Item) -> O + Send + Sync + Clone,
55{
56}
57
58unsafe impl<U, I, O, M1, R> Sync for UParMap<U, I, O, M1, R>
59where
60 R: ParallelRunner,
61 U: Using,
62 I: ConcurrentIter,
63 O: Send + Sync,
64 M1: Fn(&mut U::Item, I::Item) -> O + Send + Sync + Clone,
65{
66}
67
68impl<U, I, O, M1, R> ParIterUsing<U, R> for UParMap<U, I, O, M1, R>
69where
70 R: ParallelRunner,
71 U: Using,
72 I: ConcurrentIter,
73 O: Send + Sync,
74 M1: Fn(&mut U::Item, I::Item) -> O + Send + Sync + Clone,
75{
76 type Item = O;
77
78 fn con_iter(&self) -> &impl ConcurrentIter {
79 self.um.iter()
80 }
81
82 fn params(&self) -> Params {
83 self.um.params()
84 }
85
86 fn num_threads(mut self, num_threads: impl Into<NumThreads>) -> Self {
89 self.um.num_threads(num_threads);
90 self
91 }
92
93 fn chunk_size(mut self, chunk_size: impl Into<ChunkSize>) -> Self {
94 self.um.chunk_size(chunk_size);
95 self
96 }
97
98 fn iteration_order(mut self, collect: IterationOrder) -> Self {
99 self.um.iteration_order(collect);
100 self
101 }
102
103 fn with_runner<Q: ParallelRunner>(self) -> impl ParIterUsing<U, Q, Item = Self::Item> {
104 let (using, params, iter, map) = self.destruct();
105 UParMap::new(using, params, iter, map)
106 }
107
108 fn map<Out, Map>(self, map: Map) -> impl ParIterUsing<U, R, Item = Out>
111 where
112 Out: Send + Sync,
113 Map: Fn(&mut U::Item, Self::Item) -> Out + Send + Sync + Clone,
114 {
115 let (using, params, iter, m1) = self.destruct();
116 let m1 = move |u: &mut U::Item, x: I::Item| {
117 let v1 = m1(u, x);
118 map(u, v1)
119 };
120 UParMap::new(using, params, iter, m1)
121 }
122
123 fn filter<Filter>(self, filter: Filter) -> impl ParIterUsing<U, R, Item = Self::Item>
124 where
125 Filter: Fn(&mut U::Item, &Self::Item) -> bool + Send + Sync + Clone,
126 {
127 let (using, params, iter, m1) = self.destruct();
128 let m1 = move |u: &mut U::Item, i: I::Item| Atom(m1(u, i));
129 let filter = move |u: &mut U::Item, i: &Self::Item| filter(u, i);
130 UParXapFilterXap::new(using, params, iter, m1, filter, u_map_self_atom)
131 }
132
133 fn flat_map<IOut, FlatMap>(
134 self,
135 flat_map: FlatMap,
136 ) -> impl ParIterUsing<U, R, Item = IOut::Item>
137 where
138 IOut: IntoIterator + Send + Sync,
139 IOut::IntoIter: Send + Sync,
140 IOut::Item: Send + Sync,
141 FlatMap: Fn(&mut U::Item, Self::Item) -> IOut + Send + Sync + Clone,
142 {
143 let (using, params, iter, m1) = self.destruct();
144 let x1 = move |u: &mut U::Item, i: I::Item| {
145 let a = m1(u, i);
146 Vector(flat_map(u, a))
147 };
148 UParXap::new(using, params, iter, x1)
149 }
150
151 fn filter_map<Out, FilterMap>(
152 self,
153 filter_map: FilterMap,
154 ) -> impl ParIterUsing<U, R, Item = Out>
155 where
156 Out: Send + Sync,
157 FilterMap: Fn(&mut U::Item, Self::Item) -> Option<Out> + Send + Sync + Clone,
158 {
159 let (using, params, iter, m1) = self.destruct();
160 let x1 = move |u: &mut U::Item, i: I::Item| {
161 let a = m1(u, i);
162 filter_map(u, a)
163 };
164 UParXap::new(using, params, iter, x1)
165 }
166
167 fn collect_into<C>(self, output: C) -> C
170 where
171 C: ParCollectInto<Self::Item>,
172 {
173 output.u_m_collect_into::<R, _, _, _>(self.um)
174 }
175
176 fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
179 where
180 Reduce: Fn(&mut U::Item, Self::Item, Self::Item) -> Self::Item + Send + Sync,
181 {
182 self.um.reduce::<R, _>(reduce).1
183 }
184
185 fn first(self) -> Option<Self::Item> {
188 self.um.next()
189 }
190}