arrow_rayon/parallel_array/
parallel_boolean_array.rs1use std::any::Any;
2use std::borrow::Borrow;
3use std::sync::Arc;
4
5use arrow_array::{Array, ArrayRef, BooleanArray};
6use arrow_buffer::NullBuffer;
7use arrow_data::ArrayData;
8use arrow_schema::DataType;
9use rayon::iter::plumbing::{bridge, Consumer, ProducerCallback, UnindexedConsumer};
10use rayon::iter::{
11 FromParallelIterator, IndexedParallelIterator, IntoParallelIterator, IntoParallelRefIterator,
12 ParallelIterator,
13};
14
15#[derive(Debug, Clone)]
16pub struct ParallelBooleanArray {
17 inner: BooleanArray,
18}
19
20impl ParallelBooleanArray {
21 pub fn new(inner: BooleanArray) -> Self {
22 Self { inner }
23 }
24
25 pub fn into_inner(self) -> BooleanArray {
26 self.inner
27 }
28}
29
30impl From<BooleanArray> for ParallelBooleanArray {
31 fn from(array: BooleanArray) -> Self {
32 Self::new(array)
33 }
34}
35
36impl Array for ParallelBooleanArray {
37 fn as_any(&self) -> &dyn Any {
38 self.inner.as_any()
39 }
40
41 fn to_data(&self) -> ArrayData {
42 self.inner.to_data()
43 }
44
45 fn into_data(self) -> ArrayData {
46 self.inner.into_data()
47 }
48
49 fn data_type(&self) -> &DataType {
50 self.inner.data_type()
51 }
52
53 fn slice(&self, offset: usize, length: usize) -> ArrayRef {
54 Arc::new(self.inner.slice(offset, length))
55 }
56
57 fn len(&self) -> usize {
58 self.inner.len()
59 }
60
61 fn is_empty(&self) -> bool {
62 self.inner.is_empty()
63 }
64
65 fn offset(&self) -> usize {
66 self.inner.offset()
67 }
68
69 fn nulls(&self) -> Option<&NullBuffer> {
70 self.inner.nulls()
71 }
72
73 fn get_buffer_memory_size(&self) -> usize {
74 self.inner.get_buffer_memory_size()
75 }
76
77 fn get_array_memory_size(&self) -> usize {
78 self.inner.get_array_memory_size()
79 }
80}
81
82impl IntoParallelIterator for ParallelBooleanArray {
83 type Item = Option<bool>;
84 type Iter = ParallelBooleanArrayIter;
85
86 fn into_par_iter(self) -> Self::Iter {
87 ParallelBooleanArrayIter::new(self.inner)
88 }
89}
90
91#[derive(Debug, Clone)]
92pub struct ParallelBooleanArrayIter {
93 inner: BooleanArray,
94}
95
96impl ParallelBooleanArrayIter {
97 fn new(inner: BooleanArray) -> Self {
98 Self { inner }
99 }
100}
101
102impl ParallelIterator for ParallelBooleanArrayIter {
103 type Item = Option<bool>;
104
105 fn drive_unindexed<C>(self, consumer: C) -> C::Result
106 where
107 C: UnindexedConsumer<Self::Item>,
108 {
109 bridge(self, consumer)
110 }
111
112 fn opt_len(&self) -> Option<usize> {
113 Some(self.inner.len())
114 }
115}
116
117impl IndexedParallelIterator for ParallelBooleanArrayIter {
118 fn len(&self) -> usize {
119 self.inner.len()
120 }
121
122 fn drive<C: Consumer<Self::Item>>(self, consumer: C) -> C::Result {
123 (0..self.inner.len())
124 .into_par_iter()
125 .map(|i| {
126 if self.inner.is_null(i) {
127 None
128 } else {
129 Some(unsafe { self.inner.value_unchecked(i) })
130 }
131 })
132 .drive(consumer)
133 }
134
135 fn with_producer<CB: ProducerCallback<Self::Item>>(self, callback: CB) -> CB::Output {
136 (0..self.inner.len())
137 .into_par_iter()
138 .map(|i| {
139 if self.inner.is_null(i) {
140 None
141 } else {
142 Some(unsafe { self.inner.value_unchecked(i) })
143 }
144 })
145 .with_producer(callback)
146 }
147}
148
149impl<Ptr: Borrow<Option<bool>> + Send> FromParallelIterator<Ptr> for ParallelBooleanArray {
150 fn from_par_iter<I>(par_iter: I) -> Self
151 where
152 I: IntoParallelIterator<Item = Ptr>,
153 {
154 let vec = Vec::<Ptr>::from_par_iter(par_iter);
156 let iter = vec.into_iter();
157
158 Self::new(BooleanArray::from_iter(iter))
159 }
160}
161
162pub trait BooleanArrayIntoParallelIterator {
163 type Iter: IntoParallelIterator<Item = Option<bool>>;
164
165 fn into_par_iter(self) -> Self::Iter;
166}
167
168impl BooleanArrayIntoParallelIterator for BooleanArray {
169 type Iter = ParallelBooleanArray;
170
171 fn into_par_iter(self) -> Self::Iter {
172 ParallelBooleanArray::new(self)
173 }
174}
175
176#[derive(Debug, Clone)]
177pub struct ParallelBooleanArrayRef<'data> {
178 inner: &'data BooleanArray,
179}
180
181impl<'data> ParallelBooleanArrayRef<'data> {
182 pub fn new(inner: &'data BooleanArray) -> Self {
183 Self { inner }
184 }
185}
186
187impl<'data> From<&'data BooleanArray> for ParallelBooleanArrayRef<'data> {
188 fn from(array: &'data BooleanArray) -> Self {
189 Self::new(array)
190 }
191}
192
193impl ParallelIterator for ParallelBooleanArrayRef<'_> {
194 type Item = Option<bool>;
195
196 fn drive_unindexed<C>(self, consumer: C) -> C::Result
197 where
198 C: UnindexedConsumer<Self::Item>,
199 {
200 bridge(self, consumer)
201 }
202
203 fn opt_len(&self) -> Option<usize> {
204 Some(self.inner.len())
205 }
206}
207
208impl IndexedParallelIterator for ParallelBooleanArrayRef<'_> {
209 fn len(&self) -> usize {
210 self.inner.len()
211 }
212
213 fn drive<C: Consumer<Self::Item>>(self, consumer: C) -> C::Result {
214 (0..self.inner.len())
215 .into_par_iter()
216 .map(|i| {
217 if self.inner.is_null(i) {
218 None
219 } else {
220 Some(unsafe { self.inner.value_unchecked(i) })
221 }
222 })
223 .drive(consumer)
224 }
225
226 fn with_producer<CB: ProducerCallback<Self::Item>>(self, callback: CB) -> CB::Output {
227 (0..self.inner.len())
228 .into_par_iter()
229 .map(|i| {
230 if self.inner.is_null(i) {
231 None
232 } else {
233 Some(unsafe { self.inner.value_unchecked(i) })
234 }
235 })
236 .with_producer(callback)
237 }
238}
239
240impl<'data> IntoParallelRefIterator<'data> for ParallelBooleanArrayRef<'data> {
241 type Item = Option<bool>;
242 type Iter = ParallelBooleanArrayRef<'data>;
243
244 fn par_iter(&'data self) -> Self::Iter {
245 ParallelBooleanArrayRef::new(self.inner)
246 }
247}
248
249pub trait BooleanArrayRefParallelIterator<'data> {
250 type Iter: ParallelIterator<Item = Option<bool>>;
251
252 fn par_iter(&'data self) -> Self::Iter;
253}
254
255impl<'data> BooleanArrayRefParallelIterator<'data> for BooleanArray {
256 type Iter = ParallelBooleanArrayRef<'data>;
257
258 fn par_iter(&'data self) -> Self::Iter {
259 ParallelBooleanArrayRef::new(self)
260 }
261}
262
263#[cfg(test)]
264mod tests {
265 use super::*;
266
267 #[test]
268 fn test_par_iter() {
269 let array = BooleanArray::from(vec![Some(true), None, Some(false)]);
270 let items: Vec<bool> = array
271 .par_iter()
272 .map(|item| item.map_or(true, |item| !item))
273 .collect();
274 assert_eq!(items, vec![false, true, true]);
275 }
276
277 #[test]
278 fn test_collect_array() {
279 let array = BooleanArray::from(vec![Some(true), None, Some(false)]);
280 let collected_array: ParallelBooleanArray = array
281 .par_iter()
282 .map(|item| item.map(|item| !item))
283 .collect();
284 let boolean_array = collected_array.into_inner();
285 assert!(!boolean_array.value(0));
286 assert!(boolean_array.is_null(1));
287 assert!(boolean_array.value(2));
288 }
289}