orx_parallel/computational_variants/
par.rs

1use super::{map::ParMap, xap::ParXap, xap_filter_xap::ParXapFilterXap};
2use crate::{
3    ChunkSize, IterationOrder, NumThreads, ParCollectInto, ParIter, ParIterUsing, Params,
4    computations::{M, Vector, map_self, map_self_atom},
5    runner::{DefaultRunner, ParallelRunner},
6    using::computational_variants::UPar,
7    using::{UsingClone, UsingFun},
8};
9use orx_concurrent_iter::ConcurrentIter;
10use std::marker::PhantomData;
11
12/// A parallel iterator.
13pub struct Par<I, R = DefaultRunner>
14where
15    R: ParallelRunner,
16    I: ConcurrentIter,
17{
18    iter: I,
19    params: Params,
20    phantom: PhantomData<R>,
21}
22
23impl<I, R> Par<I, R>
24where
25    R: ParallelRunner,
26    I: ConcurrentIter,
27{
28    pub(crate) fn new(params: Params, iter: I) -> Self {
29        Self {
30            iter,
31            params,
32            phantom: PhantomData,
33        }
34    }
35
36    fn destruct(self) -> (Params, I) {
37        (self.params, self.iter)
38    }
39
40    fn m(self) -> M<I, I::Item, impl Fn(I::Item) -> I::Item> {
41        let (params, iter) = self.destruct();
42        M::new(params, iter, map_self)
43    }
44}
45
46unsafe impl<I, R> Send for Par<I, R>
47where
48    R: ParallelRunner,
49    I: ConcurrentIter,
50{
51}
52
53unsafe impl<I, R> Sync for Par<I, R>
54where
55    R: ParallelRunner,
56    I: ConcurrentIter,
57{
58}
59
60impl<I, R> ParIter<R> for Par<I, R>
61where
62    R: ParallelRunner,
63    I: ConcurrentIter,
64{
65    type Item = I::Item;
66
67    fn con_iter(&self) -> &impl ConcurrentIter {
68        &self.iter
69    }
70
71    fn params(&self) -> Params {
72        self.params
73    }
74
75    // params transformations
76
77    fn num_threads(mut self, num_threads: impl Into<NumThreads>) -> Self {
78        self.params = self.params.with_num_threads(num_threads);
79        self
80    }
81
82    fn chunk_size(mut self, chunk_size: impl Into<ChunkSize>) -> Self {
83        self.params = self.params.with_chunk_size(chunk_size);
84        self
85    }
86
87    fn iteration_order(mut self, collect: IterationOrder) -> Self {
88        self.params = self.params.with_collect_ordering(collect);
89        self
90    }
91
92    fn with_runner<Q: ParallelRunner>(self) -> impl ParIter<Q, Item = Self::Item> {
93        Par::new(self.params, self.iter)
94    }
95
96    // using transformations
97
98    fn using<U, F>(
99        self,
100        using: F,
101    ) -> impl ParIterUsing<UsingFun<F, U>, R, Item = <Self as ParIter<R>>::Item>
102    where
103        U: Send,
104        F: FnMut(usize) -> U,
105    {
106        let using = UsingFun::new(using);
107        UPar::new(using, self.params, self.iter)
108    }
109
110    fn using_clone<U>(
111        self,
112        using: U,
113    ) -> impl ParIterUsing<UsingClone<U>, R, Item = <Self as ParIter<R>>::Item>
114    where
115        U: Clone + Send,
116    {
117        let using = UsingClone::new(using);
118        UPar::new(using, self.params, self.iter)
119    }
120
121    // computation transformations
122
123    fn map<Out, Map>(self, map: Map) -> impl ParIter<R, Item = Out>
124    where
125        Out: Send + Sync,
126        Map: Fn(Self::Item) -> Out + Send + Sync + Clone,
127    {
128        let (params, iter) = self.destruct();
129        ParMap::new(params, iter, map)
130    }
131
132    fn filter<Filter>(self, filter: Filter) -> impl ParIter<R, Item = Self::Item>
133    where
134        Filter: Fn(&Self::Item) -> bool + Send + Sync,
135    {
136        let (params, iter) = self.destruct();
137        ParXapFilterXap::new(params, iter, map_self_atom, filter, map_self_atom)
138    }
139
140    fn flat_map<IOut, FlatMap>(self, flat_map: FlatMap) -> impl ParIter<R, Item = IOut::Item>
141    where
142        IOut: IntoIterator + Send + Sync,
143        IOut::IntoIter: Send + Sync,
144        IOut::Item: Send + Sync,
145        FlatMap: Fn(Self::Item) -> IOut + Send + Sync,
146    {
147        let (params, iter) = self.destruct();
148        let x1 = move |i: Self::Item| Vector(flat_map(i)); // TODO: inline
149        ParXap::new(params, iter, x1)
150    }
151
152    fn filter_map<Out, FilterMap>(self, filter_map: FilterMap) -> impl ParIter<R, Item = Out>
153    where
154        Out: Send + Sync,
155        FilterMap: Fn(Self::Item) -> Option<Out> + Send + Sync + Clone,
156    {
157        let (params, iter) = self.destruct();
158        ParXap::new(params, iter, filter_map)
159    }
160
161    // collect
162
163    fn collect_into<C>(self, output: C) -> C
164    where
165        C: ParCollectInto<Self::Item>,
166    {
167        output.m_collect_into::<R, _, _>(self.m())
168    }
169
170    // reduce
171
172    fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
173    where
174        Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Send + Sync,
175    {
176        self.m().reduce::<R, _>(reduce).1
177    }
178
179    // early exit
180
181    fn first(self) -> Option<Self::Item> {
182        self.m().next()
183    }
184}