orx_parallel/using/computational_variants/
u_par.rs

1use crate::{
2    ChunkSize, IterationOrder, NumThreads, ParCollectInto, Params,
3    computations::Vector,
4    runner::{DefaultRunner, ParallelRunner},
5    using::u_par_iter::ParIterUsing,
6    using::{
7        Using,
8        computational_variants::{
9            u_map::UParMap, u_xap::UParXap, u_xap_filter_xap::UParXapFilterXap,
10        },
11        computations::{UM, u_map_self, u_map_self_atom},
12    },
13};
14use orx_concurrent_iter::ConcurrentIter;
15use std::marker::PhantomData;
16
17/// A parallel iterator.
18pub struct UPar<U, I, R = DefaultRunner>
19where
20    U: Using,
21    R: ParallelRunner,
22    I: ConcurrentIter,
23{
24    using: U,
25    iter: I,
26    params: Params,
27    phantom: PhantomData<R>,
28}
29
30impl<U, I, R> UPar<U, I, R>
31where
32    U: Using,
33    R: ParallelRunner,
34    I: ConcurrentIter,
35{
36    pub(crate) fn new(using: U, params: Params, iter: I) -> Self {
37        Self {
38            using,
39            iter,
40            params,
41            phantom: PhantomData,
42        }
43    }
44
45    fn destruct(self) -> (U, Params, I) {
46        (self.using, self.params, self.iter)
47    }
48
49    #[allow(clippy::type_complexity)]
50    fn u_m(self) -> UM<U, I, I::Item, impl Fn(&mut U::Item, I::Item) -> I::Item> {
51        let (using, params, iter) = self.destruct();
52        UM::new(using, params, iter, u_map_self)
53    }
54}
55
56unsafe impl<U, I, R> Send for UPar<U, I, R>
57where
58    U: Using,
59    R: ParallelRunner,
60    I: ConcurrentIter,
61{
62}
63
64unsafe impl<U, I, R> Sync for UPar<U, I, R>
65where
66    U: Using,
67    R: ParallelRunner,
68    I: ConcurrentIter,
69{
70}
71
72impl<U, I, R> ParIterUsing<U, R> for UPar<U, I, R>
73where
74    U: Using,
75    R: ParallelRunner,
76    I: ConcurrentIter,
77{
78    type Item = I::Item;
79
80    fn con_iter(&self) -> &impl ConcurrentIter {
81        &self.iter
82    }
83
84    fn params(&self) -> Params {
85        self.params
86    }
87
88    // params transformations
89
90    fn num_threads(mut self, num_threads: impl Into<NumThreads>) -> Self {
91        self.params = self.params.with_num_threads(num_threads);
92        self
93    }
94
95    fn chunk_size(mut self, chunk_size: impl Into<ChunkSize>) -> Self {
96        self.params = self.params.with_chunk_size(chunk_size);
97        self
98    }
99
100    fn iteration_order(mut self, collect: IterationOrder) -> Self {
101        self.params = self.params.with_collect_ordering(collect);
102        self
103    }
104
105    fn with_runner<Q: ParallelRunner>(self) -> impl ParIterUsing<U, Q, Item = Self::Item> {
106        UPar::new(self.using, self.params, self.iter)
107    }
108
109    // computational transformations
110
111    fn map<Out, Map>(self, map: Map) -> impl ParIterUsing<U, R, Item = Out>
112    where
113        Out: Send + Sync,
114        Map: Fn(&mut <U as Using>::Item, Self::Item) -> Out + Send + Sync + Clone,
115    {
116        let (using, params, iter) = self.destruct();
117        let map = move |u: &mut U::Item, x: Self::Item| map(u, x);
118        UParMap::new(using, params, iter, map)
119    }
120
121    fn filter<Filter>(self, filter: Filter) -> impl ParIterUsing<U, R, Item = Self::Item>
122    where
123        Filter: Fn(&mut U::Item, &Self::Item) -> bool + Send + Sync + Clone,
124    {
125        let (using, params, iter) = self.destruct();
126        let filter = move |u: &mut U::Item, x: &Self::Item| filter(u, x);
127        UParXapFilterXap::new(
128            using,
129            params,
130            iter,
131            u_map_self_atom,
132            filter,
133            u_map_self_atom,
134        )
135    }
136
137    fn flat_map<IOut, FlatMap>(
138        self,
139        flat_map: FlatMap,
140    ) -> impl ParIterUsing<U, R, Item = IOut::Item>
141    where
142        IOut: IntoIterator + Send + Sync,
143        IOut::IntoIter: Send + Sync,
144        IOut::Item: Send + Sync,
145        FlatMap: Fn(&mut U::Item, Self::Item) -> IOut + Send + Sync + Clone,
146    {
147        let (using, params, iter) = self.destruct();
148        let x1 = move |u: &mut U::Item, i: Self::Item| Vector(flat_map(u, i));
149        UParXap::new(using, params, iter, x1)
150    }
151
152    fn filter_map<Out, FilterMap>(
153        self,
154        filter_map: FilterMap,
155    ) -> impl ParIterUsing<U, R, Item = Out>
156    where
157        Out: Send + Sync,
158        FilterMap: Fn(&mut <U as Using>::Item, Self::Item) -> Option<Out> + Send + Sync + Clone,
159    {
160        let (using, params, iter) = self.destruct();
161        let x1 = move |u: &mut U::Item, x: Self::Item| filter_map(u, x);
162        UParXap::new(using, params, iter, x1)
163    }
164
165    // collect
166
167    fn collect_into<C>(self, output: C) -> C
168    where
169        C: ParCollectInto<Self::Item>,
170    {
171        output.u_m_collect_into::<R, _, _, _>(self.u_m())
172    }
173
174    // reduce
175
176    fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
177    where
178        Reduce: Fn(&mut U::Item, Self::Item, Self::Item) -> Self::Item + Send + Sync,
179    {
180        self.u_m().reduce::<R, _>(reduce).1
181    }
182
183    // early exit
184
185    fn first(self) -> Option<Self::Item> {
186        self.u_m().next()
187    }
188}