orx_parallel/computational_variants/
map.rs

1use super::{xap::ParXap, xap_filter_xap::ParXapFilterXap};
2use crate::{
3    ChunkSize, IterationOrder, NumThreads, ParCollectInto, ParIter, ParIterUsing, Params,
4    computations::{Atom, M, Vector, map_self_atom},
5    runner::{DefaultRunner, ParallelRunner},
6    using::computational_variants::UParMap,
7    using::{UsingClone, UsingFun},
8};
9use orx_concurrent_iter::ConcurrentIter;
10use std::marker::PhantomData;
11
12/// A parallel iterator that maps inputs.
13pub struct ParMap<I, O, M1, R = DefaultRunner>
14where
15    R: ParallelRunner,
16    I: ConcurrentIter,
17    O: Send + Sync,
18    M1: Fn(I::Item) -> O + Send + Sync + Clone,
19{
20    m: M<I, O, M1>,
21    phantom: PhantomData<R>,
22}
23
24impl<I, O, M1, R> ParMap<I, O, M1, R>
25where
26    R: ParallelRunner,
27    I: ConcurrentIter,
28    O: Send + Sync,
29    M1: Fn(I::Item) -> O + Send + Sync + Clone,
30{
31    pub(crate) fn new(params: Params, iter: I, m1: M1) -> Self {
32        Self {
33            m: M::new(params, iter, m1),
34            phantom: PhantomData,
35        }
36    }
37
38    fn destruct(self) -> (Params, I, M1) {
39        self.m.destruct()
40    }
41}
42
43unsafe impl<I, O, M1, R> Send for ParMap<I, O, M1, R>
44where
45    R: ParallelRunner,
46    I: ConcurrentIter,
47    O: Send + Sync,
48    M1: Fn(I::Item) -> O + Send + Sync + Clone,
49{
50}
51
52unsafe impl<I, O, M1, R> Sync for ParMap<I, O, M1, R>
53where
54    R: ParallelRunner,
55    I: ConcurrentIter,
56    O: Send + Sync,
57    M1: Fn(I::Item) -> O + Send + Sync + Clone,
58{
59}
60
61impl<I, O, M1, R> ParIter<R> for ParMap<I, O, M1, R>
62where
63    R: ParallelRunner,
64    I: ConcurrentIter,
65    O: Send + Sync,
66    M1: Fn(I::Item) -> O + Send + Sync + Clone,
67{
68    type Item = O;
69
70    fn con_iter(&self) -> &impl ConcurrentIter {
71        self.m.iter()
72    }
73
74    fn params(&self) -> Params {
75        self.m.params()
76    }
77
78    // params transformations
79
80    fn num_threads(mut self, num_threads: impl Into<NumThreads>) -> Self {
81        self.m.num_threads(num_threads);
82        self
83    }
84
85    fn chunk_size(mut self, chunk_size: impl Into<ChunkSize>) -> Self {
86        self.m.chunk_size(chunk_size);
87        self
88    }
89
90    fn iteration_order(mut self, collect: IterationOrder) -> Self {
91        self.m.iteration_order(collect);
92        self
93    }
94
95    fn with_runner<Q: ParallelRunner>(self) -> impl ParIter<Q, Item = Self::Item> {
96        let (params, iter, map) = self.destruct();
97        ParMap::new(params, iter, map)
98    }
99
100    // using transformations
101
102    fn using<U, F>(
103        self,
104        using: F,
105    ) -> impl ParIterUsing<UsingFun<F, U>, R, Item = <Self as ParIter<R>>::Item>
106    where
107        U: Send,
108        F: FnMut(usize) -> U,
109    {
110        let using = UsingFun::new(using);
111        let (params, iter, m1) = self.destruct();
112        let m1 = move |_: &mut U, t: I::Item| m1(t);
113        UParMap::new(using, params, iter, m1)
114    }
115
116    fn using_clone<U>(
117        self,
118        using: U,
119    ) -> impl ParIterUsing<UsingClone<U>, R, Item = <Self as ParIter<R>>::Item>
120    where
121        U: Clone + Send,
122    {
123        let using = UsingClone::new(using);
124        let (params, iter, m1) = self.destruct();
125        let m1 = move |_: &mut U, t: I::Item| m1(t);
126        UParMap::new(using, params, iter, m1)
127    }
128
129    // computation transformations
130
131    fn map<Out, Map>(self, map: Map) -> impl ParIter<R, Item = Out>
132    where
133        Out: Send + Sync,
134        Map: Fn(Self::Item) -> Out + Send + Sync + Clone,
135    {
136        let (params, iter, m1) = self.destruct();
137        let m1 = move |x| map(m1(x));
138        ParMap::new(params, iter, m1)
139    }
140
141    fn filter<Filter>(self, filter: Filter) -> impl ParIter<R, Item = Self::Item>
142    where
143        Filter: Fn(&Self::Item) -> bool + Send + Sync,
144    {
145        let (params, iter, m1) = self.destruct();
146        let m1 = move |i: I::Item| Atom(m1(i));
147        ParXapFilterXap::new(params, iter, m1, filter, map_self_atom)
148    }
149
150    fn flat_map<IOut, FlatMap>(self, flat_map: FlatMap) -> impl ParIter<R, Item = IOut::Item>
151    where
152        IOut: IntoIterator + Send + Sync,
153        IOut::IntoIter: Send + Sync,
154        IOut::Item: Send + Sync,
155        FlatMap: Fn(Self::Item) -> IOut + Send + Sync,
156    {
157        let (params, iter, m1) = self.destruct();
158        let x1 = move |i: I::Item| Vector(flat_map(m1(i)));
159        ParXap::new(params, iter, x1)
160    }
161
162    fn filter_map<Out, FilterMap>(self, filter_map: FilterMap) -> impl ParIter<R, Item = Out>
163    where
164        Out: Send + Sync,
165        FilterMap: Fn(Self::Item) -> Option<Out> + Send + Sync + Clone,
166    {
167        let (params, iter, m1) = self.destruct();
168        let x1 = move |i: I::Item| filter_map(m1(i));
169        ParXap::new(params, iter, x1)
170    }
171
172    // collect
173
174    fn collect_into<C>(self, output: C) -> C
175    where
176        C: ParCollectInto<Self::Item>,
177    {
178        output.m_collect_into::<R, _, _>(self.m)
179    }
180
181    // reduce
182
183    fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
184    where
185        Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Send + Sync,
186    {
187        self.m.reduce::<R, _>(reduce).1
188    }
189
190    // early exit
191
192    fn first(self) -> Option<Self::Item> {
193        self.m.next()
194    }
195}