datafusion_functions/
strings.rs1use std::mem::size_of;
19
20use arrow::array::{
21 make_view, Array, ArrayAccessor, ArrayDataBuilder, ByteView, LargeStringArray,
22 NullBufferBuilder, StringArray, StringViewArray, StringViewBuilder,
23};
24use arrow::buffer::{MutableBuffer, NullBuffer};
25use arrow::datatypes::DataType;
26
27pub struct StringArrayBuilder {
31 offsets_buffer: MutableBuffer,
32 value_buffer: MutableBuffer,
33}
34
35impl StringArrayBuilder {
36 pub fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self {
37 let capacity = item_capacity
38 .checked_add(1)
39 .map(|i| i.saturating_mul(size_of::<i32>()))
40 .expect("capacity integer overflow");
41
42 let mut offsets_buffer = MutableBuffer::with_capacity(capacity);
43 unsafe { offsets_buffer.push_unchecked(0_i32) };
45 Self {
46 offsets_buffer,
47 value_buffer: MutableBuffer::with_capacity(data_capacity),
48 }
49 }
50
51 pub fn write<const CHECK_VALID: bool>(
52 &mut self,
53 column: &ColumnarValueRef,
54 i: usize,
55 ) {
56 match column {
57 ColumnarValueRef::Scalar(s) => {
58 self.value_buffer.extend_from_slice(s);
59 }
60 ColumnarValueRef::NullableArray(array) => {
61 if !CHECK_VALID || array.is_valid(i) {
62 self.value_buffer
63 .extend_from_slice(array.value(i).as_bytes());
64 }
65 }
66 ColumnarValueRef::NullableLargeStringArray(array) => {
67 if !CHECK_VALID || array.is_valid(i) {
68 self.value_buffer
69 .extend_from_slice(array.value(i).as_bytes());
70 }
71 }
72 ColumnarValueRef::NullableStringViewArray(array) => {
73 if !CHECK_VALID || array.is_valid(i) {
74 self.value_buffer
75 .extend_from_slice(array.value(i).as_bytes());
76 }
77 }
78 ColumnarValueRef::NonNullableArray(array) => {
79 self.value_buffer
80 .extend_from_slice(array.value(i).as_bytes());
81 }
82 ColumnarValueRef::NonNullableLargeStringArray(array) => {
83 self.value_buffer
84 .extend_from_slice(array.value(i).as_bytes());
85 }
86 ColumnarValueRef::NonNullableStringViewArray(array) => {
87 self.value_buffer
88 .extend_from_slice(array.value(i).as_bytes());
89 }
90 }
91 }
92
93 pub fn append_offset(&mut self) {
94 let next_offset: i32 = self
95 .value_buffer
96 .len()
97 .try_into()
98 .expect("byte array offset overflow");
99 self.offsets_buffer.push(next_offset);
100 }
101
102 pub fn finish(self, null_buffer: Option<NullBuffer>) -> StringArray {
110 let row_count = self.offsets_buffer.len() / size_of::<i32>() - 1;
111 if let Some(ref null_buffer) = null_buffer {
112 assert_eq!(
113 null_buffer.len(),
114 row_count,
115 "Null buffer and offsets buffer must be the same length"
116 );
117 }
118 let array_builder = ArrayDataBuilder::new(DataType::Utf8)
119 .len(row_count)
120 .add_buffer(self.offsets_buffer.into())
121 .add_buffer(self.value_buffer.into())
122 .nulls(null_buffer);
123 let array_data = unsafe { array_builder.build_unchecked() };
126 StringArray::from(array_data)
127 }
128}
129
130pub struct StringViewArrayBuilder {
131 builder: StringViewBuilder,
132 block: String,
133}
134
135impl StringViewArrayBuilder {
136 pub fn with_capacity(_item_capacity: usize, data_capacity: usize) -> Self {
137 let builder = StringViewBuilder::with_capacity(data_capacity);
138 Self {
139 builder,
140 block: String::new(),
141 }
142 }
143
144 pub fn write<const CHECK_VALID: bool>(
145 &mut self,
146 column: &ColumnarValueRef,
147 i: usize,
148 ) {
149 match column {
150 ColumnarValueRef::Scalar(s) => {
151 self.block.push_str(std::str::from_utf8(s).unwrap());
152 }
153 ColumnarValueRef::NullableArray(array) => {
154 if !CHECK_VALID || array.is_valid(i) {
155 self.block.push_str(
156 std::str::from_utf8(array.value(i).as_bytes()).unwrap(),
157 );
158 }
159 }
160 ColumnarValueRef::NullableLargeStringArray(array) => {
161 if !CHECK_VALID || array.is_valid(i) {
162 self.block.push_str(
163 std::str::from_utf8(array.value(i).as_bytes()).unwrap(),
164 );
165 }
166 }
167 ColumnarValueRef::NullableStringViewArray(array) => {
168 if !CHECK_VALID || array.is_valid(i) {
169 self.block.push_str(
170 std::str::from_utf8(array.value(i).as_bytes()).unwrap(),
171 );
172 }
173 }
174 ColumnarValueRef::NonNullableArray(array) => {
175 self.block
176 .push_str(std::str::from_utf8(array.value(i).as_bytes()).unwrap());
177 }
178 ColumnarValueRef::NonNullableLargeStringArray(array) => {
179 self.block
180 .push_str(std::str::from_utf8(array.value(i).as_bytes()).unwrap());
181 }
182 ColumnarValueRef::NonNullableStringViewArray(array) => {
183 self.block
184 .push_str(std::str::from_utf8(array.value(i).as_bytes()).unwrap());
185 }
186 }
187 }
188
189 pub fn append_offset(&mut self) {
190 self.builder.append_value(&self.block);
191 self.block = String::new();
192 }
193
194 pub fn finish(mut self) -> StringViewArray {
195 self.builder.finish()
196 }
197}
198
199pub struct LargeStringArrayBuilder {
200 offsets_buffer: MutableBuffer,
201 value_buffer: MutableBuffer,
202}
203
204impl LargeStringArrayBuilder {
205 pub fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self {
206 let capacity = item_capacity
207 .checked_add(1)
208 .map(|i| i.saturating_mul(size_of::<i64>()))
209 .expect("capacity integer overflow");
210
211 let mut offsets_buffer = MutableBuffer::with_capacity(capacity);
212 unsafe { offsets_buffer.push_unchecked(0_i64) };
214 Self {
215 offsets_buffer,
216 value_buffer: MutableBuffer::with_capacity(data_capacity),
217 }
218 }
219
220 pub fn write<const CHECK_VALID: bool>(
221 &mut self,
222 column: &ColumnarValueRef,
223 i: usize,
224 ) {
225 match column {
226 ColumnarValueRef::Scalar(s) => {
227 self.value_buffer.extend_from_slice(s);
228 }
229 ColumnarValueRef::NullableArray(array) => {
230 if !CHECK_VALID || array.is_valid(i) {
231 self.value_buffer
232 .extend_from_slice(array.value(i).as_bytes());
233 }
234 }
235 ColumnarValueRef::NullableLargeStringArray(array) => {
236 if !CHECK_VALID || array.is_valid(i) {
237 self.value_buffer
238 .extend_from_slice(array.value(i).as_bytes());
239 }
240 }
241 ColumnarValueRef::NullableStringViewArray(array) => {
242 if !CHECK_VALID || array.is_valid(i) {
243 self.value_buffer
244 .extend_from_slice(array.value(i).as_bytes());
245 }
246 }
247 ColumnarValueRef::NonNullableArray(array) => {
248 self.value_buffer
249 .extend_from_slice(array.value(i).as_bytes());
250 }
251 ColumnarValueRef::NonNullableLargeStringArray(array) => {
252 self.value_buffer
253 .extend_from_slice(array.value(i).as_bytes());
254 }
255 ColumnarValueRef::NonNullableStringViewArray(array) => {
256 self.value_buffer
257 .extend_from_slice(array.value(i).as_bytes());
258 }
259 }
260 }
261
262 pub fn append_offset(&mut self) {
263 let next_offset: i64 = self
264 .value_buffer
265 .len()
266 .try_into()
267 .expect("byte array offset overflow");
268 self.offsets_buffer.push(next_offset);
269 }
270
271 pub fn finish(self, null_buffer: Option<NullBuffer>) -> LargeStringArray {
279 let row_count = self.offsets_buffer.len() / size_of::<i64>() - 1;
280 if let Some(ref null_buffer) = null_buffer {
281 assert_eq!(
282 null_buffer.len(),
283 row_count,
284 "Null buffer and offsets buffer must be the same length"
285 );
286 }
287 let array_builder = ArrayDataBuilder::new(DataType::LargeUtf8)
288 .len(row_count)
289 .add_buffer(self.offsets_buffer.into())
290 .add_buffer(self.value_buffer.into())
291 .nulls(null_buffer);
292 let array_data = unsafe { array_builder.build_unchecked() };
295 LargeStringArray::from(array_data)
296 }
297}
298
299pub fn make_and_append_view(
313 views_buffer: &mut Vec<u128>,
314 null_builder: &mut NullBufferBuilder,
315 original_view: &u128,
316 substr: &str,
317 start_offset: u32,
318) {
319 let substr_len = substr.len();
320 let sub_view = if substr_len > 12 {
321 let view = ByteView::from(*original_view);
322 make_view(
323 substr.as_bytes(),
324 view.buffer_index,
325 view.offset + start_offset,
326 )
327 } else {
328 make_view(substr.as_bytes(), 0, 0)
330 };
331 views_buffer.push(sub_view);
332 null_builder.append_non_null();
333}
334
335#[derive(Debug)]
336pub enum ColumnarValueRef<'a> {
337 Scalar(&'a [u8]),
338 NullableArray(&'a StringArray),
339 NonNullableArray(&'a StringArray),
340 NullableLargeStringArray(&'a LargeStringArray),
341 NonNullableLargeStringArray(&'a LargeStringArray),
342 NullableStringViewArray(&'a StringViewArray),
343 NonNullableStringViewArray(&'a StringViewArray),
344}
345
346impl ColumnarValueRef<'_> {
347 #[inline]
348 pub fn is_valid(&self, i: usize) -> bool {
349 match &self {
350 Self::Scalar(_)
351 | Self::NonNullableArray(_)
352 | Self::NonNullableLargeStringArray(_)
353 | Self::NonNullableStringViewArray(_) => true,
354 Self::NullableArray(array) => array.is_valid(i),
355 Self::NullableStringViewArray(array) => array.is_valid(i),
356 Self::NullableLargeStringArray(array) => array.is_valid(i),
357 }
358 }
359
360 #[inline]
361 pub fn nulls(&self) -> Option<NullBuffer> {
362 match &self {
363 Self::Scalar(_)
364 | Self::NonNullableArray(_)
365 | Self::NonNullableStringViewArray(_)
366 | Self::NonNullableLargeStringArray(_) => None,
367 Self::NullableArray(array) => array.nulls().cloned(),
368 Self::NullableStringViewArray(array) => array.nulls().cloned(),
369 Self::NullableLargeStringArray(array) => array.nulls().cloned(),
370 }
371 }
372}
373
374#[cfg(test)]
375mod tests {
376 use super::*;
377
378 #[test]
379 #[should_panic(expected = "capacity integer overflow")]
380 fn test_overflow_string_array_builder() {
381 let _builder = StringArrayBuilder::with_capacity(usize::MAX, usize::MAX);
382 }
383
384 #[test]
385 #[should_panic(expected = "capacity integer overflow")]
386 fn test_overflow_large_string_array_builder() {
387 let _builder = LargeStringArrayBuilder::with_capacity(usize::MAX, usize::MAX);
388 }
389}