orx_parallel/computational_variants/
xap.rs

1use crate::{
2    ChunkSize, IterationOrder, NumThreads, ParCollectInto, ParIter, ParIterUsing, Params,
3    computational_variants::xap_filter_xap::ParXapFilterXap,
4    computations::{Values, X, map_self_atom},
5    runner::{DefaultRunner, ParallelRunner},
6    using::{UsingClone, UsingFun, computational_variants::UParXap},
7};
8use orx_concurrent_iter::ConcurrentIter;
9use std::marker::PhantomData;
10
11/// A parallel iterator that xaps inputs.
12///
13/// *xap* is a generalization of  one-to-one map, filter-map and flat-map operations.
14pub struct ParXap<I, Vo, M1, R = DefaultRunner>
15where
16    R: ParallelRunner,
17    I: ConcurrentIter,
18    Vo: Values + Send + Sync,
19    Vo::Item: Send + Sync,
20    M1: Fn(I::Item) -> Vo + Send + Sync,
21{
22    x: X<I, Vo, M1>,
23    phantom: PhantomData<R>,
24}
25
26impl<I, Vo, M1, R> ParXap<I, Vo, M1, R>
27where
28    R: ParallelRunner,
29    I: ConcurrentIter,
30    Vo: Values + Send + Sync,
31    Vo::Item: Send + Sync,
32    M1: Fn(I::Item) -> Vo + Send + Sync,
33{
34    pub(crate) fn new(params: Params, iter: I, x1: M1) -> Self {
35        Self {
36            x: X::new(params, iter, x1),
37            phantom: PhantomData,
38        }
39    }
40
41    fn destruct(self) -> (Params, I, M1) {
42        self.x.destruct()
43    }
44}
45
46unsafe impl<I, Vo, M1, R> Send for ParXap<I, Vo, M1, R>
47where
48    R: ParallelRunner,
49    I: ConcurrentIter,
50    Vo: Values + Send + Sync,
51    Vo::Item: Send + Sync,
52    M1: Fn(I::Item) -> Vo + Send + Sync,
53{
54}
55
56unsafe impl<I, Vo, M1, R> Sync for ParXap<I, Vo, M1, R>
57where
58    R: ParallelRunner,
59    I: ConcurrentIter,
60    Vo: Values + Send + Sync,
61    Vo::Item: Send + Sync,
62    M1: Fn(I::Item) -> Vo + Send + Sync,
63{
64}
65
66impl<I, Vo, M1, R> ParIter<R> for ParXap<I, Vo, M1, R>
67where
68    R: ParallelRunner,
69    I: ConcurrentIter,
70    Vo: Values + Send + Sync,
71    Vo::Item: Send + Sync,
72    M1: Fn(I::Item) -> Vo + Send + Sync,
73{
74    type Item = Vo::Item;
75
76    fn con_iter(&self) -> &impl ConcurrentIter {
77        self.x.iter()
78    }
79
80    fn params(&self) -> Params {
81        self.x.params()
82    }
83
84    // params transformations
85
86    fn num_threads(mut self, num_threads: impl Into<NumThreads>) -> Self {
87        self.x.num_threads(num_threads);
88        self
89    }
90
91    fn chunk_size(mut self, chunk_size: impl Into<ChunkSize>) -> Self {
92        self.x.chunk_size(chunk_size);
93        self
94    }
95
96    fn iteration_order(mut self, collect: IterationOrder) -> Self {
97        self.x.iteration_order(collect);
98        self
99    }
100
101    fn with_runner<Q: ParallelRunner>(self) -> impl ParIter<Q, Item = Self::Item> {
102        let (params, iter, map1) = self.destruct();
103        ParXap::new(params, iter, map1)
104    }
105
106    // using transformations
107
108    fn using<U, F>(
109        self,
110        using: F,
111    ) -> impl ParIterUsing<UsingFun<F, U>, R, Item = <Self as ParIter<R>>::Item>
112    where
113        U: Send,
114        F: FnMut(usize) -> U,
115    {
116        let using = UsingFun::new(using);
117        let (params, iter, x1) = self.destruct();
118        let m1 = move |_: &mut U, t: I::Item| x1(t);
119        UParXap::new(using, params, iter, m1)
120    }
121
122    fn using_clone<U>(
123        self,
124        using: U,
125    ) -> impl ParIterUsing<UsingClone<U>, R, Item = <Self as ParIter<R>>::Item>
126    where
127        U: Clone + Send,
128    {
129        let using = UsingClone::new(using);
130        let (params, iter, x1) = self.destruct();
131        let m1 = move |_: &mut U, t: I::Item| x1(t);
132        UParXap::new(using, params, iter, m1)
133    }
134
135    // computation transformations
136
137    fn map<Out, Map>(self, map: Map) -> impl ParIter<R, Item = Out>
138    where
139        Out: Send + Sync,
140        Map: Fn(Self::Item) -> Out + Send + Sync + Clone,
141    {
142        let (params, iter, x1) = self.destruct();
143        let x1 = move |i: I::Item| {
144            let vo = x1(i);
145            vo.map(map.clone())
146        };
147
148        ParXap::new(params, iter, x1)
149    }
150
151    fn filter<Filter>(self, filter: Filter) -> impl ParIter<R, Item = Self::Item>
152    where
153        Filter: Fn(&Self::Item) -> bool + Send + Sync,
154    {
155        let (params, iter, x1) = self.destruct();
156        ParXapFilterXap::new(params, iter, x1, filter, map_self_atom)
157    }
158
159    fn flat_map<IOut, FlatMap>(self, flat_map: FlatMap) -> impl ParIter<R, Item = IOut::Item>
160    where
161        IOut: IntoIterator + Send + Sync,
162        IOut::IntoIter: Send + Sync,
163        IOut::Item: Send + Sync,
164        FlatMap: Fn(Self::Item) -> IOut + Send + Sync + Clone,
165    {
166        let (params, iter, x1) = self.destruct();
167        let x1 = move |i: I::Item| {
168            let vo = x1(i);
169            vo.flat_map(flat_map.clone())
170        };
171        ParXap::new(params, iter, x1)
172    }
173
174    fn filter_map<Out, FilterMap>(self, filter_map: FilterMap) -> impl ParIter<R, Item = Out>
175    where
176        Out: Send + Sync,
177        FilterMap: Fn(Self::Item) -> Option<Out> + Send + Sync + Clone,
178    {
179        let (params, iter, x1) = self.destruct();
180        let x1 = move |i: I::Item| {
181            let vo = x1(i);
182            vo.flat_map(filter_map.clone())
183        };
184        ParXap::new(params, iter, x1)
185    }
186
187    // collect
188
189    fn collect_into<C>(self, output: C) -> C
190    where
191        C: ParCollectInto<Self::Item>,
192    {
193        output.x_collect_into::<R, _, _, _>(self.x)
194    }
195
196    // reduce
197
198    fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
199    where
200        Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Send + Sync,
201    {
202        self.x.reduce::<R, _>(reduce).1
203    }
204
205    // early exit
206
207    fn first(self) -> Option<Self::Item> {
208        self.x.next()
209    }
210}