datafusion_functions/
strings.rs1use std::mem::size_of;
19
20use arrow::array::{
21 Array, ArrayAccessor, ArrayDataBuilder, ByteView, LargeStringArray,
22 NullBufferBuilder, StringArray, StringViewArray, StringViewBuilder, make_view,
23};
24use arrow::buffer::{MutableBuffer, NullBuffer};
25use arrow::datatypes::DataType;
26
27pub struct StringArrayBuilder {
31 offsets_buffer: MutableBuffer,
32 value_buffer: MutableBuffer,
33}
34
35impl StringArrayBuilder {
36 pub fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self {
37 let capacity = item_capacity
38 .checked_add(1)
39 .map(|i| i.saturating_mul(size_of::<i32>()))
40 .expect("capacity integer overflow");
41
42 let mut offsets_buffer = MutableBuffer::with_capacity(capacity);
43 unsafe { offsets_buffer.push_unchecked(0_i32) };
45 Self {
46 offsets_buffer,
47 value_buffer: MutableBuffer::with_capacity(data_capacity),
48 }
49 }
50
51 pub fn write<const CHECK_VALID: bool>(
52 &mut self,
53 column: &ColumnarValueRef,
54 i: usize,
55 ) {
56 match column {
57 ColumnarValueRef::Scalar(s) => {
58 self.value_buffer.extend_from_slice(s);
59 }
60 ColumnarValueRef::NullableArray(array) => {
61 if !CHECK_VALID || array.is_valid(i) {
62 self.value_buffer
63 .extend_from_slice(array.value(i).as_bytes());
64 }
65 }
66 ColumnarValueRef::NullableLargeStringArray(array) => {
67 if !CHECK_VALID || array.is_valid(i) {
68 self.value_buffer
69 .extend_from_slice(array.value(i).as_bytes());
70 }
71 }
72 ColumnarValueRef::NullableStringViewArray(array) => {
73 if !CHECK_VALID || array.is_valid(i) {
74 self.value_buffer
75 .extend_from_slice(array.value(i).as_bytes());
76 }
77 }
78 ColumnarValueRef::NonNullableArray(array) => {
79 self.value_buffer
80 .extend_from_slice(array.value(i).as_bytes());
81 }
82 ColumnarValueRef::NonNullableLargeStringArray(array) => {
83 self.value_buffer
84 .extend_from_slice(array.value(i).as_bytes());
85 }
86 ColumnarValueRef::NonNullableStringViewArray(array) => {
87 self.value_buffer
88 .extend_from_slice(array.value(i).as_bytes());
89 }
90 }
91 }
92
93 pub fn append_offset(&mut self) {
94 let next_offset: i32 = self
95 .value_buffer
96 .len()
97 .try_into()
98 .expect("byte array offset overflow");
99 self.offsets_buffer.push(next_offset);
100 }
101
102 pub fn finish(self, null_buffer: Option<NullBuffer>) -> StringArray {
110 let row_count = self.offsets_buffer.len() / size_of::<i32>() - 1;
111 if let Some(ref null_buffer) = null_buffer {
112 assert_eq!(
113 null_buffer.len(),
114 row_count,
115 "Null buffer and offsets buffer must be the same length"
116 );
117 }
118 let array_builder = ArrayDataBuilder::new(DataType::Utf8)
119 .len(row_count)
120 .add_buffer(self.offsets_buffer.into())
121 .add_buffer(self.value_buffer.into())
122 .nulls(null_buffer);
123 let array_data = unsafe { array_builder.build_unchecked() };
126 StringArray::from(array_data)
127 }
128}
129
130pub struct StringViewArrayBuilder {
131 builder: StringViewBuilder,
132 block: String,
133}
134
135impl StringViewArrayBuilder {
136 pub fn with_capacity(_item_capacity: usize, data_capacity: usize) -> Self {
137 let builder = StringViewBuilder::with_capacity(data_capacity);
138 Self {
139 builder,
140 block: String::new(),
141 }
142 }
143
144 pub fn write<const CHECK_VALID: bool>(
145 &mut self,
146 column: &ColumnarValueRef,
147 i: usize,
148 ) {
149 match column {
150 ColumnarValueRef::Scalar(s) => {
151 self.block.push_str(std::str::from_utf8(s).unwrap());
152 }
153 ColumnarValueRef::NullableArray(array) => {
154 if !CHECK_VALID || array.is_valid(i) {
155 self.block.push_str(array.value(i));
156 }
157 }
158 ColumnarValueRef::NullableLargeStringArray(array) => {
159 if !CHECK_VALID || array.is_valid(i) {
160 self.block.push_str(array.value(i));
161 }
162 }
163 ColumnarValueRef::NullableStringViewArray(array) => {
164 if !CHECK_VALID || array.is_valid(i) {
165 self.block.push_str(array.value(i));
166 }
167 }
168 ColumnarValueRef::NonNullableArray(array) => {
169 self.block.push_str(array.value(i));
170 }
171 ColumnarValueRef::NonNullableLargeStringArray(array) => {
172 self.block.push_str(array.value(i));
173 }
174 ColumnarValueRef::NonNullableStringViewArray(array) => {
175 self.block.push_str(array.value(i));
176 }
177 }
178 }
179
180 pub fn append_offset(&mut self) {
181 self.builder.append_value(&self.block);
182 self.block.clear();
183 }
184
185 pub fn finish(mut self) -> StringViewArray {
186 self.builder.finish()
187 }
188}
189
190pub struct LargeStringArrayBuilder {
191 offsets_buffer: MutableBuffer,
192 value_buffer: MutableBuffer,
193}
194
195impl LargeStringArrayBuilder {
196 pub fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self {
197 let capacity = item_capacity
198 .checked_add(1)
199 .map(|i| i.saturating_mul(size_of::<i64>()))
200 .expect("capacity integer overflow");
201
202 let mut offsets_buffer = MutableBuffer::with_capacity(capacity);
203 unsafe { offsets_buffer.push_unchecked(0_i64) };
205 Self {
206 offsets_buffer,
207 value_buffer: MutableBuffer::with_capacity(data_capacity),
208 }
209 }
210
211 pub fn write<const CHECK_VALID: bool>(
212 &mut self,
213 column: &ColumnarValueRef,
214 i: usize,
215 ) {
216 match column {
217 ColumnarValueRef::Scalar(s) => {
218 self.value_buffer.extend_from_slice(s);
219 }
220 ColumnarValueRef::NullableArray(array) => {
221 if !CHECK_VALID || array.is_valid(i) {
222 self.value_buffer
223 .extend_from_slice(array.value(i).as_bytes());
224 }
225 }
226 ColumnarValueRef::NullableLargeStringArray(array) => {
227 if !CHECK_VALID || array.is_valid(i) {
228 self.value_buffer
229 .extend_from_slice(array.value(i).as_bytes());
230 }
231 }
232 ColumnarValueRef::NullableStringViewArray(array) => {
233 if !CHECK_VALID || array.is_valid(i) {
234 self.value_buffer
235 .extend_from_slice(array.value(i).as_bytes());
236 }
237 }
238 ColumnarValueRef::NonNullableArray(array) => {
239 self.value_buffer
240 .extend_from_slice(array.value(i).as_bytes());
241 }
242 ColumnarValueRef::NonNullableLargeStringArray(array) => {
243 self.value_buffer
244 .extend_from_slice(array.value(i).as_bytes());
245 }
246 ColumnarValueRef::NonNullableStringViewArray(array) => {
247 self.value_buffer
248 .extend_from_slice(array.value(i).as_bytes());
249 }
250 }
251 }
252
253 pub fn append_offset(&mut self) {
254 let next_offset: i64 = self
255 .value_buffer
256 .len()
257 .try_into()
258 .expect("byte array offset overflow");
259 self.offsets_buffer.push(next_offset);
260 }
261
262 pub fn finish(self, null_buffer: Option<NullBuffer>) -> LargeStringArray {
270 let row_count = self.offsets_buffer.len() / size_of::<i64>() - 1;
271 if let Some(ref null_buffer) = null_buffer {
272 assert_eq!(
273 null_buffer.len(),
274 row_count,
275 "Null buffer and offsets buffer must be the same length"
276 );
277 }
278 let array_builder = ArrayDataBuilder::new(DataType::LargeUtf8)
279 .len(row_count)
280 .add_buffer(self.offsets_buffer.into())
281 .add_buffer(self.value_buffer.into())
282 .nulls(null_buffer);
283 let array_data = unsafe { array_builder.build_unchecked() };
286 LargeStringArray::from(array_data)
287 }
288}
289
290pub fn make_and_append_view(
304 views_buffer: &mut Vec<u128>,
305 null_builder: &mut NullBufferBuilder,
306 original_view: &u128,
307 substr: &str,
308 start_offset: u32,
309) {
310 let substr_len = substr.len();
311 let sub_view = if substr_len > 12 {
312 let view = ByteView::from(*original_view);
313 make_view(
314 substr.as_bytes(),
315 view.buffer_index,
316 view.offset + start_offset,
317 )
318 } else {
319 make_view(substr.as_bytes(), 0, 0)
321 };
322 views_buffer.push(sub_view);
323 null_builder.append_non_null();
324}
325
326#[derive(Debug)]
327pub enum ColumnarValueRef<'a> {
328 Scalar(&'a [u8]),
329 NullableArray(&'a StringArray),
330 NonNullableArray(&'a StringArray),
331 NullableLargeStringArray(&'a LargeStringArray),
332 NonNullableLargeStringArray(&'a LargeStringArray),
333 NullableStringViewArray(&'a StringViewArray),
334 NonNullableStringViewArray(&'a StringViewArray),
335}
336
337impl ColumnarValueRef<'_> {
338 #[inline]
339 pub fn is_valid(&self, i: usize) -> bool {
340 match &self {
341 Self::Scalar(_)
342 | Self::NonNullableArray(_)
343 | Self::NonNullableLargeStringArray(_)
344 | Self::NonNullableStringViewArray(_) => true,
345 Self::NullableArray(array) => array.is_valid(i),
346 Self::NullableStringViewArray(array) => array.is_valid(i),
347 Self::NullableLargeStringArray(array) => array.is_valid(i),
348 }
349 }
350
351 #[inline]
352 pub fn nulls(&self) -> Option<NullBuffer> {
353 match &self {
354 Self::Scalar(_)
355 | Self::NonNullableArray(_)
356 | Self::NonNullableStringViewArray(_)
357 | Self::NonNullableLargeStringArray(_) => None,
358 Self::NullableArray(array) => array.nulls().cloned(),
359 Self::NullableStringViewArray(array) => array.nulls().cloned(),
360 Self::NullableLargeStringArray(array) => array.nulls().cloned(),
361 }
362 }
363}
364
365#[cfg(test)]
366mod tests {
367 use super::*;
368
369 #[test]
370 #[should_panic(expected = "capacity integer overflow")]
371 fn test_overflow_string_array_builder() {
372 let _builder = StringArrayBuilder::with_capacity(usize::MAX, usize::MAX);
373 }
374
375 #[test]
376 #[should_panic(expected = "capacity integer overflow")]
377 fn test_overflow_large_string_array_builder() {
378 let _builder = LargeStringArrayBuilder::with_capacity(usize::MAX, usize::MAX);
379 }
380}