orx_parallel/using/computational_variants/
u_xap.rs

1use crate::{
2    ChunkSize, IterationOrder, NumThreads, ParCollectInto, Params,
3    generic_values::{TransformableValues, runner_results::Infallible},
4    runner::{DefaultRunner, ParallelRunner},
5    using::{Using, computations::UX, u_par_iter::ParIterUsing},
6};
7use orx_concurrent_iter::ConcurrentIter;
8use std::marker::PhantomData;
9
10/// A parallel iterator that xaps inputs.
11///
12/// *xap* is a generalization of  one-to-one map, filter-map and flat-map operations.
13pub struct UParXap<U, I, Vo, M1, R = DefaultRunner>
14where
15    R: ParallelRunner,
16    U: Using,
17    I: ConcurrentIter,
18    Vo: TransformableValues<Fallibility = Infallible>,
19    M1: Fn(&mut U::Item, I::Item) -> Vo + Sync,
20{
21    ux: UX<U, I, Vo, M1>,
22    phantom: PhantomData<R>,
23}
24
25impl<U, I, Vo, M1, R> UParXap<U, I, Vo, M1, R>
26where
27    R: ParallelRunner,
28    U: Using,
29    I: ConcurrentIter,
30    Vo: TransformableValues<Fallibility = Infallible>,
31    M1: Fn(&mut U::Item, I::Item) -> Vo + Sync,
32{
33    pub(crate) fn new(using: U, params: Params, iter: I, x1: M1) -> Self {
34        Self {
35            ux: UX::new(using, params, iter, x1),
36            phantom: PhantomData,
37        }
38    }
39
40    fn destruct(self) -> (U, Params, I, M1) {
41        self.ux.destruct()
42    }
43}
44
45unsafe impl<U, I, Vo, M1, R> Send for UParXap<U, I, Vo, M1, R>
46where
47    R: ParallelRunner,
48    U: Using,
49    I: ConcurrentIter,
50    Vo: TransformableValues<Fallibility = Infallible>,
51    M1: Fn(&mut U::Item, I::Item) -> Vo + Sync,
52{
53}
54
55unsafe impl<U, I, Vo, M1, R> Sync for UParXap<U, I, Vo, M1, R>
56where
57    R: ParallelRunner,
58    U: Using,
59    I: ConcurrentIter,
60    Vo: TransformableValues<Fallibility = Infallible>,
61    M1: Fn(&mut U::Item, I::Item) -> Vo + Sync,
62{
63}
64
65impl<U, I, Vo, M1, R> ParIterUsing<U, R> for UParXap<U, I, Vo, M1, R>
66where
67    R: ParallelRunner,
68    U: Using,
69    I: ConcurrentIter,
70    Vo: TransformableValues<Fallibility = Infallible>,
71    M1: Fn(&mut U::Item, I::Item) -> Vo + Sync,
72{
73    type Item = Vo::Item;
74
75    fn con_iter(&self) -> &impl ConcurrentIter {
76        self.ux.iter()
77    }
78
79    fn params(&self) -> Params {
80        self.ux.params()
81    }
82
83    // params transformations
84
85    fn num_threads(mut self, num_threads: impl Into<NumThreads>) -> Self {
86        self.ux.num_threads(num_threads);
87        self
88    }
89
90    fn chunk_size(mut self, chunk_size: impl Into<ChunkSize>) -> Self {
91        self.ux.chunk_size(chunk_size);
92        self
93    }
94
95    fn iteration_order(mut self, collect: IterationOrder) -> Self {
96        self.ux.iteration_order(collect);
97        self
98    }
99
100    fn with_runner<Q: ParallelRunner>(self) -> impl ParIterUsing<U, Q, Item = Self::Item> {
101        let (using, params, iter, map1) = self.destruct();
102        UParXap::new(using, params, iter, map1)
103    }
104
105    // computation transformations
106
107    fn map<Out, Map>(self, map: Map) -> impl ParIterUsing<U, R, Item = Out>
108    where
109        Map: Fn(&mut U::Item, Self::Item) -> Out + Sync + Clone,
110    {
111        let (using, params, iter, x1) = self.destruct();
112
113        let x1 = move |u: &mut U::Item, i: I::Item| {
114            let vo = x1(u, i);
115            // SAFETY: all threads are guaranteed to have its own Using::Item value that is not shared with other threads.
116            // This guarantees that there will be no race conditions.
117            // TODO: the reason to have this unsafe block is the complication in lifetimes, which must be possible to fix; however with a large refactoring.
118            let u = unsafe {
119                &mut *{
120                    let p: *mut U::Item = u;
121                    p
122                }
123            };
124            vo.u_map(u, map.clone())
125        };
126
127        UParXap::new(using, params, iter, x1)
128    }
129
130    fn filter<Filter>(self, filter: Filter) -> impl ParIterUsing<U, R, Item = Self::Item>
131    where
132        Filter: Fn(&mut U::Item, &Self::Item) -> bool + Sync + Clone,
133    {
134        let (using, params, iter, x1) = self.destruct();
135        let x1 = move |u: &mut U::Item, i: I::Item| {
136            let vo = x1(u, i);
137            // SAFETY: all threads are guaranteed to have its own Using::Item value that is not shared with other threads.
138            // This guarantees that there will be no race conditions.
139            // TODO: the reason to have this unsafe block is the complication in lifetimes, which must be possible to fix; however with a large refactoring.
140            let u = unsafe {
141                &mut *{
142                    let p: *mut U::Item = u;
143                    p
144                }
145            };
146            vo.u_filter(u, filter.clone())
147        };
148        UParXap::new(using, params, iter, x1)
149    }
150
151    fn flat_map<IOut, FlatMap>(
152        self,
153        flat_map: FlatMap,
154    ) -> impl ParIterUsing<U, R, Item = IOut::Item>
155    where
156        IOut: IntoIterator,
157        FlatMap: Fn(&mut U::Item, Self::Item) -> IOut + Sync + Clone,
158    {
159        let (using, params, iter, x1) = self.destruct();
160        let x1 = move |u: &mut U::Item, i: I::Item| {
161            let vo = x1(u, i);
162            // SAFETY: all threads are guaranteed to have its own Using::Item value that is not shared with other threads.
163            // This guarantees that there will be no race conditions.
164            // TODO: the reason to have this unsafe block is the complication in lifetimes, which must be possible to fix; however with a large refactoring.
165            let u = unsafe {
166                &mut *{
167                    let p: *mut U::Item = u;
168                    p
169                }
170            };
171            vo.u_flat_map(u, flat_map.clone())
172        };
173        UParXap::new(using, params, iter, x1)
174    }
175
176    fn filter_map<Out, FilterMap>(
177        self,
178        filter_map: FilterMap,
179    ) -> impl ParIterUsing<U, R, Item = Out>
180    where
181        FilterMap: Fn(&mut U::Item, Self::Item) -> Option<Out> + Sync + Clone,
182    {
183        let (using, params, iter, x1) = self.destruct();
184        let x1 = move |u: &mut U::Item, i: I::Item| {
185            let vo = x1(u, i);
186            // SAFETY: all threads are guaranteed to have its own Using::Item value that is not shared with other threads.
187            // This guarantees that there will be no race conditions.
188            // TODO: the reason to have this unsafe block is the complication in lifetimes, which must be possible to fix; however with a large refactoring.
189            let u = unsafe {
190                &mut *{
191                    let p: *mut U::Item = u;
192                    p
193                }
194            };
195            vo.u_filter_map(u, filter_map.clone())
196        };
197        UParXap::new(using, params, iter, x1)
198    }
199
200    // collect
201
202    fn collect_into<C>(self, output: C) -> C
203    where
204        C: ParCollectInto<Self::Item>,
205    {
206        output.u_x_collect_into::<R, _, _, _, _>(self.ux)
207    }
208
209    // reduce
210
211    fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
212    where
213        Self::Item: Send,
214        Reduce: Fn(&mut U::Item, Self::Item, Self::Item) -> Self::Item + Sync,
215    {
216        self.ux.reduce::<R, _>(reduce).1
217    }
218
219    // early exit
220
221    fn first(self) -> Option<Self::Item>
222    where
223        Self::Item: Send,
224    {
225        self.ux.next::<R>().1
226    }
227}