orx_parallel/computational_variants/
par.rs

1use super::{map::ParMap, xap::ParXap, xap_filter_xap::ParXapFilterXap};
2use crate::{
3    ChunkSize, IterationOrder, NumThreads, ParCollectInto, ParIter, Params,
4    computations::{M, Vector, map_self, map_self_atom},
5    runner::{DefaultRunner, ParallelRunner},
6};
7use orx_concurrent_iter::ConcurrentIter;
8use std::marker::PhantomData;
9
10/// A parallel iterator.
11pub struct Par<I, R = DefaultRunner>
12where
13    R: ParallelRunner,
14    I: ConcurrentIter,
15{
16    iter: I,
17    params: Params,
18    phantom: PhantomData<R>,
19}
20
21impl<I, R> Par<I, R>
22where
23    R: ParallelRunner,
24    I: ConcurrentIter,
25{
26    pub(crate) fn new(params: Params, iter: I) -> Self {
27        Self {
28            iter,
29            params,
30            phantom: PhantomData,
31        }
32    }
33
34    fn destruct(self) -> (Params, I) {
35        (self.params, self.iter)
36    }
37
38    fn m(self) -> M<I, I::Item, impl Fn(I::Item) -> I::Item> {
39        let (params, iter) = self.destruct();
40        M::new(params, iter, map_self)
41    }
42}
43
44unsafe impl<I, R> Send for Par<I, R>
45where
46    R: ParallelRunner,
47    I: ConcurrentIter,
48{
49}
50
51unsafe impl<I, R> Sync for Par<I, R>
52where
53    R: ParallelRunner,
54    I: ConcurrentIter,
55{
56}
57
58impl<I, R> ParIter<R> for Par<I, R>
59where
60    R: ParallelRunner,
61    I: ConcurrentIter,
62{
63    type Item = I::Item;
64
65    fn con_iter(&self) -> &impl ConcurrentIter {
66        &self.iter
67    }
68
69    fn params(&self) -> &Params {
70        &self.params
71    }
72
73    // params transformations
74
75    fn num_threads(mut self, num_threads: impl Into<NumThreads>) -> Self {
76        self.params = self.params.with_num_threads(num_threads);
77        self
78    }
79
80    fn chunk_size(mut self, chunk_size: impl Into<ChunkSize>) -> Self {
81        self.params = self.params.with_chunk_size(chunk_size);
82        self
83    }
84
85    fn iteration_order(mut self, collect: IterationOrder) -> Self {
86        self.params = self.params.with_collect_ordering(collect);
87        self
88    }
89
90    fn with_runner<Q: ParallelRunner>(self) -> impl ParIter<Q, Item = Self::Item> {
91        Par::new(self.params, self.iter)
92    }
93
94    // computation transformations
95
96    fn map<Out, Map>(self, map: Map) -> impl ParIter<R, Item = Out>
97    where
98        Out: Send + Sync,
99        Map: Fn(Self::Item) -> Out + Send + Sync + Clone,
100    {
101        let (params, iter) = self.destruct();
102        ParMap::new(params, iter, map)
103    }
104
105    fn filter<Filter>(self, filter: Filter) -> impl ParIter<R, Item = Self::Item>
106    where
107        Filter: Fn(&Self::Item) -> bool + Send + Sync,
108    {
109        let (params, iter) = self.destruct();
110        ParXapFilterXap::new(params, iter, map_self_atom, filter, map_self_atom)
111    }
112
113    fn flat_map<IOut, FlatMap>(self, flat_map: FlatMap) -> impl ParIter<R, Item = IOut::Item>
114    where
115        IOut: IntoIterator + Send + Sync,
116        IOut::IntoIter: Send + Sync,
117        IOut::Item: Send + Sync,
118        FlatMap: Fn(Self::Item) -> IOut + Send + Sync,
119    {
120        let (params, iter) = self.destruct();
121        let x1 = move |i: Self::Item| Vector(flat_map(i)); // TODO: inline
122        ParXap::new(params, iter, x1)
123    }
124
125    fn filter_map<Out, FilterMap>(self, filter_map: FilterMap) -> impl ParIter<R, Item = Out>
126    where
127        Out: Send + Sync,
128        FilterMap: Fn(Self::Item) -> Option<Out> + Send + Sync + Clone,
129    {
130        let (params, iter) = self.destruct();
131        ParXap::new(params, iter, filter_map)
132    }
133
134    // collect
135
136    fn collect_into<C>(self, output: C) -> C
137    where
138        C: ParCollectInto<Self::Item>,
139    {
140        output.m_collect_into::<R, _, _>(self.m())
141    }
142
143    // reduce
144
145    fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
146    where
147        Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Send + Sync,
148    {
149        self.m().reduce::<R, _>(reduce).1
150    }
151
152    // early exit
153
154    fn first(self) -> Option<Self::Item> {
155        self.m().next()
156    }
157}