orx_parallel/computational_variants/
map.rs

1use super::{xap::ParXap, xap_filter_xap::ParXapFilterXap};
2use crate::{
3    ChunkSize, IterationOrder, NumThreads, ParCollectInto, ParIter, Params,
4    computations::{Atom, M, Vector, map_self_atom},
5    runner::{DefaultRunner, ParallelRunner},
6};
7use orx_concurrent_iter::ConcurrentIter;
8use std::marker::PhantomData;
9
10/// A parallel iterator that maps inputs.
11pub struct ParMap<I, O, M1, R = DefaultRunner>
12where
13    R: ParallelRunner,
14    I: ConcurrentIter,
15    O: Send + Sync,
16    M1: Fn(I::Item) -> O + Send + Sync + Clone,
17{
18    m: M<I, O, M1>,
19    phantom: PhantomData<R>,
20}
21
22impl<I, O, M1, R> ParMap<I, O, M1, R>
23where
24    R: ParallelRunner,
25    I: ConcurrentIter,
26    O: Send + Sync,
27    M1: Fn(I::Item) -> O + Send + Sync + Clone,
28{
29    pub(crate) fn new(params: Params, iter: I, m1: M1) -> Self {
30        Self {
31            m: M::new(params, iter, m1),
32            phantom: PhantomData,
33        }
34    }
35
36    fn destruct(self) -> (Params, I, M1) {
37        self.m.destruct()
38    }
39}
40
41unsafe impl<I, O, M1, R> Send for ParMap<I, O, M1, R>
42where
43    R: ParallelRunner,
44    I: ConcurrentIter,
45    O: Send + Sync,
46    M1: Fn(I::Item) -> O + Send + Sync + Clone,
47{
48}
49
50unsafe impl<I, O, M1, R> Sync for ParMap<I, O, M1, R>
51where
52    R: ParallelRunner,
53    I: ConcurrentIter,
54    O: Send + Sync,
55    M1: Fn(I::Item) -> O + Send + Sync + Clone,
56{
57}
58
59impl<I, O, M1, R> ParIter<R> for ParMap<I, O, M1, R>
60where
61    R: ParallelRunner,
62    I: ConcurrentIter,
63    O: Send + Sync,
64    M1: Fn(I::Item) -> O + Send + Sync + Clone,
65{
66    type Item = O;
67
68    fn con_iter(&self) -> &impl ConcurrentIter {
69        self.m.iter()
70    }
71
72    fn params(&self) -> &Params {
73        self.m.params()
74    }
75
76    // params transformations
77
78    fn num_threads(mut self, num_threads: impl Into<NumThreads>) -> Self {
79        self.m.num_threads(num_threads);
80        self
81    }
82
83    fn chunk_size(mut self, chunk_size: impl Into<ChunkSize>) -> Self {
84        self.m.chunk_size(chunk_size);
85        self
86    }
87
88    fn iteration_order(mut self, collect: IterationOrder) -> Self {
89        self.m.iteration_order(collect);
90        self
91    }
92
93    fn with_runner<Q: ParallelRunner>(self) -> impl ParIter<Q, Item = Self::Item> {
94        let (params, iter, map) = self.destruct();
95        ParMap::new(params, iter, map)
96    }
97
98    // computation transformations
99
100    fn map<Out, Map>(self, map: Map) -> impl ParIter<R, Item = Out>
101    where
102        Out: Send + Sync,
103        Map: Fn(Self::Item) -> Out + Send + Sync + Clone,
104    {
105        let (params, iter, m1) = self.destruct();
106        let m1 = move |x| map(m1(x));
107        ParMap::new(params, iter, m1)
108    }
109
110    fn filter<Filter>(self, filter: Filter) -> impl ParIter<R, Item = Self::Item>
111    where
112        Filter: Fn(&Self::Item) -> bool + Send + Sync,
113    {
114        let (params, iter, m1) = self.destruct();
115        let m1 = move |i: I::Item| Atom(m1(i));
116        ParXapFilterXap::new(params, iter, m1, filter, map_self_atom)
117    }
118
119    fn flat_map<IOut, FlatMap>(self, flat_map: FlatMap) -> impl ParIter<R, Item = IOut::Item>
120    where
121        IOut: IntoIterator + Send + Sync,
122        IOut::IntoIter: Send + Sync,
123        IOut::Item: Send + Sync,
124        FlatMap: Fn(Self::Item) -> IOut + Send + Sync,
125    {
126        let (params, iter, m1) = self.destruct();
127        let x1 = move |i: I::Item| Vector(flat_map(m1(i)));
128        ParXap::new(params, iter, x1)
129    }
130
131    fn filter_map<Out, FilterMap>(self, filter_map: FilterMap) -> impl ParIter<R, Item = Out>
132    where
133        Out: Send + Sync,
134        FilterMap: Fn(Self::Item) -> Option<Out> + Send + Sync + Clone,
135    {
136        let (params, iter, m1) = self.destruct();
137        let x1 = move |i: I::Item| filter_map(m1(i));
138        ParXap::new(params, iter, x1)
139    }
140
141    // collect
142
143    fn collect_into<C>(self, output: C) -> C
144    where
145        C: ParCollectInto<Self::Item>,
146    {
147        output.m_collect_into::<R, _, _>(self.m)
148    }
149
150    // reduce
151
152    fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
153    where
154        Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Send + Sync,
155    {
156        self.m.reduce::<R, _>(reduce).1
157    }
158
159    // early exit
160
161    fn first(self) -> Option<Self::Item> {
162        self.m.next()
163    }
164}