1use crate::{
2 ChunkSize, IterationOrder, NumThreads, ParCollectInto, Params,
3 generic_values::Vector,
4 runner::{DefaultRunner, ParallelRunner},
5 using::u_par_iter::ParIterUsing,
6 using::{Using, computational_variants::u_xap::UParXap, computations::UM},
7};
8use orx_concurrent_iter::ConcurrentIter;
9use std::marker::PhantomData;
10
11pub struct UParMap<U, I, O, M1, R = DefaultRunner>
13where
14 R: ParallelRunner,
15 U: Using,
16 I: ConcurrentIter,
17 M1: Fn(&mut U::Item, I::Item) -> O + Sync,
18{
19 um: UM<U, I, O, M1>,
20 phantom: PhantomData<R>,
21}
22
23impl<U, I, O, M1, R> UParMap<U, I, O, M1, R>
24where
25 R: ParallelRunner,
26 U: Using,
27 I: ConcurrentIter,
28 M1: Fn(&mut U::Item, I::Item) -> O + Sync,
29{
30 pub(crate) fn new(using: U, params: Params, iter: I, m1: M1) -> Self {
31 Self {
32 um: UM::new(using, params, iter, m1),
33 phantom: PhantomData,
34 }
35 }
36
37 fn destruct(self) -> (U, Params, I, M1) {
38 self.um.destruct()
39 }
40}
41
42unsafe impl<U, I, O, M1, R> Send for UParMap<U, I, O, M1, R>
43where
44 R: ParallelRunner,
45 U: Using,
46 I: ConcurrentIter,
47 M1: Fn(&mut U::Item, I::Item) -> O + Sync,
48{
49}
50
51unsafe impl<U, I, O, M1, R> Sync for UParMap<U, I, O, M1, R>
52where
53 R: ParallelRunner,
54 U: Using,
55 I: ConcurrentIter,
56 M1: Fn(&mut U::Item, I::Item) -> O + Sync,
57{
58}
59
60impl<U, I, O, M1, R> ParIterUsing<U, R> for UParMap<U, I, O, M1, R>
61where
62 R: ParallelRunner,
63 U: Using,
64 I: ConcurrentIter,
65 M1: Fn(&mut U::Item, I::Item) -> O + Sync,
66{
67 type Item = O;
68
69 fn con_iter(&self) -> &impl ConcurrentIter {
70 self.um.iter()
71 }
72
73 fn params(&self) -> Params {
74 self.um.params()
75 }
76
77 fn num_threads(mut self, num_threads: impl Into<NumThreads>) -> Self {
80 self.um.num_threads(num_threads);
81 self
82 }
83
84 fn chunk_size(mut self, chunk_size: impl Into<ChunkSize>) -> Self {
85 self.um.chunk_size(chunk_size);
86 self
87 }
88
89 fn iteration_order(mut self, collect: IterationOrder) -> Self {
90 self.um.iteration_order(collect);
91 self
92 }
93
94 fn with_runner<Q: ParallelRunner>(self) -> impl ParIterUsing<U, Q, Item = Self::Item> {
95 let (using, params, iter, map) = self.destruct();
96 UParMap::new(using, params, iter, map)
97 }
98
99 fn map<Out, Map>(self, map: Map) -> impl ParIterUsing<U, R, Item = Out>
102 where
103 Map: Fn(&mut U::Item, Self::Item) -> Out + Sync + Clone,
104 {
105 let (using, params, iter, m1) = self.destruct();
106 let m1 = move |u: &mut U::Item, x: I::Item| {
107 let v1 = m1(u, x);
108 map(u, v1)
109 };
110 UParMap::new(using, params, iter, m1)
111 }
112
113 fn filter<Filter>(self, filter: Filter) -> impl ParIterUsing<U, R, Item = Self::Item>
114 where
115 Filter: Fn(&mut U::Item, &Self::Item) -> bool + Sync + Clone,
116 {
117 let (using, params, iter, m1) = self.destruct();
118
119 let x1 = move |u: &mut U::Item, i: I::Item| {
120 let value = m1(u, i);
121 filter(u, &value).then_some(value)
122 };
123
124 UParXap::new(using, params, iter, x1)
125 }
126
127 fn flat_map<IOut, FlatMap>(
128 self,
129 flat_map: FlatMap,
130 ) -> impl ParIterUsing<U, R, Item = IOut::Item>
131 where
132 IOut: IntoIterator,
133 FlatMap: Fn(&mut U::Item, Self::Item) -> IOut + Sync + Clone,
134 {
135 let (using, params, iter, m1) = self.destruct();
136 let x1 = move |u: &mut U::Item, i: I::Item| {
137 let a = m1(u, i);
138 Vector(flat_map(u, a))
139 };
140 UParXap::new(using, params, iter, x1)
141 }
142
143 fn filter_map<Out, FilterMap>(
144 self,
145 filter_map: FilterMap,
146 ) -> impl ParIterUsing<U, R, Item = Out>
147 where
148 FilterMap: Fn(&mut U::Item, Self::Item) -> Option<Out> + Sync + Clone,
149 {
150 let (using, params, iter, m1) = self.destruct();
151 let x1 = move |u: &mut U::Item, i: I::Item| {
152 let a = m1(u, i);
153 filter_map(u, a)
154 };
155 UParXap::new(using, params, iter, x1)
156 }
157
158 fn collect_into<C>(self, output: C) -> C
161 where
162 C: ParCollectInto<Self::Item>,
163 {
164 output.u_m_collect_into::<R, _, _, _>(self.um)
165 }
166
167 fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
170 where
171 Self::Item: Send,
172 Reduce: Fn(&mut U::Item, Self::Item, Self::Item) -> Self::Item + Sync,
173 {
174 self.um.reduce::<R, _>(reduce).1
175 }
176
177 fn first(self) -> Option<Self::Item>
180 where
181 Self::Item: Send,
182 {
183 self.um.next::<R>().1
184 }
185}