orx_parallel/computational_variants/
par.rs

1use super::{map::ParMap, xap::ParXap};
2use crate::ParIterResult;
3use crate::computational_variants::fallible_result::ParResult;
4use crate::generic_values::{Vector, WhilstAtom};
5use crate::par_iter_result::IntoResult;
6use crate::{
7    ChunkSize, IterationOrder, NumThreads, ParCollectInto, ParIter, ParIterUsing, Params,
8    computations::{M, map_self},
9    runner::{DefaultRunner, ParallelRunner},
10    using::{UsingClone, UsingFun, computational_variants::UPar},
11};
12use orx_concurrent_iter::ConcurrentIter;
13use std::marker::PhantomData;
14
15/// A parallel iterator.
16pub struct Par<I, R = DefaultRunner>
17where
18    R: ParallelRunner,
19    I: ConcurrentIter,
20{
21    iter: I,
22    params: Params,
23    phantom: PhantomData<R>,
24}
25
26impl<I, R> Par<I, R>
27where
28    R: ParallelRunner,
29    I: ConcurrentIter,
30{
31    pub(crate) fn new(params: Params, iter: I) -> Self {
32        Self {
33            iter,
34            params,
35            phantom: PhantomData,
36        }
37    }
38
39    pub(crate) fn destruct(self) -> (Params, I) {
40        (self.params, self.iter)
41    }
42
43    fn m(self) -> M<I, I::Item, impl Fn(I::Item) -> I::Item> {
44        let (params, iter) = self.destruct();
45        M::new(params, iter, map_self)
46    }
47}
48
49unsafe impl<I, R> Send for Par<I, R>
50where
51    R: ParallelRunner,
52    I: ConcurrentIter,
53{
54}
55
56unsafe impl<I, R> Sync for Par<I, R>
57where
58    R: ParallelRunner,
59    I: ConcurrentIter,
60{
61}
62
63impl<I, R> ParIter<R> for Par<I, R>
64where
65    R: ParallelRunner,
66    I: ConcurrentIter,
67{
68    type Item = I::Item;
69
70    fn con_iter(&self) -> &impl ConcurrentIter {
71        &self.iter
72    }
73
74    fn params(&self) -> Params {
75        self.params
76    }
77
78    // params transformations
79
80    fn num_threads(mut self, num_threads: impl Into<NumThreads>) -> Self {
81        self.params = self.params.with_num_threads(num_threads);
82        self
83    }
84
85    fn chunk_size(mut self, chunk_size: impl Into<ChunkSize>) -> Self {
86        self.params = self.params.with_chunk_size(chunk_size);
87        self
88    }
89
90    fn iteration_order(mut self, collect: IterationOrder) -> Self {
91        self.params = self.params.with_collect_ordering(collect);
92        self
93    }
94
95    fn with_runner<Q: ParallelRunner>(self) -> impl ParIter<Q, Item = Self::Item> {
96        Par::new(self.params, self.iter)
97    }
98
99    // using transformations
100
101    fn using<U, F>(
102        self,
103        using: F,
104    ) -> impl ParIterUsing<UsingFun<F, U>, R, Item = <Self as ParIter<R>>::Item>
105    where
106        U: Send + 'static,
107        F: FnMut(usize) -> U,
108    {
109        let using = UsingFun::new(using);
110        UPar::new(using, self.params, self.iter)
111    }
112
113    fn using_clone<U>(
114        self,
115        using: U,
116    ) -> impl ParIterUsing<UsingClone<U>, R, Item = <Self as ParIter<R>>::Item>
117    where
118        U: Clone + Send + 'static,
119    {
120        let using = UsingClone::new(using);
121        UPar::new(using, self.params, self.iter)
122    }
123
124    // computation transformations
125
126    fn map<Out, Map>(self, map: Map) -> impl ParIter<R, Item = Out>
127    where
128        Map: Fn(Self::Item) -> Out + Sync,
129    {
130        let (params, iter) = self.destruct();
131        ParMap::new(params, iter, map)
132    }
133
134    fn filter<Filter>(self, filter: Filter) -> impl ParIter<R, Item = Self::Item>
135    where
136        Filter: Fn(&Self::Item) -> bool + Sync,
137    {
138        let (params, iter) = self.destruct();
139        let x1 = move |i: Self::Item| filter(&i).then_some(i);
140        ParXap::new(params, iter, x1)
141    }
142
143    fn flat_map<IOut, FlatMap>(self, flat_map: FlatMap) -> impl ParIter<R, Item = IOut::Item>
144    where
145        IOut: IntoIterator,
146        FlatMap: Fn(Self::Item) -> IOut + Sync,
147    {
148        let (params, iter) = self.destruct();
149        let x1 = move |i: Self::Item| Vector(flat_map(i)); // TODO: inline
150        ParXap::new(params, iter, x1)
151    }
152
153    fn filter_map<Out, FilterMap>(self, filter_map: FilterMap) -> impl ParIter<R, Item = Out>
154    where
155        FilterMap: Fn(Self::Item) -> Option<Out> + Sync,
156    {
157        let (params, iter) = self.destruct();
158        ParXap::new(params, iter, filter_map)
159    }
160
161    fn take_while<While>(self, take_while: While) -> impl ParIter<R, Item = Self::Item>
162    where
163        While: Fn(&Self::Item) -> bool + Sync,
164    {
165        let (params, iter) = self.destruct();
166        let x1 = move |value: Self::Item| WhilstAtom::new(value, &take_while);
167        ParXap::new(params, iter, x1)
168    }
169
170    fn into_fallible_result<Out, Err>(self) -> impl ParIterResult<R, Item = Out, Err = Err>
171    where
172        Self::Item: IntoResult<Out, Err>,
173    {
174        ParResult::new(self)
175    }
176
177    // collect
178
179    fn collect_into<C>(self, output: C) -> C
180    where
181        C: ParCollectInto<Self::Item>,
182    {
183        output.m_collect_into::<R, _, _>(self.m())
184    }
185
186    // reduce
187
188    fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
189    where
190        Self::Item: Send,
191        Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Sync,
192    {
193        self.m().reduce::<R, _>(reduce).1
194    }
195
196    // early exit
197
198    fn first(self) -> Option<Self::Item> {
199        match self.params().iteration_order {
200            IterationOrder::Ordered => self.m().next::<R>().1,
201            IterationOrder::Arbitrary => self.m().next_any::<R>().1,
202        }
203    }
204}