orx_parallel/computational_variants/
map.rs

1use super::xap::ParXap;
2use crate::computational_variants::fallible_result::ParMapResult;
3use crate::executor::parallel_compute as prc;
4use crate::generic_values::{Vector, WhilstAtom};
5use crate::par_iter_result::IntoResult;
6use crate::runner::{DefaultRunner, ParallelRunner};
7use crate::using::{UParMap, UsingClone, UsingFun};
8use crate::{ChunkSize, IterationOrder, NumThreads, ParCollectInto, ParIter, Params};
9use crate::{ParIterResult, ParIterUsing};
10use orx_concurrent_iter::ConcurrentIter;
11
12/// A parallel iterator that maps inputs.
13pub struct ParMap<I, O, M1, R = DefaultRunner>
14where
15    R: ParallelRunner,
16    I: ConcurrentIter,
17    M1: Fn(I::Item) -> O + Sync,
18{
19    orchestrator: R,
20    params: Params,
21    iter: I,
22    map1: M1,
23}
24
25impl<I, O, M1, R> ParMap<I, O, M1, R>
26where
27    R: ParallelRunner,
28    I: ConcurrentIter,
29    M1: Fn(I::Item) -> O + Sync,
30{
31    pub(crate) fn new(orchestrator: R, params: Params, iter: I, map1: M1) -> Self {
32        Self {
33            orchestrator,
34            params,
35            iter,
36            map1,
37        }
38    }
39
40    pub(crate) fn destruct(self) -> (R, Params, I, M1) {
41        (self.orchestrator, self.params, self.iter, self.map1)
42    }
43}
44
45unsafe impl<I, O, M1, R> Send for ParMap<I, O, M1, R>
46where
47    R: ParallelRunner,
48    I: ConcurrentIter,
49    M1: Fn(I::Item) -> O + Sync,
50{
51}
52
53unsafe impl<I, O, M1, R> Sync for ParMap<I, O, M1, R>
54where
55    R: ParallelRunner,
56    I: ConcurrentIter,
57    M1: Fn(I::Item) -> O + Sync,
58{
59}
60
61impl<I, O, M1, R> ParIter<R> for ParMap<I, O, M1, R>
62where
63    R: ParallelRunner,
64    I: ConcurrentIter,
65    M1: Fn(I::Item) -> O + Sync,
66{
67    type Item = O;
68
69    fn con_iter(&self) -> &impl ConcurrentIter {
70        &self.iter
71    }
72
73    fn params(&self) -> Params {
74        self.params
75    }
76
77    // params transformations
78
79    fn num_threads(mut self, num_threads: impl Into<NumThreads>) -> Self {
80        self.params = self.params.with_num_threads(num_threads);
81        self
82    }
83
84    fn chunk_size(mut self, chunk_size: impl Into<ChunkSize>) -> Self {
85        self.params = self.params.with_chunk_size(chunk_size);
86        self
87    }
88
89    fn iteration_order(mut self, collect: IterationOrder) -> Self {
90        self.params = self.params.with_collect_ordering(collect);
91        self
92    }
93
94    fn with_runner<Q: ParallelRunner>(self, orchestrator: Q) -> impl ParIter<Q, Item = Self::Item> {
95        let (_, params, iter, map) = self.destruct();
96        ParMap::new(orchestrator, params, iter, map)
97    }
98
99    // using transformations
100
101    fn using<U, F>(
102        self,
103        using: F,
104    ) -> impl ParIterUsing<UsingFun<F, U>, R, Item = <Self as ParIter<R>>::Item>
105    where
106        U: 'static,
107        F: Fn(usize) -> U + Sync,
108    {
109        let using = UsingFun::new(using);
110        let (orchestrator, params, iter, x1) = self.destruct();
111        let m1 = move |_: &mut U, t: I::Item| x1(t);
112        UParMap::new(using, orchestrator, params, iter, m1)
113    }
114
115    fn using_clone<U>(
116        self,
117        value: U,
118    ) -> impl ParIterUsing<UsingClone<U>, R, Item = <Self as ParIter<R>>::Item>
119    where
120        U: Clone + 'static,
121    {
122        let using = UsingClone::new(value);
123        let (orchestrator, params, iter, x1) = self.destruct();
124        let m1 = move |_: &mut U, t: I::Item| x1(t);
125        UParMap::new(using, orchestrator, params, iter, m1)
126    }
127
128    // computation transformations
129
130    fn map<Out, Map>(self, map: Map) -> impl ParIter<R, Item = Out>
131    where
132        Map: Fn(Self::Item) -> Out + Sync,
133    {
134        let (orchestrator, params, iter, m1) = self.destruct();
135        let m1 = move |x| map(m1(x));
136        ParMap::new(orchestrator, params, iter, m1)
137    }
138
139    fn filter<Filter>(self, filter: Filter) -> impl ParIter<R, Item = Self::Item>
140    where
141        Filter: Fn(&Self::Item) -> bool + Sync,
142    {
143        let (orchestrator, params, iter, m1) = self.destruct();
144
145        let x1 = move |i: I::Item| {
146            let value = m1(i);
147            filter(&value).then_some(value)
148        };
149        ParXap::new(orchestrator, params, iter, x1)
150    }
151
152    fn flat_map<IOut, FlatMap>(self, flat_map: FlatMap) -> impl ParIter<R, Item = IOut::Item>
153    where
154        IOut: IntoIterator,
155        FlatMap: Fn(Self::Item) -> IOut + Sync,
156    {
157        let (orchestrator, params, iter, m1) = self.destruct();
158        let x1 = move |i: I::Item| Vector(flat_map(m1(i)));
159        ParXap::new(orchestrator, params, iter, x1)
160    }
161
162    fn filter_map<Out, FilterMap>(self, filter_map: FilterMap) -> impl ParIter<R, Item = Out>
163    where
164        FilterMap: Fn(Self::Item) -> Option<Out> + Sync,
165    {
166        let (orchestrator, params, iter, m1) = self.destruct();
167        let x1 = move |i: I::Item| filter_map(m1(i));
168        ParXap::new(orchestrator, params, iter, x1)
169    }
170
171    fn take_while<While>(self, take_while: While) -> impl ParIter<R, Item = Self::Item>
172    where
173        While: Fn(&Self::Item) -> bool + Sync,
174    {
175        let (orchestrator, params, iter, m1) = self.destruct();
176        let x1 = move |value: I::Item| WhilstAtom::new(m1(value), &take_while);
177        ParXap::new(orchestrator, params, iter, x1)
178    }
179
180    fn into_fallible_result<Out, Err>(self) -> impl ParIterResult<R, Item = Out, Err = Err>
181    where
182        Self::Item: IntoResult<Out, Err>,
183    {
184        ParMapResult::new(self)
185    }
186
187    // collect
188
189    fn collect_into<C>(self, output: C) -> C
190    where
191        C: ParCollectInto<Self::Item>,
192    {
193        let (orchestrator, params, iter, m1) = self.destruct();
194        output.m_collect_into(orchestrator, params, iter, m1)
195    }
196
197    // reduce
198
199    fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
200    where
201        Self::Item: Send,
202        Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Sync,
203    {
204        let (orchestrator, params, iter, m1) = self.destruct();
205        prc::reduce::m(orchestrator, params, iter, m1, reduce).1
206    }
207
208    // early exit
209
210    fn first(self) -> Option<Self::Item>
211    where
212        Self::Item: Send,
213    {
214        let (orchestrator, params, iter, m1) = self.destruct();
215        match params.iteration_order {
216            IterationOrder::Ordered => prc::next::m(orchestrator, params, iter, m1).1,
217            IterationOrder::Arbitrary => prc::next_any::m(orchestrator, params, iter, m1).1,
218        }
219    }
220}