arrow_rayon/parallel_array/
parallel_primitive_array.rs1use std::any::Any;
2use std::sync::Arc;
3
4use arrow_array::{Array, ArrayRef, ArrowPrimitiveType, NativeAdapter, PrimitiveArray};
5use arrow_buffer::NullBuffer;
6use arrow_data::ArrayData;
7use arrow_schema::DataType;
8use rayon::iter::plumbing::{bridge, Consumer, ProducerCallback, UnindexedConsumer};
9use rayon::iter::{
10 FromParallelIterator, IndexedParallelIterator, IntoParallelIterator, IntoParallelRefIterator,
11 ParallelIterator,
12};
13
14#[derive(Clone)]
15pub struct ParallelPrimitiveArray<T: ArrowPrimitiveType> {
16 inner: PrimitiveArray<T>,
17}
18
19impl<T: ArrowPrimitiveType> ParallelPrimitiveArray<T> {
20 pub fn new(inner: PrimitiveArray<T>) -> Self {
21 Self { inner }
22 }
23
24 pub fn into_inner(self) -> PrimitiveArray<T> {
25 self.inner
26 }
27}
28
29impl<T: ArrowPrimitiveType> From<PrimitiveArray<T>> for ParallelPrimitiveArray<T> {
30 fn from(array: PrimitiveArray<T>) -> Self {
31 Self::new(array)
32 }
33}
34
35impl<T: ArrowPrimitiveType> std::fmt::Debug for ParallelPrimitiveArray<T> {
36 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
37 self.inner.fmt(f)
38 }
39}
40
41impl<T: ArrowPrimitiveType> Array for ParallelPrimitiveArray<T> {
42 fn as_any(&self) -> &dyn Any {
43 self.inner.as_any()
44 }
45
46 fn to_data(&self) -> ArrayData {
47 self.inner.to_data()
48 }
49
50 fn into_data(self) -> ArrayData {
51 self.inner.into_data()
52 }
53
54 fn data_type(&self) -> &DataType {
55 self.inner.data_type()
56 }
57
58 fn slice(&self, offset: usize, length: usize) -> ArrayRef {
59 Arc::new(self.inner.slice(offset, length))
60 }
61
62 fn len(&self) -> usize {
63 self.inner.len()
64 }
65
66 fn is_empty(&self) -> bool {
67 self.inner.is_empty()
68 }
69
70 fn offset(&self) -> usize {
71 self.inner.offset()
72 }
73
74 fn nulls(&self) -> Option<&NullBuffer> {
75 self.inner.nulls()
76 }
77
78 fn get_buffer_memory_size(&self) -> usize {
79 self.inner.get_buffer_memory_size()
80 }
81
82 fn get_array_memory_size(&self) -> usize {
83 self.inner.get_array_memory_size()
84 }
85}
86
87impl<T: ArrowPrimitiveType> IntoParallelIterator for ParallelPrimitiveArray<T> {
88 type Item = Option<T::Native>;
89 type Iter = ParallelPrimitiveArrayIter<T>;
90
91 fn into_par_iter(self) -> Self::Iter {
92 ParallelPrimitiveArrayIter::new(self.inner)
93 }
94}
95
96#[derive(Debug, Clone)]
97pub struct ParallelPrimitiveArrayIter<T: ArrowPrimitiveType> {
98 inner: PrimitiveArray<T>,
99}
100
101impl<T: ArrowPrimitiveType> ParallelPrimitiveArrayIter<T> {
102 fn new(inner: PrimitiveArray<T>) -> Self {
103 Self { inner }
104 }
105}
106
107impl<T: ArrowPrimitiveType> ParallelIterator for ParallelPrimitiveArrayIter<T> {
108 type Item = Option<T::Native>;
109
110 fn drive_unindexed<C>(self, consumer: C) -> C::Result
111 where
112 C: UnindexedConsumer<Self::Item>,
113 {
114 bridge(self, consumer)
115 }
116
117 fn opt_len(&self) -> Option<usize> {
118 Some(self.inner.len())
119 }
120}
121
122impl<T: ArrowPrimitiveType> IndexedParallelIterator for ParallelPrimitiveArrayIter<T> {
123 fn len(&self) -> usize {
124 self.inner.len()
125 }
126
127 fn drive<C: Consumer<Self::Item>>(self, consumer: C) -> C::Result {
128 (0..self.inner.len())
129 .into_par_iter()
130 .map(|i| {
131 if self.inner.is_null(i) {
132 None
133 } else {
134 Some(unsafe { self.inner.value_unchecked(i) })
135 }
136 })
137 .drive(consumer)
138 }
139
140 fn with_producer<CB: ProducerCallback<Self::Item>>(self, callback: CB) -> CB::Output {
141 (0..self.inner.len())
142 .into_par_iter()
143 .map(|i| {
144 if self.inner.is_null(i) {
145 None
146 } else {
147 Some(unsafe { self.inner.value_unchecked(i) })
148 }
149 })
150 .with_producer(callback)
151 }
152}
153
154impl<T: ArrowPrimitiveType, Ptr: Into<NativeAdapter<T>> + Send> FromParallelIterator<Ptr>
155 for ParallelPrimitiveArray<T>
156{
157 fn from_par_iter<I>(par_iter: I) -> Self
158 where
159 I: IntoParallelIterator<Item = Ptr>,
160 {
161 let vec = Vec::<Ptr>::from_par_iter(par_iter);
163 let iter = vec.into_iter();
164
165 Self::new(PrimitiveArray::from_iter(iter))
166 }
167}
168
169#[derive(Debug, Clone)]
170pub struct ParallelPrimitiveArrayRef<'data, T: ArrowPrimitiveType> {
171 inner: &'data PrimitiveArray<T>,
172}
173
174impl<'data, T: ArrowPrimitiveType> ParallelPrimitiveArrayRef<'data, T> {
175 pub fn new(inner: &'data PrimitiveArray<T>) -> Self {
176 Self { inner }
177 }
178}
179
180impl<'data, T: ArrowPrimitiveType> From<&'data PrimitiveArray<T>>
181 for ParallelPrimitiveArrayRef<'data, T>
182{
183 fn from(array: &'data PrimitiveArray<T>) -> Self {
184 Self::new(array)
185 }
186}
187
188impl<T: ArrowPrimitiveType> ParallelIterator for ParallelPrimitiveArrayRef<'_, T> {
189 type Item = Option<T::Native>;
190
191 fn drive_unindexed<C>(self, consumer: C) -> C::Result
192 where
193 C: UnindexedConsumer<Self::Item>,
194 {
195 bridge(self, consumer)
196 }
197
198 fn opt_len(&self) -> Option<usize> {
199 Some(self.inner.len())
200 }
201}
202
203impl<T: ArrowPrimitiveType> IndexedParallelIterator for ParallelPrimitiveArrayRef<'_, T> {
204 fn len(&self) -> usize {
205 self.inner.len()
206 }
207
208 fn drive<C: Consumer<Self::Item>>(self, consumer: C) -> C::Result {
209 (0..self.inner.len())
210 .into_par_iter()
211 .map(|i| {
212 if self.inner.is_null(i) {
213 None
214 } else {
215 Some(unsafe { self.inner.value_unchecked(i) })
216 }
217 })
218 .drive(consumer)
219 }
220
221 fn with_producer<CB: ProducerCallback<Self::Item>>(self, callback: CB) -> CB::Output {
222 (0..self.inner.len())
223 .into_par_iter()
224 .map(|i| {
225 if self.inner.is_null(i) {
226 None
227 } else {
228 Some(unsafe { self.inner.value_unchecked(i) })
229 }
230 })
231 .with_producer(callback)
232 }
233}
234
235impl<'data, T: ArrowPrimitiveType> IntoParallelRefIterator<'data>
236 for ParallelPrimitiveArrayRef<'data, T>
237{
238 type Item = Option<T::Native>;
239 type Iter = ParallelPrimitiveArrayRef<'data, T>;
240
241 fn par_iter(&'data self) -> Self::Iter {
242 ParallelPrimitiveArrayRef::new(self.inner)
243 }
244}
245
246pub trait PrimitiveArrayRefParallelIterator<'data, T: ArrowPrimitiveType> {
247 type Iter: ParallelIterator<Item = Option<T::Native>>;
248
249 fn par_iter(&'data self) -> Self::Iter;
250}
251
252impl<'data, T: ArrowPrimitiveType> PrimitiveArrayRefParallelIterator<'data, T>
253 for PrimitiveArray<T>
254{
255 type Iter = ParallelPrimitiveArrayRef<'data, T>;
256
257 fn par_iter(&'data self) -> Self::Iter {
258 ParallelPrimitiveArrayRef::new(self)
259 }
260}