orx_parallel/computational_variants/
xap.rs

1use crate::computational_variants::fallible_result::ParXapResult;
2use crate::executor::parallel_compute as prc;
3use crate::generic_values::TransformableValues;
4use crate::generic_values::runner_results::Infallible;
5use crate::par_iter_result::IntoResult;
6use crate::runner::{DefaultRunner, ParallelRunner};
7use crate::using::{UParXap, UsingClone, UsingFun};
8use crate::{ChunkSize, IterationOrder, NumThreads, ParCollectInto, ParIter, Params};
9use crate::{ParIterResult, ParIterUsing};
10use orx_concurrent_iter::ConcurrentIter;
11
12/// A parallel iterator that xaps inputs.
13///
14/// *xap* is a generalization of  one-to-one map, filter-map and flat-map operations.
15pub struct ParXap<I, Vo, X1, R = DefaultRunner>
16where
17    R: ParallelRunner,
18    I: ConcurrentIter,
19    Vo: TransformableValues<Fallibility = Infallible>,
20    X1: Fn(I::Item) -> Vo + Sync,
21{
22    orchestrator: R,
23    params: Params,
24    iter: I,
25    xap1: X1,
26}
27
28impl<I, Vo, X1, R> ParXap<I, Vo, X1, R>
29where
30    R: ParallelRunner,
31    I: ConcurrentIter,
32    Vo: TransformableValues<Fallibility = Infallible>,
33    X1: Fn(I::Item) -> Vo + Sync,
34{
35    pub(crate) fn new(orchestrator: R, params: Params, iter: I, xap1: X1) -> Self {
36        Self {
37            orchestrator,
38            params,
39            iter,
40            xap1,
41        }
42    }
43
44    pub(crate) fn destruct(self) -> (R, Params, I, X1) {
45        (self.orchestrator, self.params, self.iter, self.xap1)
46    }
47}
48
49unsafe impl<I, Vo, X1, R> Send for ParXap<I, Vo, X1, R>
50where
51    R: ParallelRunner,
52    I: ConcurrentIter,
53    Vo: TransformableValues<Fallibility = Infallible>,
54    X1: Fn(I::Item) -> Vo + Sync,
55{
56}
57
58unsafe impl<I, Vo, X1, R> Sync for ParXap<I, Vo, X1, R>
59where
60    R: ParallelRunner,
61    I: ConcurrentIter,
62    Vo: TransformableValues<Fallibility = Infallible>,
63    X1: Fn(I::Item) -> Vo + Sync,
64{
65}
66
67impl<I, Vo, X1, R> ParIter<R> for ParXap<I, Vo, X1, R>
68where
69    R: ParallelRunner,
70    I: ConcurrentIter,
71    Vo: TransformableValues<Fallibility = Infallible>,
72    X1: Fn(I::Item) -> Vo + Sync,
73{
74    type Item = Vo::Item;
75
76    fn con_iter(&self) -> &impl ConcurrentIter {
77        &self.iter
78    }
79
80    fn params(&self) -> Params {
81        self.params
82    }
83
84    // params transformations
85
86    fn num_threads(mut self, num_threads: impl Into<NumThreads>) -> Self {
87        self.params = self.params.with_num_threads(num_threads);
88        self
89    }
90
91    fn chunk_size(mut self, chunk_size: impl Into<ChunkSize>) -> Self {
92        self.params = self.params.with_chunk_size(chunk_size);
93        self
94    }
95
96    fn iteration_order(mut self, collect: IterationOrder) -> Self {
97        self.params = self.params.with_collect_ordering(collect);
98        self
99    }
100
101    fn with_runner<Q: ParallelRunner>(self, orchestrator: Q) -> impl ParIter<Q, Item = Self::Item> {
102        let (_, params, iter, x1) = self.destruct();
103        ParXap::new(orchestrator, params, iter, x1)
104    }
105
106    // using transformations
107
108    fn using<U, F>(
109        self,
110        using: F,
111    ) -> impl ParIterUsing<UsingFun<F, U>, R, Item = <Self as ParIter<R>>::Item>
112    where
113        U: 'static,
114        F: Fn(usize) -> U + Sync,
115    {
116        let using = UsingFun::new(using);
117        let (orchestrator, params, iter, x1) = self.destruct();
118        let m1 = move |_: &mut U, t: I::Item| x1(t);
119        UParXap::new(using, orchestrator, params, iter, m1)
120    }
121
122    fn using_clone<U>(
123        self,
124        value: U,
125    ) -> impl ParIterUsing<UsingClone<U>, R, Item = <Self as ParIter<R>>::Item>
126    where
127        U: Clone + 'static,
128    {
129        let using = UsingClone::new(value);
130        let (orchestrator, params, iter, x1) = self.destruct();
131        let m1 = move |_: &mut U, t: I::Item| x1(t);
132        UParXap::new(using, orchestrator, params, iter, m1)
133    }
134
135    // computation transformations
136
137    fn map<Out, Map>(self, map: Map) -> impl ParIter<R, Item = Out>
138    where
139        Map: Fn(Self::Item) -> Out + Sync + Clone,
140    {
141        let (orchestrator, params, iter, x1) = self.destruct();
142        let x1 = move |i: I::Item| {
143            let vo = x1(i);
144            vo.map(map.clone())
145        };
146
147        ParXap::new(orchestrator, params, iter, x1)
148    }
149
150    fn filter<Filter>(self, filter: Filter) -> impl ParIter<R, Item = Self::Item>
151    where
152        Filter: Fn(&Self::Item) -> bool + Sync + Clone,
153    {
154        let (orchestrator, params, iter, x1) = self.destruct();
155        let x1 = move |i: I::Item| {
156            let values = x1(i);
157            values.filter(filter.clone())
158        };
159        ParXap::new(orchestrator, params, iter, x1)
160    }
161
162    fn flat_map<IOut, FlatMap>(self, flat_map: FlatMap) -> impl ParIter<R, Item = IOut::Item>
163    where
164        IOut: IntoIterator,
165        FlatMap: Fn(Self::Item) -> IOut + Sync + Clone,
166    {
167        let (orchestrator, params, iter, x1) = self.destruct();
168        let x1 = move |i: I::Item| {
169            let vo = x1(i);
170            vo.flat_map(flat_map.clone())
171        };
172        ParXap::new(orchestrator, params, iter, x1)
173    }
174
175    fn filter_map<Out, FilterMap>(self, filter_map: FilterMap) -> impl ParIter<R, Item = Out>
176    where
177        FilterMap: Fn(Self::Item) -> Option<Out> + Sync + Clone,
178    {
179        let (orchestrator, params, iter, x1) = self.destruct();
180        let x1 = move |i: I::Item| {
181            let vo = x1(i);
182            vo.filter_map(filter_map.clone())
183        };
184        ParXap::new(orchestrator, params, iter, x1)
185    }
186
187    fn take_while<While>(self, take_while: While) -> impl ParIter<R, Item = Self::Item>
188    where
189        While: Fn(&Self::Item) -> bool + Sync + Clone,
190    {
191        let (orchestrator, params, iter, x1) = self.destruct();
192        let x1 = move |i: I::Item| {
193            let vo = x1(i);
194            vo.whilst(take_while.clone())
195        };
196        ParXap::new(orchestrator, params, iter, x1)
197    }
198
199    fn into_fallible_result<Out, Err>(self) -> impl ParIterResult<R, Item = Out, Err = Err>
200    where
201        Self::Item: IntoResult<Out, Err>,
202    {
203        let (orchestrator, params, iter, x1) = self.destruct();
204        ParXapResult::new(orchestrator, params, iter, x1)
205    }
206
207    // collect
208
209    fn collect_into<C>(self, output: C) -> C
210    where
211        C: ParCollectInto<Self::Item>,
212    {
213        let (orchestrator, params, iter, x1) = self.destruct();
214        output.x_collect_into(orchestrator, params, iter, x1)
215    }
216
217    // reduce
218
219    fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
220    where
221        Self::Item: Send,
222        Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Sync,
223    {
224        let (orchestrator, params, iter, x1) = self.destruct();
225        let (_, Ok(acc)) = prc::reduce::x(orchestrator, params, iter, x1, reduce);
226        acc
227    }
228
229    // early exit
230
231    fn first(self) -> Option<Self::Item>
232    where
233        Self::Item: Send,
234    {
235        let (orchestrator, params, iter, x1) = self.destruct();
236        match params.iteration_order {
237            IterationOrder::Ordered => {
238                let (_num_threads, Ok(result)) = prc::next::x(orchestrator, params, iter, x1);
239                result.map(|x| x.1)
240            }
241            IterationOrder::Arbitrary => {
242                let (_num_threads, Ok(result)) = prc::next_any::x(orchestrator, params, iter, x1);
243                result
244            }
245        }
246    }
247}