vortex_array/builders/
varbinview.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::ops::Range;
6use std::sync::Arc;
7
8use vortex_buffer::{Buffer, BufferMut, ByteBuffer, ByteBufferMut};
9use vortex_dtype::DType;
10use vortex_error::{VortexExpect, VortexResult, vortex_bail, vortex_ensure};
11use vortex_mask::Mask;
12use vortex_scalar::{BinaryScalar, Scalar, Utf8Scalar};
13use vortex_utils::aliases::hash_map::{Entry, HashMap};
14
15use crate::arrays::VarBinViewArray;
16use crate::arrays::binary_view::BinaryView;
17use crate::arrays::compact::BufferUtilization;
18use crate::builders::{ArrayBuilder, LazyNullBufferBuilder};
19use crate::canonical::{Canonical, ToCanonical};
20use crate::{Array, ArrayRef, IntoArray};
21
22/// The builder for building a [`VarBinViewArray`].
23pub struct VarBinViewBuilder {
24    dtype: DType,
25    views_builder: BufferMut<BinaryView>,
26    nulls: LazyNullBufferBuilder,
27    completed: CompletedBuffers,
28    in_progress: ByteBufferMut,
29    growth_strategy: BufferGrowthStrategy,
30    compaction_threshold: f64,
31}
32
33impl VarBinViewBuilder {
34    pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
35        Self::new(dtype, capacity, Default::default(), Default::default(), 0.0)
36    }
37
38    pub fn with_buffer_deduplication(dtype: DType, capacity: usize) -> Self {
39        Self::new(
40            dtype,
41            capacity,
42            CompletedBuffers::Deduplicated(Default::default()),
43            Default::default(),
44            0.0,
45        )
46    }
47
48    pub fn with_compaction(dtype: DType, capacity: usize, compaction_threshold: f64) -> Self {
49        Self::new(
50            dtype,
51            capacity,
52            Default::default(),
53            Default::default(),
54            compaction_threshold,
55        )
56    }
57
58    pub fn new(
59        dtype: DType,
60        capacity: usize,
61        completed: CompletedBuffers,
62        growth_strategy: BufferGrowthStrategy,
63        compaction_threshold: f64,
64    ) -> Self {
65        assert!(
66            matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
67            "VarBinViewBuilder DType must be Utf8 or Binary."
68        );
69        Self {
70            views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
71            nulls: LazyNullBufferBuilder::new(capacity),
72            completed,
73            in_progress: ByteBufferMut::empty(),
74            dtype,
75            growth_strategy,
76            compaction_threshold,
77        }
78    }
79
80    fn append_value_view(&mut self, value: &[u8]) {
81        let length =
82            u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
83        if length <= 12 {
84            self.views_builder.push(BinaryView::make_view(value, 0, 0));
85            return;
86        }
87
88        let (buffer_idx, offset) = self.append_value_to_buffer(value);
89        let view = BinaryView::make_view(value, buffer_idx, offset);
90        self.views_builder.push(view);
91    }
92
93    /// Appends a value to the builder.
94    pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
95        self.append_value_view(value.as_ref());
96        self.nulls.append_non_null();
97    }
98
99    fn flush_in_progress(&mut self) {
100        if self.in_progress.is_empty() {
101            return;
102        }
103        let block = std::mem::take(&mut self.in_progress).freeze();
104
105        assert!(block.len() < u32::MAX as usize, "Block too large");
106
107        let initial_len = self.completed.len();
108        self.completed.push(block);
109        assert_eq!(
110            self.completed.len(),
111            initial_len + 1,
112            "Invalid state, just completed block already exists"
113        );
114    }
115
116    /// append a non inlined value to self.in_progress.
117    fn append_value_to_buffer(&mut self, value: &[u8]) -> (u32, u32) {
118        assert!(value.len() > 12, "must inline small strings");
119        let required_cap = self.in_progress.len() + value.len();
120        if self.in_progress.capacity() < required_cap {
121            self.flush_in_progress();
122            let next_buffer_size = self.growth_strategy.next_size() as usize;
123            let to_reserve = next_buffer_size.max(value.len());
124            self.in_progress.reserve(to_reserve);
125        }
126
127        let buffer_idx = self.completed.len();
128        let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
129        self.in_progress.extend_from_slice(value);
130
131        (buffer_idx, offset)
132    }
133
134    pub fn completed_block_count(&self) -> u32 {
135        self.completed.len()
136    }
137
138    /// Pushes buffers and pre-adjusted views into the builder.
139    ///
140    /// The provided `buffer` slices contain sections of data from a `VarBinViewArray`, and the
141    /// `views` are `BinaryView`s that have already been adjusted to reference the correct buffer
142    /// indices and offsets for this builder. All views must point to valid sections within the
143    /// provided buffers, and the validity length must match the view length.
144    ///
145    /// # Warning
146    ///
147    /// This method does not check utilization of the given buffers. Callers must provide
148    /// buffers that are fully utilized by the given adjusted views.
149    ///
150    /// # Panics
151    ///
152    /// Panics if this builder deduplicates buffers and any of the given buffers already
153    /// exist in this builder.
154    pub fn push_buffer_and_adjusted_views(
155        &mut self,
156        buffer: &[ByteBuffer],
157        views: &Buffer<BinaryView>,
158        validity_mask: Mask,
159    ) {
160        self.flush_in_progress();
161
162        let expected_completed_len = self.completed.len() as usize + buffer.len();
163        self.completed.extend_from_slice_unchecked(buffer);
164        assert_eq!(
165            self.completed.len() as usize,
166            expected_completed_len,
167            "Some buffers already exist",
168        );
169        self.views_builder.extend_trusted(views.iter().copied());
170        self.push_only_validity_mask(validity_mask);
171
172        debug_assert_eq!(self.nulls.len(), self.views_builder.len())
173    }
174
175    /// Finishes the builder directly into a [`VarBinViewArray`].
176    pub fn finish_into_varbinview(&mut self) -> VarBinViewArray {
177        self.flush_in_progress();
178        let buffers = std::mem::take(&mut self.completed);
179
180        assert_eq!(
181            self.views_builder.len(),
182            self.nulls.len(),
183            "View and validity length must match"
184        );
185
186        let validity = self.nulls.finish_with_nullability(self.dtype.nullability());
187
188        // SAFETY: the builder methods check safety at each step.
189        unsafe {
190            VarBinViewArray::new_unchecked(
191                std::mem::take(&mut self.views_builder).freeze(),
192                buffers.finish(),
193                self.dtype.clone(),
194                validity,
195            )
196        }
197    }
198
199    // Pushes a validity mask into the builder not affecting the views or buffers
200    fn push_only_validity_mask(&mut self, validity_mask: Mask) {
201        self.nulls.append_validity_mask(validity_mask);
202    }
203}
204
205impl ArrayBuilder for VarBinViewBuilder {
206    fn as_any(&self) -> &dyn Any {
207        self
208    }
209
210    fn as_any_mut(&mut self) -> &mut dyn Any {
211        self
212    }
213
214    fn dtype(&self) -> &DType {
215        &self.dtype
216    }
217
218    fn len(&self) -> usize {
219        self.nulls.len()
220    }
221
222    fn append_zeros(&mut self, n: usize) {
223        self.views_builder.push_n(BinaryView::empty_view(), n);
224        self.nulls.append_n_non_nulls(n);
225    }
226
227    unsafe fn append_nulls_unchecked(&mut self, n: usize) {
228        self.views_builder.push_n(BinaryView::empty_view(), n);
229        self.nulls.append_n_nulls(n);
230    }
231
232    fn append_scalar(&mut self, scalar: &Scalar) -> VortexResult<()> {
233        vortex_ensure!(
234            scalar.dtype() == self.dtype(),
235            "VarBinViewBuilder expected scalar with dtype {:?}, got {:?}",
236            self.dtype(),
237            scalar.dtype()
238        );
239
240        match self.dtype() {
241            DType::Utf8(_) => {
242                let utf8_scalar = Utf8Scalar::try_from(scalar)?;
243                match utf8_scalar.value() {
244                    Some(value) => self.append_value(value),
245                    None => self.append_null(),
246                }
247            }
248            DType::Binary(_) => {
249                let binary_scalar = BinaryScalar::try_from(scalar)?;
250                match binary_scalar.value() {
251                    Some(value) => self.append_value(value),
252                    None => self.append_null(),
253                }
254            }
255            _ => vortex_bail!(
256                "VarBinViewBuilder can only handle Utf8 or Binary scalars, got {:?}",
257                scalar.dtype()
258            ),
259        }
260
261        Ok(())
262    }
263
264    unsafe fn extend_from_array_unchecked(&mut self, array: &dyn Array) {
265        let array = array.to_varbinview();
266        self.flush_in_progress();
267
268        self.push_only_validity_mask(array.validity_mask());
269
270        let view_adjustment =
271            self.completed
272                .extend_from_compaction(BuffersWithOffsets::from_array(
273                    &array,
274                    self.compaction_threshold,
275                ));
276
277        match view_adjustment {
278            ViewAdjustment::Precomputed(adjustment) => self.views_builder.extend_trusted(
279                array
280                    .views()
281                    .iter()
282                    .map(|view| adjustment.adjust_view(view)),
283            ),
284            ViewAdjustment::Rewriting(adjustment) => {
285                for (idx, &view) in array.views().iter().enumerate() {
286                    let new_view = if !array.is_valid(idx) {
287                        BinaryView::empty_view()
288                    } else if view.is_inlined() {
289                        view
290                    } else if let Some(adjusted) = adjustment.adjust_view(&view) {
291                        adjusted
292                    } else {
293                        let bytes = array.bytes_at(idx);
294                        let (new_buf_idx, new_offset) = self.append_value_to_buffer(&bytes);
295                        BinaryView::make_view(bytes.as_slice(), new_buf_idx, new_offset)
296                    };
297                    self.views_builder.push(new_view)
298                }
299            }
300        }
301    }
302
303    fn reserve_exact(&mut self, additional: usize) {
304        self.views_builder.reserve(additional);
305        self.nulls.reserve_exact(additional);
306    }
307
308    unsafe fn set_validity_unchecked(&mut self, validity: Mask) {
309        self.nulls = LazyNullBufferBuilder::new(validity.len());
310        self.nulls.append_validity_mask(validity);
311    }
312
313    fn finish(&mut self) -> ArrayRef {
314        self.finish_into_varbinview().into_array()
315    }
316
317    fn finish_into_canonical(&mut self) -> Canonical {
318        Canonical::VarBinView(self.finish_into_varbinview())
319    }
320}
321
322pub enum CompletedBuffers {
323    Default(Vec<ByteBuffer>),
324    Deduplicated(DeduplicatedBuffers),
325}
326
327impl Default for CompletedBuffers {
328    fn default() -> Self {
329        Self::Default(Vec::new())
330    }
331}
332
333// Self::push enforces len < u32::max
334#[allow(clippy::cast_possible_truncation)]
335impl CompletedBuffers {
336    fn len(&self) -> u32 {
337        match self {
338            Self::Default(buffers) => buffers.len() as u32,
339            Self::Deduplicated(buffers) => buffers.len(),
340        }
341    }
342
343    fn push(&mut self, block: ByteBuffer) -> u32 {
344        match self {
345            Self::Default(buffers) => {
346                assert!(buffers.len() < u32::MAX as usize, "Too many blocks");
347                buffers.push(block);
348                self.len()
349            }
350            Self::Deduplicated(buffers) => buffers.push(block),
351        }
352    }
353
354    /// Does not compact buffers, bypasses utilization checks.
355    fn extend_from_slice_unchecked(&mut self, buffers: &[ByteBuffer]) {
356        for buffer in buffers {
357            self.push(buffer.clone());
358        }
359    }
360
361    fn extend_from_compaction(&mut self, buffers: BuffersWithOffsets) -> ViewAdjustment {
362        match (self, buffers) {
363            (
364                Self::Default(completed_buffers),
365                BuffersWithOffsets::AllKept { buffers, offsets },
366            ) => {
367                let buffer_offset = completed_buffers.len() as u32;
368                completed_buffers.extend_from_slice(&buffers);
369                ViewAdjustment::shift(buffer_offset, offsets)
370            }
371            (
372                Self::Default(completed_buffers),
373                BuffersWithOffsets::SomeCompacted { buffers, offsets },
374            ) => {
375                let lookup = buffers
376                    .iter()
377                    .map(|maybe_buffer| {
378                        maybe_buffer.as_ref().map(|buffer| {
379                            completed_buffers.push(buffer.clone());
380                            completed_buffers.len() as u32 - 1
381                        })
382                    })
383                    .collect();
384                ViewAdjustment::rewriting(lookup, offsets)
385            }
386
387            (
388                Self::Deduplicated(completed_buffers),
389                BuffersWithOffsets::AllKept { buffers, offsets },
390            ) => {
391                let buffer_lookup = completed_buffers.extend_from_slice(&buffers);
392                ViewAdjustment::lookup(buffer_lookup, offsets)
393            }
394            (
395                Self::Deduplicated(completed_buffers),
396                BuffersWithOffsets::SomeCompacted { buffers, offsets },
397            ) => {
398                let buffer_lookup = completed_buffers.extend_from_option_slice(&buffers);
399                ViewAdjustment::rewriting(buffer_lookup, offsets)
400            }
401        }
402    }
403
404    fn finish(self) -> Arc<[ByteBuffer]> {
405        match self {
406            Self::Default(buffers) => Arc::from(buffers),
407            Self::Deduplicated(buffers) => buffers.finish(),
408        }
409    }
410}
411
412#[derive(Default)]
413pub struct DeduplicatedBuffers {
414    buffers: Vec<ByteBuffer>,
415    buffer_to_idx: HashMap<BufferId, u32>,
416}
417
418impl DeduplicatedBuffers {
419    // Self::push enforces len < u32::max
420    #[allow(clippy::cast_possible_truncation)]
421    fn len(&self) -> u32 {
422        self.buffers.len() as u32
423    }
424
425    /// Push a new block if not seen before. Returns the idx of the block.
426    fn push(&mut self, block: ByteBuffer) -> u32 {
427        assert!(self.buffers.len() < u32::MAX as usize, "Too many blocks");
428
429        let initial_len = self.len();
430        let id = BufferId::from(&block);
431        match self.buffer_to_idx.entry(id) {
432            Entry::Occupied(idx) => *idx.get(),
433            Entry::Vacant(entry) => {
434                let idx = initial_len;
435                entry.insert(idx);
436                self.buffers.push(block);
437                idx
438            }
439        }
440    }
441
442    fn extend_from_option_slice(&mut self, buffers: &[Option<ByteBuffer>]) -> Vec<Option<u32>> {
443        buffers
444            .iter()
445            .map(|buffer| buffer.as_ref().map(|buf| self.push(buf.clone())))
446            .collect()
447    }
448
449    fn extend_from_slice(&mut self, buffers: &[ByteBuffer]) -> Vec<u32> {
450        buffers
451            .iter()
452            .map(|buffer| self.push(buffer.clone()))
453            .collect()
454    }
455
456    fn finish(self) -> Arc<[ByteBuffer]> {
457        Arc::from(self.buffers)
458    }
459}
460
461#[derive(PartialEq, Eq, Hash)]
462struct BufferId {
463    // *const u8 stored as usize for `Send`
464    ptr: usize,
465    len: usize,
466}
467
468impl BufferId {
469    fn from(buffer: &ByteBuffer) -> Self {
470        let slice = buffer.as_slice();
471        Self {
472            ptr: slice.as_ptr() as usize,
473            len: slice.len(),
474        }
475    }
476}
477
478#[derive(Debug, Clone)]
479pub enum BufferGrowthStrategy {
480    /// Use a fixed buffer size for all allocations.
481    Fixed { size: u32 },
482    /// Use exponential growth starting from initial_size, doubling until max_size.
483    Exponential { current_size: u32, max_size: u32 },
484}
485
486impl Default for BufferGrowthStrategy {
487    fn default() -> Self {
488        Self::Exponential {
489            current_size: 4 * 1024,    // 4KB starting size
490            max_size: 2 * 1024 * 1024, // 2MB max size
491        }
492    }
493}
494
495impl BufferGrowthStrategy {
496    pub fn fixed(size: u32) -> Self {
497        Self::Fixed { size }
498    }
499
500    pub fn exponential(initial_size: u32, max_size: u32) -> Self {
501        Self::Exponential {
502            current_size: initial_size,
503            max_size,
504        }
505    }
506
507    /// Returns the next buffer size to allocate and updates internal state.
508    pub fn next_size(&mut self) -> u32 {
509        match self {
510            Self::Fixed { size } => *size,
511            Self::Exponential {
512                current_size,
513                max_size,
514            } => {
515                let result = *current_size;
516                if *current_size < *max_size {
517                    *current_size = current_size.saturating_mul(2).min(*max_size);
518                }
519                result
520            }
521        }
522    }
523}
524
525enum BuffersWithOffsets {
526    AllKept {
527        buffers: Arc<[ByteBuffer]>,
528        offsets: Option<Vec<u32>>,
529    },
530    SomeCompacted {
531        buffers: Vec<Option<ByteBuffer>>,
532        offsets: Option<Vec<u32>>,
533    },
534}
535
536impl BuffersWithOffsets {
537    pub fn from_array(array: &VarBinViewArray, compaction_threshold: f64) -> Self {
538        if compaction_threshold == 0.0 {
539            return Self::AllKept {
540                buffers: array.buffers().clone(),
541                offsets: None,
542            };
543        }
544
545        let buffer_utilizations = array.buffer_utilizations();
546        let mut has_rewrite = false;
547        let mut has_nonzero_offset = false;
548        for utilization in buffer_utilizations.iter() {
549            match compaction_strategy(utilization, compaction_threshold) {
550                CompactionStrategy::KeepFull => continue,
551                CompactionStrategy::Slice { .. } => has_nonzero_offset = true,
552                CompactionStrategy::Rewrite => has_rewrite = true,
553            }
554        }
555
556        let buffers_with_offsets_iter =
557            buffer_utilizations
558                .iter()
559                .zip(array.buffers().iter())
560                .map(|(utilization, buffer)| {
561                    match compaction_strategy(utilization, compaction_threshold) {
562                        CompactionStrategy::KeepFull => (Some(buffer.clone()), 0),
563                        CompactionStrategy::Slice { start, end } => {
564                            (Some(buffer.slice(start as usize..end as usize)), start)
565                        }
566                        CompactionStrategy::Rewrite => (None, 0),
567                    }
568                });
569
570        match (has_rewrite, has_nonzero_offset) {
571            // keep all buffers
572            (false, false) => {
573                let buffers: Vec<_> = buffers_with_offsets_iter
574                    .map(|(b, _)| b.vortex_expect("already checked for rewrite"))
575                    .collect();
576                Self::AllKept {
577                    buffers: Arc::from(buffers),
578                    offsets: None,
579                }
580            }
581            // rewrite, all zero offsets
582            (true, false) => {
583                let buffers: Vec<_> = buffers_with_offsets_iter.map(|(b, _)| b).collect();
584                Self::SomeCompacted {
585                    buffers,
586                    offsets: None,
587                }
588            }
589            // keep all buffers, but some have offsets
590            (false, true) => {
591                let (buffers, offsets): (Vec<_>, _) = buffers_with_offsets_iter
592                    .map(|(buffer, offset)| {
593                        (buffer.vortex_expect("already checked for rewrite"), offset)
594                    })
595                    .collect();
596                Self::AllKept {
597                    buffers: Arc::from(buffers),
598                    offsets: Some(offsets),
599                }
600            }
601            // rewrite and some have offsets
602            (true, true) => {
603                let (buffers, offsets) = buffers_with_offsets_iter.collect();
604                Self::SomeCompacted {
605                    buffers,
606                    offsets: Some(offsets),
607                }
608            }
609        }
610    }
611}
612
613#[derive(Debug, Clone, Copy, PartialEq, Eq)]
614enum CompactionStrategy {
615    KeepFull,
616    /// Slice the buffer to [start, end) range
617    Slice {
618        start: u32,
619        end: u32,
620    },
621    /// Rewrite data into new compacted buffer
622    Rewrite,
623}
624
625fn compaction_strategy(
626    buffer_utilization: &BufferUtilization,
627    threshold: f64,
628) -> CompactionStrategy {
629    match buffer_utilization.overall_utilization() {
630        // rewrite empty or not used buffers TODO(os): maybe keep them
631        0.0 => CompactionStrategy::Rewrite,
632        utilised if utilised >= threshold => CompactionStrategy::KeepFull,
633        _ if buffer_utilization.range_utilization() >= threshold => {
634            let Range { start, end } = buffer_utilization.range();
635            CompactionStrategy::Slice { start, end }
636        }
637        _ => CompactionStrategy::Rewrite,
638    }
639}
640
641enum ViewAdjustment {
642    Precomputed(PrecomputedViewAdjustment),
643    Rewriting(RewritingViewAdjustment),
644}
645
646impl ViewAdjustment {
647    fn shift(buffer_offset: u32, offsets: Option<Vec<u32>>) -> Self {
648        Self::Precomputed(PrecomputedViewAdjustment::Shift {
649            buffer_offset,
650            offsets,
651        })
652    }
653
654    fn lookup(buffer_lookup: Vec<u32>, offsets: Option<Vec<u32>>) -> Self {
655        Self::Precomputed(PrecomputedViewAdjustment::Lookup {
656            buffer_lookup,
657            offsets,
658        })
659    }
660
661    fn rewriting(buffer_lookup: Vec<Option<u32>>, offsets: Option<Vec<u32>>) -> Self {
662        Self::Rewriting(RewritingViewAdjustment {
663            buffer_lookup,
664            offsets,
665        })
666    }
667}
668
669// Care when adding new variants or fields in this enum, it will mess with inlining if it gets too big
670enum PrecomputedViewAdjustment {
671    Shift {
672        buffer_offset: u32,
673        offsets: Option<Vec<u32>>,
674    },
675    Lookup {
676        buffer_lookup: Vec<u32>,
677        offsets: Option<Vec<u32>>,
678    },
679}
680
681impl PrecomputedViewAdjustment {
682    #[inline]
683    fn adjust_view(&self, view: &BinaryView) -> BinaryView {
684        if view.is_inlined() {
685            return *view;
686        }
687        let view_ref = view.as_view();
688        match self {
689            Self::Shift {
690                buffer_offset,
691                offsets,
692            } => {
693                let b_idx = view_ref.buffer_index();
694                let offset_shift = offsets
695                    .as_ref()
696                    .map(|o| o[b_idx as usize])
697                    .unwrap_or_default();
698                view_ref
699                    .with_buffer_and_offset(b_idx + buffer_offset, view_ref.offset() - offset_shift)
700            }
701            Self::Lookup {
702                buffer_lookup,
703                offsets,
704            } => {
705                let b_idx = view_ref.buffer_index();
706                let buffer = buffer_lookup[b_idx as usize];
707                let offset_shift = offsets
708                    .as_ref()
709                    .map(|o| o[b_idx as usize])
710                    .unwrap_or_default();
711                view_ref.with_buffer_and_offset(buffer, view_ref.offset() - offset_shift)
712            }
713        }
714        .into()
715    }
716}
717
718struct RewritingViewAdjustment {
719    buffer_lookup: Vec<Option<u32>>,
720    offsets: Option<Vec<u32>>,
721}
722
723impl RewritingViewAdjustment {
724    /// Can return None if this view can't be adjusted, because there is no precomputed lookup
725    /// for the current buffer.
726    fn adjust_view(&self, view: &BinaryView) -> Option<BinaryView> {
727        if view.is_inlined() {
728            return Some(*view);
729        }
730
731        let view_ref = view.as_view();
732        self.buffer_lookup[view_ref.buffer_index() as usize].map(|buffer| {
733            let offset_shift = self
734                .offsets
735                .as_ref()
736                .map(|o| o[view_ref.buffer_index() as usize])
737                .unwrap_or_default();
738            view_ref
739                .with_buffer_and_offset(buffer, view_ref.offset() - offset_shift)
740                .into()
741        })
742    }
743}
744
745#[cfg(test)]
746mod tests {
747    use std::str::from_utf8;
748
749    use itertools::Itertools;
750    use vortex_dtype::{DType, Nullability};
751
752    use crate::accessor::ArrayAccessor;
753    use crate::arrays::VarBinViewVTable;
754    use crate::builders::{ArrayBuilder, VarBinViewBuilder};
755
756    #[test]
757    fn test_utf8_builder() {
758        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
759
760        builder.append_value("Hello");
761        builder.append_null();
762        builder.append_value("World");
763
764        builder.append_nulls(2);
765
766        builder.append_zeros(2);
767        builder.append_value("test");
768
769        let arr = builder.finish();
770
771        let arr = arr
772            .as_::<VarBinViewVTable>()
773            .with_iterator(|iter| {
774                iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
775                    .collect_vec()
776            })
777            .unwrap();
778        assert_eq!(arr.len(), 8);
779        assert_eq!(
780            arr,
781            vec![
782                Some("Hello".to_string()),
783                None,
784                Some("World".to_string()),
785                None,
786                None,
787                Some("".to_string()),
788                Some("".to_string()),
789                Some("test".to_string()),
790            ]
791        );
792    }
793
794    #[test]
795    fn test_utf8_builder_with_extend() {
796        let array = {
797            let mut builder =
798                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
799            builder.append_null();
800            builder.append_value("Hello2");
801            builder.finish()
802        };
803        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
804
805        builder.append_value("Hello1");
806        builder.extend_from_array(&array);
807        builder.append_nulls(2);
808        builder.append_value("Hello3");
809
810        let arr = builder.finish_into_canonical().into_varbinview();
811
812        let arr = arr
813            .with_iterator(|iter| {
814                iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
815                    .collect_vec()
816            })
817            .unwrap();
818        assert_eq!(arr.len(), 6);
819        assert_eq!(
820            arr,
821            vec![
822                Some("Hello1".to_string()),
823                None,
824                Some("Hello2".to_string()),
825                None,
826                None,
827                Some("Hello3".to_string()),
828            ]
829        );
830    }
831
832    #[test]
833    fn test_buffer_deduplication() {
834        let array = {
835            let mut builder =
836                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
837            builder.append_value("This is a long string that should not be inlined");
838            builder.append_value("short string");
839            builder.finish_into_varbinview()
840        };
841
842        assert_eq!(array.buffers().len(), 1);
843        let mut builder =
844            VarBinViewBuilder::with_buffer_deduplication(DType::Utf8(Nullability::Nullable), 10);
845
846        array.append_to_builder(&mut builder);
847        assert_eq!(builder.completed_block_count(), 1);
848
849        array.slice(1..2).append_to_builder(&mut builder);
850        array.slice(0..1).append_to_builder(&mut builder);
851        assert_eq!(builder.completed_block_count(), 1);
852
853        let array2 = {
854            let mut builder =
855                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
856            builder.append_value("This is a long string that should not be inlined");
857            builder.finish_into_varbinview()
858        };
859
860        array2.append_to_builder(&mut builder);
861        assert_eq!(builder.completed_block_count(), 2);
862
863        array.slice(0..1).append_to_builder(&mut builder);
864        array2.slice(0..1).append_to_builder(&mut builder);
865        assert_eq!(builder.completed_block_count(), 2);
866    }
867
868    #[test]
869    fn test_append_scalar() {
870        use vortex_scalar::Scalar;
871
872        // Test with Utf8 builder.
873        let mut utf8_builder =
874            VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
875
876        // Test appending a valid utf8 value.
877        let utf8_scalar1 = Scalar::utf8("hello", Nullability::Nullable);
878        utf8_builder.append_scalar(&utf8_scalar1).unwrap();
879
880        // Test appending another value.
881        let utf8_scalar2 = Scalar::utf8("world", Nullability::Nullable);
882        utf8_builder.append_scalar(&utf8_scalar2).unwrap();
883
884        // Test appending null value.
885        let null_scalar = Scalar::null(DType::Utf8(Nullability::Nullable));
886        utf8_builder.append_scalar(&null_scalar).unwrap();
887
888        let array = utf8_builder.finish();
889        assert_eq!(array.len(), 3);
890
891        // Check actual values using scalar_at.
892        use crate::array::Array;
893        let scalar0 = array.scalar_at(0).as_utf8().value();
894        assert_eq!(scalar0.as_ref().map(|s| s.as_str()), Some("hello"));
895
896        let scalar1 = array.scalar_at(1).as_utf8().value();
897        assert_eq!(scalar1.as_ref().map(|s| s.as_str()), Some("world"));
898
899        let scalar2 = array.scalar_at(2).as_utf8().value();
900        assert_eq!(scalar2, None); // This should be null.
901
902        // Test with Binary builder.
903        let mut binary_builder =
904            VarBinViewBuilder::with_capacity(DType::Binary(Nullability::Nullable), 10);
905
906        let binary_scalar = Scalar::binary(vec![1u8, 2, 3], Nullability::Nullable);
907        binary_builder.append_scalar(&binary_scalar).unwrap();
908
909        let binary_null = Scalar::null(DType::Binary(Nullability::Nullable));
910        binary_builder.append_scalar(&binary_null).unwrap();
911
912        let binary_array = binary_builder.finish();
913        assert_eq!(binary_array.len(), 2);
914
915        // Check actual binary values.
916        let binary0 = binary_array.scalar_at(0).as_binary().value();
917        assert_eq!(
918            binary0.as_ref().map(|b| b.as_slice()),
919            Some(&[1u8, 2, 3][..])
920        );
921
922        let binary1 = binary_array.scalar_at(1).as_binary().value();
923        assert_eq!(binary1, None); // This should be null.
924
925        // Test wrong dtype error.
926        let mut builder =
927            VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::NonNullable), 10);
928        let wrong_scalar = Scalar::from(42i32);
929        assert!(builder.append_scalar(&wrong_scalar).is_err());
930    }
931
932    #[test]
933    fn test_buffer_growth_strategies() {
934        use super::BufferGrowthStrategy;
935
936        // Test Fixed strategy
937        let mut strategy = BufferGrowthStrategy::fixed(1024);
938
939        // Should always return the fixed size
940        assert_eq!(strategy.next_size(), 1024);
941        assert_eq!(strategy.next_size(), 1024);
942        assert_eq!(strategy.next_size(), 1024);
943
944        // Test Exponential strategy
945        let mut strategy = BufferGrowthStrategy::exponential(1024, 8192);
946
947        // Should double each time until hitting max_size
948        assert_eq!(strategy.next_size(), 1024); // First: 1024
949        assert_eq!(strategy.next_size(), 2048); // Second: 2048
950        assert_eq!(strategy.next_size(), 4096); // Third: 4096
951        assert_eq!(strategy.next_size(), 8192); // Fourth: 8192 (max)
952        assert_eq!(strategy.next_size(), 8192); // Fifth: 8192 (capped)
953    }
954
955    #[test]
956    fn test_large_value_allocation() {
957        use super::{BufferGrowthStrategy, VarBinViewBuilder};
958
959        let mut builder = VarBinViewBuilder::new(
960            DType::Binary(Nullability::Nullable),
961            10,
962            Default::default(),
963            BufferGrowthStrategy::exponential(1024, 4096),
964            0.0,
965        );
966
967        // Create a value larger than max_size
968        let large_value = vec![0u8; 8192];
969
970        // Should successfully append the large value
971        builder.append_value(&large_value);
972
973        let array = builder.finish_into_varbinview();
974        assert_eq!(array.len(), 1);
975
976        // Verify the value was stored correctly
977        let retrieved = array.scalar_at(0).as_binary().value().unwrap();
978        assert_eq!(retrieved.len(), 8192);
979        assert_eq!(retrieved.as_slice(), &large_value);
980    }
981}