orx_parallel/using/computational_variants/
u_par.rs

1use crate::{
2    ChunkSize, IterationOrder, NumThreads, ParCollectInto, Params,
3    generic_values::Vector,
4    runner::{DefaultRunner, ParallelRunner},
5    using::u_par_iter::ParIterUsing,
6    using::{
7        Using,
8        computational_variants::{u_map::UParMap, u_xap::UParXap},
9        computations::{UM, u_map_self},
10    },
11};
12use orx_concurrent_iter::ConcurrentIter;
13use std::marker::PhantomData;
14
15/// A parallel iterator.
16pub struct UPar<U, I, R = DefaultRunner>
17where
18    U: Using,
19    R: ParallelRunner,
20    I: ConcurrentIter,
21{
22    using: U,
23    iter: I,
24    params: Params,
25    phantom: PhantomData<R>,
26}
27
28impl<U, I, R> UPar<U, I, R>
29where
30    U: Using,
31    R: ParallelRunner,
32    I: ConcurrentIter,
33{
34    pub(crate) fn new(using: U, params: Params, iter: I) -> Self {
35        Self {
36            using,
37            iter,
38            params,
39            phantom: PhantomData,
40        }
41    }
42
43    fn destruct(self) -> (U, Params, I) {
44        (self.using, self.params, self.iter)
45    }
46
47    #[allow(clippy::type_complexity)]
48    fn u_m(self) -> UM<U, I, I::Item, impl Fn(&mut U::Item, I::Item) -> I::Item> {
49        let (using, params, iter) = self.destruct();
50        UM::new(using, params, iter, u_map_self)
51    }
52}
53
54unsafe impl<U, I, R> Send for UPar<U, I, R>
55where
56    U: Using,
57    R: ParallelRunner,
58    I: ConcurrentIter,
59{
60}
61
62unsafe impl<U, I, R> Sync for UPar<U, I, R>
63where
64    U: Using,
65    R: ParallelRunner,
66    I: ConcurrentIter,
67{
68}
69
70impl<U, I, R> ParIterUsing<U, R> for UPar<U, I, R>
71where
72    U: Using,
73    R: ParallelRunner,
74    I: ConcurrentIter,
75{
76    type Item = I::Item;
77
78    fn con_iter(&self) -> &impl ConcurrentIter {
79        &self.iter
80    }
81
82    fn params(&self) -> Params {
83        self.params
84    }
85
86    // params transformations
87
88    fn num_threads(mut self, num_threads: impl Into<NumThreads>) -> Self {
89        self.params = self.params.with_num_threads(num_threads);
90        self
91    }
92
93    fn chunk_size(mut self, chunk_size: impl Into<ChunkSize>) -> Self {
94        self.params = self.params.with_chunk_size(chunk_size);
95        self
96    }
97
98    fn iteration_order(mut self, collect: IterationOrder) -> Self {
99        self.params = self.params.with_collect_ordering(collect);
100        self
101    }
102
103    fn with_runner<Q: ParallelRunner>(self) -> impl ParIterUsing<U, Q, Item = Self::Item> {
104        UPar::new(self.using, self.params, self.iter)
105    }
106
107    // computational transformations
108
109    fn map<Out, Map>(self, map: Map) -> impl ParIterUsing<U, R, Item = Out>
110    where
111        Map: Fn(&mut <U as Using>::Item, Self::Item) -> Out + Sync + Clone,
112    {
113        let (using, params, iter) = self.destruct();
114        let map = move |u: &mut U::Item, x: Self::Item| map(u, x);
115        UParMap::new(using, params, iter, map)
116    }
117
118    fn filter<Filter>(self, filter: Filter) -> impl ParIterUsing<U, R, Item = Self::Item>
119    where
120        Filter: Fn(&mut U::Item, &Self::Item) -> bool + Sync + Clone,
121    {
122        let (using, params, iter) = self.destruct();
123        let x1 = move |u: &mut U::Item, i: Self::Item| filter(u, &i).then_some(i);
124        UParXap::new(using, params, iter, x1)
125    }
126
127    fn flat_map<IOut, FlatMap>(
128        self,
129        flat_map: FlatMap,
130    ) -> impl ParIterUsing<U, R, Item = IOut::Item>
131    where
132        IOut: IntoIterator,
133        FlatMap: Fn(&mut U::Item, Self::Item) -> IOut + Sync + Clone,
134    {
135        let (using, params, iter) = self.destruct();
136        let x1 = move |u: &mut U::Item, i: Self::Item| Vector(flat_map(u, i));
137        UParXap::new(using, params, iter, x1)
138    }
139
140    fn filter_map<Out, FilterMap>(
141        self,
142        filter_map: FilterMap,
143    ) -> impl ParIterUsing<U, R, Item = Out>
144    where
145        FilterMap: Fn(&mut <U as Using>::Item, Self::Item) -> Option<Out> + Sync + Clone,
146    {
147        let (using, params, iter) = self.destruct();
148        let x1 = move |u: &mut U::Item, x: Self::Item| filter_map(u, x);
149        UParXap::new(using, params, iter, x1)
150    }
151
152    // collect
153
154    fn collect_into<C>(self, output: C) -> C
155    where
156        C: ParCollectInto<Self::Item>,
157    {
158        output.u_m_collect_into::<R, _, _, _>(self.u_m())
159    }
160
161    // reduce
162
163    fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
164    where
165        Self::Item: Send,
166        Reduce: Fn(&mut U::Item, Self::Item, Self::Item) -> Self::Item + Sync,
167    {
168        self.u_m().reduce::<R, _>(reduce).1
169    }
170
171    // early exit
172
173    fn first(self) -> Option<Self::Item>
174    where
175        Self::Item: Send,
176    {
177        self.u_m().next::<R>().1
178    }
179}