1use crate::{
2 ChunkSize, IterationOrder, NumThreads, ParCollectInto, Params,
3 computations::{Values, Vector},
4 runner::{DefaultRunner, ParallelRunner},
5 using::u_par_iter::ParIterUsing,
6 using::{
7 Using,
8 computational_variants::u_xap_filter_xap::UParXapFilterXap,
9 computations::{UX, u_map_self_atom},
10 },
11};
12use orx_concurrent_iter::ConcurrentIter;
13use std::marker::PhantomData;
14
15pub struct UParXap<U, I, Vo, M1, R = DefaultRunner>
19where
20 R: ParallelRunner,
21 U: Using,
22 I: ConcurrentIter,
23 Vo: Values + Send + Sync,
24 Vo::Item: Send + Sync,
25 M1: Fn(&mut U::Item, I::Item) -> Vo + Send + Sync,
26{
27 ux: UX<U, I, Vo, M1>,
28 phantom: PhantomData<R>,
29}
30
31impl<U, I, Vo, M1, R> UParXap<U, I, Vo, M1, R>
32where
33 R: ParallelRunner,
34 U: Using,
35 I: ConcurrentIter,
36 Vo: Values + Send + Sync,
37 Vo::Item: Send + Sync,
38 M1: Fn(&mut U::Item, I::Item) -> Vo + Send + Sync,
39{
40 pub(crate) fn new(using: U, params: Params, iter: I, x1: M1) -> Self {
41 Self {
42 ux: UX::new(using, params, iter, x1),
43 phantom: PhantomData,
44 }
45 }
46
47 fn destruct(self) -> (U, Params, I, M1) {
48 self.ux.destruct()
49 }
50}
51
52unsafe impl<U, I, Vo, M1, R> Send for UParXap<U, I, Vo, M1, R>
53where
54 R: ParallelRunner,
55 U: Using,
56 I: ConcurrentIter,
57 Vo: Values + Send + Sync,
58 Vo::Item: Send + Sync,
59 M1: Fn(&mut U::Item, I::Item) -> Vo + Send + Sync,
60{
61}
62
63unsafe impl<U, I, Vo, M1, R> Sync for UParXap<U, I, Vo, M1, R>
64where
65 R: ParallelRunner,
66 U: Using,
67 I: ConcurrentIter,
68 Vo: Values + Send + Sync,
69 Vo::Item: Send + Sync,
70 M1: Fn(&mut U::Item, I::Item) -> Vo + Send + Sync,
71{
72}
73
74impl<U, I, Vo, M1, R> ParIterUsing<U, R> for UParXap<U, I, Vo, M1, R>
75where
76 R: ParallelRunner,
77 U: Using,
78 I: ConcurrentIter,
79 Vo: Values + Send + Sync,
80 Vo::Item: Send + Sync,
81 M1: Fn(&mut U::Item, I::Item) -> Vo + Send + Sync,
82{
83 type Item = Vo::Item;
84
85 fn con_iter(&self) -> &impl ConcurrentIter {
86 self.ux.iter()
87 }
88
89 fn params(&self) -> Params {
90 self.ux.params()
91 }
92
93 fn num_threads(mut self, num_threads: impl Into<NumThreads>) -> Self {
96 self.ux.num_threads(num_threads);
97 self
98 }
99
100 fn chunk_size(mut self, chunk_size: impl Into<ChunkSize>) -> Self {
101 self.ux.chunk_size(chunk_size);
102 self
103 }
104
105 fn iteration_order(mut self, collect: IterationOrder) -> Self {
106 self.ux.iteration_order(collect);
107 self
108 }
109
110 fn with_runner<Q: ParallelRunner>(self) -> impl ParIterUsing<U, Q, Item = Self::Item> {
111 let (using, params, iter, map1) = self.destruct();
112 UParXap::new(using, params, iter, map1)
113 }
114
115 fn map<Out, Map>(self, map: Map) -> impl ParIterUsing<U, R, Item = Out>
118 where
119 Out: Send + Sync,
120 Map: Fn(&mut U::Item, Self::Item) -> Out + Send + Sync + Clone,
121 {
122 let (using, params, iter, x1) = self.destruct();
123 let x1 = move |u: &mut U::Item, i: I::Item| {
124 let vo: Vec<_> = x1(u, i).values().into_iter().map(|x| map(u, x)).collect();
126 Vector(vo)
127 };
128
129 UParXap::new(using, params, iter, x1)
130 }
131
132 fn filter<Filter>(self, filter: Filter) -> impl ParIterUsing<U, R, Item = Self::Item>
133 where
134 Filter: Fn(&mut U::Item, &Self::Item) -> bool + Send + Sync + Clone,
135 {
136 let (using, params, iter, x1) = self.destruct();
137 let filter = move |u: &mut U::Item, x: &Self::Item| filter(u, x);
138 UParXapFilterXap::new(using, params, iter, x1, filter, u_map_self_atom)
139 }
140
141 fn flat_map<IOut, FlatMap>(
142 self,
143 flat_map: FlatMap,
144 ) -> impl ParIterUsing<U, R, Item = IOut::Item>
145 where
146 IOut: IntoIterator + Send + Sync,
147 IOut::IntoIter: Send + Sync,
148 IOut::Item: Send + Sync,
149 FlatMap: Fn(&mut U::Item, Self::Item) -> IOut + Send + Sync + Clone,
150 {
151 let (using, params, iter, x1) = self.destruct();
152 let x1 = move |u: &mut U::Item, t: I::Item| {
153 let vo: Vec<_> = x1(u, t).values().into_iter().collect();
155 let vo: Vec<_> = vo.into_iter().flat_map(|x| flat_map(u, x)).collect();
156 Vector(vo)
157 };
158 UParXap::new(using, params, iter, x1)
159 }
160
161 fn filter_map<Out, FilterMap>(
162 self,
163 filter_map: FilterMap,
164 ) -> impl ParIterUsing<U, R, Item = Out>
165 where
166 Out: Send + Sync,
167 FilterMap: Fn(&mut U::Item, Self::Item) -> Option<Out> + Send + Sync + Clone,
168 {
169 let (using, params, iter, x1) = self.destruct();
170 let x1 = move |u: &mut U::Item, t: I::Item| {
171 let vo: Vec<_> = x1(u, t).values().into_iter().collect();
173 let vo: Vec<_> = vo.into_iter().filter_map(|x| filter_map(u, x)).collect();
174 Vector(vo)
175 };
176 UParXap::new(using, params, iter, x1)
177 }
178
179 fn collect_into<C>(self, output: C) -> C
182 where
183 C: ParCollectInto<Self::Item>,
184 {
185 output.u_x_collect_into::<R, _, _, _, _>(self.ux)
186 }
187
188 fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
191 where
192 Reduce: Fn(&mut U::Item, Self::Item, Self::Item) -> Self::Item + Send + Sync,
193 {
194 self.ux.reduce::<R, _>(reduce).1
195 }
196
197 fn first(self) -> Option<Self::Item> {
200 self.ux.next()
201 }
202}