Skip to main content

datafusion_functions/
strings.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::mem::size_of;
19
20use arrow::array::{
21    Array, ArrayAccessor, ArrayDataBuilder, ByteView, LargeStringArray,
22    NullBufferBuilder, StringArray, StringViewArray, StringViewBuilder, make_view,
23};
24use arrow::buffer::{MutableBuffer, NullBuffer};
25use arrow::datatypes::DataType;
26
27/// Optimized version of the StringBuilder in Arrow that:
28/// 1. Precalculating the expected length of the result, avoiding reallocations.
29/// 2. Avoids creating / incrementally creating a `NullBufferBuilder`
30pub struct StringArrayBuilder {
31    offsets_buffer: MutableBuffer,
32    value_buffer: MutableBuffer,
33}
34
35impl StringArrayBuilder {
36    pub fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self {
37        let capacity = item_capacity
38            .checked_add(1)
39            .map(|i| i.saturating_mul(size_of::<i32>()))
40            .expect("capacity integer overflow");
41
42        let mut offsets_buffer = MutableBuffer::with_capacity(capacity);
43        // SAFETY: the first offset value is definitely not going to exceed the bounds.
44        unsafe { offsets_buffer.push_unchecked(0_i32) };
45        Self {
46            offsets_buffer,
47            value_buffer: MutableBuffer::with_capacity(data_capacity),
48        }
49    }
50
51    pub fn write<const CHECK_VALID: bool>(
52        &mut self,
53        column: &ColumnarValueRef,
54        i: usize,
55    ) {
56        match column {
57            ColumnarValueRef::Scalar(s) => {
58                self.value_buffer.extend_from_slice(s);
59            }
60            ColumnarValueRef::NullableArray(array) => {
61                if !CHECK_VALID || array.is_valid(i) {
62                    self.value_buffer
63                        .extend_from_slice(array.value(i).as_bytes());
64                }
65            }
66            ColumnarValueRef::NullableLargeStringArray(array) => {
67                if !CHECK_VALID || array.is_valid(i) {
68                    self.value_buffer
69                        .extend_from_slice(array.value(i).as_bytes());
70                }
71            }
72            ColumnarValueRef::NullableStringViewArray(array) => {
73                if !CHECK_VALID || array.is_valid(i) {
74                    self.value_buffer
75                        .extend_from_slice(array.value(i).as_bytes());
76                }
77            }
78            ColumnarValueRef::NonNullableArray(array) => {
79                self.value_buffer
80                    .extend_from_slice(array.value(i).as_bytes());
81            }
82            ColumnarValueRef::NonNullableLargeStringArray(array) => {
83                self.value_buffer
84                    .extend_from_slice(array.value(i).as_bytes());
85            }
86            ColumnarValueRef::NonNullableStringViewArray(array) => {
87                self.value_buffer
88                    .extend_from_slice(array.value(i).as_bytes());
89            }
90        }
91    }
92
93    pub fn append_offset(&mut self) {
94        let next_offset: i32 = self
95            .value_buffer
96            .len()
97            .try_into()
98            .expect("byte array offset overflow");
99        self.offsets_buffer.push(next_offset);
100    }
101
102    /// Finalize the builder into a concrete [`StringArray`].
103    ///
104    /// # Panics
105    ///
106    /// This method can panic when:
107    ///
108    /// - the provided `null_buffer` is not the same length as the `offsets_buffer`.
109    pub fn finish(self, null_buffer: Option<NullBuffer>) -> StringArray {
110        let row_count = self.offsets_buffer.len() / size_of::<i32>() - 1;
111        if let Some(ref null_buffer) = null_buffer {
112            assert_eq!(
113                null_buffer.len(),
114                row_count,
115                "Null buffer and offsets buffer must be the same length"
116            );
117        }
118        let array_builder = ArrayDataBuilder::new(DataType::Utf8)
119            .len(row_count)
120            .add_buffer(self.offsets_buffer.into())
121            .add_buffer(self.value_buffer.into())
122            .nulls(null_buffer);
123        // SAFETY: all data that was appended was valid UTF8 and the values
124        // and offsets were created correctly
125        let array_data = unsafe { array_builder.build_unchecked() };
126        StringArray::from(array_data)
127    }
128}
129
130pub struct StringViewArrayBuilder {
131    builder: StringViewBuilder,
132    block: String,
133}
134
135impl StringViewArrayBuilder {
136    pub fn with_capacity(_item_capacity: usize, data_capacity: usize) -> Self {
137        let builder = StringViewBuilder::with_capacity(data_capacity);
138        Self {
139            builder,
140            block: String::new(),
141        }
142    }
143
144    pub fn write<const CHECK_VALID: bool>(
145        &mut self,
146        column: &ColumnarValueRef,
147        i: usize,
148    ) {
149        match column {
150            ColumnarValueRef::Scalar(s) => {
151                self.block.push_str(std::str::from_utf8(s).unwrap());
152            }
153            ColumnarValueRef::NullableArray(array) => {
154                if !CHECK_VALID || array.is_valid(i) {
155                    self.block.push_str(array.value(i));
156                }
157            }
158            ColumnarValueRef::NullableLargeStringArray(array) => {
159                if !CHECK_VALID || array.is_valid(i) {
160                    self.block.push_str(array.value(i));
161                }
162            }
163            ColumnarValueRef::NullableStringViewArray(array) => {
164                if !CHECK_VALID || array.is_valid(i) {
165                    self.block.push_str(array.value(i));
166                }
167            }
168            ColumnarValueRef::NonNullableArray(array) => {
169                self.block.push_str(array.value(i));
170            }
171            ColumnarValueRef::NonNullableLargeStringArray(array) => {
172                self.block.push_str(array.value(i));
173            }
174            ColumnarValueRef::NonNullableStringViewArray(array) => {
175                self.block.push_str(array.value(i));
176            }
177        }
178    }
179
180    pub fn append_offset(&mut self) {
181        self.builder.append_value(&self.block);
182        self.block.clear();
183    }
184
185    pub fn finish(mut self) -> StringViewArray {
186        self.builder.finish()
187    }
188}
189
190pub struct LargeStringArrayBuilder {
191    offsets_buffer: MutableBuffer,
192    value_buffer: MutableBuffer,
193}
194
195impl LargeStringArrayBuilder {
196    pub fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self {
197        let capacity = item_capacity
198            .checked_add(1)
199            .map(|i| i.saturating_mul(size_of::<i64>()))
200            .expect("capacity integer overflow");
201
202        let mut offsets_buffer = MutableBuffer::with_capacity(capacity);
203        // SAFETY: the first offset value is definitely not going to exceed the bounds.
204        unsafe { offsets_buffer.push_unchecked(0_i64) };
205        Self {
206            offsets_buffer,
207            value_buffer: MutableBuffer::with_capacity(data_capacity),
208        }
209    }
210
211    pub fn write<const CHECK_VALID: bool>(
212        &mut self,
213        column: &ColumnarValueRef,
214        i: usize,
215    ) {
216        match column {
217            ColumnarValueRef::Scalar(s) => {
218                self.value_buffer.extend_from_slice(s);
219            }
220            ColumnarValueRef::NullableArray(array) => {
221                if !CHECK_VALID || array.is_valid(i) {
222                    self.value_buffer
223                        .extend_from_slice(array.value(i).as_bytes());
224                }
225            }
226            ColumnarValueRef::NullableLargeStringArray(array) => {
227                if !CHECK_VALID || array.is_valid(i) {
228                    self.value_buffer
229                        .extend_from_slice(array.value(i).as_bytes());
230                }
231            }
232            ColumnarValueRef::NullableStringViewArray(array) => {
233                if !CHECK_VALID || array.is_valid(i) {
234                    self.value_buffer
235                        .extend_from_slice(array.value(i).as_bytes());
236                }
237            }
238            ColumnarValueRef::NonNullableArray(array) => {
239                self.value_buffer
240                    .extend_from_slice(array.value(i).as_bytes());
241            }
242            ColumnarValueRef::NonNullableLargeStringArray(array) => {
243                self.value_buffer
244                    .extend_from_slice(array.value(i).as_bytes());
245            }
246            ColumnarValueRef::NonNullableStringViewArray(array) => {
247                self.value_buffer
248                    .extend_from_slice(array.value(i).as_bytes());
249            }
250        }
251    }
252
253    pub fn append_offset(&mut self) {
254        let next_offset: i64 = self
255            .value_buffer
256            .len()
257            .try_into()
258            .expect("byte array offset overflow");
259        self.offsets_buffer.push(next_offset);
260    }
261
262    /// Finalize the builder into a concrete [`LargeStringArray`].
263    ///
264    /// # Panics
265    ///
266    /// This method can panic when:
267    ///
268    /// - the provided `null_buffer` is not the same length as the `offsets_buffer`.
269    pub fn finish(self, null_buffer: Option<NullBuffer>) -> LargeStringArray {
270        let row_count = self.offsets_buffer.len() / size_of::<i64>() - 1;
271        if let Some(ref null_buffer) = null_buffer {
272            assert_eq!(
273                null_buffer.len(),
274                row_count,
275                "Null buffer and offsets buffer must be the same length"
276            );
277        }
278        let array_builder = ArrayDataBuilder::new(DataType::LargeUtf8)
279            .len(row_count)
280            .add_buffer(self.offsets_buffer.into())
281            .add_buffer(self.value_buffer.into())
282            .nulls(null_buffer);
283        // SAFETY: all data that was appended was valid Large UTF8 and the values
284        // and offsets were created correctly
285        let array_data = unsafe { array_builder.build_unchecked() };
286        LargeStringArray::from(array_data)
287    }
288}
289
290/// Append a new view to the views buffer with the given substr
291///
292/// # Safety
293///
294/// original_view must be a valid view (the format described on
295/// [`GenericByteViewArray`](arrow::array::GenericByteViewArray).
296///
297/// # Arguments
298/// - views_buffer: The buffer to append the new view to
299/// - null_builder: The buffer to append the null value to
300/// - original_view: The original view value
301/// - substr: The substring to append. Must be a valid substring of the original view
302/// - start_offset: The start offset of the substring in the view
303pub fn make_and_append_view(
304    views_buffer: &mut Vec<u128>,
305    null_builder: &mut NullBufferBuilder,
306    original_view: &u128,
307    substr: &str,
308    start_offset: u32,
309) {
310    let substr_len = substr.len();
311    let sub_view = if substr_len > 12 {
312        let view = ByteView::from(*original_view);
313        make_view(
314            substr.as_bytes(),
315            view.buffer_index,
316            view.offset + start_offset,
317        )
318    } else {
319        // inline value does not need block id or offset
320        make_view(substr.as_bytes(), 0, 0)
321    };
322    views_buffer.push(sub_view);
323    null_builder.append_non_null();
324}
325
326#[derive(Debug)]
327pub enum ColumnarValueRef<'a> {
328    Scalar(&'a [u8]),
329    NullableArray(&'a StringArray),
330    NonNullableArray(&'a StringArray),
331    NullableLargeStringArray(&'a LargeStringArray),
332    NonNullableLargeStringArray(&'a LargeStringArray),
333    NullableStringViewArray(&'a StringViewArray),
334    NonNullableStringViewArray(&'a StringViewArray),
335}
336
337impl ColumnarValueRef<'_> {
338    #[inline]
339    pub fn is_valid(&self, i: usize) -> bool {
340        match &self {
341            Self::Scalar(_)
342            | Self::NonNullableArray(_)
343            | Self::NonNullableLargeStringArray(_)
344            | Self::NonNullableStringViewArray(_) => true,
345            Self::NullableArray(array) => array.is_valid(i),
346            Self::NullableStringViewArray(array) => array.is_valid(i),
347            Self::NullableLargeStringArray(array) => array.is_valid(i),
348        }
349    }
350
351    #[inline]
352    pub fn nulls(&self) -> Option<NullBuffer> {
353        match &self {
354            Self::Scalar(_)
355            | Self::NonNullableArray(_)
356            | Self::NonNullableStringViewArray(_)
357            | Self::NonNullableLargeStringArray(_) => None,
358            Self::NullableArray(array) => array.nulls().cloned(),
359            Self::NullableStringViewArray(array) => array.nulls().cloned(),
360            Self::NullableLargeStringArray(array) => array.nulls().cloned(),
361        }
362    }
363}
364
365#[cfg(test)]
366mod tests {
367    use super::*;
368
369    #[test]
370    #[should_panic(expected = "capacity integer overflow")]
371    fn test_overflow_string_array_builder() {
372        let _builder = StringArrayBuilder::with_capacity(usize::MAX, usize::MAX);
373    }
374
375    #[test]
376    #[should_panic(expected = "capacity integer overflow")]
377    fn test_overflow_large_string_array_builder() {
378        let _builder = LargeStringArrayBuilder::with_capacity(usize::MAX, usize::MAX);
379    }
380}