orx_parallel/using/computational_variants/
u_map.rs

1use crate::{
2    ChunkSize, IterationOrder, NumThreads, ParCollectInto, Params,
3    generic_values::Vector,
4    runner::{DefaultRunner, ParallelRunner},
5    using::u_par_iter::ParIterUsing,
6    using::{Using, computational_variants::u_xap::UParXap, computations::UM},
7};
8use orx_concurrent_iter::ConcurrentIter;
9use std::marker::PhantomData;
10
11/// A parallel iterator that maps inputs.
12pub struct UParMap<U, I, O, M1, R = DefaultRunner>
13where
14    R: ParallelRunner,
15    U: Using,
16    I: ConcurrentIter,
17    M1: Fn(&mut U::Item, I::Item) -> O + Sync,
18{
19    um: UM<U, I, O, M1>,
20    phantom: PhantomData<R>,
21}
22
23impl<U, I, O, M1, R> UParMap<U, I, O, M1, R>
24where
25    R: ParallelRunner,
26    U: Using,
27    I: ConcurrentIter,
28    M1: Fn(&mut U::Item, I::Item) -> O + Sync,
29{
30    pub(crate) fn new(using: U, params: Params, iter: I, m1: M1) -> Self {
31        Self {
32            um: UM::new(using, params, iter, m1),
33            phantom: PhantomData,
34        }
35    }
36
37    fn destruct(self) -> (U, Params, I, M1) {
38        self.um.destruct()
39    }
40}
41
42unsafe impl<U, I, O, M1, R> Send for UParMap<U, I, O, M1, R>
43where
44    R: ParallelRunner,
45    U: Using,
46    I: ConcurrentIter,
47    M1: Fn(&mut U::Item, I::Item) -> O + Sync,
48{
49}
50
51unsafe impl<U, I, O, M1, R> Sync for UParMap<U, I, O, M1, R>
52where
53    R: ParallelRunner,
54    U: Using,
55    I: ConcurrentIter,
56    M1: Fn(&mut U::Item, I::Item) -> O + Sync,
57{
58}
59
60impl<U, I, O, M1, R> ParIterUsing<U, R> for UParMap<U, I, O, M1, R>
61where
62    R: ParallelRunner,
63    U: Using,
64    I: ConcurrentIter,
65    M1: Fn(&mut U::Item, I::Item) -> O + Sync,
66{
67    type Item = O;
68
69    fn con_iter(&self) -> &impl ConcurrentIter {
70        self.um.iter()
71    }
72
73    fn params(&self) -> Params {
74        self.um.params()
75    }
76
77    // parameter transformations
78
79    fn num_threads(mut self, num_threads: impl Into<NumThreads>) -> Self {
80        self.um.num_threads(num_threads);
81        self
82    }
83
84    fn chunk_size(mut self, chunk_size: impl Into<ChunkSize>) -> Self {
85        self.um.chunk_size(chunk_size);
86        self
87    }
88
89    fn iteration_order(mut self, collect: IterationOrder) -> Self {
90        self.um.iteration_order(collect);
91        self
92    }
93
94    fn with_runner<Q: ParallelRunner>(self) -> impl ParIterUsing<U, Q, Item = Self::Item> {
95        let (using, params, iter, map) = self.destruct();
96        UParMap::new(using, params, iter, map)
97    }
98
99    // computation transformations
100
101    fn map<Out, Map>(self, map: Map) -> impl ParIterUsing<U, R, Item = Out>
102    where
103        Map: Fn(&mut U::Item, Self::Item) -> Out + Sync + Clone,
104    {
105        let (using, params, iter, m1) = self.destruct();
106        let m1 = move |u: &mut U::Item, x: I::Item| {
107            let v1 = m1(u, x);
108            map(u, v1)
109        };
110        UParMap::new(using, params, iter, m1)
111    }
112
113    fn filter<Filter>(self, filter: Filter) -> impl ParIterUsing<U, R, Item = Self::Item>
114    where
115        Filter: Fn(&mut U::Item, &Self::Item) -> bool + Sync + Clone,
116    {
117        let (using, params, iter, m1) = self.destruct();
118
119        let x1 = move |u: &mut U::Item, i: I::Item| {
120            let value = m1(u, i);
121            filter(u, &value).then_some(value)
122        };
123
124        UParXap::new(using, params, iter, x1)
125    }
126
127    fn flat_map<IOut, FlatMap>(
128        self,
129        flat_map: FlatMap,
130    ) -> impl ParIterUsing<U, R, Item = IOut::Item>
131    where
132        IOut: IntoIterator,
133        FlatMap: Fn(&mut U::Item, Self::Item) -> IOut + Sync + Clone,
134    {
135        let (using, params, iter, m1) = self.destruct();
136        let x1 = move |u: &mut U::Item, i: I::Item| {
137            let a = m1(u, i);
138            Vector(flat_map(u, a))
139        };
140        UParXap::new(using, params, iter, x1)
141    }
142
143    fn filter_map<Out, FilterMap>(
144        self,
145        filter_map: FilterMap,
146    ) -> impl ParIterUsing<U, R, Item = Out>
147    where
148        FilterMap: Fn(&mut U::Item, Self::Item) -> Option<Out> + Sync + Clone,
149    {
150        let (using, params, iter, m1) = self.destruct();
151        let x1 = move |u: &mut U::Item, i: I::Item| {
152            let a = m1(u, i);
153            filter_map(u, a)
154        };
155        UParXap::new(using, params, iter, x1)
156    }
157
158    // collect
159
160    fn collect_into<C>(self, output: C) -> C
161    where
162        C: ParCollectInto<Self::Item>,
163    {
164        output.u_m_collect_into::<R, _, _, _>(self.um)
165    }
166
167    // reduce
168
169    fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
170    where
171        Self::Item: Send,
172        Reduce: Fn(&mut U::Item, Self::Item, Self::Item) -> Self::Item + Sync,
173    {
174        self.um.reduce::<R, _>(reduce).1
175    }
176
177    // early exit
178
179    fn first(self) -> Option<Self::Item>
180    where
181        Self::Item: Send,
182    {
183        self.um.next::<R>().1
184    }
185}