orx_parallel/computational_variants/
xap.rs1use crate::{
2 ChunkSize, IterationOrder, NumThreads, ParCollectInto, ParIter, ParIterUsing, Params,
3 computational_variants::xap_filter_xap::ParXapFilterXap,
4 computations::{Values, X, map_self_atom},
5 runner::{DefaultRunner, ParallelRunner},
6 using::{UsingClone, UsingFun, computational_variants::UParXap},
7};
8use orx_concurrent_iter::ConcurrentIter;
9use std::marker::PhantomData;
10
11pub struct ParXap<I, Vo, M1, R = DefaultRunner>
15where
16 R: ParallelRunner,
17 I: ConcurrentIter,
18 Vo: Values + Send + Sync,
19 Vo::Item: Send + Sync,
20 M1: Fn(I::Item) -> Vo + Send + Sync,
21{
22 x: X<I, Vo, M1>,
23 phantom: PhantomData<R>,
24}
25
26impl<I, Vo, M1, R> ParXap<I, Vo, M1, R>
27where
28 R: ParallelRunner,
29 I: ConcurrentIter,
30 Vo: Values + Send + Sync,
31 Vo::Item: Send + Sync,
32 M1: Fn(I::Item) -> Vo + Send + Sync,
33{
34 pub(crate) fn new(params: Params, iter: I, x1: M1) -> Self {
35 Self {
36 x: X::new(params, iter, x1),
37 phantom: PhantomData,
38 }
39 }
40
41 fn destruct(self) -> (Params, I, M1) {
42 self.x.destruct()
43 }
44}
45
46unsafe impl<I, Vo, M1, R> Send for ParXap<I, Vo, M1, R>
47where
48 R: ParallelRunner,
49 I: ConcurrentIter,
50 Vo: Values + Send + Sync,
51 Vo::Item: Send + Sync,
52 M1: Fn(I::Item) -> Vo + Send + Sync,
53{
54}
55
56unsafe impl<I, Vo, M1, R> Sync for ParXap<I, Vo, M1, R>
57where
58 R: ParallelRunner,
59 I: ConcurrentIter,
60 Vo: Values + Send + Sync,
61 Vo::Item: Send + Sync,
62 M1: Fn(I::Item) -> Vo + Send + Sync,
63{
64}
65
66impl<I, Vo, M1, R> ParIter<R> for ParXap<I, Vo, M1, R>
67where
68 R: ParallelRunner,
69 I: ConcurrentIter,
70 Vo: Values + Send + Sync,
71 Vo::Item: Send + Sync,
72 M1: Fn(I::Item) -> Vo + Send + Sync,
73{
74 type Item = Vo::Item;
75
76 fn con_iter(&self) -> &impl ConcurrentIter {
77 self.x.iter()
78 }
79
80 fn params(&self) -> Params {
81 self.x.params()
82 }
83
84 fn num_threads(mut self, num_threads: impl Into<NumThreads>) -> Self {
87 self.x.num_threads(num_threads);
88 self
89 }
90
91 fn chunk_size(mut self, chunk_size: impl Into<ChunkSize>) -> Self {
92 self.x.chunk_size(chunk_size);
93 self
94 }
95
96 fn iteration_order(mut self, collect: IterationOrder) -> Self {
97 self.x.iteration_order(collect);
98 self
99 }
100
101 fn with_runner<Q: ParallelRunner>(self) -> impl ParIter<Q, Item = Self::Item> {
102 let (params, iter, map1) = self.destruct();
103 ParXap::new(params, iter, map1)
104 }
105
106 fn using<U, F>(
109 self,
110 using: F,
111 ) -> impl ParIterUsing<UsingFun<F, U>, R, Item = <Self as ParIter<R>>::Item>
112 where
113 U: Send,
114 F: FnMut(usize) -> U,
115 {
116 let using = UsingFun::new(using);
117 let (params, iter, x1) = self.destruct();
118 let m1 = move |_: &mut U, t: I::Item| x1(t);
119 UParXap::new(using, params, iter, m1)
120 }
121
122 fn using_clone<U>(
123 self,
124 using: U,
125 ) -> impl ParIterUsing<UsingClone<U>, R, Item = <Self as ParIter<R>>::Item>
126 where
127 U: Clone + Send,
128 {
129 let using = UsingClone::new(using);
130 let (params, iter, x1) = self.destruct();
131 let m1 = move |_: &mut U, t: I::Item| x1(t);
132 UParXap::new(using, params, iter, m1)
133 }
134
135 fn map<Out, Map>(self, map: Map) -> impl ParIter<R, Item = Out>
138 where
139 Out: Send + Sync,
140 Map: Fn(Self::Item) -> Out + Send + Sync + Clone,
141 {
142 let (params, iter, x1) = self.destruct();
143 let x1 = move |i: I::Item| {
144 let vo = x1(i);
145 vo.map(map.clone())
146 };
147
148 ParXap::new(params, iter, x1)
149 }
150
151 fn filter<Filter>(self, filter: Filter) -> impl ParIter<R, Item = Self::Item>
152 where
153 Filter: Fn(&Self::Item) -> bool + Send + Sync,
154 {
155 let (params, iter, x1) = self.destruct();
156 ParXapFilterXap::new(params, iter, x1, filter, map_self_atom)
157 }
158
159 fn flat_map<IOut, FlatMap>(self, flat_map: FlatMap) -> impl ParIter<R, Item = IOut::Item>
160 where
161 IOut: IntoIterator + Send + Sync,
162 IOut::IntoIter: Send + Sync,
163 IOut::Item: Send + Sync,
164 FlatMap: Fn(Self::Item) -> IOut + Send + Sync + Clone,
165 {
166 let (params, iter, x1) = self.destruct();
167 let x1 = move |i: I::Item| {
168 let vo = x1(i);
169 vo.flat_map(flat_map.clone())
170 };
171 ParXap::new(params, iter, x1)
172 }
173
174 fn filter_map<Out, FilterMap>(self, filter_map: FilterMap) -> impl ParIter<R, Item = Out>
175 where
176 Out: Send + Sync,
177 FilterMap: Fn(Self::Item) -> Option<Out> + Send + Sync + Clone,
178 {
179 let (params, iter, x1) = self.destruct();
180 let x1 = move |i: I::Item| {
181 let vo = x1(i);
182 vo.flat_map(filter_map.clone())
183 };
184 ParXap::new(params, iter, x1)
185 }
186
187 fn collect_into<C>(self, output: C) -> C
190 where
191 C: ParCollectInto<Self::Item>,
192 {
193 output.x_collect_into::<R, _, _, _>(self.x)
194 }
195
196 fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
199 where
200 Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Send + Sync,
201 {
202 self.x.reduce::<R, _>(reduce).1
203 }
204
205 fn first(self) -> Option<Self::Item> {
208 self.x.next()
209 }
210}