orx_parallel/using/computational_variants/
u_par.rs1use crate::{
2 ChunkSize, IterationOrder, NumThreads, ParCollectInto, Params,
3 generic_values::Vector,
4 runner::{DefaultRunner, ParallelRunner},
5 using::u_par_iter::ParIterUsing,
6 using::{
7 Using,
8 computational_variants::{u_map::UParMap, u_xap::UParXap},
9 computations::{UM, u_map_self},
10 },
11};
12use orx_concurrent_iter::ConcurrentIter;
13use std::marker::PhantomData;
14
15pub struct UPar<U, I, R = DefaultRunner>
17where
18 U: Using,
19 R: ParallelRunner,
20 I: ConcurrentIter,
21{
22 using: U,
23 iter: I,
24 params: Params,
25 phantom: PhantomData<R>,
26}
27
28impl<U, I, R> UPar<U, I, R>
29where
30 U: Using,
31 R: ParallelRunner,
32 I: ConcurrentIter,
33{
34 pub(crate) fn new(using: U, params: Params, iter: I) -> Self {
35 Self {
36 using,
37 iter,
38 params,
39 phantom: PhantomData,
40 }
41 }
42
43 fn destruct(self) -> (U, Params, I) {
44 (self.using, self.params, self.iter)
45 }
46
47 #[allow(clippy::type_complexity)]
48 fn u_m(self) -> UM<U, I, I::Item, impl Fn(&mut U::Item, I::Item) -> I::Item> {
49 let (using, params, iter) = self.destruct();
50 UM::new(using, params, iter, u_map_self)
51 }
52}
53
54unsafe impl<U, I, R> Send for UPar<U, I, R>
55where
56 U: Using,
57 R: ParallelRunner,
58 I: ConcurrentIter,
59{
60}
61
62unsafe impl<U, I, R> Sync for UPar<U, I, R>
63where
64 U: Using,
65 R: ParallelRunner,
66 I: ConcurrentIter,
67{
68}
69
70impl<U, I, R> ParIterUsing<U, R> for UPar<U, I, R>
71where
72 U: Using,
73 R: ParallelRunner,
74 I: ConcurrentIter,
75{
76 type Item = I::Item;
77
78 fn con_iter(&self) -> &impl ConcurrentIter {
79 &self.iter
80 }
81
82 fn params(&self) -> Params {
83 self.params
84 }
85
86 fn num_threads(mut self, num_threads: impl Into<NumThreads>) -> Self {
89 self.params = self.params.with_num_threads(num_threads);
90 self
91 }
92
93 fn chunk_size(mut self, chunk_size: impl Into<ChunkSize>) -> Self {
94 self.params = self.params.with_chunk_size(chunk_size);
95 self
96 }
97
98 fn iteration_order(mut self, collect: IterationOrder) -> Self {
99 self.params = self.params.with_collect_ordering(collect);
100 self
101 }
102
103 fn with_runner<Q: ParallelRunner>(self) -> impl ParIterUsing<U, Q, Item = Self::Item> {
104 UPar::new(self.using, self.params, self.iter)
105 }
106
107 fn map<Out, Map>(self, map: Map) -> impl ParIterUsing<U, R, Item = Out>
110 where
111 Map: Fn(&mut <U as Using>::Item, Self::Item) -> Out + Sync + Clone,
112 {
113 let (using, params, iter) = self.destruct();
114 let map = move |u: &mut U::Item, x: Self::Item| map(u, x);
115 UParMap::new(using, params, iter, map)
116 }
117
118 fn filter<Filter>(self, filter: Filter) -> impl ParIterUsing<U, R, Item = Self::Item>
119 where
120 Filter: Fn(&mut U::Item, &Self::Item) -> bool + Sync + Clone,
121 {
122 let (using, params, iter) = self.destruct();
123 let x1 = move |u: &mut U::Item, i: Self::Item| filter(u, &i).then_some(i);
124 UParXap::new(using, params, iter, x1)
125 }
126
127 fn flat_map<IOut, FlatMap>(
128 self,
129 flat_map: FlatMap,
130 ) -> impl ParIterUsing<U, R, Item = IOut::Item>
131 where
132 IOut: IntoIterator,
133 FlatMap: Fn(&mut U::Item, Self::Item) -> IOut + Sync + Clone,
134 {
135 let (using, params, iter) = self.destruct();
136 let x1 = move |u: &mut U::Item, i: Self::Item| Vector(flat_map(u, i));
137 UParXap::new(using, params, iter, x1)
138 }
139
140 fn filter_map<Out, FilterMap>(
141 self,
142 filter_map: FilterMap,
143 ) -> impl ParIterUsing<U, R, Item = Out>
144 where
145 FilterMap: Fn(&mut <U as Using>::Item, Self::Item) -> Option<Out> + Sync + Clone,
146 {
147 let (using, params, iter) = self.destruct();
148 let x1 = move |u: &mut U::Item, x: Self::Item| filter_map(u, x);
149 UParXap::new(using, params, iter, x1)
150 }
151
152 fn collect_into<C>(self, output: C) -> C
155 where
156 C: ParCollectInto<Self::Item>,
157 {
158 output.u_m_collect_into::<R, _, _, _>(self.u_m())
159 }
160
161 fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
164 where
165 Self::Item: Send,
166 Reduce: Fn(&mut U::Item, Self::Item, Self::Item) -> Self::Item + Sync,
167 {
168 self.u_m().reduce::<R, _>(reduce).1
169 }
170
171 fn first(self) -> Option<Self::Item>
174 where
175 Self::Item: Send,
176 {
177 self.u_m().next::<R>().1
178 }
179}