datafusion_functions/
strings.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::mem::size_of;
19
20use arrow::array::{
21    make_view, Array, ArrayAccessor, ArrayDataBuilder, ByteView, LargeStringArray,
22    NullBufferBuilder, StringArray, StringViewArray, StringViewBuilder,
23};
24use arrow::buffer::{MutableBuffer, NullBuffer};
25use arrow::datatypes::DataType;
26
27/// Optimized version of the StringBuilder in Arrow that:
28/// 1. Precalculating the expected length of the result, avoiding reallocations.
29/// 2. Avoids creating / incrementally creating a `NullBufferBuilder`
30pub struct StringArrayBuilder {
31    offsets_buffer: MutableBuffer,
32    value_buffer: MutableBuffer,
33}
34
35impl StringArrayBuilder {
36    pub fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self {
37        let capacity = item_capacity
38            .checked_add(1)
39            .map(|i| i.saturating_mul(size_of::<i32>()))
40            .expect("capacity integer overflow");
41
42        let mut offsets_buffer = MutableBuffer::with_capacity(capacity);
43        // SAFETY: the first offset value is definitely not going to exceed the bounds.
44        unsafe { offsets_buffer.push_unchecked(0_i32) };
45        Self {
46            offsets_buffer,
47            value_buffer: MutableBuffer::with_capacity(data_capacity),
48        }
49    }
50
51    pub fn write<const CHECK_VALID: bool>(
52        &mut self,
53        column: &ColumnarValueRef,
54        i: usize,
55    ) {
56        match column {
57            ColumnarValueRef::Scalar(s) => {
58                self.value_buffer.extend_from_slice(s);
59            }
60            ColumnarValueRef::NullableArray(array) => {
61                if !CHECK_VALID || array.is_valid(i) {
62                    self.value_buffer
63                        .extend_from_slice(array.value(i).as_bytes());
64                }
65            }
66            ColumnarValueRef::NullableLargeStringArray(array) => {
67                if !CHECK_VALID || array.is_valid(i) {
68                    self.value_buffer
69                        .extend_from_slice(array.value(i).as_bytes());
70                }
71            }
72            ColumnarValueRef::NullableStringViewArray(array) => {
73                if !CHECK_VALID || array.is_valid(i) {
74                    self.value_buffer
75                        .extend_from_slice(array.value(i).as_bytes());
76                }
77            }
78            ColumnarValueRef::NonNullableArray(array) => {
79                self.value_buffer
80                    .extend_from_slice(array.value(i).as_bytes());
81            }
82            ColumnarValueRef::NonNullableLargeStringArray(array) => {
83                self.value_buffer
84                    .extend_from_slice(array.value(i).as_bytes());
85            }
86            ColumnarValueRef::NonNullableStringViewArray(array) => {
87                self.value_buffer
88                    .extend_from_slice(array.value(i).as_bytes());
89            }
90        }
91    }
92
93    pub fn append_offset(&mut self) {
94        let next_offset: i32 = self
95            .value_buffer
96            .len()
97            .try_into()
98            .expect("byte array offset overflow");
99        self.offsets_buffer.push(next_offset);
100    }
101
102    /// Finalize the builder into a concrete [`StringArray`].
103    ///
104    /// # Panics
105    ///
106    /// This method can panic when:
107    ///
108    /// - the provided `null_buffer` is not the same length as the `offsets_buffer`.
109    pub fn finish(self, null_buffer: Option<NullBuffer>) -> StringArray {
110        let row_count = self.offsets_buffer.len() / size_of::<i32>() - 1;
111        if let Some(ref null_buffer) = null_buffer {
112            assert_eq!(
113                null_buffer.len(),
114                row_count,
115                "Null buffer and offsets buffer must be the same length"
116            );
117        }
118        let array_builder = ArrayDataBuilder::new(DataType::Utf8)
119            .len(row_count)
120            .add_buffer(self.offsets_buffer.into())
121            .add_buffer(self.value_buffer.into())
122            .nulls(null_buffer);
123        // SAFETY: all data that was appended was valid UTF8 and the values
124        // and offsets were created correctly
125        let array_data = unsafe { array_builder.build_unchecked() };
126        StringArray::from(array_data)
127    }
128}
129
130pub struct StringViewArrayBuilder {
131    builder: StringViewBuilder,
132    block: String,
133}
134
135impl StringViewArrayBuilder {
136    pub fn with_capacity(_item_capacity: usize, data_capacity: usize) -> Self {
137        let builder = StringViewBuilder::with_capacity(data_capacity);
138        Self {
139            builder,
140            block: String::new(),
141        }
142    }
143
144    pub fn write<const CHECK_VALID: bool>(
145        &mut self,
146        column: &ColumnarValueRef,
147        i: usize,
148    ) {
149        match column {
150            ColumnarValueRef::Scalar(s) => {
151                self.block.push_str(std::str::from_utf8(s).unwrap());
152            }
153            ColumnarValueRef::NullableArray(array) => {
154                if !CHECK_VALID || array.is_valid(i) {
155                    self.block.push_str(
156                        std::str::from_utf8(array.value(i).as_bytes()).unwrap(),
157                    );
158                }
159            }
160            ColumnarValueRef::NullableLargeStringArray(array) => {
161                if !CHECK_VALID || array.is_valid(i) {
162                    self.block.push_str(
163                        std::str::from_utf8(array.value(i).as_bytes()).unwrap(),
164                    );
165                }
166            }
167            ColumnarValueRef::NullableStringViewArray(array) => {
168                if !CHECK_VALID || array.is_valid(i) {
169                    self.block.push_str(
170                        std::str::from_utf8(array.value(i).as_bytes()).unwrap(),
171                    );
172                }
173            }
174            ColumnarValueRef::NonNullableArray(array) => {
175                self.block
176                    .push_str(std::str::from_utf8(array.value(i).as_bytes()).unwrap());
177            }
178            ColumnarValueRef::NonNullableLargeStringArray(array) => {
179                self.block
180                    .push_str(std::str::from_utf8(array.value(i).as_bytes()).unwrap());
181            }
182            ColumnarValueRef::NonNullableStringViewArray(array) => {
183                self.block
184                    .push_str(std::str::from_utf8(array.value(i).as_bytes()).unwrap());
185            }
186        }
187    }
188
189    pub fn append_offset(&mut self) {
190        self.builder.append_value(&self.block);
191        self.block = String::new();
192    }
193
194    pub fn finish(mut self) -> StringViewArray {
195        self.builder.finish()
196    }
197}
198
199pub struct LargeStringArrayBuilder {
200    offsets_buffer: MutableBuffer,
201    value_buffer: MutableBuffer,
202}
203
204impl LargeStringArrayBuilder {
205    pub fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self {
206        let capacity = item_capacity
207            .checked_add(1)
208            .map(|i| i.saturating_mul(size_of::<i64>()))
209            .expect("capacity integer overflow");
210
211        let mut offsets_buffer = MutableBuffer::with_capacity(capacity);
212        // SAFETY: the first offset value is definitely not going to exceed the bounds.
213        unsafe { offsets_buffer.push_unchecked(0_i64) };
214        Self {
215            offsets_buffer,
216            value_buffer: MutableBuffer::with_capacity(data_capacity),
217        }
218    }
219
220    pub fn write<const CHECK_VALID: bool>(
221        &mut self,
222        column: &ColumnarValueRef,
223        i: usize,
224    ) {
225        match column {
226            ColumnarValueRef::Scalar(s) => {
227                self.value_buffer.extend_from_slice(s);
228            }
229            ColumnarValueRef::NullableArray(array) => {
230                if !CHECK_VALID || array.is_valid(i) {
231                    self.value_buffer
232                        .extend_from_slice(array.value(i).as_bytes());
233                }
234            }
235            ColumnarValueRef::NullableLargeStringArray(array) => {
236                if !CHECK_VALID || array.is_valid(i) {
237                    self.value_buffer
238                        .extend_from_slice(array.value(i).as_bytes());
239                }
240            }
241            ColumnarValueRef::NullableStringViewArray(array) => {
242                if !CHECK_VALID || array.is_valid(i) {
243                    self.value_buffer
244                        .extend_from_slice(array.value(i).as_bytes());
245                }
246            }
247            ColumnarValueRef::NonNullableArray(array) => {
248                self.value_buffer
249                    .extend_from_slice(array.value(i).as_bytes());
250            }
251            ColumnarValueRef::NonNullableLargeStringArray(array) => {
252                self.value_buffer
253                    .extend_from_slice(array.value(i).as_bytes());
254            }
255            ColumnarValueRef::NonNullableStringViewArray(array) => {
256                self.value_buffer
257                    .extend_from_slice(array.value(i).as_bytes());
258            }
259        }
260    }
261
262    pub fn append_offset(&mut self) {
263        let next_offset: i64 = self
264            .value_buffer
265            .len()
266            .try_into()
267            .expect("byte array offset overflow");
268        self.offsets_buffer.push(next_offset);
269    }
270
271    /// Finalize the builder into a concrete [`LargeStringArray`].
272    ///
273    /// # Panics
274    ///
275    /// This method can panic when:
276    ///
277    /// - the provided `null_buffer` is not the same length as the `offsets_buffer`.
278    pub fn finish(self, null_buffer: Option<NullBuffer>) -> LargeStringArray {
279        let row_count = self.offsets_buffer.len() / size_of::<i64>() - 1;
280        if let Some(ref null_buffer) = null_buffer {
281            assert_eq!(
282                null_buffer.len(),
283                row_count,
284                "Null buffer and offsets buffer must be the same length"
285            );
286        }
287        let array_builder = ArrayDataBuilder::new(DataType::LargeUtf8)
288            .len(row_count)
289            .add_buffer(self.offsets_buffer.into())
290            .add_buffer(self.value_buffer.into())
291            .nulls(null_buffer);
292        // SAFETY: all data that was appended was valid Large UTF8 and the values
293        // and offsets were created correctly
294        let array_data = unsafe { array_builder.build_unchecked() };
295        LargeStringArray::from(array_data)
296    }
297}
298
299/// Append a new view to the views buffer with the given substr
300///
301/// # Safety
302///
303/// original_view must be a valid view (the format described on
304/// [`GenericByteViewArray`](arrow::array::GenericByteViewArray).
305///
306/// # Arguments
307/// - views_buffer: The buffer to append the new view to
308/// - null_builder: The buffer to append the null value to
309/// - original_view: The original view value
310/// - substr: The substring to append. Must be a valid substring of the original view
311/// - start_offset: The start offset of the substring in the view
312pub fn make_and_append_view(
313    views_buffer: &mut Vec<u128>,
314    null_builder: &mut NullBufferBuilder,
315    original_view: &u128,
316    substr: &str,
317    start_offset: u32,
318) {
319    let substr_len = substr.len();
320    let sub_view = if substr_len > 12 {
321        let view = ByteView::from(*original_view);
322        make_view(
323            substr.as_bytes(),
324            view.buffer_index,
325            view.offset + start_offset,
326        )
327    } else {
328        // inline value does not need block id or offset
329        make_view(substr.as_bytes(), 0, 0)
330    };
331    views_buffer.push(sub_view);
332    null_builder.append_non_null();
333}
334
335#[derive(Debug)]
336pub enum ColumnarValueRef<'a> {
337    Scalar(&'a [u8]),
338    NullableArray(&'a StringArray),
339    NonNullableArray(&'a StringArray),
340    NullableLargeStringArray(&'a LargeStringArray),
341    NonNullableLargeStringArray(&'a LargeStringArray),
342    NullableStringViewArray(&'a StringViewArray),
343    NonNullableStringViewArray(&'a StringViewArray),
344}
345
346impl ColumnarValueRef<'_> {
347    #[inline]
348    pub fn is_valid(&self, i: usize) -> bool {
349        match &self {
350            Self::Scalar(_)
351            | Self::NonNullableArray(_)
352            | Self::NonNullableLargeStringArray(_)
353            | Self::NonNullableStringViewArray(_) => true,
354            Self::NullableArray(array) => array.is_valid(i),
355            Self::NullableStringViewArray(array) => array.is_valid(i),
356            Self::NullableLargeStringArray(array) => array.is_valid(i),
357        }
358    }
359
360    #[inline]
361    pub fn nulls(&self) -> Option<NullBuffer> {
362        match &self {
363            Self::Scalar(_)
364            | Self::NonNullableArray(_)
365            | Self::NonNullableStringViewArray(_)
366            | Self::NonNullableLargeStringArray(_) => None,
367            Self::NullableArray(array) => array.nulls().cloned(),
368            Self::NullableStringViewArray(array) => array.nulls().cloned(),
369            Self::NullableLargeStringArray(array) => array.nulls().cloned(),
370        }
371    }
372}
373
374#[cfg(test)]
375mod tests {
376    use super::*;
377
378    #[test]
379    #[should_panic(expected = "capacity integer overflow")]
380    fn test_overflow_string_array_builder() {
381        let _builder = StringArrayBuilder::with_capacity(usize::MAX, usize::MAX);
382    }
383
384    #[test]
385    #[should_panic(expected = "capacity integer overflow")]
386    fn test_overflow_large_string_array_builder() {
387        let _builder = LargeStringArrayBuilder::with_capacity(usize::MAX, usize::MAX);
388    }
389}