orx_parallel/computational_variants/
xap.rs

1use crate::{
2    ChunkSize, IterationOrder, NumThreads, ParCollectInto, ParIter, Params,
3    computational_variants::xap_filter_xap::ParXapFilterXap,
4    computations::{Values, X, map_self_atom},
5    runner::{DefaultRunner, ParallelRunner},
6};
7use orx_concurrent_iter::ConcurrentIter;
8use std::marker::PhantomData;
9
10/// A parallel iterator that xaps inputs.
11///
12/// *xap* is a generalization of  one-to-one map, filter-map and flat-map operations.
13pub struct ParXap<I, Vo, M1, R = DefaultRunner>
14where
15    R: ParallelRunner,
16    I: ConcurrentIter,
17    Vo: Values + Send + Sync,
18    Vo::Item: Send + Sync,
19    M1: Fn(I::Item) -> Vo + Send + Sync,
20{
21    x: X<I, Vo, M1>,
22    phantom: PhantomData<R>,
23}
24
25impl<I, Vo, M1, R> ParXap<I, Vo, M1, R>
26where
27    R: ParallelRunner,
28    I: ConcurrentIter,
29    Vo: Values + Send + Sync,
30    Vo::Item: Send + Sync,
31    M1: Fn(I::Item) -> Vo + Send + Sync,
32{
33    pub(crate) fn new(params: Params, iter: I, x1: M1) -> Self {
34        Self {
35            x: X::new(params, iter, x1),
36            phantom: PhantomData,
37        }
38    }
39
40    fn destruct(self) -> (Params, I, M1) {
41        self.x.destruct()
42    }
43}
44
45unsafe impl<I, Vo, M1, R> Send for ParXap<I, Vo, M1, R>
46where
47    R: ParallelRunner,
48    I: ConcurrentIter,
49    Vo: Values + Send + Sync,
50    Vo::Item: Send + Sync,
51    M1: Fn(I::Item) -> Vo + Send + Sync,
52{
53}
54
55unsafe impl<I, Vo, M1, R> Sync for ParXap<I, Vo, M1, R>
56where
57    R: ParallelRunner,
58    I: ConcurrentIter,
59    Vo: Values + Send + Sync,
60    Vo::Item: Send + Sync,
61    M1: Fn(I::Item) -> Vo + Send + Sync,
62{
63}
64
65impl<I, Vo, M1, R> ParIter<R> for ParXap<I, Vo, M1, R>
66where
67    R: ParallelRunner,
68    I: ConcurrentIter,
69    Vo: Values + Send + Sync,
70    Vo::Item: Send + Sync,
71    M1: Fn(I::Item) -> Vo + Send + Sync,
72{
73    type Item = Vo::Item;
74
75    fn con_iter(&self) -> &impl ConcurrentIter {
76        self.x.iter()
77    }
78
79    fn params(&self) -> &Params {
80        self.x.params()
81    }
82
83    // params transformations
84
85    fn num_threads(mut self, num_threads: impl Into<NumThreads>) -> Self {
86        self.x.num_threads(num_threads);
87        self
88    }
89
90    fn chunk_size(mut self, chunk_size: impl Into<ChunkSize>) -> Self {
91        self.x.chunk_size(chunk_size);
92        self
93    }
94
95    fn iteration_order(mut self, collect: IterationOrder) -> Self {
96        self.x.iteration_order(collect);
97        self
98    }
99
100    fn with_runner<Q: ParallelRunner>(self) -> impl ParIter<Q, Item = Self::Item> {
101        let (params, iter, map1) = self.destruct();
102        ParXap::new(params, iter, map1)
103    }
104
105    // computation transformations
106
107    fn map<Out, Map>(self, map: Map) -> impl ParIter<R, Item = Out>
108    where
109        Out: Send + Sync,
110        Map: Fn(Self::Item) -> Out + Send + Sync + Clone,
111    {
112        let (params, iter, x1) = self.destruct();
113        let x1 = move |i: I::Item| {
114            let vo = x1(i);
115            vo.map(map.clone())
116        };
117
118        ParXap::new(params, iter, x1)
119    }
120
121    fn filter<Filter>(self, filter: Filter) -> impl ParIter<R, Item = Self::Item>
122    where
123        Filter: Fn(&Self::Item) -> bool + Send + Sync,
124    {
125        let (params, iter, x1) = self.destruct();
126        ParXapFilterXap::new(params, iter, x1, filter, map_self_atom)
127    }
128
129    fn flat_map<IOut, FlatMap>(self, flat_map: FlatMap) -> impl ParIter<R, Item = IOut::Item>
130    where
131        IOut: IntoIterator + Send + Sync,
132        IOut::IntoIter: Send + Sync,
133        IOut::Item: Send + Sync,
134        FlatMap: Fn(Self::Item) -> IOut + Send + Sync + Clone,
135    {
136        let (params, iter, x1) = self.destruct();
137        let x1 = move |i: I::Item| {
138            let vo = x1(i);
139            vo.flat_map(flat_map.clone())
140        };
141        ParXap::new(params, iter, x1)
142    }
143
144    fn filter_map<Out, FilterMap>(self, filter_map: FilterMap) -> impl ParIter<R, Item = Out>
145    where
146        Out: Send + Sync,
147        FilterMap: Fn(Self::Item) -> Option<Out> + Send + Sync + Clone,
148    {
149        let (params, iter, x1) = self.destruct();
150        let x1 = move |i: I::Item| {
151            let vo = x1(i);
152            vo.flat_map(filter_map.clone())
153        };
154        ParXap::new(params, iter, x1)
155    }
156
157    // collect
158
159    fn collect_into<C>(self, output: C) -> C
160    where
161        C: ParCollectInto<Self::Item>,
162    {
163        output.x_collect_into::<R, _, _, _>(self.x)
164    }
165
166    // reduce
167
168    fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
169    where
170        Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Send + Sync,
171    {
172        self.x.reduce::<R, _>(reduce).1
173    }
174
175    // early exit
176
177    fn first(self) -> Option<Self::Item> {
178        self.x.next()
179    }
180}