orx_parallel/computations/generalized_values/
atom.rs

1use super::{values::Values, vector::Vector};
2use orx_concurrent_bag::ConcurrentBag;
3use orx_concurrent_ordered_bag::ConcurrentOrderedBag;
4use orx_pinned_vec::{IntoConcurrentPinnedVec, PinnedVec};
5
6pub struct Atom<T>(pub T);
7
8impl<T> Values for Atom<T>
9where
10    T: Send + Sync,
11{
12    type Item = T;
13
14    type Mapped<M, O>
15        = Atom<O>
16    where
17        O: Send + Sync,
18        M: Fn(Self::Item) -> O + Send + Sync;
19
20    type Filtered<F>
21        = Option<Self::Item>
22    where
23        F: Fn(&Self::Item) -> bool + Send + Sync;
24
25    type FlatMapped<Fm, Vo>
26        = Vector<Vo>
27    where
28        Vo: IntoIterator + Send + Sync,
29        Vo::Item: Send + Sync,
30        Vo::IntoIter: Send + Sync,
31        Fm: Fn(Self::Item) -> Vo + Send + Sync;
32
33    type FilterMapped<Fm, O>
34        = Option<O>
35    where
36        O: Send + Sync,
37        Fm: Fn(Self::Item) -> Option<O> + Send + Sync;
38
39    fn values(self) -> impl IntoIterator<Item = T> {
40        core::iter::once(self.0)
41    }
42
43    #[inline(always)]
44    fn push_to_pinned_vec<P>(self, vector: &mut P)
45    where
46        P: PinnedVec<T>,
47    {
48        vector.push(self.0);
49    }
50
51    #[inline(always)]
52    fn push_to_bag<P>(self, bag: &ConcurrentBag<T, P>)
53    where
54        P: IntoConcurrentPinnedVec<T>,
55        T: Send + Sync,
56    {
57        bag.push(self.0);
58    }
59
60    #[inline(always)]
61    fn push_to_ordered_bag<P>(self, idx: usize, o_bag: &ConcurrentOrderedBag<Self::Item, P>)
62    where
63        P: IntoConcurrentPinnedVec<Self::Item>,
64        Self::Item: Send + Sync,
65    {
66        unsafe { o_bag.set_value(idx, self.0) };
67    }
68
69    #[inline(always)]
70    fn push_to_vec_with_idx(self, idx: usize, vec: &mut Vec<(usize, T)>) {
71        vec.push((idx, self.0))
72    }
73
74    #[inline(always)]
75    fn map<M, O>(self, map: M) -> Self::Mapped<M, O>
76    where
77        O: Send + Sync,
78        M: Fn(Self::Item) -> O + Send + Sync,
79    {
80        Atom(map(self.0))
81    }
82
83    #[inline(always)]
84    fn filter<F>(self, filter: F) -> Self::Filtered<F>
85    where
86        F: Fn(&Self::Item) -> bool + Send + Sync,
87    {
88        match filter(&self.0) {
89            true => Some(self.0),
90            false => None,
91        }
92    }
93
94    #[inline(always)]
95    fn flat_map<Fm, Vo>(self, flat_map: Fm) -> Self::FlatMapped<Fm, Vo>
96    where
97        Vo: IntoIterator + Send + Sync,
98        Vo::Item: Send + Sync,
99        Vo::IntoIter: Send + Sync,
100        Fm: Fn(Self::Item) -> Vo + Send + Sync,
101    {
102        Vector(flat_map(self.0))
103    }
104
105    #[inline(always)]
106    fn filter_map<Fm, O>(self, filter_map: Fm) -> Self::FilterMapped<Fm, O>
107    where
108        O: Send + Sync,
109        Fm: Fn(Self::Item) -> Option<O> + Send + Sync,
110    {
111        filter_map(self.0)
112    }
113
114    #[inline(always)]
115    fn acc_reduce<X>(self, acc: Option<Self::Item>, reduce: X) -> Option<Self::Item>
116    where
117        X: Fn(Self::Item, Self::Item) -> Self::Item + Send + Sync,
118    {
119        match acc {
120            Some(x) => Some(reduce(x, self.0)),
121            None => Some(self.0),
122        }
123    }
124
125    #[inline(always)]
126    fn fx_reduce<F, M2, Vo, X>(
127        self,
128        acc: Option<Vo::Item>,
129        filter: F,
130        map2: M2,
131        reduce: X,
132    ) -> Option<Vo::Item>
133    where
134        Self: Sized,
135        F: Fn(&Self::Item) -> bool + Send + Sync,
136        M2: Fn(Self::Item) -> Vo + Send + Sync,
137        Vo: Values,
138        Vo::Item: Send + Sync,
139        X: Fn(Vo::Item, Vo::Item) -> Vo::Item + Send + Sync,
140    {
141        match filter(&self.0) {
142            true => map2(self.0).acc_reduce(acc, reduce),
143            false => acc,
144        }
145    }
146
147    #[inline(always)]
148    fn first(self) -> Option<Self::Item> {
149        Some(self.0)
150    }
151
152    #[inline(always)]
153    fn fx_next<F, M2, Vo>(self, filter: F, map2: M2) -> Option<Vo::Item>
154    where
155        F: Fn(&Self::Item) -> bool + Send + Sync,
156        M2: Fn(Self::Item) -> Vo + Send + Sync,
157        Vo: Values,
158        Vo::Item: Send + Sync,
159    {
160        match filter(&self.0) {
161            true => map2(self.0).first(),
162            false => None,
163        }
164    }
165
166    #[inline(always)]
167    fn filter_map_collect_sequential<F, M2, P, Vo>(self, filter: F, map2: M2, vector: &mut P)
168    where
169        F: Fn(&Self::Item) -> bool + Send + Sync,
170        M2: Fn(Self::Item) -> Vo + Send + Sync,
171        Vo: Values,
172        P: IntoConcurrentPinnedVec<Vo::Item>,
173    {
174        if filter(&self.0) {
175            let vo = map2(self.0);
176            vo.push_to_pinned_vec(vector);
177        }
178    }
179
180    #[inline(always)]
181    fn filter_map_collect_arbitrary<F, M2, P, Vo>(
182        self,
183        filter: F,
184        map2: M2,
185        bag: &ConcurrentBag<Vo::Item, P>,
186    ) where
187        F: Fn(&Self::Item) -> bool + Send + Sync,
188        M2: Fn(Self::Item) -> Vo + Send + Sync,
189        Vo: Values,
190        Vo::Item: Send + Sync,
191        P: IntoConcurrentPinnedVec<Vo::Item>,
192    {
193        if filter(&self.0) {
194            let vo = map2(self.0);
195            vo.push_to_bag(bag);
196        }
197    }
198
199    #[inline(always)]
200    fn xfx_collect_heap<F, M2, Vo>(
201        self,
202        input_idx: usize,
203        filter: F,
204        map2: M2,
205        vec: &mut Vec<(usize, Vo::Item)>,
206    ) where
207        F: Fn(&Self::Item) -> bool + Send + Sync,
208        M2: Fn(Self::Item) -> Vo + Send + Sync,
209        Vo: Values,
210        Vo::Item: Send + Sync,
211    {
212        if filter(&self.0) {
213            let vo = map2(self.0);
214            vo.push_to_vec_with_idx(input_idx, vec);
215        }
216    }
217
218    fn filter_map_collect_in_input_order<F, M2, P, Vo>(
219        self,
220        input_idx: usize,
221        filter: F,
222        map2: M2,
223        o_bag: &ConcurrentOrderedBag<Vo::Item, P>,
224    ) where
225        F: Fn(&Self::Item) -> bool + Send + Sync,
226        M2: Fn(Self::Item) -> Vo + Send + Sync,
227        Vo: Values,
228        Vo::Item: Send + Sync,
229        P: IntoConcurrentPinnedVec<Vo::Item>,
230    {
231        if filter(&self.0) {
232            let vo = map2(self.0);
233            vo.push_to_ordered_bag(input_idx, o_bag);
234        }
235    }
236}