1use std::marker::PhantomData;
19use std::mem::size_of;
20use std::sync::Arc;
21
22use datafusion_common::{Result, exec_datafusion_err, internal_err};
23
24use arrow::array::{
25 Array, ArrayAccessor, ArrayDataBuilder, ArrayRef, BinaryArray, ByteView,
26 GenericStringArray, LargeStringArray, OffsetSizeTrait, StringArray, StringViewArray,
27 make_view,
28};
29use arrow::buffer::{Buffer, MutableBuffer, NullBuffer, ScalarBuffer};
30use arrow::datatypes::DataType;
31
32pub(crate) struct ConcatStringBuilder {
43 offsets_buffer: MutableBuffer,
44 value_buffer: MutableBuffer,
45 tainted: bool,
47}
48
49impl ConcatStringBuilder {
50 pub fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self {
51 let capacity = item_capacity
52 .checked_add(1)
53 .map(|i| i.saturating_mul(size_of::<i32>()))
54 .expect("capacity integer overflow");
55
56 let mut offsets_buffer = MutableBuffer::with_capacity(capacity);
57 unsafe { offsets_buffer.push_unchecked(0_i32) };
59 Self {
60 offsets_buffer,
61 value_buffer: MutableBuffer::with_capacity(data_capacity),
62 tainted: false,
63 }
64 }
65
66 pub fn write<const CHECK_VALID: bool>(
67 &mut self,
68 column: &ColumnarValueRef,
69 i: usize,
70 ) {
71 match column {
72 ColumnarValueRef::Scalar(s) => {
73 self.value_buffer.extend_from_slice(s);
74 self.tainted = true;
75 }
76 ColumnarValueRef::NullableArray(array) => {
77 if !CHECK_VALID || array.is_valid(i) {
78 self.value_buffer
79 .extend_from_slice(array.value(i).as_bytes());
80 }
81 }
82 ColumnarValueRef::NullableLargeStringArray(array) => {
83 if !CHECK_VALID || array.is_valid(i) {
84 self.value_buffer
85 .extend_from_slice(array.value(i).as_bytes());
86 }
87 }
88 ColumnarValueRef::NullableStringViewArray(array) => {
89 if !CHECK_VALID || array.is_valid(i) {
90 self.value_buffer
91 .extend_from_slice(array.value(i).as_bytes());
92 }
93 }
94 ColumnarValueRef::NullableBinaryArray(array) => {
95 if !CHECK_VALID || array.is_valid(i) {
96 self.value_buffer.extend_from_slice(array.value(i));
97 }
98 self.tainted = true;
99 }
100 ColumnarValueRef::NonNullableArray(array) => {
101 self.value_buffer
102 .extend_from_slice(array.value(i).as_bytes());
103 }
104 ColumnarValueRef::NonNullableLargeStringArray(array) => {
105 self.value_buffer
106 .extend_from_slice(array.value(i).as_bytes());
107 }
108 ColumnarValueRef::NonNullableStringViewArray(array) => {
109 self.value_buffer
110 .extend_from_slice(array.value(i).as_bytes());
111 }
112 ColumnarValueRef::NonNullableBinaryArray(array) => {
113 self.value_buffer.extend_from_slice(array.value(i));
114 self.tainted = true;
115 }
116 }
117 }
118
119 pub fn append_offset(&mut self) -> Result<()> {
120 let next_offset: i32 = self
121 .value_buffer
122 .len()
123 .try_into()
124 .map_err(|_| exec_datafusion_err!("byte array offset overflow"))?;
125 self.offsets_buffer.push(next_offset);
126 Ok(())
127 }
128
129 pub fn finish(self, null_buffer: Option<NullBuffer>) -> Result<StringArray> {
137 let row_count = self.offsets_buffer.len() / size_of::<i32>() - 1;
138 if let Some(ref null_buffer) = null_buffer
139 && null_buffer.len() != row_count
140 {
141 return internal_err!(
142 "Null buffer and offsets buffer must be the same length"
143 );
144 }
145 let array_builder = ArrayDataBuilder::new(DataType::Utf8)
146 .len(row_count)
147 .add_buffer(self.offsets_buffer.into())
148 .add_buffer(self.value_buffer.into())
149 .nulls(null_buffer);
150 if self.tainted {
151 let array_data = array_builder.build()?;
154 Ok(StringArray::from(array_data))
155 } else {
156 let array_data = unsafe { array_builder.build_unchecked() };
159 Ok(StringArray::from(array_data))
160 }
161 }
162}
163
164pub(crate) struct ConcatStringViewBuilder {
175 views: Vec<u128>,
176 data: Vec<u8>,
177 block: Vec<u8>,
178 tainted: bool,
180}
181
182impl ConcatStringViewBuilder {
183 pub fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self {
184 Self {
185 views: Vec::with_capacity(item_capacity),
186 data: Vec::with_capacity(data_capacity),
187 block: vec![],
188 tainted: false,
189 }
190 }
191
192 pub fn write<const CHECK_VALID: bool>(
193 &mut self,
194 column: &ColumnarValueRef,
195 i: usize,
196 ) {
197 match column {
198 ColumnarValueRef::Scalar(s) => {
199 self.block.extend_from_slice(s);
200 self.tainted = true;
201 }
202 ColumnarValueRef::NullableArray(array) => {
203 if !CHECK_VALID || array.is_valid(i) {
204 self.block.extend_from_slice(array.value(i).as_bytes());
205 }
206 }
207 ColumnarValueRef::NullableLargeStringArray(array) => {
208 if !CHECK_VALID || array.is_valid(i) {
209 self.block.extend_from_slice(array.value(i).as_bytes());
210 }
211 }
212 ColumnarValueRef::NullableStringViewArray(array) => {
213 if !CHECK_VALID || array.is_valid(i) {
214 self.block.extend_from_slice(array.value(i).as_bytes());
215 }
216 }
217 ColumnarValueRef::NullableBinaryArray(array) => {
218 if !CHECK_VALID || array.is_valid(i) {
219 self.block.extend_from_slice(array.value(i));
220 }
221 self.tainted = true;
222 }
223 ColumnarValueRef::NonNullableArray(array) => {
224 self.block.extend_from_slice(array.value(i).as_bytes());
225 }
226 ColumnarValueRef::NonNullableLargeStringArray(array) => {
227 self.block.extend_from_slice(array.value(i).as_bytes());
228 }
229 ColumnarValueRef::NonNullableStringViewArray(array) => {
230 self.block.extend_from_slice(array.value(i).as_bytes());
231 }
232 ColumnarValueRef::NonNullableBinaryArray(array) => {
233 self.block.extend_from_slice(array.value(i));
234 self.tainted = true;
235 }
236 }
237 }
238
239 pub fn append_offset(&mut self) -> Result<()> {
242 if self.tainted {
243 std::str::from_utf8(&self.block)
244 .map_err(|_| exec_datafusion_err!("invalid UTF-8 in binary literal"))?;
245 }
246
247 let v = &self.block;
248 if v.len() > 12 {
249 let offset: u32 = self
250 .data
251 .len()
252 .try_into()
253 .map_err(|_| exec_datafusion_err!("byte array offset overflow"))?;
254 self.data.extend_from_slice(v);
255 self.views.push(make_view(v, 0, offset));
256 } else {
257 self.views.push(make_view(v, 0, 0));
258 }
259
260 self.block.clear();
261 self.tainted = false;
262 Ok(())
263 }
264
265 pub fn finish(self, null_buffer: Option<NullBuffer>) -> Result<StringViewArray> {
273 if let Some(ref nulls) = null_buffer
274 && nulls.len() != self.views.len()
275 {
276 return internal_err!(
277 "Null buffer length ({}) must match row count ({})",
278 nulls.len(),
279 self.views.len()
280 );
281 }
282
283 let buffers: Vec<Buffer> = if self.data.is_empty() {
284 vec![]
285 } else {
286 vec![Buffer::from(self.data)]
287 };
288
289 let array = unsafe {
293 StringViewArray::new_unchecked(
294 ScalarBuffer::from(self.views),
295 buffers,
296 null_buffer,
297 )
298 };
299 Ok(array)
300 }
301}
302
303pub(crate) struct ConcatLargeStringBuilder {
310 offsets_buffer: MutableBuffer,
311 value_buffer: MutableBuffer,
312 tainted: bool,
314}
315
316impl ConcatLargeStringBuilder {
317 pub fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self {
318 let capacity = item_capacity
319 .checked_add(1)
320 .map(|i| i.saturating_mul(size_of::<i64>()))
321 .expect("capacity integer overflow");
322
323 let mut offsets_buffer = MutableBuffer::with_capacity(capacity);
324 unsafe { offsets_buffer.push_unchecked(0_i64) };
326 Self {
327 offsets_buffer,
328 value_buffer: MutableBuffer::with_capacity(data_capacity),
329 tainted: false,
330 }
331 }
332
333 pub fn write<const CHECK_VALID: bool>(
334 &mut self,
335 column: &ColumnarValueRef,
336 i: usize,
337 ) {
338 match column {
339 ColumnarValueRef::Scalar(s) => {
340 self.value_buffer.extend_from_slice(s);
341 self.tainted = true;
342 }
343 ColumnarValueRef::NullableArray(array) => {
344 if !CHECK_VALID || array.is_valid(i) {
345 self.value_buffer
346 .extend_from_slice(array.value(i).as_bytes());
347 }
348 }
349 ColumnarValueRef::NullableLargeStringArray(array) => {
350 if !CHECK_VALID || array.is_valid(i) {
351 self.value_buffer
352 .extend_from_slice(array.value(i).as_bytes());
353 }
354 }
355 ColumnarValueRef::NullableStringViewArray(array) => {
356 if !CHECK_VALID || array.is_valid(i) {
357 self.value_buffer
358 .extend_from_slice(array.value(i).as_bytes());
359 }
360 }
361 ColumnarValueRef::NullableBinaryArray(array) => {
362 if !CHECK_VALID || array.is_valid(i) {
363 self.value_buffer.extend_from_slice(array.value(i));
364 }
365 self.tainted = true;
366 }
367 ColumnarValueRef::NonNullableArray(array) => {
368 self.value_buffer
369 .extend_from_slice(array.value(i).as_bytes());
370 }
371 ColumnarValueRef::NonNullableLargeStringArray(array) => {
372 self.value_buffer
373 .extend_from_slice(array.value(i).as_bytes());
374 }
375 ColumnarValueRef::NonNullableStringViewArray(array) => {
376 self.value_buffer
377 .extend_from_slice(array.value(i).as_bytes());
378 }
379 ColumnarValueRef::NonNullableBinaryArray(array) => {
380 self.value_buffer.extend_from_slice(array.value(i));
381 self.tainted = true;
382 }
383 }
384 }
385
386 pub fn append_offset(&mut self) -> Result<()> {
387 let next_offset: i64 = self
388 .value_buffer
389 .len()
390 .try_into()
391 .map_err(|_| exec_datafusion_err!("byte array offset overflow"))?;
392 self.offsets_buffer.push(next_offset);
393 Ok(())
394 }
395
396 pub fn finish(self, null_buffer: Option<NullBuffer>) -> Result<LargeStringArray> {
404 let row_count = self.offsets_buffer.len() / size_of::<i64>() - 1;
405 if let Some(ref null_buffer) = null_buffer
406 && null_buffer.len() != row_count
407 {
408 return internal_err!(
409 "Null buffer and offsets buffer must be the same length"
410 );
411 }
412 let array_builder = ArrayDataBuilder::new(DataType::LargeUtf8)
413 .len(row_count)
414 .add_buffer(self.offsets_buffer.into())
415 .add_buffer(self.value_buffer.into())
416 .nulls(null_buffer);
417 if self.tainted {
418 let array_data = array_builder.build()?;
421 Ok(LargeStringArray::from(array_data))
422 } else {
423 let array_data = unsafe { array_builder.build_unchecked() };
426 Ok(LargeStringArray::from(array_data))
427 }
428 }
429}
430
431pub(crate) struct GenericStringArrayBuilder<O: OffsetSizeTrait> {
450 offsets_buffer: MutableBuffer,
451 value_buffer: MutableBuffer,
452 placeholder_count: usize,
453 _phantom: PhantomData<O>,
454}
455
456impl<O: OffsetSizeTrait> GenericStringArrayBuilder<O> {
457 pub fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self {
458 let capacity = item_capacity
459 .checked_add(1)
460 .map(|i| i.saturating_mul(size_of::<O>()))
461 .expect("capacity integer overflow");
462
463 let mut offsets_buffer = MutableBuffer::with_capacity(capacity);
464 offsets_buffer.push(O::usize_as(0));
465 Self {
466 offsets_buffer,
467 value_buffer: MutableBuffer::with_capacity(data_capacity),
468 placeholder_count: 0,
469 _phantom: PhantomData,
470 }
471 }
472
473 #[inline]
479 pub fn append_value(&mut self, value: &str) {
480 self.value_buffer.extend_from_slice(value.as_bytes());
481 let next_offset =
482 O::from_usize(self.value_buffer.len()).expect("byte array offset overflow");
483 self.offsets_buffer.push(next_offset);
484 }
485
486 #[inline]
488 pub fn append_placeholder(&mut self) {
489 let next_offset =
490 O::from_usize(self.value_buffer.len()).expect("byte array offset overflow");
491 self.offsets_buffer.push(next_offset);
492 self.placeholder_count += 1;
493 }
494
495 #[inline]
506 pub unsafe fn append_byte_map<F: FnMut(u8) -> u8>(&mut self, src: &[u8], mut map: F) {
507 self.value_buffer.extend(src.iter().map(|&b| map(b)));
508 let next_offset =
509 O::from_usize(self.value_buffer.len()).expect("byte array offset overflow");
510 self.offsets_buffer.push(next_offset);
511 }
512
513 #[inline]
519 pub fn append_with<F>(&mut self, f: F)
520 where
521 F: FnOnce(&mut GenericStringWriter<'_>),
522 {
523 let mut writer = GenericStringWriter {
524 value_buffer: &mut self.value_buffer,
525 };
526 f(&mut writer);
527 let next_offset =
528 O::from_usize(self.value_buffer.len()).expect("byte array offset overflow");
529 self.offsets_buffer.push(next_offset);
530 }
531
532 pub fn finish(
540 self,
541 null_buffer: Option<NullBuffer>,
542 ) -> Result<GenericStringArray<O>> {
543 let row_count = self.offsets_buffer.len() / size_of::<O>() - 1;
544 if let Some(ref n) = null_buffer
545 && n.len() != row_count
546 {
547 return internal_err!(
548 "Null buffer length ({}) must match row count ({row_count})",
549 n.len()
550 );
551 }
552 let null_count = null_buffer.as_ref().map_or(0, |n| n.null_count());
553 debug_assert!(
554 null_count >= self.placeholder_count,
555 "{} placeholder rows but null buffer has {null_count} nulls",
556 self.placeholder_count,
557 );
558 let array_data = ArrayDataBuilder::new(GenericStringArray::<O>::DATA_TYPE)
559 .len(row_count)
560 .add_buffer(self.offsets_buffer.into())
561 .add_buffer(self.value_buffer.into())
562 .nulls(null_buffer);
563 let array_data = unsafe { array_data.build_unchecked() };
566 Ok(GenericStringArray::<O>::from(array_data))
567 }
568}
569
570pub(crate) const STRING_VIEW_INIT_BLOCK_SIZE: u32 = 8 * 1024;
573pub(crate) const STRING_VIEW_MAX_BLOCK_SIZE: u32 = 2 * 1024 * 1024;
576
577pub(crate) trait StringWriter {
579 fn write_str(&mut self, s: &str);
580 fn write_char(&mut self, c: char);
581}
582
583pub(crate) struct GenericStringWriter<'a> {
586 value_buffer: &'a mut MutableBuffer,
587}
588
589impl StringWriter for GenericStringWriter<'_> {
590 #[inline(always)]
591 fn write_str(&mut self, s: &str) {
592 push_bytes_to_mutable_buffer(self.value_buffer, s.as_bytes());
593 }
594
595 #[inline(always)]
596 fn write_char(&mut self, c: char) {
597 push_char_to_mutable_buffer(self.value_buffer, c);
598 }
599}
600
601#[inline(always)]
605fn push_bytes_to_mutable_buffer(value_buffer: &mut MutableBuffer, bytes: &[u8]) {
606 let n = bytes.len();
607 let old_len = value_buffer.len();
608 value_buffer.reserve(n);
609
610 unsafe {
613 let dst = value_buffer.as_mut_ptr().add(old_len);
614 let src = bytes.as_ptr();
615 match n {
616 0 => {}
617 1 => std::ptr::copy_nonoverlapping(src, dst, 1),
618 2 => std::ptr::copy_nonoverlapping(src, dst, 2),
619 3 => std::ptr::copy_nonoverlapping(src, dst, 3),
620 4 => std::ptr::copy_nonoverlapping(src, dst, 4),
621 5 => std::ptr::copy_nonoverlapping(src, dst, 5),
622 6 => std::ptr::copy_nonoverlapping(src, dst, 6),
623 7 => std::ptr::copy_nonoverlapping(src, dst, 7),
624 8 => std::ptr::copy_nonoverlapping(src, dst, 8),
625 _ => std::ptr::copy_nonoverlapping(src, dst, n),
626 }
627 value_buffer.set_len(old_len + n);
628 }
629}
630
631#[inline(always)]
632fn push_char_to_mutable_buffer(value_buffer: &mut MutableBuffer, c: char) {
633 let len = c.len_utf8();
634 let old_len = value_buffer.len();
635 value_buffer.reserve(len);
636
637 unsafe {
640 let dst = value_buffer.as_mut_ptr().add(old_len);
641 if len == 1 {
642 *dst = c as u8;
643 } else {
644 c.encode_utf8(std::slice::from_raw_parts_mut(dst, len));
645 }
646 value_buffer.set_len(old_len + len);
647 }
648}
649
650pub(crate) struct StringViewArrayBuilder {
657 views: Vec<u128>,
658 in_progress: Vec<u8>,
659 completed: Vec<Buffer>,
660 block_size: u32,
661 placeholder_count: usize,
662}
663
664impl StringViewArrayBuilder {
665 pub fn with_capacity(item_capacity: usize) -> Self {
666 Self {
667 views: Vec::with_capacity(item_capacity),
668 in_progress: Vec::new(),
669 completed: Vec::new(),
670 block_size: STRING_VIEW_INIT_BLOCK_SIZE,
671 placeholder_count: 0,
672 }
673 }
674
675 fn next_block_size(&mut self) -> u32 {
677 if self.block_size < STRING_VIEW_MAX_BLOCK_SIZE {
678 self.block_size = self.block_size.saturating_mul(2);
679 }
680 self.block_size
681 }
682
683 #[inline]
693 pub fn append_value(&mut self, value: &str) {
694 let v = value.as_bytes();
695 let length: u32 =
696 i32::try_from(v.len()).expect("value length exceeds i32::MAX") as u32;
697 if length <= 12 {
698 self.views.push(make_view(v, 0, 0));
699 return;
700 }
701
702 let required_cap = self.in_progress.len() + length as usize;
703 if self.in_progress.capacity() < required_cap {
704 self.flush_in_progress();
705 let to_reserve = (length as usize).max(self.next_block_size() as usize);
706 self.in_progress.reserve(to_reserve);
707 }
708
709 let offset: u32 = i32::try_from(self.in_progress.len())
710 .expect("offset exceeds i32::MAX") as u32;
711 self.in_progress.extend_from_slice(v);
712 self.views.push(self.make_long_view(length, offset, v));
713 }
714
715 #[inline]
717 pub fn append_placeholder(&mut self) {
718 self.views.push(0);
720 self.placeholder_count += 1;
721 }
722
723 #[inline]
728 fn ensure_long_capacity(&mut self, length: u32) {
729 let required_cap = self.in_progress.len() + length as usize;
730 if self.in_progress.capacity() < required_cap {
731 self.flush_in_progress();
732 let to_reserve = (length as usize).max(self.next_block_size() as usize);
733 self.in_progress.reserve(to_reserve);
734 }
735 }
736
737 #[inline]
745 fn make_long_view(&self, length: u32, offset: u32, prefix_bytes: &[u8]) -> u128 {
746 let buffer_index: u32 = i32::try_from(self.completed.len())
747 .expect("buffer count exceeds i32::MAX")
748 as u32;
749 ByteView {
750 length,
751 prefix: u32::from_le_bytes(prefix_bytes[..4].try_into().unwrap()),
753 buffer_index,
754 offset,
755 }
756 .into()
757 }
758
759 #[inline]
772 pub unsafe fn append_byte_map<F: FnMut(u8) -> u8>(&mut self, src: &[u8], mut map: F) {
773 let length: u32 =
774 i32::try_from(src.len()).expect("value length exceeds i32::MAX") as u32;
775 if length <= 12 {
776 let mut bytes = [0u8; 12];
777 for (d, &b) in bytes[..src.len()].iter_mut().zip(src) {
778 *d = map(b);
779 }
780 self.views.push(make_view(&bytes[..src.len()], 0, 0));
781 return;
782 }
783
784 self.ensure_long_capacity(length);
785
786 let cursor = self.in_progress.len();
787 let offset: u32 = i32::try_from(cursor).expect("offset exceeds i32::MAX") as u32;
788 self.in_progress.extend(src.iter().map(|&b| map(b)));
789 self.views
790 .push(self.make_long_view(length, offset, &self.in_progress[cursor..]));
791 }
792
793 #[inline]
801 pub fn append_with<F>(&mut self, f: F)
802 where
803 F: FnOnce(&mut StringViewWriter<'_>),
804 {
805 let mut writer = StringViewWriter {
806 inline_buf: [0u8; 12],
807 inline_len: 0,
808 spill_cursor: None,
809 builder: self,
810 };
811 f(&mut writer);
812 let StringViewWriter {
816 inline_buf,
817 inline_len,
818 spill_cursor,
819 ..
820 } = writer;
821
822 match spill_cursor {
823 None => {
824 self.views
825 .push(make_view(&inline_buf[..inline_len as usize], 0, 0));
826 }
827 Some(start) => {
828 let end = self.in_progress.len();
829 let length: u32 = i32::try_from(end - start)
830 .expect("value length exceeds i32::MAX")
831 as u32;
832 let offset: u32 =
833 i32::try_from(start).expect("offset exceeds i32::MAX") as u32;
834 self.views.push(self.make_long_view(
835 length,
836 offset,
837 &self.in_progress[start..],
838 ));
839 }
840 }
841 }
842
843 fn flush_in_progress(&mut self) {
844 if !self.in_progress.is_empty() {
845 let block = std::mem::take(&mut self.in_progress);
846 self.completed.push(Buffer::from_vec(block));
847 }
848 }
849
850 pub fn finish(mut self, null_buffer: Option<NullBuffer>) -> Result<StringViewArray> {
858 if let Some(ref n) = null_buffer
859 && n.len() != self.views.len()
860 {
861 return internal_err!(
862 "Null buffer length ({}) must match row count ({})",
863 n.len(),
864 self.views.len()
865 );
866 }
867 let null_count = null_buffer.as_ref().map_or(0, |n| n.null_count());
868 debug_assert!(
869 null_count >= self.placeholder_count,
870 "{} placeholder rows but null buffer has {null_count} nulls",
871 self.placeholder_count,
872 );
873 self.flush_in_progress();
874 let array = unsafe {
879 StringViewArray::new_unchecked(
880 ScalarBuffer::from(self.views),
881 self.completed,
882 null_buffer,
883 )
884 };
885 Ok(array)
886 }
887}
888
889pub(crate) struct StringViewWriter<'a> {
897 inline_buf: [u8; 12],
898 inline_len: u8,
899 spill_cursor: Option<usize>,
902 builder: &'a mut StringViewArrayBuilder,
903}
904
905impl StringWriter for StringViewWriter<'_> {
906 #[inline]
907 fn write_str(&mut self, s: &str) {
908 let bytes = s.as_bytes();
909 if self.spill_cursor.is_some() {
910 self.builder.in_progress.extend_from_slice(bytes);
911 return;
912 }
913
914 let inline_len = self.inline_len as usize;
915 let new_len = inline_len + bytes.len();
916 if new_len <= 12 {
917 self.inline_buf[inline_len..new_len].copy_from_slice(bytes);
918 self.inline_len = new_len as u8;
919 return;
920 }
921
922 self.builder.ensure_long_capacity(new_len as u32);
926 let cursor = self.builder.in_progress.len();
927 self.builder
928 .in_progress
929 .extend_from_slice(&self.inline_buf[..inline_len]);
930 self.builder.in_progress.extend_from_slice(bytes);
931 self.spill_cursor = Some(cursor);
932 }
933
934 #[inline]
935 fn write_char(&mut self, c: char) {
936 let len = c.len_utf8();
937 if self.spill_cursor.is_some() {
938 push_char_to_vec(&mut self.builder.in_progress, c);
939 return;
940 }
941
942 let inline_len = self.inline_len as usize;
943 let new_len = inline_len + len;
944 if new_len <= 12 {
945 c.encode_utf8(&mut self.inline_buf[inline_len..new_len]);
946 self.inline_len = new_len as u8;
947 return;
948 }
949
950 self.builder.ensure_long_capacity(new_len as u32);
951 let cursor = self.builder.in_progress.len();
952 self.builder
953 .in_progress
954 .extend_from_slice(&self.inline_buf[..inline_len]);
955 push_char_to_vec(&mut self.builder.in_progress, c);
956 self.spill_cursor = Some(cursor);
957 }
958}
959
960#[inline]
961fn push_char_to_vec(v: &mut Vec<u8>, c: char) {
962 let mut buf = [0u8; 4];
963 v.extend_from_slice(c.encode_utf8(&mut buf).as_bytes());
964}
965
966pub(crate) trait BulkNullStringArrayBuilder {
993 type Writer<'a>: StringWriter
997 where
998 Self: 'a;
999
1000 fn append_value(&mut self, value: &str);
1007
1008 fn append_placeholder(&mut self);
1011
1012 fn append_with<F>(&mut self, f: F)
1023 where
1024 F: for<'a> FnOnce(&mut Self::Writer<'a>);
1025
1026 unsafe fn append_byte_map<F: FnMut(u8) -> u8>(&mut self, src: &[u8], map: F);
1043
1044 fn finish(self, nulls: Option<NullBuffer>) -> Result<ArrayRef>;
1051}
1052
1053impl<O: OffsetSizeTrait> BulkNullStringArrayBuilder for GenericStringArrayBuilder<O> {
1054 type Writer<'a> = GenericStringWriter<'a>;
1055
1056 #[inline]
1057 fn append_value(&mut self, value: &str) {
1058 GenericStringArrayBuilder::<O>::append_value(self, value)
1059 }
1060 #[inline]
1061 fn append_placeholder(&mut self) {
1062 GenericStringArrayBuilder::<O>::append_placeholder(self)
1063 }
1064 #[inline]
1065 fn append_with<F>(&mut self, f: F)
1066 where
1067 F: for<'a> FnOnce(&mut Self::Writer<'a>),
1068 {
1069 GenericStringArrayBuilder::<O>::append_with(self, f)
1070 }
1071 #[inline]
1072 unsafe fn append_byte_map<F: FnMut(u8) -> u8>(&mut self, src: &[u8], map: F) {
1073 unsafe { GenericStringArrayBuilder::<O>::append_byte_map(self, src, map) }
1075 }
1076 fn finish(self, nulls: Option<NullBuffer>) -> Result<ArrayRef> {
1077 Ok(Arc::new(GenericStringArrayBuilder::<O>::finish(
1078 self, nulls,
1079 )?))
1080 }
1081}
1082
1083impl BulkNullStringArrayBuilder for StringViewArrayBuilder {
1084 type Writer<'a> = StringViewWriter<'a>;
1085
1086 #[inline]
1087 fn append_value(&mut self, value: &str) {
1088 StringViewArrayBuilder::append_value(self, value)
1089 }
1090 #[inline]
1091 fn append_placeholder(&mut self) {
1092 StringViewArrayBuilder::append_placeholder(self)
1093 }
1094 #[inline]
1095 fn append_with<F>(&mut self, f: F)
1096 where
1097 F: for<'a> FnOnce(&mut Self::Writer<'a>),
1098 {
1099 StringViewArrayBuilder::append_with(self, f)
1100 }
1101 #[inline]
1102 unsafe fn append_byte_map<F: FnMut(u8) -> u8>(&mut self, src: &[u8], map: F) {
1103 unsafe { StringViewArrayBuilder::append_byte_map(self, src, map) }
1105 }
1106 fn finish(self, nulls: Option<NullBuffer>) -> Result<ArrayRef> {
1107 Ok(Arc::new(StringViewArrayBuilder::finish(self, nulls)?))
1108 }
1109}
1110
1111#[inline(never)]
1129pub(crate) fn append_view(
1130 views_buffer: &mut Vec<u128>,
1131 original_view: &u128,
1132 substr: &str,
1133 start_offset: u32,
1134) {
1135 let substr_len = substr.len();
1136 let sub_view = if substr_len > 12 {
1137 let view = ByteView::from(*original_view);
1138 make_view(
1139 substr.as_bytes(),
1140 view.buffer_index,
1141 view.offset + start_offset,
1142 )
1143 } else {
1144 make_view(substr.as_bytes(), 0, 0)
1145 };
1146 views_buffer.push(sub_view);
1147}
1148
1149#[derive(Debug)]
1150pub(crate) enum ColumnarValueRef<'a> {
1151 Scalar(&'a [u8]),
1152 NullableArray(&'a StringArray),
1153 NonNullableArray(&'a StringArray),
1154 NullableLargeStringArray(&'a LargeStringArray),
1155 NonNullableLargeStringArray(&'a LargeStringArray),
1156 NullableStringViewArray(&'a StringViewArray),
1157 NonNullableStringViewArray(&'a StringViewArray),
1158 NullableBinaryArray(&'a BinaryArray),
1159 NonNullableBinaryArray(&'a BinaryArray),
1160}
1161
1162impl ColumnarValueRef<'_> {
1163 #[inline]
1164 pub fn is_valid(&self, i: usize) -> bool {
1165 match &self {
1166 Self::Scalar(_)
1167 | Self::NonNullableArray(_)
1168 | Self::NonNullableLargeStringArray(_)
1169 | Self::NonNullableStringViewArray(_)
1170 | Self::NonNullableBinaryArray(_) => true,
1171 Self::NullableArray(array) => array.is_valid(i),
1172 Self::NullableStringViewArray(array) => array.is_valid(i),
1173 Self::NullableLargeStringArray(array) => array.is_valid(i),
1174 Self::NullableBinaryArray(array) => array.is_valid(i),
1175 }
1176 }
1177
1178 #[inline]
1179 pub fn nulls(&self) -> Option<NullBuffer> {
1180 match &self {
1181 Self::Scalar(_)
1182 | Self::NonNullableArray(_)
1183 | Self::NonNullableStringViewArray(_)
1184 | Self::NonNullableLargeStringArray(_)
1185 | Self::NonNullableBinaryArray(_) => None,
1186 Self::NullableArray(array) => array.nulls().cloned(),
1187 Self::NullableStringViewArray(array) => array.nulls().cloned(),
1188 Self::NullableLargeStringArray(array) => array.nulls().cloned(),
1189 Self::NullableBinaryArray(array) => array.nulls().cloned(),
1190 }
1191 }
1192}
1193
1194#[cfg(test)]
1195mod tests {
1196 use super::*;
1197
1198 fn run_scenario<B, F>(mut builder: B, expected: &[Option<&str>], scenario: F)
1207 where
1208 B: BulkNullStringArrayBuilder,
1209 F: FnOnce(&mut B),
1210 {
1211 scenario(&mut builder);
1212 let bits: Vec<bool> = expected.iter().map(|x| x.is_some()).collect();
1213 let nulls = if bits.iter().any(|v| !v) {
1214 Some(NullBuffer::from(bits))
1215 } else {
1216 None
1217 };
1218 let array = builder.finish(nulls).unwrap();
1219 let owned: Vec<Option<&str>> = expected.to_vec();
1220 if let Some(a) = array.as_any().downcast_ref::<StringArray>() {
1221 assert_eq!(a, &StringArray::from(owned));
1222 } else if let Some(a) = array.as_any().downcast_ref::<LargeStringArray>() {
1223 assert_eq!(a, &LargeStringArray::from(owned));
1224 } else if let Some(a) = array.as_any().downcast_ref::<StringViewArray>() {
1225 assert_eq!(a, &StringViewArray::from(owned));
1226 } else {
1227 panic!("unexpected array type");
1228 }
1229 }
1230
1231 macro_rules! check_on_all_builders {
1237 ($expected:expr, $scenario:expr $(,)?) => {{
1238 let expected = $expected;
1239 run_scenario(
1240 GenericStringArrayBuilder::<i32>::with_capacity(0, 0),
1241 expected,
1242 $scenario,
1243 );
1244 run_scenario(
1245 GenericStringArrayBuilder::<i64>::with_capacity(0, 0),
1246 expected,
1247 $scenario,
1248 );
1249 run_scenario(
1250 StringViewArrayBuilder::with_capacity(0),
1251 expected,
1252 $scenario,
1253 );
1254 }};
1255 }
1256
1257 fn assert_finish_errs_on_length_mismatch<B>(mut builder: B)
1258 where
1259 B: BulkNullStringArrayBuilder,
1260 {
1261 builder.append_value("a");
1262 builder.append_value("b");
1263 let nulls = NullBuffer::from(vec![true, false, true]);
1264 assert!(builder.finish(Some(nulls)).is_err());
1265 }
1266
1267 #[test]
1268 #[should_panic(expected = "capacity integer overflow")]
1269 fn test_overflow_concat_string_builder() {
1270 let _builder = ConcatStringBuilder::with_capacity(usize::MAX, usize::MAX);
1271 }
1272
1273 #[test]
1274 #[should_panic(expected = "capacity integer overflow")]
1275 fn test_overflow_concat_large_string_builder() {
1276 let _builder = ConcatLargeStringBuilder::with_capacity(usize::MAX, usize::MAX);
1277 }
1278
1279 #[test]
1280 fn bulk_append_value_with_nulls() {
1281 check_on_all_builders!(
1282 &[
1283 Some("a string longer than twelve bytes"),
1284 None,
1285 Some("short"),
1286 None,
1287 ],
1288 |b| {
1289 b.append_value("a string longer than twelve bytes");
1290 b.append_placeholder();
1291 b.append_value("short");
1292 b.append_placeholder();
1293 },
1294 );
1295 }
1296
1297 #[test]
1298 fn bulk_empty_builder() {
1299 check_on_all_builders!(&[], |_b| {});
1300 }
1301
1302 #[test]
1303 fn bulk_all_placeholders() {
1304 check_on_all_builders!(&[None, None, None], |b| {
1305 b.append_placeholder();
1306 b.append_placeholder();
1307 b.append_placeholder();
1308 });
1309 }
1310
1311 #[test]
1312 fn bulk_append_value_no_nulls() {
1313 check_on_all_builders!(
1314 &[
1315 Some("foo"),
1316 Some(""),
1317 Some("a string longer than twelve bytes")
1318 ],
1319 |b| {
1320 b.append_value("foo");
1321 b.append_value("");
1322 b.append_value("a string longer than twelve bytes");
1323 },
1324 );
1325 }
1326
1327 #[test]
1328 fn bulk_append_with() {
1329 check_on_all_builders!(
1330 &[
1331 Some("hello"),
1332 None,
1333 Some("hello world"),
1334 Some("a long string of 25 bytes"),
1335 Some(""),
1336 ],
1337 |b| {
1338 b.append_with(|w| w.write_str("hello"));
1339 b.append_placeholder();
1340 b.append_with(|w| {
1341 w.write_str("hello ");
1342 w.write_str("world");
1343 });
1344 b.append_with(|w| w.write_str("a long string of 25 bytes"));
1345 b.append_with(|_w| {});
1346 },
1347 );
1348 }
1349
1350 #[test]
1351 fn bulk_append_with_chars() {
1352 check_on_all_builders!(&[Some("hé!"), Some("x")], |b| {
1353 b.append_with(|w| {
1354 w.write_char('h');
1355 w.write_char('é');
1356 w.write_char('!');
1357 });
1358 b.append_with(|w| w.write_char('x'));
1359 });
1360 }
1361
1362 #[test]
1363 fn bulk_append_byte_map() {
1364 check_on_all_builders!(&[Some("HELLO"), Some("aXcaX"), Some("")], |b| unsafe {
1366 b.append_byte_map(b"hello", |x| x.to_ascii_uppercase());
1367 b.append_byte_map(b"abcab", |x| if x == b'b' { b'X' } else { x });
1368 b.append_byte_map(b"", |x| x);
1369 },);
1370 }
1371
1372 #[test]
1373 fn bulk_finish_errors_on_null_buffer_length_mismatch() {
1374 assert_finish_errs_on_length_mismatch(
1375 GenericStringArrayBuilder::<i32>::with_capacity(2, 4),
1376 );
1377 assert_finish_errs_on_length_mismatch(
1378 GenericStringArrayBuilder::<i64>::with_capacity(2, 4),
1379 );
1380 assert_finish_errs_on_length_mismatch(StringViewArrayBuilder::with_capacity(2));
1381 }
1382
1383 #[test]
1384 #[cfg(debug_assertions)]
1385 #[should_panic(expected = "placeholder rows")]
1386 fn string_array_builder_placeholder_without_null_mask() {
1387 let mut builder = GenericStringArrayBuilder::<i32>::with_capacity(2, 4);
1388 builder.append_value("a");
1389 builder.append_placeholder();
1390 let nulls = NullBuffer::from(vec![true, true]);
1392 let _ = builder.finish(Some(nulls));
1393 }
1394
1395 #[test]
1396 #[cfg(debug_assertions)]
1397 #[should_panic(expected = "placeholder rows")]
1398 fn string_array_builder_placeholder_with_none_null_buffer() {
1399 let mut builder = GenericStringArrayBuilder::<i32>::with_capacity(1, 4);
1400 builder.append_placeholder();
1401 let _ = builder.finish(None);
1402 }
1403
1404 #[test]
1405 #[cfg(debug_assertions)]
1406 #[should_panic(expected = "placeholder rows")]
1407 fn string_view_array_builder_placeholder_without_null_mask() {
1408 let mut builder = StringViewArrayBuilder::with_capacity(2);
1409 builder.append_value("a");
1410 builder.append_placeholder();
1411 let nulls = NullBuffer::from(vec![true, true]);
1412 let _ = builder.finish(Some(nulls));
1413 }
1414
1415 #[test]
1416 #[cfg(debug_assertions)]
1417 #[should_panic(expected = "placeholder rows")]
1418 fn string_view_array_builder_placeholder_with_none_null_buffer() {
1419 let mut builder = StringViewArrayBuilder::with_capacity(1);
1420 builder.append_placeholder();
1421 let _ = builder.finish(None);
1422 }
1423
1424 #[test]
1425 fn string_view_array_builder_append_with_inline() {
1426 let mut builder = StringViewArrayBuilder::with_capacity(4);
1428 let inputs = ["hello", "world!", "", "0123456789ab"];
1429 for s in &inputs {
1430 builder.append_with(|w| w.write_str(s));
1431 }
1432 let array = builder.finish(None).unwrap();
1433 assert_eq!(array.len(), inputs.len());
1434 for (i, s) in inputs.iter().enumerate() {
1435 assert_eq!(array.value(i), *s);
1436 }
1437 assert_eq!(array.data_buffers().len(), 0);
1438 }
1439
1440 #[test]
1441 fn string_view_array_builder_append_byte_map() {
1442 let mut builder = StringViewArrayBuilder::with_capacity(4);
1443 unsafe {
1445 builder.append_byte_map(b"hello", |b| b.to_ascii_uppercase());
1446 builder.append_byte_map(b"a long string of 25 bytes", |b| {
1447 if b == b' ' { b'_' } else { b }
1448 });
1449 builder.append_byte_map(b"abcdefghijkl", |b| b);
1451 builder.append_byte_map(b"", |b| b);
1452 }
1453 let array = builder.finish(None).unwrap();
1454 assert_eq!(array.value(0), "HELLO");
1455 assert_eq!(array.value(1), "a_long_string_of_25_bytes");
1456 assert_eq!(array.value(2), "abcdefghijkl");
1457 assert_eq!(array.value(3), "");
1458 assert_eq!(array.data_buffers().len(), 1);
1459 assert_eq!(array.data_buffers()[0].len(), 25);
1460 }
1461
1462 #[test]
1463 fn string_view_array_builder_append_with_at_inline_boundary() {
1464 let mut builder = StringViewArrayBuilder::with_capacity(2);
1466 builder.append_with(|w| {
1467 w.write_str("hello");
1468 w.write_str(" world!");
1469 });
1470 builder.append_with(|w| {
1471 for _ in 0..6 {
1472 w.write_str("ab");
1473 }
1474 });
1475 let array = builder.finish(None).unwrap();
1476 assert_eq!(array.value(0), "hello world!");
1477 assert_eq!(array.value(1), "abababababab");
1478 assert_eq!(array.data_buffers().len(), 0);
1479 }
1480
1481 #[test]
1482 fn string_view_array_builder_append_with_spill_on_overflow() {
1483 let mut builder = StringViewArrayBuilder::with_capacity(1);
1485 builder.append_with(|w| {
1486 w.write_str("hello world!");
1487 w.write_str("X");
1488 });
1489 let array = builder.finish(None).unwrap();
1490 assert_eq!(array.value(0), "hello world!X");
1491 assert_eq!(array.data_buffers().len(), 1);
1492 assert_eq!(array.data_buffers()[0].len(), 13);
1493 }
1494
1495 #[test]
1496 fn string_view_array_builder_append_with_long_single_write() {
1497 let mut builder = StringViewArrayBuilder::with_capacity(1);
1500 builder.append_with(|w| w.write_str("a long string of 25 bytes"));
1501 let array = builder.finish(None).unwrap();
1502 assert_eq!(array.value(0), "a long string of 25 bytes");
1503 assert_eq!(array.data_buffers().len(), 1);
1504 assert_eq!(array.data_buffers()[0].len(), 25);
1505 }
1506
1507 #[test]
1508 fn string_view_array_builder_append_with_many_small_writes_spilling() {
1509 let mut builder = StringViewArrayBuilder::with_capacity(1);
1511 builder.append_with(|w| {
1512 for _ in 0..30 {
1513 w.write_str("ab");
1514 }
1515 });
1516 let array = builder.finish(None).unwrap();
1517 assert_eq!(array.value(0), "ab".repeat(30));
1518 assert_eq!(array.data_buffers().len(), 1);
1519 assert_eq!(array.data_buffers()[0].len(), 60);
1520 }
1521
1522 #[test]
1523 fn string_view_array_builder_append_with_chars() {
1524 let mut builder = StringViewArrayBuilder::with_capacity(2);
1527 builder.append_with(|w| {
1528 w.write_char('é');
1529 w.write_char('!');
1530 });
1531 builder.append_with(|w| {
1532 for _ in 0..10 {
1533 w.write_char('🦀');
1534 }
1535 });
1536 let array = builder.finish(None).unwrap();
1537 assert_eq!(array.value(0), "é!");
1538 assert_eq!(array.value(1), "🦀".repeat(10));
1539 }
1540
1541 #[test]
1542 fn string_view_array_builder_append_with_block_rotation() {
1543 const STR_LEN: usize = 500;
1546 const N: usize = 40;
1547 let s = "x".repeat(STR_LEN);
1548 let mut builder = StringViewArrayBuilder::with_capacity(N);
1549 for _ in 0..N {
1550 builder.append_with(|w| w.write_str(&s));
1551 }
1552 let array = builder.finish(None).unwrap();
1553 assert_eq!(array.len(), N);
1554 assert!(
1555 array.data_buffers().len() >= 2,
1556 "expected multiple data buffers, got {}",
1557 array.data_buffers().len()
1558 );
1559 let total: usize = array.data_buffers().iter().map(|b| b.len()).sum();
1560 assert_eq!(total, N * STR_LEN);
1561 for i in 0..N {
1562 assert_eq!(array.value(i), s);
1563 }
1564 }
1565
1566 #[test]
1567 fn string_view_array_builder_flushes_full_blocks() {
1568 let value = "x".repeat(300);
1572 let mut builder = StringViewArrayBuilder::with_capacity(100);
1573 for _ in 0..100 {
1574 builder.append_value(&value);
1575 }
1576 let array = builder.finish(None).unwrap();
1577 assert_eq!(array.len(), 100);
1578 assert!(
1579 array.data_buffers().len() > 1,
1580 "expected multiple data buffers, got {}",
1581 array.data_buffers().len()
1582 );
1583 for i in 0..100 {
1584 assert_eq!(array.value(i), value);
1585 }
1586 }
1587}