arrow_rayon/parallel_array/
parallel_byte_array.rs1use std::any::Any;
2use std::sync::Arc;
3
4use arrow_array::types::ByteArrayType;
5use arrow_array::{Array, ArrayRef, GenericByteArray};
6use arrow_buffer::NullBuffer;
7use arrow_data::ArrayData;
8use arrow_schema::DataType;
9use rayon::iter::plumbing::{bridge, Consumer, ProducerCallback, UnindexedConsumer};
10use rayon::iter::{
11 FromParallelIterator, IndexedParallelIterator, IntoParallelIterator, IntoParallelRefIterator,
12 ParallelIterator,
13};
14
15#[derive(Clone)]
16pub struct ParallelGenericByteArray<T: ByteArrayType> {
17 inner: GenericByteArray<T>,
18}
19
20impl<T: ByteArrayType> ParallelGenericByteArray<T> {
21 pub fn new(inner: GenericByteArray<T>) -> Self {
22 Self { inner }
23 }
24
25 pub fn into_inner(self) -> GenericByteArray<T> {
26 self.inner
27 }
28}
29
30impl<T: ByteArrayType> From<GenericByteArray<T>> for ParallelGenericByteArray<T> {
31 fn from(array: GenericByteArray<T>) -> Self {
32 Self::new(array)
33 }
34}
35
36impl<T: ByteArrayType> std::fmt::Debug for ParallelGenericByteArray<T> {
37 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
38 self.inner.fmt(f)
39 }
40}
41
42impl<T: ByteArrayType> Array for ParallelGenericByteArray<T> {
43 fn as_any(&self) -> &dyn Any {
44 self.inner.as_any()
45 }
46
47 fn to_data(&self) -> ArrayData {
48 self.inner.to_data()
49 }
50
51 fn into_data(self) -> ArrayData {
52 self.inner.into_data()
53 }
54
55 fn data_type(&self) -> &DataType {
56 self.inner.data_type()
57 }
58
59 fn slice(&self, offset: usize, length: usize) -> ArrayRef {
60 Arc::new(self.inner.slice(offset, length))
61 }
62
63 fn len(&self) -> usize {
64 self.inner.len()
65 }
66
67 fn is_empty(&self) -> bool {
68 self.inner.is_empty()
69 }
70
71 fn offset(&self) -> usize {
72 self.inner.offset()
73 }
74
75 fn nulls(&self) -> Option<&NullBuffer> {
76 self.inner.nulls()
77 }
78
79 fn get_buffer_memory_size(&self) -> usize {
80 self.inner.get_buffer_memory_size()
81 }
82
83 fn get_array_memory_size(&self) -> usize {
84 self.inner.get_array_memory_size()
85 }
86}
87
88impl<T: ByteArrayType, Ptr: AsRef<T::Native> + Send> FromParallelIterator<Option<Ptr>>
89 for ParallelGenericByteArray<T>
90{
91 fn from_par_iter<I>(par_iter: I) -> Self
92 where
93 I: IntoParallelIterator<Item = Option<Ptr>>,
94 {
95 let vec = Vec::<Option<Ptr>>::from_par_iter(par_iter);
97 let iter = vec.into_iter();
98
99 Self::new(GenericByteArray::from_iter(iter))
100 }
101}
102
103#[derive(Debug, Clone)]
104pub struct ParallelGenericByteArrayRef<'data, T: ByteArrayType> {
105 inner: &'data GenericByteArray<T>,
106}
107
108impl<'data, T: ByteArrayType> ParallelGenericByteArrayRef<'data, T> {
109 pub fn new(inner: &'data GenericByteArray<T>) -> Self {
110 Self { inner }
111 }
112}
113
114impl<'data, T: ByteArrayType> From<&'data GenericByteArray<T>>
115 for ParallelGenericByteArrayRef<'data, T>
116{
117 fn from(array: &'data GenericByteArray<T>) -> Self {
118 Self::new(array)
119 }
120}
121
122impl<'data, T: ByteArrayType> ParallelIterator for ParallelGenericByteArrayRef<'data, T> {
123 type Item = Option<&'data T::Native>;
124
125 fn drive_unindexed<C>(self, consumer: C) -> C::Result
126 where
127 C: UnindexedConsumer<Self::Item>,
128 {
129 bridge(self, consumer)
130 }
131
132 fn opt_len(&self) -> Option<usize> {
133 Some(self.inner.len())
134 }
135}
136
137impl<T: ByteArrayType> IndexedParallelIterator for ParallelGenericByteArrayRef<'_, T> {
138 fn len(&self) -> usize {
139 self.inner.len()
140 }
141
142 fn drive<C: Consumer<Self::Item>>(self, consumer: C) -> C::Result {
143 (0..self.inner.len())
144 .into_par_iter()
145 .map(|i| {
146 if self.inner.is_null(i) {
147 None
148 } else {
149 Some(unsafe { self.inner.value_unchecked(i) })
150 }
151 })
152 .drive(consumer)
153 }
154
155 fn with_producer<CB: ProducerCallback<Self::Item>>(self, callback: CB) -> CB::Output {
156 (0..self.inner.len())
157 .into_par_iter()
158 .map(|i| {
159 if self.inner.is_null(i) {
160 None
161 } else {
162 Some(unsafe { self.inner.value_unchecked(i) })
163 }
164 })
165 .with_producer(callback)
166 }
167}
168
169impl<'data, T: ByteArrayType> IntoParallelRefIterator<'data>
170 for ParallelGenericByteArrayRef<'data, T>
171{
172 type Item = Option<&'data T::Native>;
173 type Iter = ParallelGenericByteArrayRef<'data, T>;
174
175 fn par_iter(&'data self) -> Self::Iter {
176 ParallelGenericByteArrayRef::new(self.inner)
177 }
178}
179
180pub trait GenericByteArrayRefParallelIterator<'data, T: ByteArrayType> {
181 type Iter: ParallelIterator<Item = Option<&'data T::Native>>;
182
183 fn par_iter(&'data self) -> Self::Iter;
184}
185
186impl<'data, T: ByteArrayType> GenericByteArrayRefParallelIterator<'data, T>
187 for GenericByteArray<T>
188{
189 type Iter = ParallelGenericByteArrayRef<'data, T>;
190
191 fn par_iter(&'data self) -> Self::Iter {
192 ParallelGenericByteArrayRef::new(self)
193 }
194}