orx_parallel/using/computational_variants/
u_xap.rs

1use crate::{
2    ChunkSize, IterationOrder, NumThreads, ParCollectInto, Params,
3    computations::{Values, Vector},
4    runner::{DefaultRunner, ParallelRunner},
5    using::u_par_iter::ParIterUsing,
6    using::{
7        Using,
8        computational_variants::u_xap_filter_xap::UParXapFilterXap,
9        computations::{UX, u_map_self_atom},
10    },
11};
12use orx_concurrent_iter::ConcurrentIter;
13use std::marker::PhantomData;
14
15/// A parallel iterator that xaps inputs.
16///
17/// *xap* is a generalization of  one-to-one map, filter-map and flat-map operations.
18pub struct UParXap<U, I, Vo, M1, R = DefaultRunner>
19where
20    R: ParallelRunner,
21    U: Using,
22    I: ConcurrentIter,
23    Vo: Values + Send + Sync,
24    Vo::Item: Send + Sync,
25    M1: Fn(&mut U::Item, I::Item) -> Vo + Send + Sync,
26{
27    ux: UX<U, I, Vo, M1>,
28    phantom: PhantomData<R>,
29}
30
31impl<U, I, Vo, M1, R> UParXap<U, I, Vo, M1, R>
32where
33    R: ParallelRunner,
34    U: Using,
35    I: ConcurrentIter,
36    Vo: Values + Send + Sync,
37    Vo::Item: Send + Sync,
38    M1: Fn(&mut U::Item, I::Item) -> Vo + Send + Sync,
39{
40    pub(crate) fn new(using: U, params: Params, iter: I, x1: M1) -> Self {
41        Self {
42            ux: UX::new(using, params, iter, x1),
43            phantom: PhantomData,
44        }
45    }
46
47    fn destruct(self) -> (U, Params, I, M1) {
48        self.ux.destruct()
49    }
50}
51
52unsafe impl<U, I, Vo, M1, R> Send for UParXap<U, I, Vo, M1, R>
53where
54    R: ParallelRunner,
55    U: Using,
56    I: ConcurrentIter,
57    Vo: Values + Send + Sync,
58    Vo::Item: Send + Sync,
59    M1: Fn(&mut U::Item, I::Item) -> Vo + Send + Sync,
60{
61}
62
63unsafe impl<U, I, Vo, M1, R> Sync for UParXap<U, I, Vo, M1, R>
64where
65    R: ParallelRunner,
66    U: Using,
67    I: ConcurrentIter,
68    Vo: Values + Send + Sync,
69    Vo::Item: Send + Sync,
70    M1: Fn(&mut U::Item, I::Item) -> Vo + Send + Sync,
71{
72}
73
74impl<U, I, Vo, M1, R> ParIterUsing<U, R> for UParXap<U, I, Vo, M1, R>
75where
76    R: ParallelRunner,
77    U: Using,
78    I: ConcurrentIter,
79    Vo: Values + Send + Sync,
80    Vo::Item: Send + Sync,
81    M1: Fn(&mut U::Item, I::Item) -> Vo + Send + Sync,
82{
83    type Item = Vo::Item;
84
85    fn con_iter(&self) -> &impl ConcurrentIter {
86        self.ux.iter()
87    }
88
89    fn params(&self) -> Params {
90        self.ux.params()
91    }
92
93    // params transformations
94
95    fn num_threads(mut self, num_threads: impl Into<NumThreads>) -> Self {
96        self.ux.num_threads(num_threads);
97        self
98    }
99
100    fn chunk_size(mut self, chunk_size: impl Into<ChunkSize>) -> Self {
101        self.ux.chunk_size(chunk_size);
102        self
103    }
104
105    fn iteration_order(mut self, collect: IterationOrder) -> Self {
106        self.ux.iteration_order(collect);
107        self
108    }
109
110    fn with_runner<Q: ParallelRunner>(self) -> impl ParIterUsing<U, Q, Item = Self::Item> {
111        let (using, params, iter, map1) = self.destruct();
112        UParXap::new(using, params, iter, map1)
113    }
114
115    // computation transformations
116
117    fn map<Out, Map>(self, map: Map) -> impl ParIterUsing<U, R, Item = Out>
118    where
119        Out: Send + Sync,
120        Map: Fn(&mut U::Item, Self::Item) -> Out + Send + Sync + Clone,
121    {
122        let (using, params, iter, x1) = self.destruct();
123        let x1 = move |u: &mut U::Item, i: I::Item| {
124            // TODO: avoid allocation
125            let vo: Vec<_> = x1(u, i).values().into_iter().map(|x| map(u, x)).collect();
126            Vector(vo)
127        };
128
129        UParXap::new(using, params, iter, x1)
130    }
131
132    fn filter<Filter>(self, filter: Filter) -> impl ParIterUsing<U, R, Item = Self::Item>
133    where
134        Filter: Fn(&mut U::Item, &Self::Item) -> bool + Send + Sync + Clone,
135    {
136        let (using, params, iter, x1) = self.destruct();
137        let filter = move |u: &mut U::Item, x: &Self::Item| filter(u, x);
138        UParXapFilterXap::new(using, params, iter, x1, filter, u_map_self_atom)
139    }
140
141    fn flat_map<IOut, FlatMap>(
142        self,
143        flat_map: FlatMap,
144    ) -> impl ParIterUsing<U, R, Item = IOut::Item>
145    where
146        IOut: IntoIterator + Send + Sync,
147        IOut::IntoIter: Send + Sync,
148        IOut::Item: Send + Sync,
149        FlatMap: Fn(&mut U::Item, Self::Item) -> IOut + Send + Sync + Clone,
150    {
151        let (using, params, iter, x1) = self.destruct();
152        let x1 = move |u: &mut U::Item, t: I::Item| {
153            // TODO: avoid allocation
154            let vo: Vec<_> = x1(u, t).values().into_iter().collect();
155            let vo: Vec<_> = vo.into_iter().flat_map(|x| flat_map(u, x)).collect();
156            Vector(vo)
157        };
158        UParXap::new(using, params, iter, x1)
159    }
160
161    fn filter_map<Out, FilterMap>(
162        self,
163        filter_map: FilterMap,
164    ) -> impl ParIterUsing<U, R, Item = Out>
165    where
166        Out: Send + Sync,
167        FilterMap: Fn(&mut U::Item, Self::Item) -> Option<Out> + Send + Sync + Clone,
168    {
169        let (using, params, iter, x1) = self.destruct();
170        let x1 = move |u: &mut U::Item, t: I::Item| {
171            // TODO: avoid allocation
172            let vo: Vec<_> = x1(u, t).values().into_iter().collect();
173            let vo: Vec<_> = vo.into_iter().filter_map(|x| filter_map(u, x)).collect();
174            Vector(vo)
175        };
176        UParXap::new(using, params, iter, x1)
177    }
178
179    // collect
180
181    fn collect_into<C>(self, output: C) -> C
182    where
183        C: ParCollectInto<Self::Item>,
184    {
185        output.u_x_collect_into::<R, _, _, _, _>(self.ux)
186    }
187
188    // reduce
189
190    fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
191    where
192        Reduce: Fn(&mut U::Item, Self::Item, Self::Item) -> Self::Item + Send + Sync,
193    {
194        self.ux.reduce::<R, _>(reduce).1
195    }
196
197    // early exit
198
199    fn first(self) -> Option<Self::Item> {
200        self.ux.next()
201    }
202}