orx_parallel/computations/generalized_values/
atom.rs1use super::{values::Values, vector::Vector};
2use orx_concurrent_bag::ConcurrentBag;
3use orx_concurrent_ordered_bag::ConcurrentOrderedBag;
4use orx_pinned_vec::{IntoConcurrentPinnedVec, PinnedVec};
5
6pub struct Atom<T>(pub T);
7
8impl<T> Values for Atom<T>
9where
10 T: Send + Sync,
11{
12 type Item = T;
13
14 type Mapped<M, O>
15 = Atom<O>
16 where
17 O: Send + Sync,
18 M: Fn(Self::Item) -> O + Send + Sync;
19
20 type Filtered<F>
21 = Option<Self::Item>
22 where
23 F: Fn(&Self::Item) -> bool + Send + Sync;
24
25 type FlatMapped<Fm, Vo>
26 = Vector<Vo>
27 where
28 Vo: IntoIterator + Send + Sync,
29 Vo::Item: Send + Sync,
30 Vo::IntoIter: Send + Sync,
31 Fm: Fn(Self::Item) -> Vo + Send + Sync;
32
33 type FilterMapped<Fm, O>
34 = Option<O>
35 where
36 O: Send + Sync,
37 Fm: Fn(Self::Item) -> Option<O> + Send + Sync;
38
39 fn values(self) -> impl IntoIterator<Item = T> {
40 core::iter::once(self.0)
41 }
42
43 #[inline(always)]
44 fn push_to_pinned_vec<P>(self, vector: &mut P)
45 where
46 P: PinnedVec<T>,
47 {
48 vector.push(self.0);
49 }
50
51 #[inline(always)]
52 fn push_to_bag<P>(self, bag: &ConcurrentBag<T, P>)
53 where
54 P: IntoConcurrentPinnedVec<T>,
55 T: Send + Sync,
56 {
57 bag.push(self.0);
58 }
59
60 #[inline(always)]
61 fn push_to_ordered_bag<P>(self, idx: usize, o_bag: &ConcurrentOrderedBag<Self::Item, P>)
62 where
63 P: IntoConcurrentPinnedVec<Self::Item>,
64 Self::Item: Send + Sync,
65 {
66 unsafe { o_bag.set_value(idx, self.0) };
67 }
68
69 #[inline(always)]
70 fn push_to_vec_with_idx(self, idx: usize, vec: &mut Vec<(usize, T)>) {
71 vec.push((idx, self.0))
72 }
73
74 #[inline(always)]
75 fn map<M, O>(self, map: M) -> Self::Mapped<M, O>
76 where
77 O: Send + Sync,
78 M: Fn(Self::Item) -> O + Send + Sync,
79 {
80 Atom(map(self.0))
81 }
82
83 #[inline(always)]
84 fn filter<F>(self, filter: F) -> Self::Filtered<F>
85 where
86 F: Fn(&Self::Item) -> bool + Send + Sync,
87 {
88 match filter(&self.0) {
89 true => Some(self.0),
90 false => None,
91 }
92 }
93
94 #[inline(always)]
95 fn flat_map<Fm, Vo>(self, flat_map: Fm) -> Self::FlatMapped<Fm, Vo>
96 where
97 Vo: IntoIterator + Send + Sync,
98 Vo::Item: Send + Sync,
99 Vo::IntoIter: Send + Sync,
100 Fm: Fn(Self::Item) -> Vo + Send + Sync,
101 {
102 Vector(flat_map(self.0))
103 }
104
105 #[inline(always)]
106 fn filter_map<Fm, O>(self, filter_map: Fm) -> Self::FilterMapped<Fm, O>
107 where
108 O: Send + Sync,
109 Fm: Fn(Self::Item) -> Option<O> + Send + Sync,
110 {
111 filter_map(self.0)
112 }
113
114 #[inline(always)]
115 fn acc_reduce<X>(self, acc: Option<Self::Item>, reduce: X) -> Option<Self::Item>
116 where
117 X: Fn(Self::Item, Self::Item) -> Self::Item + Send + Sync,
118 {
119 match acc {
120 Some(x) => Some(reduce(x, self.0)),
121 None => Some(self.0),
122 }
123 }
124
125 #[inline(always)]
126 fn fx_reduce<F, M2, Vo, X>(
127 self,
128 acc: Option<Vo::Item>,
129 filter: F,
130 map2: M2,
131 reduce: X,
132 ) -> Option<Vo::Item>
133 where
134 Self: Sized,
135 F: Fn(&Self::Item) -> bool + Send + Sync,
136 M2: Fn(Self::Item) -> Vo + Send + Sync,
137 Vo: Values,
138 Vo::Item: Send + Sync,
139 X: Fn(Vo::Item, Vo::Item) -> Vo::Item + Send + Sync,
140 {
141 match filter(&self.0) {
142 true => map2(self.0).acc_reduce(acc, reduce),
143 false => acc,
144 }
145 }
146
147 #[inline(always)]
148 fn first(self) -> Option<Self::Item> {
149 Some(self.0)
150 }
151
152 #[inline(always)]
153 fn fx_next<F, M2, Vo>(self, filter: F, map2: M2) -> Option<Vo::Item>
154 where
155 F: Fn(&Self::Item) -> bool + Send + Sync,
156 M2: Fn(Self::Item) -> Vo + Send + Sync,
157 Vo: Values,
158 Vo::Item: Send + Sync,
159 {
160 match filter(&self.0) {
161 true => map2(self.0).first(),
162 false => None,
163 }
164 }
165
166 #[inline(always)]
167 fn filter_map_collect_sequential<F, M2, P, Vo>(self, filter: F, map2: M2, vector: &mut P)
168 where
169 F: Fn(&Self::Item) -> bool + Send + Sync,
170 M2: Fn(Self::Item) -> Vo + Send + Sync,
171 Vo: Values,
172 P: IntoConcurrentPinnedVec<Vo::Item>,
173 {
174 if filter(&self.0) {
175 let vo = map2(self.0);
176 vo.push_to_pinned_vec(vector);
177 }
178 }
179
180 #[inline(always)]
181 fn filter_map_collect_arbitrary<F, M2, P, Vo>(
182 self,
183 filter: F,
184 map2: M2,
185 bag: &ConcurrentBag<Vo::Item, P>,
186 ) where
187 F: Fn(&Self::Item) -> bool + Send + Sync,
188 M2: Fn(Self::Item) -> Vo + Send + Sync,
189 Vo: Values,
190 Vo::Item: Send + Sync,
191 P: IntoConcurrentPinnedVec<Vo::Item>,
192 {
193 if filter(&self.0) {
194 let vo = map2(self.0);
195 vo.push_to_bag(bag);
196 }
197 }
198
199 #[inline(always)]
200 fn xfx_collect_heap<F, M2, Vo>(
201 self,
202 input_idx: usize,
203 filter: F,
204 map2: M2,
205 vec: &mut Vec<(usize, Vo::Item)>,
206 ) where
207 F: Fn(&Self::Item) -> bool + Send + Sync,
208 M2: Fn(Self::Item) -> Vo + Send + Sync,
209 Vo: Values,
210 Vo::Item: Send + Sync,
211 {
212 if filter(&self.0) {
213 let vo = map2(self.0);
214 vo.push_to_vec_with_idx(input_idx, vec);
215 }
216 }
217
218 fn filter_map_collect_in_input_order<F, M2, P, Vo>(
219 self,
220 input_idx: usize,
221 filter: F,
222 map2: M2,
223 o_bag: &ConcurrentOrderedBag<Vo::Item, P>,
224 ) where
225 F: Fn(&Self::Item) -> bool + Send + Sync,
226 M2: Fn(Self::Item) -> Vo + Send + Sync,
227 Vo: Values,
228 Vo::Item: Send + Sync,
229 P: IntoConcurrentPinnedVec<Vo::Item>,
230 {
231 if filter(&self.0) {
232 let vo = map2(self.0);
233 vo.push_to_ordered_bag(input_idx, o_bag);
234 }
235 }
236}