1use std::any::Any;
19use std::marker::PhantomData;
20use std::sync::Arc;
21
22use arrow_buffer::{Buffer, NullBufferBuilder, ScalarBuffer};
23use arrow_data::{ByteView, MAX_INLINE_VIEW_LEN};
24use arrow_schema::ArrowError;
25use hashbrown::HashTable;
26use hashbrown::hash_table::Entry;
27
28use crate::builder::{ArrayBuilder, BinaryLikeArrayBuilder, StringLikeArrayBuilder};
29use crate::types::bytes::ByteArrayNativeType;
30use crate::types::{BinaryViewType, ByteViewType, StringViewType};
31use crate::{Array, ArrayRef, GenericByteViewArray};
32
33const STARTING_BLOCK_SIZE: u32 = 8 * 1024; const MAX_BLOCK_SIZE: u32 = 2 * 1024 * 1024; enum BlockSizeGrowthStrategy {
37 Fixed { size: u32 },
38 Exponential { current_size: u32 },
39}
40
41impl BlockSizeGrowthStrategy {
42 fn next_size(&mut self) -> u32 {
43 match self {
44 Self::Fixed { size } => *size,
45 Self::Exponential { current_size } => {
46 if *current_size < MAX_BLOCK_SIZE {
47 *current_size = current_size.saturating_mul(2);
49 *current_size
50 } else {
51 MAX_BLOCK_SIZE
52 }
53 }
54 }
55 }
56}
57
58pub struct GenericByteViewBuilder<T: ByteViewType + ?Sized> {
82 views_buffer: Vec<u128>,
83 null_buffer_builder: NullBufferBuilder,
84 completed: Vec<Buffer>,
85 in_progress: Vec<u8>,
86 block_size: BlockSizeGrowthStrategy,
87 string_tracker: Option<(HashTable<usize>, ahash::RandomState)>,
90 phantom: PhantomData<T>,
91}
92
93impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
94 pub fn new() -> Self {
96 Self::with_capacity(1024)
97 }
98
99 pub fn with_capacity(capacity: usize) -> Self {
101 Self {
102 views_buffer: Vec::with_capacity(capacity),
103 null_buffer_builder: NullBufferBuilder::new(capacity),
104 completed: vec![],
105 in_progress: vec![],
106 block_size: BlockSizeGrowthStrategy::Exponential {
107 current_size: STARTING_BLOCK_SIZE,
108 },
109 string_tracker: None,
110 phantom: Default::default(),
111 }
112 }
113
114 pub fn with_fixed_block_size(self, block_size: u32) -> Self {
130 debug_assert!(block_size > 0, "Block size must be greater than 0");
131 Self {
132 block_size: BlockSizeGrowthStrategy::Fixed { size: block_size },
133 ..self
134 }
135 }
136
137 pub fn with_deduplicate_strings(self) -> Self {
142 Self {
143 string_tracker: Some((
144 HashTable::with_capacity(self.views_buffer.capacity()),
145 Default::default(),
146 )),
147 ..self
148 }
149 }
150
151 pub fn append_block(&mut self, buffer: Buffer) -> u32 {
176 assert!(buffer.len() < u32::MAX as usize);
177
178 self.flush_in_progress();
179 let offset = self.completed.len();
180 self.push_completed(buffer);
181 offset as u32
182 }
183
184 pub unsafe fn append_view_unchecked(&mut self, block: u32, offset: u32, len: u32) {
191 let b = unsafe { self.completed.get_unchecked(block as usize) };
192 let start = offset as usize;
193 let end = start.saturating_add(len as usize);
194 let b = unsafe { b.get_unchecked(start..end) };
195
196 let view = make_view(b, block, offset);
197 self.views_buffer.push(view);
198 self.null_buffer_builder.append_non_null();
199 }
200
201 pub fn append_array(&mut self, array: &GenericByteViewArray<T>) {
205 self.flush_in_progress();
206 let keep_views = self.completed.is_empty() || array.data_buffers().is_empty();
208 let starting_buffer = self.completed.len() as u32;
209
210 self.completed.extend(array.data_buffers().iter().cloned());
211
212 if keep_views {
213 self.views_buffer.extend_from_slice(array.views());
214 } else {
215 self.views_buffer.extend(array.views().iter().map(|v| {
216 let mut byte_view = ByteView::from(*v);
217 if byte_view.length > MAX_INLINE_VIEW_LEN {
218 byte_view.buffer_index += starting_buffer;
220 };
221
222 byte_view.as_u128()
223 }));
224 }
225
226 if let Some(null_buffer) = array.nulls() {
227 self.null_buffer_builder.append_buffer(null_buffer);
228 } else {
229 self.null_buffer_builder.append_n_non_nulls(array.len());
230 }
231 }
232
233 pub fn try_append_view(&mut self, block: u32, offset: u32, len: u32) -> Result<(), ArrowError> {
237 let b = self.completed.get(block as usize).ok_or_else(|| {
238 ArrowError::InvalidArgumentError(format!("No block found with index {block}"))
239 })?;
240 let start = offset as usize;
241 let end = start.saturating_add(len as usize);
242
243 let b = b.get(start..end).ok_or_else(|| {
244 ArrowError::InvalidArgumentError(format!(
245 "Range {start}..{end} out of bounds for block of length {}",
246 b.len()
247 ))
248 })?;
249
250 if T::Native::from_bytes_checked(b).is_none() {
251 return Err(ArrowError::InvalidArgumentError(
252 "Invalid view data".to_string(),
253 ));
254 }
255
256 unsafe {
257 self.append_view_unchecked(block, offset, len);
258 }
259 Ok(())
260 }
261
262 #[inline]
264 fn flush_in_progress(&mut self) {
265 if !self.in_progress.is_empty() {
266 let f = Buffer::from_vec(std::mem::take(&mut self.in_progress));
267 self.push_completed(f)
268 }
269 }
270
271 #[inline]
273 fn push_completed(&mut self, block: Buffer) {
274 assert!(block.len() < u32::MAX as usize, "Block too large");
275 assert!(self.completed.len() < u32::MAX as usize, "Too many blocks");
276 self.completed.push(block);
277 }
278
279 pub fn get_value(&self, index: usize) -> &[u8] {
283 let view = self.views_buffer.as_slice().get(index).unwrap();
284 let len = *view as u32;
285 if len <= MAX_INLINE_VIEW_LEN {
286 unsafe { GenericByteViewArray::<T>::inline_value(view, len as usize) }
289 } else {
290 let view = ByteView::from(*view);
291 if view.buffer_index < self.completed.len() as u32 {
292 let block = &self.completed[view.buffer_index as usize];
293 &block[view.offset as usize..view.offset as usize + view.length as usize]
294 } else {
295 &self.in_progress[view.offset as usize..view.offset as usize + view.length as usize]
296 }
297 }
298 }
299
300 #[inline]
308 pub fn append_value(&mut self, value: impl AsRef<T::Native>) {
309 self.try_append_value(value).unwrap()
310 }
311
312 #[inline]
320 pub fn try_append_value(&mut self, value: impl AsRef<T::Native>) -> Result<(), ArrowError> {
321 let v: &[u8] = value.as_ref().as_ref();
322 let length: u32 = v.len().try_into().map_err(|_| {
323 ArrowError::InvalidArgumentError(format!("String length {} exceeds u32::MAX", v.len()))
324 })?;
325
326 if length <= MAX_INLINE_VIEW_LEN {
327 let mut view_buffer = [0; 16];
328 view_buffer[0..4].copy_from_slice(&length.to_le_bytes());
329 view_buffer[4..4 + v.len()].copy_from_slice(v);
330 self.views_buffer.push(u128::from_le_bytes(view_buffer));
331 self.null_buffer_builder.append_non_null();
332 return Ok(());
333 }
334
335 if let Some((mut ht, hasher)) = self.string_tracker.take() {
339 let hash_val = hasher.hash_one(v);
340 let hasher_fn = |v: &_| hasher.hash_one(v);
341
342 let entry = ht.entry(
343 hash_val,
344 |idx| {
345 let stored_value = self.get_value(*idx);
346 v == stored_value
347 },
348 hasher_fn,
349 );
350 match entry {
351 Entry::Occupied(occupied) => {
352 let idx = occupied.get();
354 self.views_buffer.push(self.views_buffer[*idx]);
355 self.null_buffer_builder.append_non_null();
356 self.string_tracker = Some((ht, hasher));
357 return Ok(());
358 }
359 Entry::Vacant(vacant) => {
360 vacant.insert(self.views_buffer.len());
363 }
364 }
365 self.string_tracker = Some((ht, hasher));
366 }
367
368 let required_cap = self.in_progress.len() + v.len();
369 if self.in_progress.capacity() < required_cap {
370 self.flush_in_progress();
371 let to_reserve = v.len().max(self.block_size.next_size() as usize);
372 self.in_progress.reserve(to_reserve);
373 };
374
375 let offset = self.in_progress.len() as u32;
376 self.in_progress.extend_from_slice(v);
377
378 let buffer_index: u32 = self.completed.len().try_into().map_err(|_| {
379 ArrowError::InvalidArgumentError(format!(
380 "Buffer count {} exceeds u32::MAX",
381 self.completed.len()
382 ))
383 })?;
384
385 let view = ByteView {
386 length,
387 prefix: u32::from_le_bytes(v[0..4].try_into().unwrap()),
389 buffer_index,
390 offset,
391 };
392 self.views_buffer.push(view.into());
393 self.null_buffer_builder.append_non_null();
394
395 Ok(())
396 }
397
398 #[inline]
400 pub fn append_option(&mut self, value: Option<impl AsRef<T::Native>>) {
401 match value {
402 None => self.append_null(),
403 Some(v) => self.append_value(v),
404 };
405 }
406
407 #[inline]
409 pub fn append_null(&mut self) {
410 self.null_buffer_builder.append_null();
411 self.views_buffer.push(0);
412 }
413
414 pub fn finish(&mut self) -> GenericByteViewArray<T> {
416 self.flush_in_progress();
417 let completed = std::mem::take(&mut self.completed);
418 let nulls = self.null_buffer_builder.finish();
419 if let Some((ht, _)) = self.string_tracker.as_mut() {
420 ht.clear();
421 }
422 let views = std::mem::take(&mut self.views_buffer);
423 unsafe { GenericByteViewArray::new_unchecked(views.into(), completed, nulls) }
425 }
426
427 pub fn finish_cloned(&self) -> GenericByteViewArray<T> {
429 let mut completed = self.completed.clone();
430 if !self.in_progress.is_empty() {
431 completed.push(Buffer::from_slice_ref(&self.in_progress));
432 }
433 let len = self.views_buffer.len();
434 let views = Buffer::from_slice_ref(self.views_buffer.as_slice());
435 let views = ScalarBuffer::new(views, 0, len);
436 let nulls = self.null_buffer_builder.finish_cloned();
437 unsafe { GenericByteViewArray::new_unchecked(views, completed, nulls) }
439 }
440
441 pub fn validity_slice(&self) -> Option<&[u8]> {
443 self.null_buffer_builder.as_slice()
444 }
445
446 pub fn allocated_size(&self) -> usize {
448 let views = self.views_buffer.capacity() * std::mem::size_of::<u128>();
449 let null = self.null_buffer_builder.allocated_size();
450 let buffer_size = self.completed.iter().map(|b| b.capacity()).sum::<usize>();
451 let in_progress = self.in_progress.capacity();
452 let tracker = match &self.string_tracker {
453 Some((ht, _)) => ht.capacity() * std::mem::size_of::<usize>(),
454 None => 0,
455 };
456 buffer_size + in_progress + tracker + views + null
457 }
458}
459
460impl<T: ByteViewType + ?Sized> Default for GenericByteViewBuilder<T> {
461 fn default() -> Self {
462 Self::new()
463 }
464}
465
466impl<T: ByteViewType + ?Sized> std::fmt::Debug for GenericByteViewBuilder<T> {
467 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
468 write!(f, "{}ViewBuilder", T::PREFIX)?;
469 f.debug_struct("")
470 .field("views_buffer", &self.views_buffer)
471 .field("in_progress", &self.in_progress)
472 .field("completed", &self.completed)
473 .field("null_buffer_builder", &self.null_buffer_builder)
474 .finish()
475 }
476}
477
478impl<T: ByteViewType + ?Sized> ArrayBuilder for GenericByteViewBuilder<T> {
479 fn len(&self) -> usize {
480 self.null_buffer_builder.len()
481 }
482
483 fn finish(&mut self) -> ArrayRef {
484 Arc::new(self.finish())
485 }
486
487 fn finish_cloned(&self) -> ArrayRef {
488 Arc::new(self.finish_cloned())
489 }
490
491 fn as_any(&self) -> &dyn Any {
492 self
493 }
494
495 fn as_any_mut(&mut self) -> &mut dyn Any {
496 self
497 }
498
499 fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
500 self
501 }
502}
503
504impl<T: ByteViewType + ?Sized, V: AsRef<T::Native>> Extend<Option<V>>
505 for GenericByteViewBuilder<T>
506{
507 #[inline]
508 fn extend<I: IntoIterator<Item = Option<V>>>(&mut self, iter: I) {
509 for v in iter {
510 self.append_option(v)
511 }
512 }
513}
514
515pub type StringViewBuilder = GenericByteViewBuilder<StringViewType>;
535
536impl StringLikeArrayBuilder for StringViewBuilder {
537 fn type_name() -> &'static str {
538 std::any::type_name::<StringViewBuilder>()
539 }
540 fn with_capacity(capacity: usize) -> Self {
541 Self::with_capacity(capacity)
542 }
543 fn append_value(&mut self, value: &str) {
544 Self::append_value(self, value);
545 }
546 fn append_null(&mut self) {
547 Self::append_null(self);
548 }
549}
550
551pub type BinaryViewBuilder = GenericByteViewBuilder<BinaryViewType>;
572
573impl BinaryLikeArrayBuilder for BinaryViewBuilder {
574 fn type_name() -> &'static str {
575 std::any::type_name::<BinaryViewBuilder>()
576 }
577 fn with_capacity(capacity: usize) -> Self {
578 Self::with_capacity(capacity)
579 }
580 fn append_value(&mut self, value: &[u8]) {
581 Self::append_value(self, value);
582 }
583 fn append_null(&mut self) {
584 Self::append_null(self);
585 }
586}
587
588fn make_inlined_view<const LEN: usize>(data: &[u8]) -> u128 {
591 let mut view_buffer = [0; 16];
592 view_buffer[0..4].copy_from_slice(&(LEN as u32).to_le_bytes());
593 view_buffer[4..4 + LEN].copy_from_slice(&data[..LEN]);
594 u128::from_le_bytes(view_buffer)
595}
596
597#[inline(never)]
603pub fn make_view(data: &[u8], block_id: u32, offset: u32) -> u128 {
604 let len = data.len();
605
606 match len {
609 0 => make_inlined_view::<0>(data),
610 1 => make_inlined_view::<1>(data),
611 2 => make_inlined_view::<2>(data),
612 3 => make_inlined_view::<3>(data),
613 4 => make_inlined_view::<4>(data),
614 5 => make_inlined_view::<5>(data),
615 6 => make_inlined_view::<6>(data),
616 7 => make_inlined_view::<7>(data),
617 8 => make_inlined_view::<8>(data),
618 9 => make_inlined_view::<9>(data),
619 10 => make_inlined_view::<10>(data),
620 11 => make_inlined_view::<11>(data),
621 12 => make_inlined_view::<12>(data),
622 _ => {
624 let view = ByteView {
625 length: len as u32,
626 prefix: u32::from_le_bytes(data[0..4].try_into().unwrap()),
627 buffer_index: block_id,
628 offset,
629 };
630 view.as_u128()
631 }
632 }
633}
634
635#[cfg(test)]
636mod tests {
637 use core::str;
638
639 use super::*;
640
641 #[test]
642 fn test_string_view_deduplicate() {
643 let value_1 = "long string to test string view";
644 let value_2 = "not so similar string but long";
645
646 let mut builder = StringViewBuilder::new()
647 .with_deduplicate_strings()
648 .with_fixed_block_size(value_1.len() as u32 * 2); let values = vec![
651 Some(value_1),
652 Some(value_2),
653 Some("short"),
654 Some(value_1),
655 None,
656 Some(value_2),
657 Some(value_1),
658 ];
659 builder.extend(values.clone());
660
661 let array = builder.finish_cloned();
662 array.to_data().validate_full().unwrap();
663 assert_eq!(array.data_buffers().len(), 1); let actual: Vec<_> = array.iter().collect();
665 assert_eq!(actual, values);
666
667 let view0 = array.views().first().unwrap();
668 let view3 = array.views().get(3).unwrap();
669 let view6 = array.views().get(6).unwrap();
670
671 assert_eq!(view0, view3);
672 assert_eq!(view0, view6);
673
674 assert_eq!(array.views().get(1), array.views().get(5));
675 }
676
677 #[test]
678 fn test_string_view_deduplicate_after_finish() {
679 let mut builder = StringViewBuilder::new().with_deduplicate_strings();
680
681 let value_1 = "long string to test string view";
682 let value_2 = "not so similar string but long";
683 builder.append_value(value_1);
684 let _array = builder.finish();
685 builder.append_value(value_2);
686 let _array = builder.finish();
687 builder.append_value(value_1);
688 let _array = builder.finish();
689 }
690
691 #[test]
692 fn test_string_view() {
693 let b1 = Buffer::from(b"world\xFFbananas\xF0\x9F\x98\x81");
694 let b2 = Buffer::from(b"cupcakes");
695 let b3 = Buffer::from(b"Many strings are here contained of great length and verbosity");
696
697 let mut v = StringViewBuilder::new();
698 assert_eq!(v.append_block(b1), 0);
699
700 v.append_value("This is a very long string that exceeds the inline length");
701 v.append_value("This is another very long string that exceeds the inline length");
702
703 assert_eq!(v.append_block(b2), 2);
704 assert_eq!(v.append_block(b3), 3);
705
706 v.try_append_view(0, 0, 5).unwrap(); v.try_append_view(0, 6, 7).unwrap(); v.try_append_view(2, 3, 5).unwrap(); v.try_append_view(2, 0, 3).unwrap(); v.try_append_view(2, 0, 8).unwrap(); v.try_append_view(0, 13, 4).unwrap(); v.try_append_view(0, 13, 0).unwrap(); v.try_append_view(3, 0, 16).unwrap(); v.try_append_view(1, 0, 19).unwrap(); v.try_append_view(3, 13, 27).unwrap(); v.append_value("I do so like long strings");
721
722 let array = v.finish_cloned();
723 array.to_data().validate_full().unwrap();
724 assert_eq!(array.data_buffers().len(), 5);
725 let actual: Vec<_> = array.iter().flatten().collect();
726 assert_eq!(
727 actual,
728 &[
729 "This is a very long string that exceeds the inline length",
730 "This is another very long string that exceeds the inline length",
731 "world",
732 "bananas",
733 "cakes",
734 "cup",
735 "cupcakes",
736 "😁",
737 "",
738 "Many strings are",
739 "This is a very long",
740 "are here contained of great",
741 "I do so like long strings"
742 ]
743 );
744
745 let err = v.try_append_view(0, u32::MAX, 1).unwrap_err();
746 assert_eq!(
747 err.to_string(),
748 "Invalid argument error: Range 4294967295..4294967296 out of bounds for block of length 17"
749 );
750
751 let err = v.try_append_view(0, 1, u32::MAX).unwrap_err();
752 assert_eq!(
753 err.to_string(),
754 "Invalid argument error: Range 1..4294967296 out of bounds for block of length 17"
755 );
756
757 let err = v.try_append_view(0, 13, 2).unwrap_err();
758 assert_eq!(err.to_string(), "Invalid argument error: Invalid view data");
759
760 let err = v.try_append_view(0, 40, 0).unwrap_err();
761 assert_eq!(
762 err.to_string(),
763 "Invalid argument error: Range 40..40 out of bounds for block of length 17"
764 );
765
766 let err = v.try_append_view(5, 0, 0).unwrap_err();
767 assert_eq!(
768 err.to_string(),
769 "Invalid argument error: No block found with index 5"
770 );
771 }
772
773 #[test]
774 fn test_string_view_with_block_size_growth() {
775 let mut exp_builder = StringViewBuilder::new();
776 let mut fixed_builder = StringViewBuilder::new().with_fixed_block_size(STARTING_BLOCK_SIZE);
777
778 let long_string = str::from_utf8(&[b'a'; STARTING_BLOCK_SIZE as usize]).unwrap();
779
780 for i in 0..9 {
781 for _ in 0..(2_u32.pow(i)) {
783 exp_builder.append_value(long_string);
784 fixed_builder.append_value(long_string);
785 }
786 exp_builder.flush_in_progress();
787 fixed_builder.flush_in_progress();
788
789 assert_eq!(exp_builder.completed.len(), i as usize + 1);
791 assert_eq!(
792 exp_builder.completed[i as usize].len(),
793 STARTING_BLOCK_SIZE as usize * 2_usize.pow(i)
794 );
795
796 assert_eq!(fixed_builder.completed.len(), 2_usize.pow(i + 1) - 1);
798
799 assert!(
801 fixed_builder
802 .completed
803 .iter()
804 .all(|b| b.len() == STARTING_BLOCK_SIZE as usize)
805 );
806 }
807
808 exp_builder.append_value(long_string);
810 exp_builder.flush_in_progress();
811 assert_eq!(
812 exp_builder.completed.last().unwrap().capacity(),
813 MAX_BLOCK_SIZE as usize
814 );
815 }
816}