arrow_rayon/parallel_array/
parallel_primitive_array.rs

1use std::any::Any;
2use std::sync::Arc;
3
4use arrow_array::{Array, ArrayRef, ArrowPrimitiveType, NativeAdapter, PrimitiveArray};
5use arrow_buffer::NullBuffer;
6use arrow_data::ArrayData;
7use arrow_schema::DataType;
8use rayon::iter::plumbing::{bridge, Consumer, ProducerCallback, UnindexedConsumer};
9use rayon::iter::{
10    FromParallelIterator, IndexedParallelIterator, IntoParallelIterator, IntoParallelRefIterator,
11    ParallelIterator,
12};
13
14#[derive(Clone)]
15pub struct ParallelPrimitiveArray<T: ArrowPrimitiveType> {
16    inner: PrimitiveArray<T>,
17}
18
19impl<T: ArrowPrimitiveType> ParallelPrimitiveArray<T> {
20    pub fn new(inner: PrimitiveArray<T>) -> Self {
21        Self { inner }
22    }
23
24    pub fn into_inner(self) -> PrimitiveArray<T> {
25        self.inner
26    }
27}
28
29impl<T: ArrowPrimitiveType> From<PrimitiveArray<T>> for ParallelPrimitiveArray<T> {
30    fn from(array: PrimitiveArray<T>) -> Self {
31        Self::new(array)
32    }
33}
34
35impl<T: ArrowPrimitiveType> std::fmt::Debug for ParallelPrimitiveArray<T> {
36    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
37        self.inner.fmt(f)
38    }
39}
40
41impl<T: ArrowPrimitiveType> Array for ParallelPrimitiveArray<T> {
42    fn as_any(&self) -> &dyn Any {
43        self.inner.as_any()
44    }
45
46    fn to_data(&self) -> ArrayData {
47        self.inner.to_data()
48    }
49
50    fn into_data(self) -> ArrayData {
51        self.inner.into_data()
52    }
53
54    fn data_type(&self) -> &DataType {
55        self.inner.data_type()
56    }
57
58    fn slice(&self, offset: usize, length: usize) -> ArrayRef {
59        Arc::new(self.inner.slice(offset, length))
60    }
61
62    fn len(&self) -> usize {
63        self.inner.len()
64    }
65
66    fn is_empty(&self) -> bool {
67        self.inner.is_empty()
68    }
69
70    fn offset(&self) -> usize {
71        self.inner.offset()
72    }
73
74    fn nulls(&self) -> Option<&NullBuffer> {
75        self.inner.nulls()
76    }
77
78    fn get_buffer_memory_size(&self) -> usize {
79        self.inner.get_buffer_memory_size()
80    }
81
82    fn get_array_memory_size(&self) -> usize {
83        self.inner.get_array_memory_size()
84    }
85}
86
87impl<T: ArrowPrimitiveType> IntoParallelIterator for ParallelPrimitiveArray<T> {
88    type Item = Option<T::Native>;
89    type Iter = ParallelPrimitiveArrayIter<T>;
90
91    fn into_par_iter(self) -> Self::Iter {
92        ParallelPrimitiveArrayIter::new(self.inner)
93    }
94}
95
96#[derive(Debug, Clone)]
97pub struct ParallelPrimitiveArrayIter<T: ArrowPrimitiveType> {
98    inner: PrimitiveArray<T>,
99}
100
101impl<T: ArrowPrimitiveType> ParallelPrimitiveArrayIter<T> {
102    fn new(inner: PrimitiveArray<T>) -> Self {
103        Self { inner }
104    }
105}
106
107impl<T: ArrowPrimitiveType> ParallelIterator for ParallelPrimitiveArrayIter<T> {
108    type Item = Option<T::Native>;
109
110    fn drive_unindexed<C>(self, consumer: C) -> C::Result
111    where
112        C: UnindexedConsumer<Self::Item>,
113    {
114        bridge(self, consumer)
115    }
116
117    fn opt_len(&self) -> Option<usize> {
118        Some(self.inner.len())
119    }
120}
121
122impl<T: ArrowPrimitiveType> IndexedParallelIterator for ParallelPrimitiveArrayIter<T> {
123    fn len(&self) -> usize {
124        self.inner.len()
125    }
126
127    fn drive<C: Consumer<Self::Item>>(self, consumer: C) -> C::Result {
128        (0..self.inner.len())
129            .into_par_iter()
130            .map(|i| {
131                if self.inner.is_null(i) {
132                    None
133                } else {
134                    Some(unsafe { self.inner.value_unchecked(i) })
135                }
136            })
137            .drive(consumer)
138    }
139
140    fn with_producer<CB: ProducerCallback<Self::Item>>(self, callback: CB) -> CB::Output {
141        (0..self.inner.len())
142            .into_par_iter()
143            .map(|i| {
144                if self.inner.is_null(i) {
145                    None
146                } else {
147                    Some(unsafe { self.inner.value_unchecked(i) })
148                }
149            })
150            .with_producer(callback)
151    }
152}
153
154impl<T: ArrowPrimitiveType, Ptr: Into<NativeAdapter<T>> + Send> FromParallelIterator<Ptr>
155    for ParallelPrimitiveArray<T>
156{
157    fn from_par_iter<I>(par_iter: I) -> Self
158    where
159        I: IntoParallelIterator<Item = Ptr>,
160    {
161        // HACK
162        let vec = Vec::<Ptr>::from_par_iter(par_iter);
163        let iter = vec.into_iter();
164
165        Self::new(PrimitiveArray::from_iter(iter))
166    }
167}
168
169#[derive(Debug, Clone)]
170pub struct ParallelPrimitiveArrayRef<'data, T: ArrowPrimitiveType> {
171    inner: &'data PrimitiveArray<T>,
172}
173
174impl<'data, T: ArrowPrimitiveType> ParallelPrimitiveArrayRef<'data, T> {
175    pub fn new(inner: &'data PrimitiveArray<T>) -> Self {
176        Self { inner }
177    }
178}
179
180impl<'data, T: ArrowPrimitiveType> From<&'data PrimitiveArray<T>>
181    for ParallelPrimitiveArrayRef<'data, T>
182{
183    fn from(array: &'data PrimitiveArray<T>) -> Self {
184        Self::new(array)
185    }
186}
187
188impl<T: ArrowPrimitiveType> ParallelIterator for ParallelPrimitiveArrayRef<'_, T> {
189    type Item = Option<T::Native>;
190
191    fn drive_unindexed<C>(self, consumer: C) -> C::Result
192    where
193        C: UnindexedConsumer<Self::Item>,
194    {
195        bridge(self, consumer)
196    }
197
198    fn opt_len(&self) -> Option<usize> {
199        Some(self.inner.len())
200    }
201}
202
203impl<T: ArrowPrimitiveType> IndexedParallelIterator for ParallelPrimitiveArrayRef<'_, T> {
204    fn len(&self) -> usize {
205        self.inner.len()
206    }
207
208    fn drive<C: Consumer<Self::Item>>(self, consumer: C) -> C::Result {
209        (0..self.inner.len())
210            .into_par_iter()
211            .map(|i| {
212                if self.inner.is_null(i) {
213                    None
214                } else {
215                    Some(unsafe { self.inner.value_unchecked(i) })
216                }
217            })
218            .drive(consumer)
219    }
220
221    fn with_producer<CB: ProducerCallback<Self::Item>>(self, callback: CB) -> CB::Output {
222        (0..self.inner.len())
223            .into_par_iter()
224            .map(|i| {
225                if self.inner.is_null(i) {
226                    None
227                } else {
228                    Some(unsafe { self.inner.value_unchecked(i) })
229                }
230            })
231            .with_producer(callback)
232    }
233}
234
235impl<'data, T: ArrowPrimitiveType> IntoParallelRefIterator<'data>
236    for ParallelPrimitiveArrayRef<'data, T>
237{
238    type Item = Option<T::Native>;
239    type Iter = ParallelPrimitiveArrayRef<'data, T>;
240
241    fn par_iter(&'data self) -> Self::Iter {
242        ParallelPrimitiveArrayRef::new(self.inner)
243    }
244}
245
246pub trait PrimitiveArrayRefParallelIterator<'data, T: ArrowPrimitiveType> {
247    type Iter: ParallelIterator<Item = Option<T::Native>>;
248
249    fn par_iter(&'data self) -> Self::Iter;
250}
251
252impl<'data, T: ArrowPrimitiveType> PrimitiveArrayRefParallelIterator<'data, T>
253    for PrimitiveArray<T>
254{
255    type Iter = ParallelPrimitiveArrayRef<'data, T>;
256
257    fn par_iter(&'data self) -> Self::Iter {
258        ParallelPrimitiveArrayRef::new(self)
259    }
260}