orx_parallel/computational_variants/
par.rs

1use super::{map::ParMap, xap::ParXap};
2use crate::computational_variants::fallible_result::ParResult;
3use crate::executor::parallel_compute as prc;
4use crate::generic_values::{Vector, WhilstAtom};
5use crate::par_iter_result::IntoResult;
6use crate::runner::{DefaultRunner, ParallelRunner};
7use crate::using::{UPar, UsingClone, UsingFun};
8use crate::{
9    ChunkSize, IterationOrder, NumThreads, ParCollectInto, ParIter, Params, default_fns::map_self,
10};
11use crate::{IntoParIter, ParIterResult, ParIterUsing};
12use orx_concurrent_iter::chain::ChainKnownLenI;
13use orx_concurrent_iter::{ConcurrentIter, ExactSizeConcurrentIter};
14
15/// A parallel iterator.
16pub struct Par<I, R = DefaultRunner>
17where
18    R: ParallelRunner,
19    I: ConcurrentIter,
20{
21    orchestrator: R,
22    params: Params,
23    iter: I,
24}
25
26impl<I, R> Par<I, R>
27where
28    R: ParallelRunner,
29    I: ConcurrentIter,
30{
31    pub(crate) fn new(orchestrator: R, params: Params, iter: I) -> Self {
32        Self {
33            orchestrator,
34            iter,
35            params,
36        }
37    }
38
39    pub(crate) fn destruct(self) -> (R, Params, I) {
40        (self.orchestrator, self.params, self.iter)
41    }
42
43    pub(crate) fn orchestrator(&self) -> &R {
44        &self.orchestrator
45    }
46}
47
48unsafe impl<I, R> Send for Par<I, R>
49where
50    R: ParallelRunner,
51    I: ConcurrentIter,
52{
53}
54
55unsafe impl<I, R> Sync for Par<I, R>
56where
57    R: ParallelRunner,
58    I: ConcurrentIter,
59{
60}
61
62impl<I, R> ParIter<R> for Par<I, R>
63where
64    R: ParallelRunner,
65    I: ConcurrentIter,
66{
67    type Item = I::Item;
68
69    fn con_iter(&self) -> &impl ConcurrentIter {
70        &self.iter
71    }
72
73    fn params(&self) -> Params {
74        self.params
75    }
76
77    // params transformations
78
79    fn num_threads(mut self, num_threads: impl Into<NumThreads>) -> Self {
80        self.params = self.params.with_num_threads(num_threads);
81        self
82    }
83
84    fn chunk_size(mut self, chunk_size: impl Into<ChunkSize>) -> Self {
85        self.params = self.params.with_chunk_size(chunk_size);
86        self
87    }
88
89    fn iteration_order(mut self, collect: IterationOrder) -> Self {
90        self.params = self.params.with_collect_ordering(collect);
91        self
92    }
93
94    fn with_runner<Q: ParallelRunner>(self, orchestrator: Q) -> impl ParIter<Q, Item = Self::Item> {
95        Par::new(orchestrator, self.params, self.iter)
96    }
97
98    // using transformations
99
100    fn using<U, F>(
101        self,
102        using: F,
103    ) -> impl ParIterUsing<UsingFun<F, U>, R, Item = <Self as ParIter<R>>::Item>
104    where
105        U: 'static,
106        F: Fn(usize) -> U + Sync,
107    {
108        let using = UsingFun::new(using);
109        let (orchestrator, params, iter) = self.destruct();
110        UPar::new(using, orchestrator, params, iter)
111    }
112
113    fn using_clone<U>(
114        self,
115        value: U,
116    ) -> impl ParIterUsing<UsingClone<U>, R, Item = <Self as ParIter<R>>::Item>
117    where
118        U: Clone + 'static,
119    {
120        let using = UsingClone::new(value);
121        let (orchestrator, params, iter) = self.destruct();
122        UPar::new(using, orchestrator, params, iter)
123    }
124
125    // computation transformations
126
127    fn map<Out, Map>(self, map: Map) -> impl ParIter<R, Item = Out>
128    where
129        Map: Fn(Self::Item) -> Out + Sync,
130    {
131        let (orchestrator, params, iter) = self.destruct();
132        ParMap::new(orchestrator, params, iter, map)
133    }
134
135    fn filter<Filter>(self, filter: Filter) -> impl ParIter<R, Item = Self::Item>
136    where
137        Filter: Fn(&Self::Item) -> bool + Sync,
138    {
139        let (orchestrator, params, iter) = self.destruct();
140        let x1 = move |i: Self::Item| filter(&i).then_some(i);
141        ParXap::new(orchestrator, params, iter, x1)
142    }
143
144    fn flat_map<IOut, FlatMap>(self, flat_map: FlatMap) -> impl ParIter<R, Item = IOut::Item>
145    where
146        IOut: IntoIterator,
147        FlatMap: Fn(Self::Item) -> IOut + Sync,
148    {
149        let (orchestrator, params, iter) = self.destruct();
150        let x1 = move |i: Self::Item| Vector(flat_map(i));
151        ParXap::new(orchestrator, params, iter, x1)
152    }
153
154    fn filter_map<Out, FilterMap>(self, filter_map: FilterMap) -> impl ParIter<R, Item = Out>
155    where
156        FilterMap: Fn(Self::Item) -> Option<Out> + Sync,
157    {
158        let (orchestrator, params, iter) = self.destruct();
159        ParXap::new(orchestrator, params, iter, filter_map)
160    }
161
162    fn take_while<While>(self, take_while: While) -> impl ParIter<R, Item = Self::Item>
163    where
164        While: Fn(&Self::Item) -> bool + Sync,
165    {
166        let (orchestrator, params, iter) = self.destruct();
167        let x1 = move |value: Self::Item| WhilstAtom::new(value, &take_while);
168        ParXap::new(orchestrator, params, iter, x1)
169    }
170
171    fn into_fallible_result<Out, Err>(self) -> impl ParIterResult<R, Item = Out, Err = Err>
172    where
173        Self::Item: IntoResult<Out, Err>,
174    {
175        ParResult::new(self)
176    }
177
178    // collect
179
180    fn collect_into<C>(self, output: C) -> C
181    where
182        C: ParCollectInto<Self::Item>,
183    {
184        let (orchestrator, params, iter) = self.destruct();
185        output.m_collect_into(orchestrator, params, iter, map_self)
186    }
187
188    // reduce
189
190    fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
191    where
192        Self::Item: Send,
193        Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Sync,
194    {
195        let (orchestrator, params, iter) = self.destruct();
196        prc::reduce::m(orchestrator, params, iter, map_self, reduce).1
197    }
198
199    // early exit
200
201    fn first(self) -> Option<Self::Item> {
202        let (orchestrator, params, iter) = self.destruct();
203        match params.iteration_order {
204            IterationOrder::Ordered => prc::next::m(orchestrator, params, iter, map_self).1,
205            IterationOrder::Arbitrary => prc::next_any::m(orchestrator, params, iter, map_self).1,
206        }
207    }
208}
209
210impl<I, R> Par<I, R>
211where
212    R: ParallelRunner,
213    I: ConcurrentIter,
214{
215    /// Creates a chain of this and `other` parallel iterators.
216    ///
217    /// The first iterator is required to have a known length for chaining.
218    ///
219    /// # Examples
220    ///
221    /// ```
222    /// use orx_parallel::*;
223    ///
224    /// let a = vec!['a', 'b', 'c']; // with exact len
225    /// let b = vec!['d', 'e', 'f'].into_iter().filter(|x| *x != 'x');
226    ///
227    /// let chain = a.into_par().chain(b.iter_into_par());
228    /// assert_eq!(
229    ///     chain.collect::<Vec<_>>(),
230    ///     vec!['a', 'b', 'c', 'd', 'e', 'f'],
231    /// );
232    /// ```
233    pub fn chain<C>(self, other: C) -> Par<ChainKnownLenI<I, C::IntoIter>, R>
234    where
235        I: ExactSizeConcurrentIter,
236        C: IntoParIter<Item = I::Item>,
237    {
238        let (orchestrator, params, iter) = self.destruct();
239        let iter = iter.chain(other.into_con_iter());
240        Par::new(orchestrator, params, iter)
241    }
242}