vortex_array/builders/
varbinview.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::ops::Range;
6use std::sync::Arc;
7
8use vortex_buffer::Buffer;
9use vortex_buffer::BufferMut;
10use vortex_buffer::ByteBuffer;
11use vortex_buffer::ByteBufferMut;
12use vortex_dtype::DType;
13use vortex_error::VortexExpect;
14use vortex_error::VortexResult;
15use vortex_error::vortex_bail;
16use vortex_error::vortex_ensure;
17use vortex_mask::Mask;
18use vortex_scalar::BinaryScalar;
19use vortex_scalar::Scalar;
20use vortex_scalar::Utf8Scalar;
21use vortex_utils::aliases::hash_map::Entry;
22use vortex_utils::aliases::hash_map::HashMap;
23use vortex_vector::binaryview::BinaryView;
24
25use crate::Array;
26use crate::ArrayRef;
27use crate::IntoArray;
28use crate::arrays::VarBinViewArray;
29use crate::arrays::compact::BufferUtilization;
30use crate::builders::ArrayBuilder;
31use crate::builders::LazyBitBufferBuilder;
32use crate::canonical::Canonical;
33use crate::canonical::ToCanonical;
34
35/// The builder for building a [`VarBinViewArray`].
36pub struct VarBinViewBuilder {
37    dtype: DType,
38    views_builder: BufferMut<BinaryView>,
39    nulls: LazyBitBufferBuilder,
40    completed: CompletedBuffers,
41    in_progress: ByteBufferMut,
42    growth_strategy: BufferGrowthStrategy,
43    compaction_threshold: f64,
44}
45
46impl VarBinViewBuilder {
47    pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
48        Self::new(dtype, capacity, Default::default(), Default::default(), 0.0)
49    }
50
51    pub fn with_buffer_deduplication(dtype: DType, capacity: usize) -> Self {
52        Self::new(
53            dtype,
54            capacity,
55            CompletedBuffers::Deduplicated(Default::default()),
56            Default::default(),
57            0.0,
58        )
59    }
60
61    pub fn with_compaction(dtype: DType, capacity: usize, compaction_threshold: f64) -> Self {
62        Self::new(
63            dtype,
64            capacity,
65            Default::default(),
66            Default::default(),
67            compaction_threshold,
68        )
69    }
70
71    pub fn new(
72        dtype: DType,
73        capacity: usize,
74        completed: CompletedBuffers,
75        growth_strategy: BufferGrowthStrategy,
76        compaction_threshold: f64,
77    ) -> Self {
78        assert!(
79            matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
80            "VarBinViewBuilder DType must be Utf8 or Binary."
81        );
82        Self {
83            views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
84            nulls: LazyBitBufferBuilder::new(capacity),
85            completed,
86            in_progress: ByteBufferMut::empty(),
87            dtype,
88            growth_strategy,
89            compaction_threshold,
90        }
91    }
92
93    fn append_value_view(&mut self, value: &[u8]) {
94        let length =
95            u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
96        if length <= 12 {
97            self.views_builder.push(BinaryView::make_view(value, 0, 0));
98            return;
99        }
100
101        let (buffer_idx, offset) = self.append_value_to_buffer(value);
102        let view = BinaryView::make_view(value, buffer_idx, offset);
103        self.views_builder.push(view);
104    }
105
106    /// Appends a value to the builder.
107    pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
108        self.append_value_view(value.as_ref());
109        self.nulls.append_non_null();
110    }
111
112    fn flush_in_progress(&mut self) {
113        if self.in_progress.is_empty() {
114            return;
115        }
116        let block = std::mem::take(&mut self.in_progress).freeze();
117
118        assert!(block.len() < u32::MAX as usize, "Block too large");
119
120        let initial_len = self.completed.len();
121        self.completed.push(block);
122        assert_eq!(
123            self.completed.len(),
124            initial_len + 1,
125            "Invalid state, just completed block already exists"
126        );
127    }
128
129    /// append a non inlined value to self.in_progress.
130    fn append_value_to_buffer(&mut self, value: &[u8]) -> (u32, u32) {
131        assert!(value.len() > 12, "must inline small strings");
132        let required_cap = self.in_progress.len() + value.len();
133        if self.in_progress.capacity() < required_cap {
134            self.flush_in_progress();
135            let next_buffer_size = self.growth_strategy.next_size() as usize;
136            let to_reserve = next_buffer_size.max(value.len());
137            self.in_progress.reserve(to_reserve);
138        }
139
140        let buffer_idx = self.completed.len();
141        let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
142        self.in_progress.extend_from_slice(value);
143
144        (buffer_idx, offset)
145    }
146
147    pub fn completed_block_count(&self) -> u32 {
148        self.completed.len()
149    }
150
151    /// Pushes buffers and pre-adjusted views into the builder.
152    ///
153    /// The provided `buffer` slices contain sections of data from a `VarBinViewArray`, and the
154    /// `views` are `BinaryView`s that have already been adjusted to reference the correct buffer
155    /// indices and offsets for this builder. All views must point to valid sections within the
156    /// provided buffers, and the validity length must match the view length.
157    ///
158    /// # Warning
159    ///
160    /// This method does not check utilization of the given buffers. Callers must provide
161    /// buffers that are fully utilized by the given adjusted views.
162    ///
163    /// # Panics
164    ///
165    /// Panics if this builder deduplicates buffers and any of the given buffers already
166    /// exist in this builder.
167    pub fn push_buffer_and_adjusted_views(
168        &mut self,
169        buffer: &[ByteBuffer],
170        views: &Buffer<BinaryView>,
171        validity_mask: Mask,
172    ) {
173        self.flush_in_progress();
174
175        let expected_completed_len = self.completed.len() as usize + buffer.len();
176        self.completed.extend_from_slice_unchecked(buffer);
177        assert_eq!(
178            self.completed.len() as usize,
179            expected_completed_len,
180            "Some buffers already exist",
181        );
182        self.views_builder.extend_trusted(views.iter().copied());
183        self.push_only_validity_mask(validity_mask);
184
185        debug_assert_eq!(self.nulls.len(), self.views_builder.len())
186    }
187
188    /// Finishes the builder directly into a [`VarBinViewArray`].
189    pub fn finish_into_varbinview(&mut self) -> VarBinViewArray {
190        self.flush_in_progress();
191        let buffers = std::mem::take(&mut self.completed);
192
193        assert_eq!(
194            self.views_builder.len(),
195            self.nulls.len(),
196            "View and validity length must match"
197        );
198
199        let validity = self.nulls.finish_with_nullability(self.dtype.nullability());
200
201        // SAFETY: the builder methods check safety at each step.
202        unsafe {
203            VarBinViewArray::new_unchecked(
204                std::mem::take(&mut self.views_builder).freeze(),
205                buffers.finish(),
206                self.dtype.clone(),
207                validity,
208            )
209        }
210    }
211
212    // Pushes a validity mask into the builder not affecting the views or buffers
213    fn push_only_validity_mask(&mut self, validity_mask: Mask) {
214        self.nulls.append_validity_mask(validity_mask);
215    }
216}
217
218impl ArrayBuilder for VarBinViewBuilder {
219    fn as_any(&self) -> &dyn Any {
220        self
221    }
222
223    fn as_any_mut(&mut self) -> &mut dyn Any {
224        self
225    }
226
227    fn dtype(&self) -> &DType {
228        &self.dtype
229    }
230
231    fn len(&self) -> usize {
232        self.nulls.len()
233    }
234
235    fn append_zeros(&mut self, n: usize) {
236        self.views_builder.push_n(BinaryView::empty_view(), n);
237        self.nulls.append_n_non_nulls(n);
238    }
239
240    unsafe fn append_nulls_unchecked(&mut self, n: usize) {
241        self.views_builder.push_n(BinaryView::empty_view(), n);
242        self.nulls.append_n_nulls(n);
243    }
244
245    fn append_scalar(&mut self, scalar: &Scalar) -> VortexResult<()> {
246        vortex_ensure!(
247            scalar.dtype() == self.dtype(),
248            "VarBinViewBuilder expected scalar with dtype {:?}, got {:?}",
249            self.dtype(),
250            scalar.dtype()
251        );
252
253        match self.dtype() {
254            DType::Utf8(_) => {
255                let utf8_scalar = Utf8Scalar::try_from(scalar)?;
256                match utf8_scalar.value() {
257                    Some(value) => self.append_value(value),
258                    None => self.append_null(),
259                }
260            }
261            DType::Binary(_) => {
262                let binary_scalar = BinaryScalar::try_from(scalar)?;
263                match binary_scalar.value() {
264                    Some(value) => self.append_value(value),
265                    None => self.append_null(),
266                }
267            }
268            _ => vortex_bail!(
269                "VarBinViewBuilder can only handle Utf8 or Binary scalars, got {:?}",
270                scalar.dtype()
271            ),
272        }
273
274        Ok(())
275    }
276
277    unsafe fn extend_from_array_unchecked(&mut self, array: &dyn Array) {
278        let array = array.to_varbinview();
279        self.flush_in_progress();
280
281        self.push_only_validity_mask(array.validity_mask());
282
283        let view_adjustment =
284            self.completed
285                .extend_from_compaction(BuffersWithOffsets::from_array(
286                    &array,
287                    self.compaction_threshold,
288                ));
289
290        match view_adjustment {
291            ViewAdjustment::Precomputed(adjustment) => self.views_builder.extend_trusted(
292                array
293                    .views()
294                    .iter()
295                    .map(|view| adjustment.adjust_view(view)),
296            ),
297            ViewAdjustment::Rewriting(adjustment) => {
298                for (idx, &view) in array.views().iter().enumerate() {
299                    let new_view = if !array.is_valid(idx) {
300                        BinaryView::empty_view()
301                    } else if view.is_inlined() {
302                        view
303                    } else if let Some(adjusted) = adjustment.adjust_view(&view) {
304                        adjusted
305                    } else {
306                        let bytes = array.bytes_at(idx);
307                        let (new_buf_idx, new_offset) = self.append_value_to_buffer(&bytes);
308                        BinaryView::make_view(bytes.as_slice(), new_buf_idx, new_offset)
309                    };
310                    self.views_builder.push(new_view)
311                }
312            }
313        }
314    }
315
316    fn reserve_exact(&mut self, additional: usize) {
317        self.views_builder.reserve(additional);
318        self.nulls.reserve_exact(additional);
319    }
320
321    unsafe fn set_validity_unchecked(&mut self, validity: Mask) {
322        self.nulls = LazyBitBufferBuilder::new(validity.len());
323        self.nulls.append_validity_mask(validity);
324    }
325
326    fn finish(&mut self) -> ArrayRef {
327        self.finish_into_varbinview().into_array()
328    }
329
330    fn finish_into_canonical(&mut self) -> Canonical {
331        Canonical::VarBinView(self.finish_into_varbinview())
332    }
333}
334
335pub enum CompletedBuffers {
336    Default(Vec<ByteBuffer>),
337    Deduplicated(DeduplicatedBuffers),
338}
339
340impl Default for CompletedBuffers {
341    fn default() -> Self {
342        Self::Default(Vec::new())
343    }
344}
345
346// Self::push enforces len < u32::max
347#[allow(clippy::cast_possible_truncation)]
348impl CompletedBuffers {
349    fn len(&self) -> u32 {
350        match self {
351            Self::Default(buffers) => buffers.len() as u32,
352            Self::Deduplicated(buffers) => buffers.len(),
353        }
354    }
355
356    fn push(&mut self, block: ByteBuffer) -> u32 {
357        match self {
358            Self::Default(buffers) => {
359                assert!(buffers.len() < u32::MAX as usize, "Too many blocks");
360                buffers.push(block);
361                self.len()
362            }
363            Self::Deduplicated(buffers) => buffers.push(block),
364        }
365    }
366
367    /// Does not compact buffers, bypasses utilization checks.
368    fn extend_from_slice_unchecked(&mut self, buffers: &[ByteBuffer]) {
369        for buffer in buffers {
370            self.push(buffer.clone());
371        }
372    }
373
374    fn extend_from_compaction(&mut self, buffers: BuffersWithOffsets) -> ViewAdjustment {
375        match (self, buffers) {
376            (
377                Self::Default(completed_buffers),
378                BuffersWithOffsets::AllKept { buffers, offsets },
379            ) => {
380                let buffer_offset = completed_buffers.len() as u32;
381                completed_buffers.extend_from_slice(&buffers);
382                ViewAdjustment::shift(buffer_offset, offsets)
383            }
384            (
385                Self::Default(completed_buffers),
386                BuffersWithOffsets::SomeCompacted { buffers, offsets },
387            ) => {
388                let lookup = buffers
389                    .iter()
390                    .map(|maybe_buffer| {
391                        maybe_buffer.as_ref().map(|buffer| {
392                            completed_buffers.push(buffer.clone());
393                            completed_buffers.len() as u32 - 1
394                        })
395                    })
396                    .collect();
397                ViewAdjustment::rewriting(lookup, offsets)
398            }
399
400            (
401                Self::Deduplicated(completed_buffers),
402                BuffersWithOffsets::AllKept { buffers, offsets },
403            ) => {
404                let buffer_lookup = completed_buffers.extend_from_slice(&buffers);
405                ViewAdjustment::lookup(buffer_lookup, offsets)
406            }
407            (
408                Self::Deduplicated(completed_buffers),
409                BuffersWithOffsets::SomeCompacted { buffers, offsets },
410            ) => {
411                let buffer_lookup = completed_buffers.extend_from_option_slice(&buffers);
412                ViewAdjustment::rewriting(buffer_lookup, offsets)
413            }
414        }
415    }
416
417    fn finish(self) -> Arc<[ByteBuffer]> {
418        match self {
419            Self::Default(buffers) => Arc::from(buffers),
420            Self::Deduplicated(buffers) => buffers.finish(),
421        }
422    }
423}
424
425#[derive(Default)]
426pub struct DeduplicatedBuffers {
427    buffers: Vec<ByteBuffer>,
428    buffer_to_idx: HashMap<BufferId, u32>,
429}
430
431impl DeduplicatedBuffers {
432    // Self::push enforces len < u32::max
433    #[allow(clippy::cast_possible_truncation)]
434    fn len(&self) -> u32 {
435        self.buffers.len() as u32
436    }
437
438    /// Push a new block if not seen before. Returns the idx of the block.
439    pub(crate) fn push(&mut self, block: ByteBuffer) -> u32 {
440        assert!(self.buffers.len() < u32::MAX as usize, "Too many blocks");
441
442        let initial_len = self.len();
443        let id = BufferId::from(&block);
444        match self.buffer_to_idx.entry(id) {
445            Entry::Occupied(idx) => *idx.get(),
446            Entry::Vacant(entry) => {
447                let idx = initial_len;
448                entry.insert(idx);
449                self.buffers.push(block);
450                idx
451            }
452        }
453    }
454
455    pub(crate) fn extend_from_option_slice(
456        &mut self,
457        buffers: &[Option<ByteBuffer>],
458    ) -> Vec<Option<u32>> {
459        buffers
460            .iter()
461            .map(|buffer| buffer.as_ref().map(|buf| self.push(buf.clone())))
462            .collect()
463    }
464
465    pub(crate) fn extend_from_slice(&mut self, buffers: &[ByteBuffer]) -> Vec<u32> {
466        buffers
467            .iter()
468            .map(|buffer| self.push(buffer.clone()))
469            .collect()
470    }
471
472    pub(crate) fn finish(self) -> Arc<[ByteBuffer]> {
473        Arc::from(self.buffers)
474    }
475}
476
477#[derive(PartialEq, Eq, Hash)]
478struct BufferId {
479    // *const u8 stored as usize for `Send`
480    ptr: usize,
481    len: usize,
482}
483
484impl BufferId {
485    fn from(buffer: &ByteBuffer) -> Self {
486        let slice = buffer.as_slice();
487        Self {
488            ptr: slice.as_ptr() as usize,
489            len: slice.len(),
490        }
491    }
492}
493
494#[derive(Debug, Clone)]
495pub enum BufferGrowthStrategy {
496    /// Use a fixed buffer size for all allocations.
497    Fixed { size: u32 },
498    /// Use exponential growth starting from initial_size, doubling until max_size.
499    Exponential { current_size: u32, max_size: u32 },
500}
501
502impl Default for BufferGrowthStrategy {
503    fn default() -> Self {
504        Self::Exponential {
505            current_size: 4 * 1024,    // 4KB starting size
506            max_size: 2 * 1024 * 1024, // 2MB max size
507        }
508    }
509}
510
511impl BufferGrowthStrategy {
512    pub fn fixed(size: u32) -> Self {
513        Self::Fixed { size }
514    }
515
516    pub fn exponential(initial_size: u32, max_size: u32) -> Self {
517        Self::Exponential {
518            current_size: initial_size,
519            max_size,
520        }
521    }
522
523    /// Returns the next buffer size to allocate and updates internal state.
524    pub fn next_size(&mut self) -> u32 {
525        match self {
526            Self::Fixed { size } => *size,
527            Self::Exponential {
528                current_size,
529                max_size,
530            } => {
531                let result = *current_size;
532                if *current_size < *max_size {
533                    *current_size = current_size.saturating_mul(2).min(*max_size);
534                }
535                result
536            }
537        }
538    }
539}
540
541enum BuffersWithOffsets {
542    AllKept {
543        buffers: Arc<[ByteBuffer]>,
544        offsets: Option<Vec<u32>>,
545    },
546    SomeCompacted {
547        buffers: Vec<Option<ByteBuffer>>,
548        offsets: Option<Vec<u32>>,
549    },
550}
551
552impl BuffersWithOffsets {
553    pub fn from_array(array: &VarBinViewArray, compaction_threshold: f64) -> Self {
554        if compaction_threshold == 0.0 {
555            return Self::AllKept {
556                buffers: array.buffers().clone(),
557                offsets: None,
558            };
559        }
560
561        let buffer_utilizations = array.buffer_utilizations();
562        let mut has_rewrite = false;
563        let mut has_nonzero_offset = false;
564        for utilization in buffer_utilizations.iter() {
565            match compaction_strategy(utilization, compaction_threshold) {
566                CompactionStrategy::KeepFull => continue,
567                CompactionStrategy::Slice { .. } => has_nonzero_offset = true,
568                CompactionStrategy::Rewrite => has_rewrite = true,
569            }
570        }
571
572        let buffers_with_offsets_iter =
573            buffer_utilizations
574                .iter()
575                .zip(array.buffers().iter())
576                .map(|(utilization, buffer)| {
577                    match compaction_strategy(utilization, compaction_threshold) {
578                        CompactionStrategy::KeepFull => (Some(buffer.clone()), 0),
579                        CompactionStrategy::Slice { start, end } => {
580                            (Some(buffer.slice(start as usize..end as usize)), start)
581                        }
582                        CompactionStrategy::Rewrite => (None, 0),
583                    }
584                });
585
586        match (has_rewrite, has_nonzero_offset) {
587            // keep all buffers
588            (false, false) => {
589                let buffers: Vec<_> = buffers_with_offsets_iter
590                    .map(|(b, _)| b.vortex_expect("already checked for rewrite"))
591                    .collect();
592                Self::AllKept {
593                    buffers: Arc::from(buffers),
594                    offsets: None,
595                }
596            }
597            // rewrite, all zero offsets
598            (true, false) => {
599                let buffers: Vec<_> = buffers_with_offsets_iter.map(|(b, _)| b).collect();
600                Self::SomeCompacted {
601                    buffers,
602                    offsets: None,
603                }
604            }
605            // keep all buffers, but some have offsets
606            (false, true) => {
607                let (buffers, offsets): (Vec<_>, _) = buffers_with_offsets_iter
608                    .map(|(buffer, offset)| {
609                        (buffer.vortex_expect("already checked for rewrite"), offset)
610                    })
611                    .collect();
612                Self::AllKept {
613                    buffers: Arc::from(buffers),
614                    offsets: Some(offsets),
615                }
616            }
617            // rewrite and some have offsets
618            (true, true) => {
619                let (buffers, offsets) = buffers_with_offsets_iter.collect();
620                Self::SomeCompacted {
621                    buffers,
622                    offsets: Some(offsets),
623                }
624            }
625        }
626    }
627}
628
629#[derive(Debug, Clone, Copy, PartialEq, Eq)]
630enum CompactionStrategy {
631    KeepFull,
632    /// Slice the buffer to [start, end) range
633    Slice {
634        start: u32,
635        end: u32,
636    },
637    /// Rewrite data into new compacted buffer
638    Rewrite,
639}
640
641fn compaction_strategy(
642    buffer_utilization: &BufferUtilization,
643    threshold: f64,
644) -> CompactionStrategy {
645    match buffer_utilization.overall_utilization() {
646        // rewrite empty or not used buffers TODO(os): maybe keep them
647        0.0 => CompactionStrategy::Rewrite,
648        utilised if utilised >= threshold => CompactionStrategy::KeepFull,
649        _ if buffer_utilization.range_utilization() >= threshold => {
650            let Range { start, end } = buffer_utilization.range();
651            CompactionStrategy::Slice { start, end }
652        }
653        _ => CompactionStrategy::Rewrite,
654    }
655}
656
657enum ViewAdjustment {
658    Precomputed(PrecomputedViewAdjustment),
659    Rewriting(RewritingViewAdjustment),
660}
661
662impl ViewAdjustment {
663    fn shift(buffer_offset: u32, offsets: Option<Vec<u32>>) -> Self {
664        Self::Precomputed(PrecomputedViewAdjustment::Shift {
665            buffer_offset,
666            offsets,
667        })
668    }
669
670    fn lookup(buffer_lookup: Vec<u32>, offsets: Option<Vec<u32>>) -> Self {
671        Self::Precomputed(PrecomputedViewAdjustment::Lookup {
672            buffer_lookup,
673            offsets,
674        })
675    }
676
677    fn rewriting(buffer_lookup: Vec<Option<u32>>, offsets: Option<Vec<u32>>) -> Self {
678        Self::Rewriting(RewritingViewAdjustment {
679            buffer_lookup,
680            offsets,
681        })
682    }
683}
684
685// Care when adding new variants or fields in this enum, it will mess with inlining if it gets too big
686enum PrecomputedViewAdjustment {
687    Shift {
688        buffer_offset: u32,
689        offsets: Option<Vec<u32>>,
690    },
691    Lookup {
692        buffer_lookup: Vec<u32>,
693        offsets: Option<Vec<u32>>,
694    },
695}
696
697impl PrecomputedViewAdjustment {
698    #[inline]
699    fn adjust_view(&self, view: &BinaryView) -> BinaryView {
700        if view.is_inlined() {
701            return *view;
702        }
703        let view_ref = view.as_view();
704        match self {
705            Self::Shift {
706                buffer_offset,
707                offsets,
708            } => {
709                let b_idx = view_ref.buffer_index;
710                let offset_shift = offsets
711                    .as_ref()
712                    .map(|o| o[b_idx as usize])
713                    .unwrap_or_default();
714
715                // If offset < offset_shift, this view was invalid and wasn't counted in buffer_utilizations.
716                // Return an empty view to match how invalid views are handled in the Rewriting path.
717                if view_ref.offset < offset_shift {
718                    return BinaryView::empty_view();
719                }
720
721                view_ref
722                    .with_buffer_and_offset(b_idx + buffer_offset, view_ref.offset - offset_shift)
723            }
724            Self::Lookup {
725                buffer_lookup,
726                offsets,
727            } => {
728                let b_idx = view_ref.buffer_index;
729                let buffer = buffer_lookup[b_idx as usize];
730                let offset_shift = offsets
731                    .as_ref()
732                    .map(|o| o[b_idx as usize])
733                    .unwrap_or_default();
734
735                // If offset < offset_shift, this view was invalid and wasn't counted in buffer_utilizations.
736                // Return an empty view to match how invalid views are handled in the Rewriting path.
737                if view_ref.offset < offset_shift {
738                    return BinaryView::empty_view();
739                }
740
741                view_ref.with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
742            }
743        }
744        .into()
745    }
746}
747
748struct RewritingViewAdjustment {
749    buffer_lookup: Vec<Option<u32>>,
750    offsets: Option<Vec<u32>>,
751}
752
753impl RewritingViewAdjustment {
754    /// Can return None if this view can't be adjusted, because there is no precomputed lookup
755    /// for the current buffer.
756    fn adjust_view(&self, view: &BinaryView) -> Option<BinaryView> {
757        if view.is_inlined() {
758            return Some(*view);
759        }
760
761        let view_ref = view.as_view();
762        self.buffer_lookup[view_ref.buffer_index as usize].map(|buffer| {
763            let offset_shift = self
764                .offsets
765                .as_ref()
766                .map(|o| o[view_ref.buffer_index as usize])
767                .unwrap_or_default();
768            view_ref
769                .with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
770                .into()
771        })
772    }
773}
774
775#[cfg(test)]
776mod tests {
777    use vortex_dtype::DType;
778    use vortex_dtype::Nullability;
779
780    use crate::IntoArray;
781    use crate::arrays::VarBinViewArray;
782    use crate::assert_arrays_eq;
783    use crate::builders::ArrayBuilder;
784    use crate::builders::VarBinViewBuilder;
785
786    #[test]
787    fn test_utf8_builder() {
788        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
789
790        builder.append_value("Hello");
791        builder.append_null();
792        builder.append_value("World");
793
794        builder.append_nulls(2);
795
796        builder.append_zeros(2);
797        builder.append_value("test");
798
799        let actual = builder.finish();
800        let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
801            Some("Hello"),
802            None,
803            Some("World"),
804            None,
805            None,
806            Some(""),
807            Some(""),
808            Some("test"),
809        ]);
810        assert_arrays_eq!(actual, expected);
811    }
812
813    #[test]
814    fn test_utf8_builder_with_extend() {
815        let array = {
816            let mut builder =
817                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
818            builder.append_null();
819            builder.append_value("Hello2");
820            builder.finish()
821        };
822        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
823
824        builder.append_value("Hello1");
825        builder.extend_from_array(&array);
826        builder.append_nulls(2);
827        builder.append_value("Hello3");
828
829        let actual = builder.finish_into_canonical();
830        let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
831            Some("Hello1"),
832            None,
833            Some("Hello2"),
834            None,
835            None,
836            Some("Hello3"),
837        ]);
838        assert_arrays_eq!(actual.into_array(), expected.into_array());
839    }
840
841    #[test]
842    fn test_buffer_deduplication() {
843        let array = {
844            let mut builder =
845                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
846            builder.append_value("This is a long string that should not be inlined");
847            builder.append_value("short string");
848            builder.finish_into_varbinview()
849        };
850
851        assert_eq!(array.buffers().len(), 1);
852        let mut builder =
853            VarBinViewBuilder::with_buffer_deduplication(DType::Utf8(Nullability::Nullable), 10);
854
855        array.append_to_builder(&mut builder);
856        assert_eq!(builder.completed_block_count(), 1);
857
858        array.slice(1..2).append_to_builder(&mut builder);
859        array.slice(0..1).append_to_builder(&mut builder);
860        assert_eq!(builder.completed_block_count(), 1);
861
862        let array2 = {
863            let mut builder =
864                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
865            builder.append_value("This is a long string that should not be inlined");
866            builder.finish_into_varbinview()
867        };
868
869        array2.append_to_builder(&mut builder);
870        assert_eq!(builder.completed_block_count(), 2);
871
872        array.slice(0..1).append_to_builder(&mut builder);
873        array2.slice(0..1).append_to_builder(&mut builder);
874        assert_eq!(builder.completed_block_count(), 2);
875    }
876
877    #[test]
878    fn test_append_scalar() {
879        use vortex_scalar::Scalar;
880
881        // Test with Utf8 builder.
882        let mut utf8_builder =
883            VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
884
885        // Test appending a valid utf8 value.
886        let utf8_scalar1 = Scalar::utf8("hello", Nullability::Nullable);
887        utf8_builder.append_scalar(&utf8_scalar1).unwrap();
888
889        // Test appending another value.
890        let utf8_scalar2 = Scalar::utf8("world", Nullability::Nullable);
891        utf8_builder.append_scalar(&utf8_scalar2).unwrap();
892
893        // Test appending null value.
894        let null_scalar = Scalar::null(DType::Utf8(Nullability::Nullable));
895        utf8_builder.append_scalar(&null_scalar).unwrap();
896
897        let array = utf8_builder.finish();
898        let expected =
899            <VarBinViewArray as FromIterator<_>>::from_iter([Some("hello"), Some("world"), None]);
900        assert_arrays_eq!(&array, &expected);
901
902        // Test with Binary builder.
903        let mut binary_builder =
904            VarBinViewBuilder::with_capacity(DType::Binary(Nullability::Nullable), 10);
905
906        let binary_scalar = Scalar::binary(vec![1u8, 2, 3], Nullability::Nullable);
907        binary_builder.append_scalar(&binary_scalar).unwrap();
908
909        let binary_null = Scalar::null(DType::Binary(Nullability::Nullable));
910        binary_builder.append_scalar(&binary_null).unwrap();
911
912        let binary_array = binary_builder.finish();
913        let expected =
914            <VarBinViewArray as FromIterator<_>>::from_iter([Some(vec![1u8, 2, 3]), None]);
915        assert_arrays_eq!(&binary_array, &expected);
916
917        // Test wrong dtype error.
918        let mut builder =
919            VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::NonNullable), 10);
920        let wrong_scalar = Scalar::from(42i32);
921        assert!(builder.append_scalar(&wrong_scalar).is_err());
922    }
923
924    #[test]
925    fn test_buffer_growth_strategies() {
926        use super::BufferGrowthStrategy;
927
928        // Test Fixed strategy
929        let mut strategy = BufferGrowthStrategy::fixed(1024);
930
931        // Should always return the fixed size
932        assert_eq!(strategy.next_size(), 1024);
933        assert_eq!(strategy.next_size(), 1024);
934        assert_eq!(strategy.next_size(), 1024);
935
936        // Test Exponential strategy
937        let mut strategy = BufferGrowthStrategy::exponential(1024, 8192);
938
939        // Should double each time until hitting max_size
940        assert_eq!(strategy.next_size(), 1024); // First: 1024
941        assert_eq!(strategy.next_size(), 2048); // Second: 2048
942        assert_eq!(strategy.next_size(), 4096); // Third: 4096
943        assert_eq!(strategy.next_size(), 8192); // Fourth: 8192 (max)
944        assert_eq!(strategy.next_size(), 8192); // Fifth: 8192 (capped)
945    }
946
947    #[test]
948    fn test_large_value_allocation() {
949        use super::BufferGrowthStrategy;
950        use super::VarBinViewBuilder;
951
952        let mut builder = VarBinViewBuilder::new(
953            DType::Binary(Nullability::Nullable),
954            10,
955            Default::default(),
956            BufferGrowthStrategy::exponential(1024, 4096),
957            0.0,
958        );
959
960        // Create a value larger than max_size
961        let large_value = vec![0u8; 8192];
962
963        // Should successfully append the large value
964        builder.append_value(&large_value);
965
966        let array = builder.finish_into_varbinview();
967        assert_eq!(array.len(), 1);
968
969        // Verify the value was stored correctly
970        let retrieved = array.scalar_at(0).as_binary().value().unwrap();
971        assert_eq!(retrieved.len(), 8192);
972        assert_eq!(retrieved.as_slice(), &large_value);
973    }
974}