1use std::any::Any;
19use std::marker::PhantomData;
20use std::sync::Arc;
21
22use arrow_buffer::{Buffer, NullBufferBuilder, ScalarBuffer};
23use arrow_data::{ByteView, MAX_INLINE_VIEW_LEN};
24use arrow_schema::ArrowError;
25use hashbrown::HashTable;
26use hashbrown::hash_table::Entry;
27
28use crate::builder::ArrayBuilder;
29use crate::types::bytes::ByteArrayNativeType;
30use crate::types::{BinaryViewType, ByteViewType, StringViewType};
31use crate::{Array, ArrayRef, GenericByteViewArray};
32
33const STARTING_BLOCK_SIZE: u32 = 8 * 1024; const MAX_BLOCK_SIZE: u32 = 2 * 1024 * 1024; enum BlockSizeGrowthStrategy {
37 Fixed { size: u32 },
38 Exponential { current_size: u32 },
39}
40
41impl BlockSizeGrowthStrategy {
42 fn next_size(&mut self) -> u32 {
43 match self {
44 Self::Fixed { size } => *size,
45 Self::Exponential { current_size } => {
46 if *current_size < MAX_BLOCK_SIZE {
47 *current_size = current_size.saturating_mul(2);
49 *current_size
50 } else {
51 MAX_BLOCK_SIZE
52 }
53 }
54 }
55 }
56}
57
58pub struct GenericByteViewBuilder<T: ByteViewType + ?Sized> {
82 views_buffer: Vec<u128>,
83 null_buffer_builder: NullBufferBuilder,
84 completed: Vec<Buffer>,
85 in_progress: Vec<u8>,
86 block_size: BlockSizeGrowthStrategy,
87 string_tracker: Option<(HashTable<usize>, ahash::RandomState)>,
90 phantom: PhantomData<T>,
91}
92
93impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
94 pub fn new() -> Self {
96 Self::with_capacity(1024)
97 }
98
99 pub fn with_capacity(capacity: usize) -> Self {
101 Self {
102 views_buffer: Vec::with_capacity(capacity),
103 null_buffer_builder: NullBufferBuilder::new(capacity),
104 completed: vec![],
105 in_progress: vec![],
106 block_size: BlockSizeGrowthStrategy::Exponential {
107 current_size: STARTING_BLOCK_SIZE,
108 },
109 string_tracker: None,
110 phantom: Default::default(),
111 }
112 }
113
114 pub fn with_fixed_block_size(self, block_size: u32) -> Self {
130 debug_assert!(block_size > 0, "Block size must be greater than 0");
131 Self {
132 block_size: BlockSizeGrowthStrategy::Fixed { size: block_size },
133 ..self
134 }
135 }
136
137 pub fn with_deduplicate_strings(self) -> Self {
142 Self {
143 string_tracker: Some((
144 HashTable::with_capacity(self.views_buffer.capacity()),
145 Default::default(),
146 )),
147 ..self
148 }
149 }
150
151 pub fn append_block(&mut self, buffer: Buffer) -> u32 {
176 assert!(buffer.len() < u32::MAX as usize);
177
178 self.flush_in_progress();
179 let offset = self.completed.len();
180 self.push_completed(buffer);
181 offset as u32
182 }
183
184 pub unsafe fn append_view_unchecked(&mut self, block: u32, offset: u32, len: u32) {
191 let b = unsafe { self.completed.get_unchecked(block as usize) };
192 let start = offset as usize;
193 let end = start.saturating_add(len as usize);
194 let b = unsafe { b.get_unchecked(start..end) };
195
196 let view = make_view(b, block, offset);
197 self.views_buffer.push(view);
198 self.null_buffer_builder.append_non_null();
199 }
200
201 pub fn append_array(&mut self, array: &GenericByteViewArray<T>) {
205 self.flush_in_progress();
206 let keep_views = self.completed.is_empty() || array.data_buffers().is_empty();
208 let starting_buffer = self.completed.len() as u32;
209
210 self.completed.extend(array.data_buffers().iter().cloned());
211
212 if keep_views {
213 self.views_buffer.extend_from_slice(array.views());
214 } else {
215 self.views_buffer.extend(array.views().iter().map(|v| {
216 let mut byte_view = ByteView::from(*v);
217 if byte_view.length > MAX_INLINE_VIEW_LEN {
218 byte_view.buffer_index += starting_buffer;
220 };
221
222 byte_view.as_u128()
223 }));
224 }
225
226 if let Some(null_buffer) = array.nulls() {
227 self.null_buffer_builder.append_buffer(null_buffer);
228 } else {
229 self.null_buffer_builder.append_n_non_nulls(array.len());
230 }
231 }
232
233 pub fn try_append_view(&mut self, block: u32, offset: u32, len: u32) -> Result<(), ArrowError> {
237 let b = self.completed.get(block as usize).ok_or_else(|| {
238 ArrowError::InvalidArgumentError(format!("No block found with index {block}"))
239 })?;
240 let start = offset as usize;
241 let end = start.saturating_add(len as usize);
242
243 let b = b.get(start..end).ok_or_else(|| {
244 ArrowError::InvalidArgumentError(format!(
245 "Range {start}..{end} out of bounds for block of length {}",
246 b.len()
247 ))
248 })?;
249
250 if T::Native::from_bytes_checked(b).is_none() {
251 return Err(ArrowError::InvalidArgumentError(
252 "Invalid view data".to_string(),
253 ));
254 }
255
256 unsafe {
257 self.append_view_unchecked(block, offset, len);
258 }
259 Ok(())
260 }
261
262 #[inline]
264 fn flush_in_progress(&mut self) {
265 if !self.in_progress.is_empty() {
266 let f = Buffer::from_vec(std::mem::take(&mut self.in_progress));
267 self.push_completed(f)
268 }
269 }
270
271 #[inline]
273 fn push_completed(&mut self, block: Buffer) {
274 assert!(block.len() < u32::MAX as usize, "Block too large");
275 assert!(self.completed.len() < u32::MAX as usize, "Too many blocks");
276 self.completed.push(block);
277 }
278
279 pub fn get_value(&self, index: usize) -> &[u8] {
283 let view = self.views_buffer.as_slice().get(index).unwrap();
284 let len = *view as u32;
285 if len <= MAX_INLINE_VIEW_LEN {
286 unsafe { GenericByteViewArray::<T>::inline_value(view, len as usize) }
289 } else {
290 let view = ByteView::from(*view);
291 if view.buffer_index < self.completed.len() as u32 {
292 let block = &self.completed[view.buffer_index as usize];
293 &block[view.offset as usize..view.offset as usize + view.length as usize]
294 } else {
295 &self.in_progress[view.offset as usize..view.offset as usize + view.length as usize]
296 }
297 }
298 }
299
300 #[inline]
308 pub fn append_value(&mut self, value: impl AsRef<T::Native>) {
309 self.try_append_value(value).unwrap()
310 }
311
312 #[inline]
320 pub fn try_append_value(&mut self, value: impl AsRef<T::Native>) -> Result<(), ArrowError> {
321 let v: &[u8] = value.as_ref().as_ref();
322 let length: u32 = v.len().try_into().map_err(|_| {
323 ArrowError::InvalidArgumentError(format!("String length {} exceeds u32::MAX", v.len()))
324 })?;
325
326 if length <= MAX_INLINE_VIEW_LEN {
327 let mut view_buffer = [0; 16];
328 view_buffer[0..4].copy_from_slice(&length.to_le_bytes());
329 view_buffer[4..4 + v.len()].copy_from_slice(v);
330 self.views_buffer.push(u128::from_le_bytes(view_buffer));
331 self.null_buffer_builder.append_non_null();
332 return Ok(());
333 }
334
335 if let Some((mut ht, hasher)) = self.string_tracker.take() {
339 let hash_val = hasher.hash_one(v);
340 let hasher_fn = |v: &_| hasher.hash_one(v);
341
342 let entry = ht.entry(
343 hash_val,
344 |idx| {
345 let stored_value = self.get_value(*idx);
346 v == stored_value
347 },
348 hasher_fn,
349 );
350 match entry {
351 Entry::Occupied(occupied) => {
352 let idx = occupied.get();
354 self.views_buffer.push(self.views_buffer[*idx]);
355 self.null_buffer_builder.append_non_null();
356 self.string_tracker = Some((ht, hasher));
357 return Ok(());
358 }
359 Entry::Vacant(vacant) => {
360 vacant.insert(self.views_buffer.len());
363 }
364 }
365 self.string_tracker = Some((ht, hasher));
366 }
367
368 let required_cap = self.in_progress.len() + v.len();
369 if self.in_progress.capacity() < required_cap {
370 self.flush_in_progress();
371 let to_reserve = v.len().max(self.block_size.next_size() as usize);
372 self.in_progress.reserve(to_reserve);
373 };
374
375 let offset = self.in_progress.len() as u32;
376 self.in_progress.extend_from_slice(v);
377
378 let buffer_index: u32 = self.completed.len().try_into().map_err(|_| {
379 ArrowError::InvalidArgumentError(format!(
380 "Buffer count {} exceeds u32::MAX",
381 self.completed.len()
382 ))
383 })?;
384
385 let view = ByteView {
386 length,
387 prefix: u32::from_le_bytes(v[0..4].try_into().unwrap()),
389 buffer_index,
390 offset,
391 };
392 self.views_buffer.push(view.into());
393 self.null_buffer_builder.append_non_null();
394
395 Ok(())
396 }
397
398 #[inline]
400 pub fn append_option(&mut self, value: Option<impl AsRef<T::Native>>) {
401 match value {
402 None => self.append_null(),
403 Some(v) => self.append_value(v),
404 };
405 }
406
407 #[inline]
409 pub fn append_null(&mut self) {
410 self.null_buffer_builder.append_null();
411 self.views_buffer.push(0);
412 }
413
414 pub fn finish(&mut self) -> GenericByteViewArray<T> {
416 self.flush_in_progress();
417 let completed = std::mem::take(&mut self.completed);
418 let nulls = self.null_buffer_builder.finish();
419 if let Some((ht, _)) = self.string_tracker.as_mut() {
420 ht.clear();
421 }
422 let views = std::mem::take(&mut self.views_buffer);
423 unsafe { GenericByteViewArray::new_unchecked(views.into(), completed, nulls) }
425 }
426
427 pub fn finish_cloned(&self) -> GenericByteViewArray<T> {
429 let mut completed = self.completed.clone();
430 if !self.in_progress.is_empty() {
431 completed.push(Buffer::from_slice_ref(&self.in_progress));
432 }
433 let len = self.views_buffer.len();
434 let views = Buffer::from_slice_ref(self.views_buffer.as_slice());
435 let views = ScalarBuffer::new(views, 0, len);
436 let nulls = self.null_buffer_builder.finish_cloned();
437 unsafe { GenericByteViewArray::new_unchecked(views, completed, nulls) }
439 }
440
441 pub fn validity_slice(&self) -> Option<&[u8]> {
443 self.null_buffer_builder.as_slice()
444 }
445
446 pub fn allocated_size(&self) -> usize {
448 let views = self.views_buffer.capacity() * std::mem::size_of::<u128>();
449 let null = self.null_buffer_builder.allocated_size();
450 let buffer_size = self.completed.iter().map(|b| b.capacity()).sum::<usize>();
451 let in_progress = self.in_progress.capacity();
452 let tracker = match &self.string_tracker {
453 Some((ht, _)) => ht.capacity() * std::mem::size_of::<usize>(),
454 None => 0,
455 };
456 buffer_size + in_progress + tracker + views + null
457 }
458}
459
460impl<T: ByteViewType + ?Sized> Default for GenericByteViewBuilder<T> {
461 fn default() -> Self {
462 Self::new()
463 }
464}
465
466impl<T: ByteViewType + ?Sized> std::fmt::Debug for GenericByteViewBuilder<T> {
467 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
468 write!(f, "{}ViewBuilder", T::PREFIX)?;
469 f.debug_struct("")
470 .field("views_buffer", &self.views_buffer)
471 .field("in_progress", &self.in_progress)
472 .field("completed", &self.completed)
473 .field("null_buffer_builder", &self.null_buffer_builder)
474 .finish()
475 }
476}
477
478impl<T: ByteViewType + ?Sized> ArrayBuilder for GenericByteViewBuilder<T> {
479 fn len(&self) -> usize {
480 self.null_buffer_builder.len()
481 }
482
483 fn finish(&mut self) -> ArrayRef {
484 Arc::new(self.finish())
485 }
486
487 fn finish_cloned(&self) -> ArrayRef {
488 Arc::new(self.finish_cloned())
489 }
490
491 fn as_any(&self) -> &dyn Any {
492 self
493 }
494
495 fn as_any_mut(&mut self) -> &mut dyn Any {
496 self
497 }
498
499 fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
500 self
501 }
502}
503
504impl<T: ByteViewType + ?Sized, V: AsRef<T::Native>> Extend<Option<V>>
505 for GenericByteViewBuilder<T>
506{
507 #[inline]
508 fn extend<I: IntoIterator<Item = Option<V>>>(&mut self, iter: I) {
509 for v in iter {
510 self.append_option(v)
511 }
512 }
513}
514
515pub type StringViewBuilder = GenericByteViewBuilder<StringViewType>;
535
536pub type BinaryViewBuilder = GenericByteViewBuilder<BinaryViewType>;
557
558fn make_inlined_view<const LEN: usize>(data: &[u8]) -> u128 {
561 let mut view_buffer = [0; 16];
562 view_buffer[0..4].copy_from_slice(&(LEN as u32).to_le_bytes());
563 view_buffer[4..4 + LEN].copy_from_slice(&data[..LEN]);
564 u128::from_le_bytes(view_buffer)
565}
566
567#[inline(never)]
573pub fn make_view(data: &[u8], block_id: u32, offset: u32) -> u128 {
574 let len = data.len();
575
576 match len {
579 0 => make_inlined_view::<0>(data),
580 1 => make_inlined_view::<1>(data),
581 2 => make_inlined_view::<2>(data),
582 3 => make_inlined_view::<3>(data),
583 4 => make_inlined_view::<4>(data),
584 5 => make_inlined_view::<5>(data),
585 6 => make_inlined_view::<6>(data),
586 7 => make_inlined_view::<7>(data),
587 8 => make_inlined_view::<8>(data),
588 9 => make_inlined_view::<9>(data),
589 10 => make_inlined_view::<10>(data),
590 11 => make_inlined_view::<11>(data),
591 12 => make_inlined_view::<12>(data),
592 _ => {
594 let view = ByteView {
595 length: len as u32,
596 prefix: u32::from_le_bytes(data[0..4].try_into().unwrap()),
597 buffer_index: block_id,
598 offset,
599 };
600 view.as_u128()
601 }
602 }
603}
604
605#[cfg(test)]
606mod tests {
607 use core::str;
608
609 use super::*;
610
611 #[test]
612 fn test_string_view_deduplicate() {
613 let value_1 = "long string to test string view";
614 let value_2 = "not so similar string but long";
615
616 let mut builder = StringViewBuilder::new()
617 .with_deduplicate_strings()
618 .with_fixed_block_size(value_1.len() as u32 * 2); let values = vec![
621 Some(value_1),
622 Some(value_2),
623 Some("short"),
624 Some(value_1),
625 None,
626 Some(value_2),
627 Some(value_1),
628 ];
629 builder.extend(values.clone());
630
631 let array = builder.finish_cloned();
632 array.to_data().validate_full().unwrap();
633 assert_eq!(array.data_buffers().len(), 1); let actual: Vec<_> = array.iter().collect();
635 assert_eq!(actual, values);
636
637 let view0 = array.views().first().unwrap();
638 let view3 = array.views().get(3).unwrap();
639 let view6 = array.views().get(6).unwrap();
640
641 assert_eq!(view0, view3);
642 assert_eq!(view0, view6);
643
644 assert_eq!(array.views().get(1), array.views().get(5));
645 }
646
647 #[test]
648 fn test_string_view_deduplicate_after_finish() {
649 let mut builder = StringViewBuilder::new().with_deduplicate_strings();
650
651 let value_1 = "long string to test string view";
652 let value_2 = "not so similar string but long";
653 builder.append_value(value_1);
654 let _array = builder.finish();
655 builder.append_value(value_2);
656 let _array = builder.finish();
657 builder.append_value(value_1);
658 let _array = builder.finish();
659 }
660
661 #[test]
662 fn test_string_view() {
663 let b1 = Buffer::from(b"world\xFFbananas\xF0\x9F\x98\x81");
664 let b2 = Buffer::from(b"cupcakes");
665 let b3 = Buffer::from(b"Many strings are here contained of great length and verbosity");
666
667 let mut v = StringViewBuilder::new();
668 assert_eq!(v.append_block(b1), 0);
669
670 v.append_value("This is a very long string that exceeds the inline length");
671 v.append_value("This is another very long string that exceeds the inline length");
672
673 assert_eq!(v.append_block(b2), 2);
674 assert_eq!(v.append_block(b3), 3);
675
676 v.try_append_view(0, 0, 5).unwrap(); v.try_append_view(0, 6, 7).unwrap(); v.try_append_view(2, 3, 5).unwrap(); v.try_append_view(2, 0, 3).unwrap(); v.try_append_view(2, 0, 8).unwrap(); v.try_append_view(0, 13, 4).unwrap(); v.try_append_view(0, 13, 0).unwrap(); v.try_append_view(3, 0, 16).unwrap(); v.try_append_view(1, 0, 19).unwrap(); v.try_append_view(3, 13, 27).unwrap(); v.append_value("I do so like long strings");
691
692 let array = v.finish_cloned();
693 array.to_data().validate_full().unwrap();
694 assert_eq!(array.data_buffers().len(), 5);
695 let actual: Vec<_> = array.iter().flatten().collect();
696 assert_eq!(
697 actual,
698 &[
699 "This is a very long string that exceeds the inline length",
700 "This is another very long string that exceeds the inline length",
701 "world",
702 "bananas",
703 "cakes",
704 "cup",
705 "cupcakes",
706 "😁",
707 "",
708 "Many strings are",
709 "This is a very long",
710 "are here contained of great",
711 "I do so like long strings"
712 ]
713 );
714
715 let err = v.try_append_view(0, u32::MAX, 1).unwrap_err();
716 assert_eq!(
717 err.to_string(),
718 "Invalid argument error: Range 4294967295..4294967296 out of bounds for block of length 17"
719 );
720
721 let err = v.try_append_view(0, 1, u32::MAX).unwrap_err();
722 assert_eq!(
723 err.to_string(),
724 "Invalid argument error: Range 1..4294967296 out of bounds for block of length 17"
725 );
726
727 let err = v.try_append_view(0, 13, 2).unwrap_err();
728 assert_eq!(err.to_string(), "Invalid argument error: Invalid view data");
729
730 let err = v.try_append_view(0, 40, 0).unwrap_err();
731 assert_eq!(
732 err.to_string(),
733 "Invalid argument error: Range 40..40 out of bounds for block of length 17"
734 );
735
736 let err = v.try_append_view(5, 0, 0).unwrap_err();
737 assert_eq!(
738 err.to_string(),
739 "Invalid argument error: No block found with index 5"
740 );
741 }
742
743 #[test]
744 fn test_string_view_with_block_size_growth() {
745 let mut exp_builder = StringViewBuilder::new();
746 let mut fixed_builder = StringViewBuilder::new().with_fixed_block_size(STARTING_BLOCK_SIZE);
747
748 let long_string = str::from_utf8(&[b'a'; STARTING_BLOCK_SIZE as usize]).unwrap();
749
750 for i in 0..9 {
751 for _ in 0..(2_u32.pow(i)) {
753 exp_builder.append_value(long_string);
754 fixed_builder.append_value(long_string);
755 }
756 exp_builder.flush_in_progress();
757 fixed_builder.flush_in_progress();
758
759 assert_eq!(exp_builder.completed.len(), i as usize + 1);
761 assert_eq!(
762 exp_builder.completed[i as usize].len(),
763 STARTING_BLOCK_SIZE as usize * 2_usize.pow(i)
764 );
765
766 assert_eq!(fixed_builder.completed.len(), 2_usize.pow(i + 1) - 1);
768
769 assert!(
771 fixed_builder
772 .completed
773 .iter()
774 .all(|b| b.len() == STARTING_BLOCK_SIZE as usize)
775 );
776 }
777
778 exp_builder.append_value(long_string);
780 exp_builder.flush_in_progress();
781 assert_eq!(
782 exp_builder.completed.last().unwrap().capacity(),
783 MAX_BLOCK_SIZE as usize
784 );
785 }
786}