orx_parallel/computational_variants/
map.rs1use super::{xap::ParXap, xap_filter_xap::ParXapFilterXap};
2use crate::{
3 ChunkSize, IterationOrder, NumThreads, ParCollectInto, ParIter, Params,
4 computations::{Atom, M, Vector, map_self_atom},
5 runner::{DefaultRunner, ParallelRunner},
6};
7use orx_concurrent_iter::ConcurrentIter;
8use std::marker::PhantomData;
9
10pub struct ParMap<I, O, M1, R = DefaultRunner>
12where
13 R: ParallelRunner,
14 I: ConcurrentIter,
15 O: Send + Sync,
16 M1: Fn(I::Item) -> O + Send + Sync + Clone,
17{
18 m: M<I, O, M1>,
19 phantom: PhantomData<R>,
20}
21
22impl<I, O, M1, R> ParMap<I, O, M1, R>
23where
24 R: ParallelRunner,
25 I: ConcurrentIter,
26 O: Send + Sync,
27 M1: Fn(I::Item) -> O + Send + Sync + Clone,
28{
29 pub(crate) fn new(params: Params, iter: I, m1: M1) -> Self {
30 Self {
31 m: M::new(params, iter, m1),
32 phantom: PhantomData,
33 }
34 }
35
36 fn destruct(self) -> (Params, I, M1) {
37 self.m.destruct()
38 }
39}
40
41unsafe impl<I, O, M1, R> Send for ParMap<I, O, M1, R>
42where
43 R: ParallelRunner,
44 I: ConcurrentIter,
45 O: Send + Sync,
46 M1: Fn(I::Item) -> O + Send + Sync + Clone,
47{
48}
49
50unsafe impl<I, O, M1, R> Sync for ParMap<I, O, M1, R>
51where
52 R: ParallelRunner,
53 I: ConcurrentIter,
54 O: Send + Sync,
55 M1: Fn(I::Item) -> O + Send + Sync + Clone,
56{
57}
58
59impl<I, O, M1, R> ParIter<R> for ParMap<I, O, M1, R>
60where
61 R: ParallelRunner,
62 I: ConcurrentIter,
63 O: Send + Sync,
64 M1: Fn(I::Item) -> O + Send + Sync + Clone,
65{
66 type Item = O;
67
68 fn con_iter(&self) -> &impl ConcurrentIter {
69 self.m.iter()
70 }
71
72 fn params(&self) -> &Params {
73 self.m.params()
74 }
75
76 fn num_threads(mut self, num_threads: impl Into<NumThreads>) -> Self {
79 self.m.num_threads(num_threads);
80 self
81 }
82
83 fn chunk_size(mut self, chunk_size: impl Into<ChunkSize>) -> Self {
84 self.m.chunk_size(chunk_size);
85 self
86 }
87
88 fn iteration_order(mut self, collect: IterationOrder) -> Self {
89 self.m.iteration_order(collect);
90 self
91 }
92
93 fn with_runner<Q: ParallelRunner>(self) -> impl ParIter<Q, Item = Self::Item> {
94 let (params, iter, map) = self.destruct();
95 ParMap::new(params, iter, map)
96 }
97
98 fn map<Out, Map>(self, map: Map) -> impl ParIter<R, Item = Out>
101 where
102 Out: Send + Sync,
103 Map: Fn(Self::Item) -> Out + Send + Sync + Clone,
104 {
105 let (params, iter, m1) = self.destruct();
106 let m1 = move |x| map(m1(x));
107 ParMap::new(params, iter, m1)
108 }
109
110 fn filter<Filter>(self, filter: Filter) -> impl ParIter<R, Item = Self::Item>
111 where
112 Filter: Fn(&Self::Item) -> bool + Send + Sync,
113 {
114 let (params, iter, m1) = self.destruct();
115 let m1 = move |i: I::Item| Atom(m1(i));
116 ParXapFilterXap::new(params, iter, m1, filter, map_self_atom)
117 }
118
119 fn flat_map<IOut, FlatMap>(self, flat_map: FlatMap) -> impl ParIter<R, Item = IOut::Item>
120 where
121 IOut: IntoIterator + Send + Sync,
122 IOut::IntoIter: Send + Sync,
123 IOut::Item: Send + Sync,
124 FlatMap: Fn(Self::Item) -> IOut + Send + Sync,
125 {
126 let (params, iter, m1) = self.destruct();
127 let x1 = move |i: I::Item| Vector(flat_map(m1(i)));
128 ParXap::new(params, iter, x1)
129 }
130
131 fn filter_map<Out, FilterMap>(self, filter_map: FilterMap) -> impl ParIter<R, Item = Out>
132 where
133 Out: Send + Sync,
134 FilterMap: Fn(Self::Item) -> Option<Out> + Send + Sync + Clone,
135 {
136 let (params, iter, m1) = self.destruct();
137 let x1 = move |i: I::Item| filter_map(m1(i));
138 ParXap::new(params, iter, x1)
139 }
140
141 fn collect_into<C>(self, output: C) -> C
144 where
145 C: ParCollectInto<Self::Item>,
146 {
147 output.m_collect_into::<R, _, _>(self.m)
148 }
149
150 fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
153 where
154 Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Send + Sync,
155 {
156 self.m.reduce::<R, _>(reduce).1
157 }
158
159 fn first(self) -> Option<Self::Item> {
162 self.m.next()
163 }
164}