1use super::{map::ParMap, xap::ParXap};
2use crate::computational_variants::fallible_result::ParResult;
3use crate::executor::parallel_compute as prc;
4use crate::generic_values::{Vector, WhilstAtom};
5use crate::par_iter_result::IntoResult;
6use crate::runner::{DefaultRunner, ParallelRunner};
7use crate::using::{UPar, UsingClone, UsingFun};
8use crate::{
9 ChunkSize, IterationOrder, NumThreads, ParCollectInto, ParIter, Params, default_fns::map_self,
10};
11use crate::{IntoParIter, ParIterResult, ParIterUsing};
12use orx_concurrent_iter::chain::ChainKnownLenI;
13use orx_concurrent_iter::{ConcurrentIter, ExactSizeConcurrentIter};
14
15pub struct Par<I, R = DefaultRunner>
17where
18 R: ParallelRunner,
19 I: ConcurrentIter,
20{
21 orchestrator: R,
22 params: Params,
23 iter: I,
24}
25
26impl<I, R> Par<I, R>
27where
28 R: ParallelRunner,
29 I: ConcurrentIter,
30{
31 pub(crate) fn new(orchestrator: R, params: Params, iter: I) -> Self {
32 Self {
33 orchestrator,
34 iter,
35 params,
36 }
37 }
38
39 pub(crate) fn destruct(self) -> (R, Params, I) {
40 (self.orchestrator, self.params, self.iter)
41 }
42
43 pub(crate) fn orchestrator(&self) -> &R {
44 &self.orchestrator
45 }
46}
47
48unsafe impl<I, R> Send for Par<I, R>
49where
50 R: ParallelRunner,
51 I: ConcurrentIter,
52{
53}
54
55unsafe impl<I, R> Sync for Par<I, R>
56where
57 R: ParallelRunner,
58 I: ConcurrentIter,
59{
60}
61
62impl<I, R> ParIter<R> for Par<I, R>
63where
64 R: ParallelRunner,
65 I: ConcurrentIter,
66{
67 type Item = I::Item;
68
69 fn con_iter(&self) -> &impl ConcurrentIter {
70 &self.iter
71 }
72
73 fn params(&self) -> Params {
74 self.params
75 }
76
77 fn num_threads(mut self, num_threads: impl Into<NumThreads>) -> Self {
80 self.params = self.params.with_num_threads(num_threads);
81 self
82 }
83
84 fn chunk_size(mut self, chunk_size: impl Into<ChunkSize>) -> Self {
85 self.params = self.params.with_chunk_size(chunk_size);
86 self
87 }
88
89 fn iteration_order(mut self, collect: IterationOrder) -> Self {
90 self.params = self.params.with_collect_ordering(collect);
91 self
92 }
93
94 fn with_runner<Q: ParallelRunner>(self, orchestrator: Q) -> impl ParIter<Q, Item = Self::Item> {
95 Par::new(orchestrator, self.params, self.iter)
96 }
97
98 fn using<U, F>(
101 self,
102 using: F,
103 ) -> impl ParIterUsing<UsingFun<F, U>, R, Item = <Self as ParIter<R>>::Item>
104 where
105 U: 'static,
106 F: Fn(usize) -> U + Sync,
107 {
108 let using = UsingFun::new(using);
109 let (orchestrator, params, iter) = self.destruct();
110 UPar::new(using, orchestrator, params, iter)
111 }
112
113 fn using_clone<U>(
114 self,
115 value: U,
116 ) -> impl ParIterUsing<UsingClone<U>, R, Item = <Self as ParIter<R>>::Item>
117 where
118 U: Clone + 'static,
119 {
120 let using = UsingClone::new(value);
121 let (orchestrator, params, iter) = self.destruct();
122 UPar::new(using, orchestrator, params, iter)
123 }
124
125 fn map<Out, Map>(self, map: Map) -> impl ParIter<R, Item = Out>
128 where
129 Map: Fn(Self::Item) -> Out + Sync,
130 {
131 let (orchestrator, params, iter) = self.destruct();
132 ParMap::new(orchestrator, params, iter, map)
133 }
134
135 fn filter<Filter>(self, filter: Filter) -> impl ParIter<R, Item = Self::Item>
136 where
137 Filter: Fn(&Self::Item) -> bool + Sync,
138 {
139 let (orchestrator, params, iter) = self.destruct();
140 let x1 = move |i: Self::Item| filter(&i).then_some(i);
141 ParXap::new(orchestrator, params, iter, x1)
142 }
143
144 fn flat_map<IOut, FlatMap>(self, flat_map: FlatMap) -> impl ParIter<R, Item = IOut::Item>
145 where
146 IOut: IntoIterator,
147 FlatMap: Fn(Self::Item) -> IOut + Sync,
148 {
149 let (orchestrator, params, iter) = self.destruct();
150 let x1 = move |i: Self::Item| Vector(flat_map(i));
151 ParXap::new(orchestrator, params, iter, x1)
152 }
153
154 fn filter_map<Out, FilterMap>(self, filter_map: FilterMap) -> impl ParIter<R, Item = Out>
155 where
156 FilterMap: Fn(Self::Item) -> Option<Out> + Sync,
157 {
158 let (orchestrator, params, iter) = self.destruct();
159 ParXap::new(orchestrator, params, iter, filter_map)
160 }
161
162 fn take_while<While>(self, take_while: While) -> impl ParIter<R, Item = Self::Item>
163 where
164 While: Fn(&Self::Item) -> bool + Sync,
165 {
166 let (orchestrator, params, iter) = self.destruct();
167 let x1 = move |value: Self::Item| WhilstAtom::new(value, &take_while);
168 ParXap::new(orchestrator, params, iter, x1)
169 }
170
171 fn into_fallible_result<Out, Err>(self) -> impl ParIterResult<R, Item = Out, Err = Err>
172 where
173 Self::Item: IntoResult<Out, Err>,
174 {
175 ParResult::new(self)
176 }
177
178 fn collect_into<C>(self, output: C) -> C
181 where
182 C: ParCollectInto<Self::Item>,
183 {
184 let (orchestrator, params, iter) = self.destruct();
185 output.m_collect_into(orchestrator, params, iter, map_self)
186 }
187
188 fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
191 where
192 Self::Item: Send,
193 Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Sync,
194 {
195 let (orchestrator, params, iter) = self.destruct();
196 prc::reduce::m(orchestrator, params, iter, map_self, reduce).1
197 }
198
199 fn first(self) -> Option<Self::Item> {
202 let (orchestrator, params, iter) = self.destruct();
203 match params.iteration_order {
204 IterationOrder::Ordered => prc::next::m(orchestrator, params, iter, map_self).1,
205 IterationOrder::Arbitrary => prc::next_any::m(orchestrator, params, iter, map_self).1,
206 }
207 }
208}
209
210impl<I, R> Par<I, R>
211where
212 R: ParallelRunner,
213 I: ConcurrentIter,
214{
215 pub fn chain<C>(self, other: C) -> Par<ChainKnownLenI<I, C::IntoIter>, R>
234 where
235 I: ExactSizeConcurrentIter,
236 C: IntoParIter<Item = I::Item>,
237 {
238 let (orchestrator, params, iter) = self.destruct();
239 let iter = iter.chain(other.into_con_iter());
240 Par::new(orchestrator, params, iter)
241 }
242}