1use std::any::Any;
19use std::marker::PhantomData;
20use std::sync::Arc;
21
22use arrow_buffer::{Buffer, NullBufferBuilder, ScalarBuffer};
23use arrow_data::{ByteView, MAX_INLINE_VIEW_LEN};
24use arrow_schema::ArrowError;
25use hashbrown::HashTable;
26use hashbrown::hash_table::Entry;
27
28use crate::builder::ArrayBuilder;
29use crate::types::bytes::ByteArrayNativeType;
30use crate::types::{BinaryViewType, ByteViewType, StringViewType};
31use crate::{Array, ArrayRef, GenericByteViewArray};
32
33const STARTING_BLOCK_SIZE: u32 = 8 * 1024; const MAX_BLOCK_SIZE: u32 = 2 * 1024 * 1024; enum BlockSizeGrowthStrategy {
37    Fixed { size: u32 },
38    Exponential { current_size: u32 },
39}
40
41impl BlockSizeGrowthStrategy {
42    fn next_size(&mut self) -> u32 {
43        match self {
44            Self::Fixed { size } => *size,
45            Self::Exponential { current_size } => {
46                if *current_size < MAX_BLOCK_SIZE {
47                    *current_size = current_size.saturating_mul(2);
49                    *current_size
50                } else {
51                    MAX_BLOCK_SIZE
52                }
53            }
54        }
55    }
56}
57
58pub struct GenericByteViewBuilder<T: ByteViewType + ?Sized> {
82    views_buffer: Vec<u128>,
83    null_buffer_builder: NullBufferBuilder,
84    completed: Vec<Buffer>,
85    in_progress: Vec<u8>,
86    block_size: BlockSizeGrowthStrategy,
87    string_tracker: Option<(HashTable<usize>, ahash::RandomState)>,
90    phantom: PhantomData<T>,
91}
92
93impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
94    pub fn new() -> Self {
96        Self::with_capacity(1024)
97    }
98
99    pub fn with_capacity(capacity: usize) -> Self {
101        Self {
102            views_buffer: Vec::with_capacity(capacity),
103            null_buffer_builder: NullBufferBuilder::new(capacity),
104            completed: vec![],
105            in_progress: vec![],
106            block_size: BlockSizeGrowthStrategy::Exponential {
107                current_size: STARTING_BLOCK_SIZE,
108            },
109            string_tracker: None,
110            phantom: Default::default(),
111        }
112    }
113
114    pub fn with_fixed_block_size(self, block_size: u32) -> Self {
130        debug_assert!(block_size > 0, "Block size must be greater than 0");
131        Self {
132            block_size: BlockSizeGrowthStrategy::Fixed { size: block_size },
133            ..self
134        }
135    }
136
137    pub fn with_deduplicate_strings(self) -> Self {
142        Self {
143            string_tracker: Some((
144                HashTable::with_capacity(self.views_buffer.capacity()),
145                Default::default(),
146            )),
147            ..self
148        }
149    }
150
151    pub fn append_block(&mut self, buffer: Buffer) -> u32 {
176        assert!(buffer.len() < u32::MAX as usize);
177
178        self.flush_in_progress();
179        let offset = self.completed.len();
180        self.push_completed(buffer);
181        offset as u32
182    }
183
184    pub unsafe fn append_view_unchecked(&mut self, block: u32, offset: u32, len: u32) {
191        let b = unsafe { self.completed.get_unchecked(block as usize) };
192        let start = offset as usize;
193        let end = start.saturating_add(len as usize);
194        let b = unsafe { b.get_unchecked(start..end) };
195
196        let view = make_view(b, block, offset);
197        self.views_buffer.push(view);
198        self.null_buffer_builder.append_non_null();
199    }
200
201    pub fn append_array(&mut self, array: &GenericByteViewArray<T>) {
205        self.flush_in_progress();
206        let keep_views = self.completed.is_empty() || array.data_buffers().is_empty();
208        let starting_buffer = self.completed.len() as u32;
209
210        self.completed.extend(array.data_buffers().iter().cloned());
211
212        if keep_views {
213            self.views_buffer.extend_from_slice(array.views());
214        } else {
215            self.views_buffer.extend(array.views().iter().map(|v| {
216                let mut byte_view = ByteView::from(*v);
217                if byte_view.length > MAX_INLINE_VIEW_LEN {
218                    byte_view.buffer_index += starting_buffer;
220                };
221
222                byte_view.as_u128()
223            }));
224        }
225
226        if let Some(null_buffer) = array.nulls() {
227            self.null_buffer_builder.append_buffer(null_buffer);
228        } else {
229            self.null_buffer_builder.append_n_non_nulls(array.len());
230        }
231    }
232
233    pub fn try_append_view(&mut self, block: u32, offset: u32, len: u32) -> Result<(), ArrowError> {
237        let b = self.completed.get(block as usize).ok_or_else(|| {
238            ArrowError::InvalidArgumentError(format!("No block found with index {block}"))
239        })?;
240        let start = offset as usize;
241        let end = start.saturating_add(len as usize);
242
243        let b = b.get(start..end).ok_or_else(|| {
244            ArrowError::InvalidArgumentError(format!(
245                "Range {start}..{end} out of bounds for block of length {}",
246                b.len()
247            ))
248        })?;
249
250        if T::Native::from_bytes_checked(b).is_none() {
251            return Err(ArrowError::InvalidArgumentError(
252                "Invalid view data".to_string(),
253            ));
254        }
255
256        unsafe {
257            self.append_view_unchecked(block, offset, len);
258        }
259        Ok(())
260    }
261
262    #[inline]
264    fn flush_in_progress(&mut self) {
265        if !self.in_progress.is_empty() {
266            let f = Buffer::from_vec(std::mem::take(&mut self.in_progress));
267            self.push_completed(f)
268        }
269    }
270
271    #[inline]
273    fn push_completed(&mut self, block: Buffer) {
274        assert!(block.len() < u32::MAX as usize, "Block too large");
275        assert!(self.completed.len() < u32::MAX as usize, "Too many blocks");
276        self.completed.push(block);
277    }
278
279    pub fn get_value(&self, index: usize) -> &[u8] {
283        let view = self.views_buffer.as_slice().get(index).unwrap();
284        let len = *view as u32;
285        if len <= MAX_INLINE_VIEW_LEN {
286            unsafe { GenericByteViewArray::<T>::inline_value(view, len as usize) }
289        } else {
290            let view = ByteView::from(*view);
291            if view.buffer_index < self.completed.len() as u32 {
292                let block = &self.completed[view.buffer_index as usize];
293                &block[view.offset as usize..view.offset as usize + view.length as usize]
294            } else {
295                &self.in_progress[view.offset as usize..view.offset as usize + view.length as usize]
296            }
297        }
298    }
299
300    #[inline]
308    pub fn append_value(&mut self, value: impl AsRef<T::Native>) {
309        self.try_append_value(value).unwrap()
310    }
311
312    #[inline]
320    pub fn try_append_value(&mut self, value: impl AsRef<T::Native>) -> Result<(), ArrowError> {
321        let v: &[u8] = value.as_ref().as_ref();
322        let length: u32 = v.len().try_into().map_err(|_| {
323            ArrowError::InvalidArgumentError(format!("String length {} exceeds u32::MAX", v.len()))
324        })?;
325
326        if length <= MAX_INLINE_VIEW_LEN {
327            let mut view_buffer = [0; 16];
328            view_buffer[0..4].copy_from_slice(&length.to_le_bytes());
329            view_buffer[4..4 + v.len()].copy_from_slice(v);
330            self.views_buffer.push(u128::from_le_bytes(view_buffer));
331            self.null_buffer_builder.append_non_null();
332            return Ok(());
333        }
334
335        if let Some((mut ht, hasher)) = self.string_tracker.take() {
339            let hash_val = hasher.hash_one(v);
340            let hasher_fn = |v: &_| hasher.hash_one(v);
341
342            let entry = ht.entry(
343                hash_val,
344                |idx| {
345                    let stored_value = self.get_value(*idx);
346                    v == stored_value
347                },
348                hasher_fn,
349            );
350            match entry {
351                Entry::Occupied(occupied) => {
352                    let idx = occupied.get();
354                    self.views_buffer.push(self.views_buffer[*idx]);
355                    self.null_buffer_builder.append_non_null();
356                    self.string_tracker = Some((ht, hasher));
357                    return Ok(());
358                }
359                Entry::Vacant(vacant) => {
360                    vacant.insert(self.views_buffer.len());
363                }
364            }
365            self.string_tracker = Some((ht, hasher));
366        }
367
368        let required_cap = self.in_progress.len() + v.len();
369        if self.in_progress.capacity() < required_cap {
370            self.flush_in_progress();
371            let to_reserve = v.len().max(self.block_size.next_size() as usize);
372            self.in_progress.reserve(to_reserve);
373        };
374
375        let offset = self.in_progress.len() as u32;
376        self.in_progress.extend_from_slice(v);
377
378        let buffer_index: u32 = self.completed.len().try_into().map_err(|_| {
379            ArrowError::InvalidArgumentError(format!(
380                "Buffer count {} exceeds u32::MAX",
381                self.completed.len()
382            ))
383        })?;
384
385        let view = ByteView {
386            length,
387            prefix: u32::from_le_bytes(v[0..4].try_into().unwrap()),
389            buffer_index,
390            offset,
391        };
392        self.views_buffer.push(view.into());
393        self.null_buffer_builder.append_non_null();
394
395        Ok(())
396    }
397
398    #[inline]
400    pub fn append_option(&mut self, value: Option<impl AsRef<T::Native>>) {
401        match value {
402            None => self.append_null(),
403            Some(v) => self.append_value(v),
404        };
405    }
406
407    #[inline]
409    pub fn append_null(&mut self) {
410        self.null_buffer_builder.append_null();
411        self.views_buffer.push(0);
412    }
413
414    pub fn finish(&mut self) -> GenericByteViewArray<T> {
416        self.flush_in_progress();
417        let completed = std::mem::take(&mut self.completed);
418        let nulls = self.null_buffer_builder.finish();
419        if let Some((ht, _)) = self.string_tracker.as_mut() {
420            ht.clear();
421        }
422        let views = std::mem::take(&mut self.views_buffer);
423        unsafe { GenericByteViewArray::new_unchecked(views.into(), completed, nulls) }
425    }
426
427    pub fn finish_cloned(&self) -> GenericByteViewArray<T> {
429        let mut completed = self.completed.clone();
430        if !self.in_progress.is_empty() {
431            completed.push(Buffer::from_slice_ref(&self.in_progress));
432        }
433        let len = self.views_buffer.len();
434        let views = Buffer::from_slice_ref(self.views_buffer.as_slice());
435        let views = ScalarBuffer::new(views, 0, len);
436        let nulls = self.null_buffer_builder.finish_cloned();
437        unsafe { GenericByteViewArray::new_unchecked(views, completed, nulls) }
439    }
440
441    pub fn validity_slice(&self) -> Option<&[u8]> {
443        self.null_buffer_builder.as_slice()
444    }
445
446    pub fn allocated_size(&self) -> usize {
448        let views = self.views_buffer.capacity() * std::mem::size_of::<u128>();
449        let null = self.null_buffer_builder.allocated_size();
450        let buffer_size = self.completed.iter().map(|b| b.capacity()).sum::<usize>();
451        let in_progress = self.in_progress.capacity();
452        let tracker = match &self.string_tracker {
453            Some((ht, _)) => ht.capacity() * std::mem::size_of::<usize>(),
454            None => 0,
455        };
456        buffer_size + in_progress + tracker + views + null
457    }
458}
459
460impl<T: ByteViewType + ?Sized> Default for GenericByteViewBuilder<T> {
461    fn default() -> Self {
462        Self::new()
463    }
464}
465
466impl<T: ByteViewType + ?Sized> std::fmt::Debug for GenericByteViewBuilder<T> {
467    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
468        write!(f, "{}ViewBuilder", T::PREFIX)?;
469        f.debug_struct("")
470            .field("views_buffer", &self.views_buffer)
471            .field("in_progress", &self.in_progress)
472            .field("completed", &self.completed)
473            .field("null_buffer_builder", &self.null_buffer_builder)
474            .finish()
475    }
476}
477
478impl<T: ByteViewType + ?Sized> ArrayBuilder for GenericByteViewBuilder<T> {
479    fn len(&self) -> usize {
480        self.null_buffer_builder.len()
481    }
482
483    fn finish(&mut self) -> ArrayRef {
484        Arc::new(self.finish())
485    }
486
487    fn finish_cloned(&self) -> ArrayRef {
488        Arc::new(self.finish_cloned())
489    }
490
491    fn as_any(&self) -> &dyn Any {
492        self
493    }
494
495    fn as_any_mut(&mut self) -> &mut dyn Any {
496        self
497    }
498
499    fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
500        self
501    }
502}
503
504impl<T: ByteViewType + ?Sized, V: AsRef<T::Native>> Extend<Option<V>>
505    for GenericByteViewBuilder<T>
506{
507    #[inline]
508    fn extend<I: IntoIterator<Item = Option<V>>>(&mut self, iter: I) {
509        for v in iter {
510            self.append_option(v)
511        }
512    }
513}
514
515pub type StringViewBuilder = GenericByteViewBuilder<StringViewType>;
535
536pub type BinaryViewBuilder = GenericByteViewBuilder<BinaryViewType>;
557
558fn make_inlined_view<const LEN: usize>(data: &[u8]) -> u128 {
561    let mut view_buffer = [0; 16];
562    view_buffer[0..4].copy_from_slice(&(LEN as u32).to_le_bytes());
563    view_buffer[4..4 + LEN].copy_from_slice(&data[..LEN]);
564    u128::from_le_bytes(view_buffer)
565}
566
567#[inline(never)]
573pub fn make_view(data: &[u8], block_id: u32, offset: u32) -> u128 {
574    let len = data.len();
575
576    match len {
579        0 => make_inlined_view::<0>(data),
580        1 => make_inlined_view::<1>(data),
581        2 => make_inlined_view::<2>(data),
582        3 => make_inlined_view::<3>(data),
583        4 => make_inlined_view::<4>(data),
584        5 => make_inlined_view::<5>(data),
585        6 => make_inlined_view::<6>(data),
586        7 => make_inlined_view::<7>(data),
587        8 => make_inlined_view::<8>(data),
588        9 => make_inlined_view::<9>(data),
589        10 => make_inlined_view::<10>(data),
590        11 => make_inlined_view::<11>(data),
591        12 => make_inlined_view::<12>(data),
592        _ => {
594            let view = ByteView {
595                length: len as u32,
596                prefix: u32::from_le_bytes(data[0..4].try_into().unwrap()),
597                buffer_index: block_id,
598                offset,
599            };
600            view.as_u128()
601        }
602    }
603}
604
605#[cfg(test)]
606mod tests {
607    use core::str;
608
609    use super::*;
610
611    #[test]
612    fn test_string_view_deduplicate() {
613        let value_1 = "long string to test string view";
614        let value_2 = "not so similar string but long";
615
616        let mut builder = StringViewBuilder::new()
617            .with_deduplicate_strings()
618            .with_fixed_block_size(value_1.len() as u32 * 2); let values = vec![
621            Some(value_1),
622            Some(value_2),
623            Some("short"),
624            Some(value_1),
625            None,
626            Some(value_2),
627            Some(value_1),
628        ];
629        builder.extend(values.clone());
630
631        let array = builder.finish_cloned();
632        array.to_data().validate_full().unwrap();
633        assert_eq!(array.data_buffers().len(), 1); let actual: Vec<_> = array.iter().collect();
635        assert_eq!(actual, values);
636
637        let view0 = array.views().first().unwrap();
638        let view3 = array.views().get(3).unwrap();
639        let view6 = array.views().get(6).unwrap();
640
641        assert_eq!(view0, view3);
642        assert_eq!(view0, view6);
643
644        assert_eq!(array.views().get(1), array.views().get(5));
645    }
646
647    #[test]
648    fn test_string_view_deduplicate_after_finish() {
649        let mut builder = StringViewBuilder::new().with_deduplicate_strings();
650
651        let value_1 = "long string to test string view";
652        let value_2 = "not so similar string but long";
653        builder.append_value(value_1);
654        let _array = builder.finish();
655        builder.append_value(value_2);
656        let _array = builder.finish();
657        builder.append_value(value_1);
658        let _array = builder.finish();
659    }
660
661    #[test]
662    fn test_string_view() {
663        let b1 = Buffer::from(b"world\xFFbananas\xF0\x9F\x98\x81");
664        let b2 = Buffer::from(b"cupcakes");
665        let b3 = Buffer::from(b"Many strings are here contained of great length and verbosity");
666
667        let mut v = StringViewBuilder::new();
668        assert_eq!(v.append_block(b1), 0);
669
670        v.append_value("This is a very long string that exceeds the inline length");
671        v.append_value("This is another very long string that exceeds the inline length");
672
673        assert_eq!(v.append_block(b2), 2);
674        assert_eq!(v.append_block(b3), 3);
675
676        v.try_append_view(0, 0, 5).unwrap(); v.try_append_view(0, 6, 7).unwrap(); v.try_append_view(2, 3, 5).unwrap(); v.try_append_view(2, 0, 3).unwrap(); v.try_append_view(2, 0, 8).unwrap(); v.try_append_view(0, 13, 4).unwrap(); v.try_append_view(0, 13, 0).unwrap(); v.try_append_view(3, 0, 16).unwrap(); v.try_append_view(1, 0, 19).unwrap(); v.try_append_view(3, 13, 27).unwrap(); v.append_value("I do so like long strings");
691
692        let array = v.finish_cloned();
693        array.to_data().validate_full().unwrap();
694        assert_eq!(array.data_buffers().len(), 5);
695        let actual: Vec<_> = array.iter().flatten().collect();
696        assert_eq!(
697            actual,
698            &[
699                "This is a very long string that exceeds the inline length",
700                "This is another very long string that exceeds the inline length",
701                "world",
702                "bananas",
703                "cakes",
704                "cup",
705                "cupcakes",
706                "😁",
707                "",
708                "Many strings are",
709                "This is a very long",
710                "are here contained of great",
711                "I do so like long strings"
712            ]
713        );
714
715        let err = v.try_append_view(0, u32::MAX, 1).unwrap_err();
716        assert_eq!(
717            err.to_string(),
718            "Invalid argument error: Range 4294967295..4294967296 out of bounds for block of length 17"
719        );
720
721        let err = v.try_append_view(0, 1, u32::MAX).unwrap_err();
722        assert_eq!(
723            err.to_string(),
724            "Invalid argument error: Range 1..4294967296 out of bounds for block of length 17"
725        );
726
727        let err = v.try_append_view(0, 13, 2).unwrap_err();
728        assert_eq!(err.to_string(), "Invalid argument error: Invalid view data");
729
730        let err = v.try_append_view(0, 40, 0).unwrap_err();
731        assert_eq!(
732            err.to_string(),
733            "Invalid argument error: Range 40..40 out of bounds for block of length 17"
734        );
735
736        let err = v.try_append_view(5, 0, 0).unwrap_err();
737        assert_eq!(
738            err.to_string(),
739            "Invalid argument error: No block found with index 5"
740        );
741    }
742
743    #[test]
744    fn test_string_view_with_block_size_growth() {
745        let mut exp_builder = StringViewBuilder::new();
746        let mut fixed_builder = StringViewBuilder::new().with_fixed_block_size(STARTING_BLOCK_SIZE);
747
748        let long_string = str::from_utf8(&[b'a'; STARTING_BLOCK_SIZE as usize]).unwrap();
749
750        for i in 0..9 {
751            for _ in 0..(2_u32.pow(i)) {
753                exp_builder.append_value(long_string);
754                fixed_builder.append_value(long_string);
755            }
756            exp_builder.flush_in_progress();
757            fixed_builder.flush_in_progress();
758
759            assert_eq!(exp_builder.completed.len(), i as usize + 1);
761            assert_eq!(
762                exp_builder.completed[i as usize].len(),
763                STARTING_BLOCK_SIZE as usize * 2_usize.pow(i)
764            );
765
766            assert_eq!(fixed_builder.completed.len(), 2_usize.pow(i + 1) - 1);
768
769            assert!(
771                fixed_builder
772                    .completed
773                    .iter()
774                    .all(|b| b.len() == STARTING_BLOCK_SIZE as usize)
775            );
776        }
777
778        exp_builder.append_value(long_string);
780        exp_builder.flush_in_progress();
781        assert_eq!(
782            exp_builder.completed.last().unwrap().capacity(),
783            MAX_BLOCK_SIZE as usize
784        );
785    }
786}