orx_parallel/computational_variants/
map.rs

1use super::xap::ParXap;
2use crate::ParIterResult;
3use crate::computational_variants::fallible_result::ParMapResult;
4use crate::generic_values::{Vector, WhilstAtom};
5use crate::par_iter_result::IntoResult;
6use crate::{
7    ChunkSize, IterationOrder, NumThreads, ParCollectInto, ParIter, ParIterUsing, Params,
8    computations::M,
9    runner::{DefaultRunner, ParallelRunner},
10    using::{UsingClone, UsingFun, computational_variants::UParMap},
11};
12use orx_concurrent_iter::ConcurrentIter;
13use std::marker::PhantomData;
14
15/// A parallel iterator that maps inputs.
16pub struct ParMap<I, O, M1, R = DefaultRunner>
17where
18    R: ParallelRunner,
19    I: ConcurrentIter,
20    M1: Fn(I::Item) -> O + Sync,
21{
22    m: M<I, O, M1>,
23    phantom: PhantomData<R>,
24}
25
26impl<I, O, M1, R> ParMap<I, O, M1, R>
27where
28    R: ParallelRunner,
29    I: ConcurrentIter,
30    M1: Fn(I::Item) -> O + Sync,
31{
32    pub(crate) fn new(params: Params, iter: I, m1: M1) -> Self {
33        Self {
34            m: M::new(params, iter, m1),
35            phantom: PhantomData,
36        }
37    }
38
39    pub(crate) fn destruct(self) -> (Params, I, M1) {
40        self.m.destruct()
41    }
42}
43
44unsafe impl<I, O, M1, R> Send for ParMap<I, O, M1, R>
45where
46    R: ParallelRunner,
47    I: ConcurrentIter,
48    M1: Fn(I::Item) -> O + Sync,
49{
50}
51
52unsafe impl<I, O, M1, R> Sync for ParMap<I, O, M1, R>
53where
54    R: ParallelRunner,
55    I: ConcurrentIter,
56    M1: Fn(I::Item) -> O + Sync,
57{
58}
59
60impl<I, O, M1, R> ParIter<R> for ParMap<I, O, M1, R>
61where
62    R: ParallelRunner,
63    I: ConcurrentIter,
64    M1: Fn(I::Item) -> O + Sync,
65{
66    type Item = O;
67
68    fn con_iter(&self) -> &impl ConcurrentIter {
69        self.m.iter()
70    }
71
72    fn params(&self) -> Params {
73        self.m.params()
74    }
75
76    // params transformations
77
78    fn num_threads(mut self, num_threads: impl Into<NumThreads>) -> Self {
79        self.m.num_threads(num_threads);
80        self
81    }
82
83    fn chunk_size(mut self, chunk_size: impl Into<ChunkSize>) -> Self {
84        self.m.chunk_size(chunk_size);
85        self
86    }
87
88    fn iteration_order(mut self, collect: IterationOrder) -> Self {
89        self.m.iteration_order(collect);
90        self
91    }
92
93    fn with_runner<Q: ParallelRunner>(self) -> impl ParIter<Q, Item = Self::Item> {
94        let (params, iter, map) = self.destruct();
95        ParMap::new(params, iter, map)
96    }
97
98    // using transformations
99
100    fn using<U, F>(
101        self,
102        using: F,
103    ) -> impl ParIterUsing<UsingFun<F, U>, R, Item = <Self as ParIter<R>>::Item>
104    where
105        U: Send + 'static,
106        F: FnMut(usize) -> U,
107    {
108        let using = UsingFun::new(using);
109        let (params, iter, m1) = self.destruct();
110        let m1 = move |_: &mut U, t: I::Item| m1(t);
111        UParMap::new(using, params, iter, m1)
112    }
113
114    fn using_clone<U>(
115        self,
116        using: U,
117    ) -> impl ParIterUsing<UsingClone<U>, R, Item = <Self as ParIter<R>>::Item>
118    where
119        U: Clone + Send + 'static,
120    {
121        let using = UsingClone::new(using);
122        let (params, iter, m1) = self.destruct();
123        let m1 = move |_: &mut U, t: I::Item| m1(t);
124        UParMap::new(using, params, iter, m1)
125    }
126
127    // computation transformations
128
129    fn map<Out, Map>(self, map: Map) -> impl ParIter<R, Item = Out>
130    where
131        Map: Fn(Self::Item) -> Out + Sync,
132    {
133        let (params, iter, m1) = self.destruct();
134        let m1 = move |x| map(m1(x));
135        ParMap::new(params, iter, m1)
136    }
137
138    fn filter<Filter>(self, filter: Filter) -> impl ParIter<R, Item = Self::Item>
139    where
140        Filter: Fn(&Self::Item) -> bool + Sync,
141    {
142        let (params, iter, m1) = self.destruct();
143
144        let x1 = move |i: I::Item| {
145            let value = m1(i);
146            filter(&value).then_some(value)
147        };
148        ParXap::new(params, iter, x1)
149    }
150
151    fn flat_map<IOut, FlatMap>(self, flat_map: FlatMap) -> impl ParIter<R, Item = IOut::Item>
152    where
153        IOut: IntoIterator,
154        FlatMap: Fn(Self::Item) -> IOut + Sync,
155    {
156        let (params, iter, m1) = self.destruct();
157        let x1 = move |i: I::Item| Vector(flat_map(m1(i)));
158        ParXap::new(params, iter, x1)
159    }
160
161    fn filter_map<Out, FilterMap>(self, filter_map: FilterMap) -> impl ParIter<R, Item = Out>
162    where
163        FilterMap: Fn(Self::Item) -> Option<Out> + Sync,
164    {
165        let (params, iter, m1) = self.destruct();
166        let x1 = move |i: I::Item| filter_map(m1(i));
167        ParXap::new(params, iter, x1)
168    }
169
170    fn take_while<While>(self, take_while: While) -> impl ParIter<R, Item = Self::Item>
171    where
172        While: Fn(&Self::Item) -> bool + Sync,
173    {
174        let (params, iter, m1) = self.destruct();
175        let x1 = move |value: I::Item| WhilstAtom::new(m1(value), &take_while);
176        ParXap::new(params, iter, x1)
177    }
178
179    fn into_fallible_result<Out, Err>(self) -> impl ParIterResult<R, Item = Out, Err = Err>
180    where
181        Self::Item: IntoResult<Out, Err>,
182    {
183        ParMapResult::new(self)
184    }
185
186    // collect
187
188    fn collect_into<C>(self, output: C) -> C
189    where
190        C: ParCollectInto<Self::Item>,
191    {
192        output.m_collect_into::<R, _, _>(self.m)
193    }
194
195    // reduce
196
197    fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
198    where
199        Self::Item: Send,
200        Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Sync,
201    {
202        self.m.reduce::<R, _>(reduce).1
203    }
204
205    // early exit
206
207    fn first(self) -> Option<Self::Item>
208    where
209        Self::Item: Send,
210    {
211        match self.params().iteration_order {
212            IterationOrder::Ordered => self.m.next::<R>().1,
213            IterationOrder::Arbitrary => self.m.next_any::<R>().1,
214        }
215    }
216}