orx_parallel/using/computational_variants/
u_map.rs

1use crate::{
2    ChunkSize, IterationOrder, NumThreads, ParCollectInto, Params,
3    computations::{Atom, Vector},
4    runner::{DefaultRunner, ParallelRunner},
5    using::u_par_iter::ParIterUsing,
6    using::{
7        Using,
8        computational_variants::{u_xap::UParXap, u_xap_filter_xap::UParXapFilterXap},
9        computations::{UM, u_map_self_atom},
10    },
11};
12use orx_concurrent_iter::ConcurrentIter;
13use std::marker::PhantomData;
14
15/// A parallel iterator that maps inputs.
16pub struct UParMap<U, I, O, M1, R = DefaultRunner>
17where
18    R: ParallelRunner,
19    U: Using,
20    I: ConcurrentIter,
21    O: Send + Sync,
22    M1: Fn(&mut U::Item, I::Item) -> O + Send + Sync + Clone,
23{
24    um: UM<U, I, O, M1>,
25    phantom: PhantomData<R>,
26}
27
28impl<U, I, O, M1, R> UParMap<U, I, O, M1, R>
29where
30    R: ParallelRunner,
31    U: Using,
32    I: ConcurrentIter,
33    O: Send + Sync,
34    M1: Fn(&mut U::Item, I::Item) -> O + Send + Sync + Clone,
35{
36    pub(crate) fn new(using: U, params: Params, iter: I, m1: M1) -> Self {
37        Self {
38            um: UM::new(using, params, iter, m1),
39            phantom: PhantomData,
40        }
41    }
42
43    fn destruct(self) -> (U, Params, I, M1) {
44        self.um.destruct()
45    }
46}
47
48unsafe impl<U, I, O, M1, R> Send for UParMap<U, I, O, M1, R>
49where
50    R: ParallelRunner,
51    U: Using,
52    I: ConcurrentIter,
53    O: Send + Sync,
54    M1: Fn(&mut U::Item, I::Item) -> O + Send + Sync + Clone,
55{
56}
57
58unsafe impl<U, I, O, M1, R> Sync for UParMap<U, I, O, M1, R>
59where
60    R: ParallelRunner,
61    U: Using,
62    I: ConcurrentIter,
63    O: Send + Sync,
64    M1: Fn(&mut U::Item, I::Item) -> O + Send + Sync + Clone,
65{
66}
67
68impl<U, I, O, M1, R> ParIterUsing<U, R> for UParMap<U, I, O, M1, R>
69where
70    R: ParallelRunner,
71    U: Using,
72    I: ConcurrentIter,
73    O: Send + Sync,
74    M1: Fn(&mut U::Item, I::Item) -> O + Send + Sync + Clone,
75{
76    type Item = O;
77
78    fn con_iter(&self) -> &impl ConcurrentIter {
79        self.um.iter()
80    }
81
82    fn params(&self) -> Params {
83        self.um.params()
84    }
85
86    // parameter transformations
87
88    fn num_threads(mut self, num_threads: impl Into<NumThreads>) -> Self {
89        self.um.num_threads(num_threads);
90        self
91    }
92
93    fn chunk_size(mut self, chunk_size: impl Into<ChunkSize>) -> Self {
94        self.um.chunk_size(chunk_size);
95        self
96    }
97
98    fn iteration_order(mut self, collect: IterationOrder) -> Self {
99        self.um.iteration_order(collect);
100        self
101    }
102
103    fn with_runner<Q: ParallelRunner>(self) -> impl ParIterUsing<U, Q, Item = Self::Item> {
104        let (using, params, iter, map) = self.destruct();
105        UParMap::new(using, params, iter, map)
106    }
107
108    // computation transformations
109
110    fn map<Out, Map>(self, map: Map) -> impl ParIterUsing<U, R, Item = Out>
111    where
112        Out: Send + Sync,
113        Map: Fn(&mut U::Item, Self::Item) -> Out + Send + Sync + Clone,
114    {
115        let (using, params, iter, m1) = self.destruct();
116        let m1 = move |u: &mut U::Item, x: I::Item| {
117            let v1 = m1(u, x);
118            map(u, v1)
119        };
120        UParMap::new(using, params, iter, m1)
121    }
122
123    fn filter<Filter>(self, filter: Filter) -> impl ParIterUsing<U, R, Item = Self::Item>
124    where
125        Filter: Fn(&mut U::Item, &Self::Item) -> bool + Send + Sync + Clone,
126    {
127        let (using, params, iter, m1) = self.destruct();
128        let m1 = move |u: &mut U::Item, i: I::Item| Atom(m1(u, i));
129        let filter = move |u: &mut U::Item, i: &Self::Item| filter(u, i);
130        UParXapFilterXap::new(using, params, iter, m1, filter, u_map_self_atom)
131    }
132
133    fn flat_map<IOut, FlatMap>(
134        self,
135        flat_map: FlatMap,
136    ) -> impl ParIterUsing<U, R, Item = IOut::Item>
137    where
138        IOut: IntoIterator + Send + Sync,
139        IOut::IntoIter: Send + Sync,
140        IOut::Item: Send + Sync,
141        FlatMap: Fn(&mut U::Item, Self::Item) -> IOut + Send + Sync + Clone,
142    {
143        let (using, params, iter, m1) = self.destruct();
144        let x1 = move |u: &mut U::Item, i: I::Item| {
145            let a = m1(u, i);
146            Vector(flat_map(u, a))
147        };
148        UParXap::new(using, params, iter, x1)
149    }
150
151    fn filter_map<Out, FilterMap>(
152        self,
153        filter_map: FilterMap,
154    ) -> impl ParIterUsing<U, R, Item = Out>
155    where
156        Out: Send + Sync,
157        FilterMap: Fn(&mut U::Item, Self::Item) -> Option<Out> + Send + Sync + Clone,
158    {
159        let (using, params, iter, m1) = self.destruct();
160        let x1 = move |u: &mut U::Item, i: I::Item| {
161            let a = m1(u, i);
162            filter_map(u, a)
163        };
164        UParXap::new(using, params, iter, x1)
165    }
166
167    // collect
168
169    fn collect_into<C>(self, output: C) -> C
170    where
171        C: ParCollectInto<Self::Item>,
172    {
173        output.u_m_collect_into::<R, _, _, _>(self.um)
174    }
175
176    // reduce
177
178    fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
179    where
180        Reduce: Fn(&mut U::Item, Self::Item, Self::Item) -> Self::Item + Send + Sync,
181    {
182        self.um.reduce::<R, _>(reduce).1
183    }
184
185    // early exit
186
187    fn first(self) -> Option<Self::Item> {
188        self.um.next()
189    }
190}