1use crate::{
2 ChunkSize, IterationOrder, NumThreads, ParCollectInto, Params,
3 generic_values::{TransformableValues, runner_results::Infallible},
4 runner::{DefaultRunner, ParallelRunner},
5 using::{Using, computations::UX, u_par_iter::ParIterUsing},
6};
7use orx_concurrent_iter::ConcurrentIter;
8use std::marker::PhantomData;
9
10pub struct UParXap<U, I, Vo, M1, R = DefaultRunner>
14where
15 R: ParallelRunner,
16 U: Using,
17 I: ConcurrentIter,
18 Vo: TransformableValues<Fallibility = Infallible>,
19 M1: Fn(&mut U::Item, I::Item) -> Vo + Sync,
20{
21 ux: UX<U, I, Vo, M1>,
22 phantom: PhantomData<R>,
23}
24
25impl<U, I, Vo, M1, R> UParXap<U, I, Vo, M1, R>
26where
27 R: ParallelRunner,
28 U: Using,
29 I: ConcurrentIter,
30 Vo: TransformableValues<Fallibility = Infallible>,
31 M1: Fn(&mut U::Item, I::Item) -> Vo + Sync,
32{
33 pub(crate) fn new(using: U, params: Params, iter: I, x1: M1) -> Self {
34 Self {
35 ux: UX::new(using, params, iter, x1),
36 phantom: PhantomData,
37 }
38 }
39
40 fn destruct(self) -> (U, Params, I, M1) {
41 self.ux.destruct()
42 }
43}
44
45unsafe impl<U, I, Vo, M1, R> Send for UParXap<U, I, Vo, M1, R>
46where
47 R: ParallelRunner,
48 U: Using,
49 I: ConcurrentIter,
50 Vo: TransformableValues<Fallibility = Infallible>,
51 M1: Fn(&mut U::Item, I::Item) -> Vo + Sync,
52{
53}
54
55unsafe impl<U, I, Vo, M1, R> Sync for UParXap<U, I, Vo, M1, R>
56where
57 R: ParallelRunner,
58 U: Using,
59 I: ConcurrentIter,
60 Vo: TransformableValues<Fallibility = Infallible>,
61 M1: Fn(&mut U::Item, I::Item) -> Vo + Sync,
62{
63}
64
65impl<U, I, Vo, M1, R> ParIterUsing<U, R> for UParXap<U, I, Vo, M1, R>
66where
67 R: ParallelRunner,
68 U: Using,
69 I: ConcurrentIter,
70 Vo: TransformableValues<Fallibility = Infallible>,
71 M1: Fn(&mut U::Item, I::Item) -> Vo + Sync,
72{
73 type Item = Vo::Item;
74
75 fn con_iter(&self) -> &impl ConcurrentIter {
76 self.ux.iter()
77 }
78
79 fn params(&self) -> Params {
80 self.ux.params()
81 }
82
83 fn num_threads(mut self, num_threads: impl Into<NumThreads>) -> Self {
86 self.ux.num_threads(num_threads);
87 self
88 }
89
90 fn chunk_size(mut self, chunk_size: impl Into<ChunkSize>) -> Self {
91 self.ux.chunk_size(chunk_size);
92 self
93 }
94
95 fn iteration_order(mut self, collect: IterationOrder) -> Self {
96 self.ux.iteration_order(collect);
97 self
98 }
99
100 fn with_runner<Q: ParallelRunner>(self) -> impl ParIterUsing<U, Q, Item = Self::Item> {
101 let (using, params, iter, map1) = self.destruct();
102 UParXap::new(using, params, iter, map1)
103 }
104
105 fn map<Out, Map>(self, map: Map) -> impl ParIterUsing<U, R, Item = Out>
108 where
109 Map: Fn(&mut U::Item, Self::Item) -> Out + Sync + Clone,
110 {
111 let (using, params, iter, x1) = self.destruct();
112
113 let x1 = move |u: &mut U::Item, i: I::Item| {
114 let vo = x1(u, i);
115 let u = unsafe {
119 &mut *{
120 let p: *mut U::Item = u;
121 p
122 }
123 };
124 vo.u_map(u, map.clone())
125 };
126
127 UParXap::new(using, params, iter, x1)
128 }
129
130 fn filter<Filter>(self, filter: Filter) -> impl ParIterUsing<U, R, Item = Self::Item>
131 where
132 Filter: Fn(&mut U::Item, &Self::Item) -> bool + Sync + Clone,
133 {
134 let (using, params, iter, x1) = self.destruct();
135 let x1 = move |u: &mut U::Item, i: I::Item| {
136 let vo = x1(u, i);
137 let u = unsafe {
141 &mut *{
142 let p: *mut U::Item = u;
143 p
144 }
145 };
146 vo.u_filter(u, filter.clone())
147 };
148 UParXap::new(using, params, iter, x1)
149 }
150
151 fn flat_map<IOut, FlatMap>(
152 self,
153 flat_map: FlatMap,
154 ) -> impl ParIterUsing<U, R, Item = IOut::Item>
155 where
156 IOut: IntoIterator,
157 FlatMap: Fn(&mut U::Item, Self::Item) -> IOut + Sync + Clone,
158 {
159 let (using, params, iter, x1) = self.destruct();
160 let x1 = move |u: &mut U::Item, i: I::Item| {
161 let vo = x1(u, i);
162 let u = unsafe {
166 &mut *{
167 let p: *mut U::Item = u;
168 p
169 }
170 };
171 vo.u_flat_map(u, flat_map.clone())
172 };
173 UParXap::new(using, params, iter, x1)
174 }
175
176 fn filter_map<Out, FilterMap>(
177 self,
178 filter_map: FilterMap,
179 ) -> impl ParIterUsing<U, R, Item = Out>
180 where
181 FilterMap: Fn(&mut U::Item, Self::Item) -> Option<Out> + Sync + Clone,
182 {
183 let (using, params, iter, x1) = self.destruct();
184 let x1 = move |u: &mut U::Item, i: I::Item| {
185 let vo = x1(u, i);
186 let u = unsafe {
190 &mut *{
191 let p: *mut U::Item = u;
192 p
193 }
194 };
195 vo.u_filter_map(u, filter_map.clone())
196 };
197 UParXap::new(using, params, iter, x1)
198 }
199
200 fn collect_into<C>(self, output: C) -> C
203 where
204 C: ParCollectInto<Self::Item>,
205 {
206 output.u_x_collect_into::<R, _, _, _, _>(self.ux)
207 }
208
209 fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
212 where
213 Self::Item: Send,
214 Reduce: Fn(&mut U::Item, Self::Item, Self::Item) -> Self::Item + Sync,
215 {
216 self.ux.reduce::<R, _>(reduce).1
217 }
218
219 fn first(self) -> Option<Self::Item>
222 where
223 Self::Item: Send,
224 {
225 self.ux.next::<R>().1
226 }
227}