arrow_rayon/parallel_array/
parallel_boolean_array.rs

1use std::any::Any;
2use std::borrow::Borrow;
3use std::sync::Arc;
4
5use arrow_array::{Array, ArrayRef, BooleanArray};
6use arrow_buffer::NullBuffer;
7use arrow_data::ArrayData;
8use arrow_schema::DataType;
9use rayon::iter::plumbing::{bridge, Consumer, ProducerCallback, UnindexedConsumer};
10use rayon::iter::{
11    FromParallelIterator, IndexedParallelIterator, IntoParallelIterator, IntoParallelRefIterator,
12    ParallelIterator,
13};
14
15#[derive(Debug, Clone)]
16pub struct ParallelBooleanArray {
17    inner: BooleanArray,
18}
19
20impl ParallelBooleanArray {
21    pub fn new(inner: BooleanArray) -> Self {
22        Self { inner }
23    }
24
25    pub fn into_inner(self) -> BooleanArray {
26        self.inner
27    }
28}
29
30impl From<BooleanArray> for ParallelBooleanArray {
31    fn from(array: BooleanArray) -> Self {
32        Self::new(array)
33    }
34}
35
36impl Array for ParallelBooleanArray {
37    fn as_any(&self) -> &dyn Any {
38        self.inner.as_any()
39    }
40
41    fn to_data(&self) -> ArrayData {
42        self.inner.to_data()
43    }
44
45    fn into_data(self) -> ArrayData {
46        self.inner.into_data()
47    }
48
49    fn data_type(&self) -> &DataType {
50        self.inner.data_type()
51    }
52
53    fn slice(&self, offset: usize, length: usize) -> ArrayRef {
54        Arc::new(self.inner.slice(offset, length))
55    }
56
57    fn len(&self) -> usize {
58        self.inner.len()
59    }
60
61    fn is_empty(&self) -> bool {
62        self.inner.is_empty()
63    }
64
65    fn offset(&self) -> usize {
66        self.inner.offset()
67    }
68
69    fn nulls(&self) -> Option<&NullBuffer> {
70        self.inner.nulls()
71    }
72
73    fn get_buffer_memory_size(&self) -> usize {
74        self.inner.get_buffer_memory_size()
75    }
76
77    fn get_array_memory_size(&self) -> usize {
78        self.inner.get_array_memory_size()
79    }
80}
81
82impl IntoParallelIterator for ParallelBooleanArray {
83    type Item = Option<bool>;
84    type Iter = ParallelBooleanArrayIter;
85
86    fn into_par_iter(self) -> Self::Iter {
87        ParallelBooleanArrayIter::new(self.inner)
88    }
89}
90
91#[derive(Debug, Clone)]
92pub struct ParallelBooleanArrayIter {
93    inner: BooleanArray,
94}
95
96impl ParallelBooleanArrayIter {
97    fn new(inner: BooleanArray) -> Self {
98        Self { inner }
99    }
100}
101
102impl ParallelIterator for ParallelBooleanArrayIter {
103    type Item = Option<bool>;
104
105    fn drive_unindexed<C>(self, consumer: C) -> C::Result
106    where
107        C: UnindexedConsumer<Self::Item>,
108    {
109        bridge(self, consumer)
110    }
111
112    fn opt_len(&self) -> Option<usize> {
113        Some(self.inner.len())
114    }
115}
116
117impl IndexedParallelIterator for ParallelBooleanArrayIter {
118    fn len(&self) -> usize {
119        self.inner.len()
120    }
121
122    fn drive<C: Consumer<Self::Item>>(self, consumer: C) -> C::Result {
123        (0..self.inner.len())
124            .into_par_iter()
125            .map(|i| {
126                if self.inner.is_null(i) {
127                    None
128                } else {
129                    Some(unsafe { self.inner.value_unchecked(i) })
130                }
131            })
132            .drive(consumer)
133    }
134
135    fn with_producer<CB: ProducerCallback<Self::Item>>(self, callback: CB) -> CB::Output {
136        (0..self.inner.len())
137            .into_par_iter()
138            .map(|i| {
139                if self.inner.is_null(i) {
140                    None
141                } else {
142                    Some(unsafe { self.inner.value_unchecked(i) })
143                }
144            })
145            .with_producer(callback)
146    }
147}
148
149impl<Ptr: Borrow<Option<bool>> + Send> FromParallelIterator<Ptr> for ParallelBooleanArray {
150    fn from_par_iter<I>(par_iter: I) -> Self
151    where
152        I: IntoParallelIterator<Item = Ptr>,
153    {
154        // HACK
155        let vec = Vec::<Ptr>::from_par_iter(par_iter);
156        let iter = vec.into_iter();
157
158        Self::new(BooleanArray::from_iter(iter))
159    }
160}
161
162pub trait BooleanArrayIntoParallelIterator {
163    type Iter: IntoParallelIterator<Item = Option<bool>>;
164
165    fn into_par_iter(self) -> Self::Iter;
166}
167
168impl BooleanArrayIntoParallelIterator for BooleanArray {
169    type Iter = ParallelBooleanArray;
170
171    fn into_par_iter(self) -> Self::Iter {
172        ParallelBooleanArray::new(self)
173    }
174}
175
176#[derive(Debug, Clone)]
177pub struct ParallelBooleanArrayRef<'data> {
178    inner: &'data BooleanArray,
179}
180
181impl<'data> ParallelBooleanArrayRef<'data> {
182    pub fn new(inner: &'data BooleanArray) -> Self {
183        Self { inner }
184    }
185}
186
187impl<'data> From<&'data BooleanArray> for ParallelBooleanArrayRef<'data> {
188    fn from(array: &'data BooleanArray) -> Self {
189        Self::new(array)
190    }
191}
192
193impl ParallelIterator for ParallelBooleanArrayRef<'_> {
194    type Item = Option<bool>;
195
196    fn drive_unindexed<C>(self, consumer: C) -> C::Result
197    where
198        C: UnindexedConsumer<Self::Item>,
199    {
200        bridge(self, consumer)
201    }
202
203    fn opt_len(&self) -> Option<usize> {
204        Some(self.inner.len())
205    }
206}
207
208impl IndexedParallelIterator for ParallelBooleanArrayRef<'_> {
209    fn len(&self) -> usize {
210        self.inner.len()
211    }
212
213    fn drive<C: Consumer<Self::Item>>(self, consumer: C) -> C::Result {
214        (0..self.inner.len())
215            .into_par_iter()
216            .map(|i| {
217                if self.inner.is_null(i) {
218                    None
219                } else {
220                    Some(unsafe { self.inner.value_unchecked(i) })
221                }
222            })
223            .drive(consumer)
224    }
225
226    fn with_producer<CB: ProducerCallback<Self::Item>>(self, callback: CB) -> CB::Output {
227        (0..self.inner.len())
228            .into_par_iter()
229            .map(|i| {
230                if self.inner.is_null(i) {
231                    None
232                } else {
233                    Some(unsafe { self.inner.value_unchecked(i) })
234                }
235            })
236            .with_producer(callback)
237    }
238}
239
240impl<'data> IntoParallelRefIterator<'data> for ParallelBooleanArrayRef<'data> {
241    type Item = Option<bool>;
242    type Iter = ParallelBooleanArrayRef<'data>;
243
244    fn par_iter(&'data self) -> Self::Iter {
245        ParallelBooleanArrayRef::new(self.inner)
246    }
247}
248
249pub trait BooleanArrayRefParallelIterator<'data> {
250    type Iter: ParallelIterator<Item = Option<bool>>;
251
252    fn par_iter(&'data self) -> Self::Iter;
253}
254
255impl<'data> BooleanArrayRefParallelIterator<'data> for BooleanArray {
256    type Iter = ParallelBooleanArrayRef<'data>;
257
258    fn par_iter(&'data self) -> Self::Iter {
259        ParallelBooleanArrayRef::new(self)
260    }
261}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266
267    #[test]
268    fn test_par_iter() {
269        let array = BooleanArray::from(vec![Some(true), None, Some(false)]);
270        let items: Vec<bool> = array
271            .par_iter()
272            .map(|item| item.map_or(true, |item| !item))
273            .collect();
274        assert_eq!(items, vec![false, true, true]);
275    }
276
277    #[test]
278    fn test_collect_array() {
279        let array = BooleanArray::from(vec![Some(true), None, Some(false)]);
280        let collected_array: ParallelBooleanArray = array
281            .par_iter()
282            .map(|item| item.map(|item| !item))
283            .collect();
284        let boolean_array = collected_array.into_inner();
285        assert!(!boolean_array.value(0));
286        assert!(boolean_array.is_null(1));
287        assert!(boolean_array.value(2));
288    }
289}