orx_parallel/computational_variants/
xap.rs1use crate::{
2 ChunkSize, IterationOrder, NumThreads, ParCollectInto, ParIter, Params,
3 computational_variants::xap_filter_xap::ParXapFilterXap,
4 computations::{Values, X, map_self_atom},
5 runner::{DefaultRunner, ParallelRunner},
6};
7use orx_concurrent_iter::ConcurrentIter;
8use std::marker::PhantomData;
9
10pub struct ParXap<I, Vo, M1, R = DefaultRunner>
14where
15 R: ParallelRunner,
16 I: ConcurrentIter,
17 Vo: Values + Send + Sync,
18 Vo::Item: Send + Sync,
19 M1: Fn(I::Item) -> Vo + Send + Sync,
20{
21 x: X<I, Vo, M1>,
22 phantom: PhantomData<R>,
23}
24
25impl<I, Vo, M1, R> ParXap<I, Vo, M1, R>
26where
27 R: ParallelRunner,
28 I: ConcurrentIter,
29 Vo: Values + Send + Sync,
30 Vo::Item: Send + Sync,
31 M1: Fn(I::Item) -> Vo + Send + Sync,
32{
33 pub(crate) fn new(params: Params, iter: I, x1: M1) -> Self {
34 Self {
35 x: X::new(params, iter, x1),
36 phantom: PhantomData,
37 }
38 }
39
40 fn destruct(self) -> (Params, I, M1) {
41 self.x.destruct()
42 }
43}
44
45unsafe impl<I, Vo, M1, R> Send for ParXap<I, Vo, M1, R>
46where
47 R: ParallelRunner,
48 I: ConcurrentIter,
49 Vo: Values + Send + Sync,
50 Vo::Item: Send + Sync,
51 M1: Fn(I::Item) -> Vo + Send + Sync,
52{
53}
54
55unsafe impl<I, Vo, M1, R> Sync for ParXap<I, Vo, M1, R>
56where
57 R: ParallelRunner,
58 I: ConcurrentIter,
59 Vo: Values + Send + Sync,
60 Vo::Item: Send + Sync,
61 M1: Fn(I::Item) -> Vo + Send + Sync,
62{
63}
64
65impl<I, Vo, M1, R> ParIter<R> for ParXap<I, Vo, M1, R>
66where
67 R: ParallelRunner,
68 I: ConcurrentIter,
69 Vo: Values + Send + Sync,
70 Vo::Item: Send + Sync,
71 M1: Fn(I::Item) -> Vo + Send + Sync,
72{
73 type Item = Vo::Item;
74
75 fn con_iter(&self) -> &impl ConcurrentIter {
76 self.x.iter()
77 }
78
79 fn params(&self) -> &Params {
80 self.x.params()
81 }
82
83 fn num_threads(mut self, num_threads: impl Into<NumThreads>) -> Self {
86 self.x.num_threads(num_threads);
87 self
88 }
89
90 fn chunk_size(mut self, chunk_size: impl Into<ChunkSize>) -> Self {
91 self.x.chunk_size(chunk_size);
92 self
93 }
94
95 fn iteration_order(mut self, collect: IterationOrder) -> Self {
96 self.x.iteration_order(collect);
97 self
98 }
99
100 fn with_runner<Q: ParallelRunner>(self) -> impl ParIter<Q, Item = Self::Item> {
101 let (params, iter, map1) = self.destruct();
102 ParXap::new(params, iter, map1)
103 }
104
105 fn map<Out, Map>(self, map: Map) -> impl ParIter<R, Item = Out>
108 where
109 Out: Send + Sync,
110 Map: Fn(Self::Item) -> Out + Send + Sync + Clone,
111 {
112 let (params, iter, x1) = self.destruct();
113 let x1 = move |i: I::Item| {
114 let vo = x1(i);
115 vo.map(map.clone())
116 };
117
118 ParXap::new(params, iter, x1)
119 }
120
121 fn filter<Filter>(self, filter: Filter) -> impl ParIter<R, Item = Self::Item>
122 where
123 Filter: Fn(&Self::Item) -> bool + Send + Sync,
124 {
125 let (params, iter, x1) = self.destruct();
126 ParXapFilterXap::new(params, iter, x1, filter, map_self_atom)
127 }
128
129 fn flat_map<IOut, FlatMap>(self, flat_map: FlatMap) -> impl ParIter<R, Item = IOut::Item>
130 where
131 IOut: IntoIterator + Send + Sync,
132 IOut::IntoIter: Send + Sync,
133 IOut::Item: Send + Sync,
134 FlatMap: Fn(Self::Item) -> IOut + Send + Sync + Clone,
135 {
136 let (params, iter, x1) = self.destruct();
137 let x1 = move |i: I::Item| {
138 let vo = x1(i);
139 vo.flat_map(flat_map.clone())
140 };
141 ParXap::new(params, iter, x1)
142 }
143
144 fn filter_map<Out, FilterMap>(self, filter_map: FilterMap) -> impl ParIter<R, Item = Out>
145 where
146 Out: Send + Sync,
147 FilterMap: Fn(Self::Item) -> Option<Out> + Send + Sync + Clone,
148 {
149 let (params, iter, x1) = self.destruct();
150 let x1 = move |i: I::Item| {
151 let vo = x1(i);
152 vo.flat_map(filter_map.clone())
153 };
154 ParXap::new(params, iter, x1)
155 }
156
157 fn collect_into<C>(self, output: C) -> C
160 where
161 C: ParCollectInto<Self::Item>,
162 {
163 output.x_collect_into::<R, _, _, _>(self.x)
164 }
165
166 fn reduce<Reduce>(self, reduce: Reduce) -> Option<Self::Item>
169 where
170 Reduce: Fn(Self::Item, Self::Item) -> Self::Item + Send + Sync,
171 {
172 self.x.reduce::<R, _>(reduce).1
173 }
174
175 fn first(self) -> Option<Self::Item> {
178 self.x.next()
179 }
180}