orx_parallel/computational_variants/
par.rs1use super::{map::ParMap, xap::ParXap, xap_filter_xap::ParXapFilterXap};
2use crate::{
3 ChunkSize, IterationOrder, NumThreads, ParCollectInto, ParIter, ParIterUsing, Params,
4 computations::{M, Vector, map_self, map_self_atom},
5 runner::{DefaultRunner, ParallelRunner},
6 using::computational_variants::UPar,
7 using::{UsingClone, UsingFun},
8};
9use orx_concurrent_iter::ConcurrentIter;
10use std::marker::PhantomData;
11
12pub struct Par<I, R = DefaultRunner>
14where
15 R: ParallelRunner,
16 I: ConcurrentIter,
17{
18 iter: I,
19 params: Params,
20 phantom: PhantomData<R>,
21}
22
23impl<I, R> Par<I, R>
24where
25 R: ParallelRunner,
26 I: ConcurrentIter,
27{
28 pub(crate) fn new(params: Params, iter: I) -> Self {
29 Self {
30 iter,
31 params,
32 phantom: PhantomData,
33 }
34 }
35
36 fn destruct(self) -> (Params, I) {
37 (self.params, self.iter)
38 }
39
40 fn m(self) -> M<I, I::Item, impl Fn(I::Item) -> I::Item> {
41 let (params, iter) = self.destruct();
42 M::new(params, iter, map_self)
43 }
44}
45
46unsafe impl<I, R> Send for Par<I, R>
47where
48 R: ParallelRunner,
49 I: ConcurrentIter,
50{
51}
52
53unsafe impl<I, R> Sync for Par<I, R>
54where
55 R: ParallelRunner,
56 I: ConcurrentIter,
57{
58}
59
60impl<I, R> ParIter<R> for Par<I, R>
61where
62 R: ParallelRunner,
63 I: ConcurrentIter,
64{
65 type Item = I::Item;
66
67 fn con_iter(&self) -> &impl ConcurrentIter {
68 &self.iter
69 }
70
71 fn params(&self) -> Params {
72 self.params
73 }
74
75 fn num_threads(mut self, num_threads: impl Into<NumThreads>) -> Self {
78 self.params = self.params.with_num_threads(num_threads);
79 self
80 }
81
82 fn chunk_size(mut self, chunk_size: impl Into<ChunkSize>) -> Self {
83 self.params = self.params.with_chunk_size(chunk_size);
84 self
85 }
86
87 fn iteration_order(mut self, collect: IterationOrder) -> Self {
88 self.params = self.params.with_collect_ordering(collect);
89 self
90 }
91
92 fn with_runner<Q: ParallelRunner>(self) -> impl ParIter<Q, Item = Self::Item> {
93 Par::new(self.params, self.iter)
94 }
95
96 fn using<U, F>(
99 self,
100 using: F,
101 ) -> impl ParIterUsing<UsingFun<F, U>, R, Item = <Self as ParIter<R>>::Item>
102 where
103 U: Send,
104 F: FnMut(usize) -> U,
105 {
106 let using = UsingFun::new(using);
107 UPar::new(using, self.params, self.iter)
108 }
109
110 fn using_clone<U>(
111 self,
112 using: U,
113 ) -> impl ParIterUsing<UsingClone<U>, R, Item = <Self as ParIter<R>>::Item>
114 where
115 U: Clone + Send,
116 {
117 let using = UsingClone::new(using);
118 UPar::new(using, self.params, self.iter)
119 }
120
121 fn map<Out, Map>(self, map: Map) -> impl ParIter<R, Item = Out>
124 where
125 Out: Send + Sync,
126 Map: Fn(Self::Item) -> Out + Send + Sync + Clone,
127 {
128 let (params, iter) = self.destruct();
129 ParMap::new(params, iter, map)
130 }
131
132 fn filter<Filter>(self, filter: Filter) -> impl ParIter<R, Item = Self::Item>
133 where
134 Filter: Fn(&Self::Item) -> bool + Send + Sync,
135 {
136 let (params, iter) = self.destruct();
137 ParXapFilterXap::new(params, iter, map_self_atom, filter, map_self_atom)
138 }
139
140 fn flat_map<IOut, FlatMap>(self, flat_map: FlatMap) -> impl ParIter<R, Item = IOut::Item>
141 where
142 IOut: IntoIterator + Send + Sync,
143 IOut::IntoIter: Send + Sync,
144 IOut::Item: Send + Sync,
145 FlatMap: Fn(Self::Item) -> IOut + Send + Sync,
146 {
147 let (params, iter) = self.destruct();
148 let x1 = move |i: Self::Item| Vector(flat_map(i)); ParXap::new(params, iter, x1)
150 }
151
152 fn filter_map<Out, FilterMap>(self, filter_map: FilterMap) -> impl ParIter<R, Item = Out>
153 where
154 Out: Send + Sync,
155 FilterMap: Fn(Self::Item) -> Option<Out> + Send + Sync + Clone,
156 {
157 let (params, iter) = self.destruct();
158 ParXap::new(params, iter, filter_map)
159 }
160
161 fn collect_into<C>(self, output: C) -> C
164 where
165 C: ParCollectInto<Self::Item>,
166 {
167 output.m_collect_into::<R, _, _>(self.m())
168 }
169
170 fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
173 where
174 Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Send + Sync,
175 {
176 self.m().reduce::<R, _>(reduce).1
177 }
178
179 fn first(self) -> Option<Self::Item> {
182 self.m().next()
183 }
184}