arrow_rayon/parallel_array/
parallel_byte_array.rs

1use std::any::Any;
2use std::sync::Arc;
3
4use arrow_array::types::ByteArrayType;
5use arrow_array::{Array, ArrayRef, GenericByteArray};
6use arrow_buffer::NullBuffer;
7use arrow_data::ArrayData;
8use arrow_schema::DataType;
9use rayon::iter::plumbing::{bridge, Consumer, ProducerCallback, UnindexedConsumer};
10use rayon::iter::{
11    FromParallelIterator, IndexedParallelIterator, IntoParallelIterator, IntoParallelRefIterator,
12    ParallelIterator,
13};
14
15#[derive(Clone)]
16pub struct ParallelGenericByteArray<T: ByteArrayType> {
17    inner: GenericByteArray<T>,
18}
19
20impl<T: ByteArrayType> ParallelGenericByteArray<T> {
21    pub fn new(inner: GenericByteArray<T>) -> Self {
22        Self { inner }
23    }
24
25    pub fn into_inner(self) -> GenericByteArray<T> {
26        self.inner
27    }
28}
29
30impl<T: ByteArrayType> From<GenericByteArray<T>> for ParallelGenericByteArray<T> {
31    fn from(array: GenericByteArray<T>) -> Self {
32        Self::new(array)
33    }
34}
35
36impl<T: ByteArrayType> std::fmt::Debug for ParallelGenericByteArray<T> {
37    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
38        self.inner.fmt(f)
39    }
40}
41
42impl<T: ByteArrayType> Array for ParallelGenericByteArray<T> {
43    fn as_any(&self) -> &dyn Any {
44        self.inner.as_any()
45    }
46
47    fn to_data(&self) -> ArrayData {
48        self.inner.to_data()
49    }
50
51    fn into_data(self) -> ArrayData {
52        self.inner.into_data()
53    }
54
55    fn data_type(&self) -> &DataType {
56        self.inner.data_type()
57    }
58
59    fn slice(&self, offset: usize, length: usize) -> ArrayRef {
60        Arc::new(self.inner.slice(offset, length))
61    }
62
63    fn len(&self) -> usize {
64        self.inner.len()
65    }
66
67    fn is_empty(&self) -> bool {
68        self.inner.is_empty()
69    }
70
71    fn offset(&self) -> usize {
72        self.inner.offset()
73    }
74
75    fn nulls(&self) -> Option<&NullBuffer> {
76        self.inner.nulls()
77    }
78
79    fn get_buffer_memory_size(&self) -> usize {
80        self.inner.get_buffer_memory_size()
81    }
82
83    fn get_array_memory_size(&self) -> usize {
84        self.inner.get_array_memory_size()
85    }
86}
87
88impl<T: ByteArrayType, Ptr: AsRef<T::Native> + Send> FromParallelIterator<Option<Ptr>>
89    for ParallelGenericByteArray<T>
90{
91    fn from_par_iter<I>(par_iter: I) -> Self
92    where
93        I: IntoParallelIterator<Item = Option<Ptr>>,
94    {
95        // HACK
96        let vec = Vec::<Option<Ptr>>::from_par_iter(par_iter);
97        let iter = vec.into_iter();
98
99        Self::new(GenericByteArray::from_iter(iter))
100    }
101}
102
103#[derive(Debug, Clone)]
104pub struct ParallelGenericByteArrayRef<'data, T: ByteArrayType> {
105    inner: &'data GenericByteArray<T>,
106}
107
108impl<'data, T: ByteArrayType> ParallelGenericByteArrayRef<'data, T> {
109    pub fn new(inner: &'data GenericByteArray<T>) -> Self {
110        Self { inner }
111    }
112}
113
114impl<'data, T: ByteArrayType> From<&'data GenericByteArray<T>>
115    for ParallelGenericByteArrayRef<'data, T>
116{
117    fn from(array: &'data GenericByteArray<T>) -> Self {
118        Self::new(array)
119    }
120}
121
122impl<'data, T: ByteArrayType> ParallelIterator for ParallelGenericByteArrayRef<'data, T> {
123    type Item = Option<&'data T::Native>;
124
125    fn drive_unindexed<C>(self, consumer: C) -> C::Result
126    where
127        C: UnindexedConsumer<Self::Item>,
128    {
129        bridge(self, consumer)
130    }
131
132    fn opt_len(&self) -> Option<usize> {
133        Some(self.inner.len())
134    }
135}
136
137impl<T: ByteArrayType> IndexedParallelIterator for ParallelGenericByteArrayRef<'_, T> {
138    fn len(&self) -> usize {
139        self.inner.len()
140    }
141
142    fn drive<C: Consumer<Self::Item>>(self, consumer: C) -> C::Result {
143        (0..self.inner.len())
144            .into_par_iter()
145            .map(|i| {
146                if self.inner.is_null(i) {
147                    None
148                } else {
149                    Some(unsafe { self.inner.value_unchecked(i) })
150                }
151            })
152            .drive(consumer)
153    }
154
155    fn with_producer<CB: ProducerCallback<Self::Item>>(self, callback: CB) -> CB::Output {
156        (0..self.inner.len())
157            .into_par_iter()
158            .map(|i| {
159                if self.inner.is_null(i) {
160                    None
161                } else {
162                    Some(unsafe { self.inner.value_unchecked(i) })
163                }
164            })
165            .with_producer(callback)
166    }
167}
168
169impl<'data, T: ByteArrayType> IntoParallelRefIterator<'data>
170    for ParallelGenericByteArrayRef<'data, T>
171{
172    type Item = Option<&'data T::Native>;
173    type Iter = ParallelGenericByteArrayRef<'data, T>;
174
175    fn par_iter(&'data self) -> Self::Iter {
176        ParallelGenericByteArrayRef::new(self.inner)
177    }
178}
179
180pub trait GenericByteArrayRefParallelIterator<'data, T: ByteArrayType> {
181    type Iter: ParallelIterator<Item = Option<&'data T::Native>>;
182
183    fn par_iter(&'data self) -> Self::Iter;
184}
185
186impl<'data, T: ByteArrayType> GenericByteArrayRefParallelIterator<'data, T>
187    for GenericByteArray<T>
188{
189    type Iter = ParallelGenericByteArrayRef<'data, T>;
190
191    fn par_iter(&'data self) -> Self::Iter {
192        ParallelGenericByteArrayRef::new(self)
193    }
194}