orx_parallel/using/computational_variants/
u_par.rs1use crate::{
2 ChunkSize, IterationOrder, NumThreads, ParCollectInto, Params,
3 computations::Vector,
4 runner::{DefaultRunner, ParallelRunner},
5 using::u_par_iter::ParIterUsing,
6 using::{
7 Using,
8 computational_variants::{
9 u_map::UParMap, u_xap::UParXap, u_xap_filter_xap::UParXapFilterXap,
10 },
11 computations::{UM, u_map_self, u_map_self_atom},
12 },
13};
14use orx_concurrent_iter::ConcurrentIter;
15use std::marker::PhantomData;
16
17pub struct UPar<U, I, R = DefaultRunner>
19where
20 U: Using,
21 R: ParallelRunner,
22 I: ConcurrentIter,
23{
24 using: U,
25 iter: I,
26 params: Params,
27 phantom: PhantomData<R>,
28}
29
30impl<U, I, R> UPar<U, I, R>
31where
32 U: Using,
33 R: ParallelRunner,
34 I: ConcurrentIter,
35{
36 pub(crate) fn new(using: U, params: Params, iter: I) -> Self {
37 Self {
38 using,
39 iter,
40 params,
41 phantom: PhantomData,
42 }
43 }
44
45 fn destruct(self) -> (U, Params, I) {
46 (self.using, self.params, self.iter)
47 }
48
49 #[allow(clippy::type_complexity)]
50 fn u_m(self) -> UM<U, I, I::Item, impl Fn(&mut U::Item, I::Item) -> I::Item> {
51 let (using, params, iter) = self.destruct();
52 UM::new(using, params, iter, u_map_self)
53 }
54}
55
56unsafe impl<U, I, R> Send for UPar<U, I, R>
57where
58 U: Using,
59 R: ParallelRunner,
60 I: ConcurrentIter,
61{
62}
63
64unsafe impl<U, I, R> Sync for UPar<U, I, R>
65where
66 U: Using,
67 R: ParallelRunner,
68 I: ConcurrentIter,
69{
70}
71
72impl<U, I, R> ParIterUsing<U, R> for UPar<U, I, R>
73where
74 U: Using,
75 R: ParallelRunner,
76 I: ConcurrentIter,
77{
78 type Item = I::Item;
79
80 fn con_iter(&self) -> &impl ConcurrentIter {
81 &self.iter
82 }
83
84 fn params(&self) -> Params {
85 self.params
86 }
87
88 fn num_threads(mut self, num_threads: impl Into<NumThreads>) -> Self {
91 self.params = self.params.with_num_threads(num_threads);
92 self
93 }
94
95 fn chunk_size(mut self, chunk_size: impl Into<ChunkSize>) -> Self {
96 self.params = self.params.with_chunk_size(chunk_size);
97 self
98 }
99
100 fn iteration_order(mut self, collect: IterationOrder) -> Self {
101 self.params = self.params.with_collect_ordering(collect);
102 self
103 }
104
105 fn with_runner<Q: ParallelRunner>(self) -> impl ParIterUsing<U, Q, Item = Self::Item> {
106 UPar::new(self.using, self.params, self.iter)
107 }
108
109 fn map<Out, Map>(self, map: Map) -> impl ParIterUsing<U, R, Item = Out>
112 where
113 Out: Send + Sync,
114 Map: Fn(&mut <U as Using>::Item, Self::Item) -> Out + Send + Sync + Clone,
115 {
116 let (using, params, iter) = self.destruct();
117 let map = move |u: &mut U::Item, x: Self::Item| map(u, x);
118 UParMap::new(using, params, iter, map)
119 }
120
121 fn filter<Filter>(self, filter: Filter) -> impl ParIterUsing<U, R, Item = Self::Item>
122 where
123 Filter: Fn(&mut U::Item, &Self::Item) -> bool + Send + Sync + Clone,
124 {
125 let (using, params, iter) = self.destruct();
126 let filter = move |u: &mut U::Item, x: &Self::Item| filter(u, x);
127 UParXapFilterXap::new(
128 using,
129 params,
130 iter,
131 u_map_self_atom,
132 filter,
133 u_map_self_atom,
134 )
135 }
136
137 fn flat_map<IOut, FlatMap>(
138 self,
139 flat_map: FlatMap,
140 ) -> impl ParIterUsing<U, R, Item = IOut::Item>
141 where
142 IOut: IntoIterator + Send + Sync,
143 IOut::IntoIter: Send + Sync,
144 IOut::Item: Send + Sync,
145 FlatMap: Fn(&mut U::Item, Self::Item) -> IOut + Send + Sync + Clone,
146 {
147 let (using, params, iter) = self.destruct();
148 let x1 = move |u: &mut U::Item, i: Self::Item| Vector(flat_map(u, i));
149 UParXap::new(using, params, iter, x1)
150 }
151
152 fn filter_map<Out, FilterMap>(
153 self,
154 filter_map: FilterMap,
155 ) -> impl ParIterUsing<U, R, Item = Out>
156 where
157 Out: Send + Sync,
158 FilterMap: Fn(&mut <U as Using>::Item, Self::Item) -> Option<Out> + Send + Sync + Clone,
159 {
160 let (using, params, iter) = self.destruct();
161 let x1 = move |u: &mut U::Item, x: Self::Item| filter_map(u, x);
162 UParXap::new(using, params, iter, x1)
163 }
164
165 fn collect_into<C>(self, output: C) -> C
168 where
169 C: ParCollectInto<Self::Item>,
170 {
171 output.u_m_collect_into::<R, _, _, _>(self.u_m())
172 }
173
174 fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
177 where
178 Reduce: Fn(&mut U::Item, Self::Item, Self::Item) -> Self::Item + Send + Sync,
179 {
180 self.u_m().reduce::<R, _>(reduce).1
181 }
182
183 fn first(self) -> Option<Self::Item> {
186 self.u_m().next()
187 }
188}