Skip to main content

vortex_array/builders/
varbinview.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::ops::Range;
6use std::sync::Arc;
7
8use itertools::Itertools;
9use vortex_buffer::Buffer;
10use vortex_buffer::BufferMut;
11use vortex_buffer::ByteBuffer;
12use vortex_buffer::ByteBufferMut;
13use vortex_error::VortexExpect;
14use vortex_error::VortexResult;
15use vortex_error::vortex_bail;
16use vortex_error::vortex_ensure;
17use vortex_mask::Mask;
18use vortex_utils::aliases::hash_map::Entry;
19use vortex_utils::aliases::hash_map::HashMap;
20
21use crate::ArrayRef;
22use crate::IntoArray;
23use crate::arrays::BinaryView;
24use crate::arrays::VarBinViewArray;
25use crate::arrays::compact::BufferUtilization;
26use crate::builders::ArrayBuilder;
27use crate::builders::LazyBitBufferBuilder;
28use crate::canonical::Canonical;
29use crate::canonical::ToCanonical;
30use crate::dtype::DType;
31use crate::scalar::Scalar;
32
33/// The builder for building a [`VarBinViewArray`].
34pub struct VarBinViewBuilder {
35    dtype: DType,
36    views_builder: BufferMut<BinaryView>,
37    nulls: LazyBitBufferBuilder,
38    completed: CompletedBuffers,
39    in_progress: ByteBufferMut,
40    growth_strategy: BufferGrowthStrategy,
41    compaction_threshold: f64,
42}
43
44impl VarBinViewBuilder {
45    pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
46        Self::new(dtype, capacity, Default::default(), Default::default(), 0.0)
47    }
48
49    pub fn with_buffer_deduplication(dtype: DType, capacity: usize) -> Self {
50        Self::new(
51            dtype,
52            capacity,
53            CompletedBuffers::Deduplicated(Default::default()),
54            Default::default(),
55            0.0,
56        )
57    }
58
59    pub fn with_compaction(dtype: DType, capacity: usize, compaction_threshold: f64) -> Self {
60        Self::new(
61            dtype,
62            capacity,
63            Default::default(),
64            Default::default(),
65            compaction_threshold,
66        )
67    }
68
69    pub fn new(
70        dtype: DType,
71        capacity: usize,
72        completed: CompletedBuffers,
73        growth_strategy: BufferGrowthStrategy,
74        compaction_threshold: f64,
75    ) -> Self {
76        assert!(
77            matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
78            "VarBinViewBuilder DType must be Utf8 or Binary."
79        );
80        Self {
81            views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
82            nulls: LazyBitBufferBuilder::new(capacity),
83            completed,
84            in_progress: ByteBufferMut::empty(),
85            dtype,
86            growth_strategy,
87            compaction_threshold,
88        }
89    }
90
91    fn append_value_view(&mut self, value: &[u8]) {
92        let length =
93            u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
94        if length <= 12 {
95            self.views_builder.push(BinaryView::make_view(value, 0, 0));
96            return;
97        }
98
99        let (buffer_idx, offset) = self.append_value_to_buffer(value);
100        let view = BinaryView::make_view(value, buffer_idx, offset);
101        self.views_builder.push(view);
102    }
103
104    /// Appends a value to the builder.
105    pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
106        self.append_value_view(value.as_ref());
107        self.nulls.append_non_null();
108    }
109
110    fn flush_in_progress(&mut self) {
111        if self.in_progress.is_empty() {
112            return;
113        }
114        let block = std::mem::take(&mut self.in_progress).freeze();
115
116        assert!(block.len() < u32::MAX as usize, "Block too large");
117
118        let initial_len = self.completed.len();
119        self.completed.push(block);
120        assert_eq!(
121            self.completed.len(),
122            initial_len + 1,
123            "Invalid state, just completed block already exists"
124        );
125    }
126
127    /// append a non inlined value to self.in_progress.
128    fn append_value_to_buffer(&mut self, value: &[u8]) -> (u32, u32) {
129        assert!(value.len() > 12, "must inline small strings");
130        let required_cap = self.in_progress.len() + value.len();
131        if self.in_progress.capacity() < required_cap {
132            self.flush_in_progress();
133            let next_buffer_size = self.growth_strategy.next_size() as usize;
134            let to_reserve = next_buffer_size.max(value.len());
135            self.in_progress.reserve(to_reserve);
136        }
137
138        let buffer_idx = self.completed.len();
139        let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
140        self.in_progress.extend_from_slice(value);
141
142        (buffer_idx, offset)
143    }
144
145    pub fn completed_block_count(&self) -> u32 {
146        self.completed.len()
147    }
148
149    /// Pushes buffers and pre-adjusted views into the builder.
150    ///
151    /// The provided `buffers` contain sections of data from a `VarBinViewArray`, and the
152    /// `views` are `BinaryView`s that have already been adjusted to reference the correct buffer
153    /// indices and offsets for this builder. All views must point to valid sections within the
154    /// provided buffers, and the validity length must match the view length.
155    ///
156    /// # Warning
157    ///
158    /// This method does not check utilization of the given buffers. Callers must provide
159    /// buffers that are fully utilized by the given adjusted views.
160    ///
161    /// # Panics
162    ///
163    /// Panics if this builder deduplicates buffers and any of the given buffers already
164    /// exist in this builder.
165    pub fn push_buffer_and_adjusted_views(
166        &mut self,
167        buffers: &[ByteBuffer],
168        views: &Buffer<BinaryView>,
169        validity_mask: Mask,
170    ) {
171        self.flush_in_progress();
172
173        let expected_completed_len = self.completed.len() as usize + buffers.len();
174        self.completed.extend_from_slice_unchecked(buffers);
175        assert_eq!(
176            self.completed.len() as usize,
177            expected_completed_len,
178            "Some buffers already exist",
179        );
180        self.views_builder.extend_trusted(views.iter().copied());
181        self.push_only_validity_mask(validity_mask);
182
183        debug_assert_eq!(self.nulls.len(), self.views_builder.len())
184    }
185
186    /// Finishes the builder directly into a [`VarBinViewArray`].
187    pub fn finish_into_varbinview(&mut self) -> VarBinViewArray {
188        self.flush_in_progress();
189        let buffers = std::mem::take(&mut self.completed);
190
191        assert_eq!(
192            self.views_builder.len(),
193            self.nulls.len(),
194            "View and validity length must match"
195        );
196
197        let validity = self.nulls.finish_with_nullability(self.dtype.nullability());
198
199        // SAFETY: the builder methods check safety at each step.
200        unsafe {
201            VarBinViewArray::new_unchecked(
202                std::mem::take(&mut self.views_builder).freeze(),
203                buffers.finish(),
204                self.dtype.clone(),
205                validity,
206            )
207        }
208    }
209
210    // Pushes a validity mask into the builder not affecting the views or buffers
211    fn push_only_validity_mask(&mut self, validity_mask: Mask) {
212        self.nulls.append_validity_mask(validity_mask);
213    }
214}
215
216impl ArrayBuilder for VarBinViewBuilder {
217    fn as_any(&self) -> &dyn Any {
218        self
219    }
220
221    fn as_any_mut(&mut self) -> &mut dyn Any {
222        self
223    }
224
225    fn dtype(&self) -> &DType {
226        &self.dtype
227    }
228
229    fn len(&self) -> usize {
230        self.nulls.len()
231    }
232
233    fn append_zeros(&mut self, n: usize) {
234        self.views_builder.push_n(BinaryView::empty_view(), n);
235        self.nulls.append_n_non_nulls(n);
236    }
237
238    unsafe fn append_nulls_unchecked(&mut self, n: usize) {
239        self.views_builder.push_n(BinaryView::empty_view(), n);
240        self.nulls.append_n_nulls(n);
241    }
242
243    fn append_scalar(&mut self, scalar: &Scalar) -> VortexResult<()> {
244        vortex_ensure!(
245            scalar.dtype() == self.dtype(),
246            "VarBinViewBuilder expected scalar with dtype {}, got {}",
247            self.dtype(),
248            scalar.dtype()
249        );
250
251        match self.dtype() {
252            DType::Utf8(_) => match scalar.as_utf8().value() {
253                Some(value) => self.append_value(value),
254                None => self.append_null(),
255            },
256            DType::Binary(_) => match scalar.as_binary().value() {
257                Some(value) => self.append_value(value),
258                None => self.append_null(),
259            },
260            _ => vortex_bail!(
261                "VarBinViewBuilder can only handle Utf8 or Binary scalars, got {:?}",
262                scalar.dtype()
263            ),
264        }
265
266        Ok(())
267    }
268
269    unsafe fn extend_from_array_unchecked(&mut self, array: &ArrayRef) {
270        let array = array.to_varbinview();
271        self.flush_in_progress();
272
273        self.push_only_validity_mask(
274            array
275                .validity_mask()
276                .vortex_expect("validity_mask in extend_from_array_unchecked"),
277        );
278
279        let view_adjustment =
280            self.completed
281                .extend_from_compaction(BuffersWithOffsets::from_array(
282                    &array,
283                    self.compaction_threshold,
284                ));
285
286        match view_adjustment {
287            ViewAdjustment::Precomputed(adjustment) => self.views_builder.extend_trusted(
288                array
289                    .views()
290                    .iter()
291                    .map(|view| adjustment.adjust_view(view)),
292            ),
293            ViewAdjustment::Rewriting(adjustment) => match array
294                .validity_mask()
295                .vortex_expect("validity_mask in extend_from_array_unchecked")
296            {
297                Mask::AllTrue(_) => {
298                    for (idx, &view) in array.views().iter().enumerate() {
299                        let new_view = self.push_view(view, &adjustment, &array, idx);
300                        self.views_builder.push(new_view);
301                    }
302                }
303                Mask::AllFalse(_) => {
304                    self.views_builder
305                        .push_n(BinaryView::empty_view(), array.len());
306                }
307                Mask::Values(v) => {
308                    for (idx, (&view, is_valid)) in
309                        array.views().iter().zip(v.bit_buffer().iter()).enumerate()
310                    {
311                        let new_view = if !is_valid {
312                            BinaryView::empty_view()
313                        } else {
314                            self.push_view(view, &adjustment, &array, idx)
315                        };
316                        self.views_builder.push(new_view);
317                    }
318                }
319            },
320        }
321    }
322
323    fn reserve_exact(&mut self, additional: usize) {
324        self.views_builder.reserve(additional);
325        self.nulls.reserve_exact(additional);
326    }
327
328    unsafe fn set_validity_unchecked(&mut self, validity: Mask) {
329        self.nulls = LazyBitBufferBuilder::new(validity.len());
330        self.nulls.append_validity_mask(validity);
331    }
332
333    fn finish(&mut self) -> ArrayRef {
334        self.finish_into_varbinview().into_array()
335    }
336
337    fn finish_into_canonical(&mut self) -> Canonical {
338        Canonical::VarBinView(self.finish_into_varbinview())
339    }
340}
341
342impl VarBinViewBuilder {
343    #[inline]
344    fn push_view(
345        &mut self,
346        view: BinaryView,
347        adjustment: &RewritingViewAdjustment,
348        array: &VarBinViewArray,
349        idx: usize,
350    ) -> BinaryView {
351        if view.is_inlined() {
352            view
353        } else if let Some(adjusted) = adjustment.adjust_view(&view) {
354            adjusted
355        } else {
356            let bytes = array.bytes_at(idx);
357            let (new_buf_idx, new_offset) = self.append_value_to_buffer(&bytes);
358            BinaryView::make_view(bytes.as_slice(), new_buf_idx, new_offset)
359        }
360    }
361}
362
363pub enum CompletedBuffers {
364    Default(Vec<ByteBuffer>),
365    Deduplicated(DeduplicatedBuffers),
366}
367
368impl Default for CompletedBuffers {
369    fn default() -> Self {
370        Self::Default(Vec::new())
371    }
372}
373
374// Self::push enforces len < u32::max
375#[allow(clippy::cast_possible_truncation)]
376impl CompletedBuffers {
377    fn len(&self) -> u32 {
378        match self {
379            Self::Default(buffers) => buffers.len() as u32,
380            Self::Deduplicated(buffers) => buffers.len(),
381        }
382    }
383
384    fn push(&mut self, block: ByteBuffer) -> u32 {
385        match self {
386            Self::Default(buffers) => {
387                assert!(buffers.len() < u32::MAX as usize, "Too many blocks");
388                buffers.push(block);
389                self.len()
390            }
391            Self::Deduplicated(buffers) => buffers.push(block),
392        }
393    }
394
395    /// Does not compact buffers, bypasses utilization checks.
396    fn extend_from_slice_unchecked(&mut self, buffers: &[ByteBuffer]) {
397        for buffer in buffers {
398            self.push(buffer.clone());
399        }
400    }
401
402    fn extend_from_compaction(&mut self, buffers: BuffersWithOffsets) -> ViewAdjustment {
403        match (self, buffers) {
404            (
405                Self::Default(completed_buffers),
406                BuffersWithOffsets::AllKept { buffers, offsets },
407            ) => {
408                let buffer_offset = completed_buffers.len() as u32;
409                completed_buffers.extend_from_slice(&buffers);
410                ViewAdjustment::shift(buffer_offset, offsets)
411            }
412            (
413                Self::Default(completed_buffers),
414                BuffersWithOffsets::SomeCompacted { buffers, offsets },
415            ) => {
416                let lookup = buffers
417                    .iter()
418                    .map(|maybe_buffer| {
419                        maybe_buffer.as_ref().map(|buffer| {
420                            completed_buffers.push(buffer.clone());
421                            completed_buffers.len() as u32 - 1
422                        })
423                    })
424                    .collect();
425                ViewAdjustment::rewriting(lookup, offsets)
426            }
427
428            (
429                Self::Deduplicated(completed_buffers),
430                BuffersWithOffsets::AllKept { buffers, offsets },
431            ) => {
432                let buffer_lookup = completed_buffers.extend_from_iter(buffers.iter().cloned());
433                ViewAdjustment::lookup(buffer_lookup, offsets)
434            }
435            (
436                Self::Deduplicated(completed_buffers),
437                BuffersWithOffsets::SomeCompacted { buffers, offsets },
438            ) => {
439                let buffer_lookup = completed_buffers.extend_from_option_slice(&buffers);
440                ViewAdjustment::rewriting(buffer_lookup, offsets)
441            }
442        }
443    }
444
445    fn finish(self) -> Arc<[ByteBuffer]> {
446        match self {
447            Self::Default(buffers) => Arc::from(buffers),
448            Self::Deduplicated(buffers) => buffers.finish(),
449        }
450    }
451}
452
453#[derive(Default)]
454pub struct DeduplicatedBuffers {
455    buffers: Vec<ByteBuffer>,
456    buffer_to_idx: HashMap<BufferId, u32>,
457}
458
459impl DeduplicatedBuffers {
460    // Self::push enforces len < u32::max
461    #[allow(clippy::cast_possible_truncation)]
462    fn len(&self) -> u32 {
463        self.buffers.len() as u32
464    }
465
466    /// Push a new block if not seen before. Returns the idx of the block.
467    pub(crate) fn push(&mut self, block: ByteBuffer) -> u32 {
468        assert!(self.buffers.len() < u32::MAX as usize, "Too many blocks");
469
470        let initial_len = self.len();
471        let id = BufferId::from(&block);
472        match self.buffer_to_idx.entry(id) {
473            Entry::Occupied(idx) => *idx.get(),
474            Entry::Vacant(entry) => {
475                let idx = initial_len;
476                entry.insert(idx);
477                self.buffers.push(block);
478                idx
479            }
480        }
481    }
482
483    pub(crate) fn extend_from_option_slice(
484        &mut self,
485        buffers: &[Option<ByteBuffer>],
486    ) -> Vec<Option<u32>> {
487        buffers
488            .iter()
489            .map(|buffer| buffer.as_ref().map(|buf| self.push(buf.clone())))
490            .collect()
491    }
492
493    pub(crate) fn extend_from_iter(
494        &mut self,
495        buffers: impl Iterator<Item = ByteBuffer>,
496    ) -> Vec<u32> {
497        buffers.map(|buffer| self.push(buffer)).collect()
498    }
499
500    pub(crate) fn finish(self) -> Arc<[ByteBuffer]> {
501        Arc::from(self.buffers)
502    }
503}
504
505#[derive(PartialEq, Eq, Hash)]
506struct BufferId {
507    // *const u8 stored as usize for `Send`
508    ptr: usize,
509    len: usize,
510}
511
512impl BufferId {
513    fn from(buffer: &ByteBuffer) -> Self {
514        let slice = buffer.as_slice();
515        Self {
516            ptr: slice.as_ptr() as usize,
517            len: slice.len(),
518        }
519    }
520}
521
522#[derive(Debug, Clone)]
523pub enum BufferGrowthStrategy {
524    /// Use a fixed buffer size for all allocations.
525    Fixed { size: u32 },
526    /// Use exponential growth starting from initial_size, doubling until max_size.
527    Exponential { current_size: u32, max_size: u32 },
528}
529
530impl Default for BufferGrowthStrategy {
531    fn default() -> Self {
532        Self::Exponential {
533            current_size: 4 * 1024,    // 4KB starting size
534            max_size: 2 * 1024 * 1024, // 2MB max size
535        }
536    }
537}
538
539impl BufferGrowthStrategy {
540    pub fn fixed(size: u32) -> Self {
541        Self::Fixed { size }
542    }
543
544    pub fn exponential(initial_size: u32, max_size: u32) -> Self {
545        Self::Exponential {
546            current_size: initial_size,
547            max_size,
548        }
549    }
550
551    /// Returns the next buffer size to allocate and updates internal state.
552    pub fn next_size(&mut self) -> u32 {
553        match self {
554            Self::Fixed { size } => *size,
555            Self::Exponential {
556                current_size,
557                max_size,
558            } => {
559                let result = *current_size;
560                if *current_size < *max_size {
561                    *current_size = current_size.saturating_mul(2).min(*max_size);
562                }
563                result
564            }
565        }
566    }
567}
568
569enum BuffersWithOffsets {
570    AllKept {
571        buffers: Arc<[ByteBuffer]>,
572        offsets: Option<Vec<u32>>,
573    },
574    SomeCompacted {
575        buffers: Vec<Option<ByteBuffer>>,
576        offsets: Option<Vec<u32>>,
577    },
578}
579
580impl BuffersWithOffsets {
581    pub fn from_array(array: &VarBinViewArray, compaction_threshold: f64) -> Self {
582        if compaction_threshold == 0.0 {
583            return Self::AllKept {
584                buffers: Arc::from(
585                    array
586                        .buffers()
587                        .to_vec()
588                        .into_iter()
589                        .map(|b| b.unwrap_host())
590                        .collect_vec(),
591                ),
592                offsets: None,
593            };
594        }
595
596        let buffer_utilizations = array
597            .buffer_utilizations()
598            .vortex_expect("buffer_utilizations in BuffersWithOffsets::from_array");
599        let mut has_rewrite = false;
600        let mut has_nonzero_offset = false;
601        for utilization in buffer_utilizations.iter() {
602            match compaction_strategy(utilization, compaction_threshold) {
603                CompactionStrategy::KeepFull => continue,
604                CompactionStrategy::Slice { .. } => has_nonzero_offset = true,
605                CompactionStrategy::Rewrite => has_rewrite = true,
606            }
607        }
608
609        let buffers_with_offsets_iter =
610            buffer_utilizations
611                .iter()
612                .zip(array.buffers().iter())
613                .map(|(utilization, buffer)| {
614                    match compaction_strategy(utilization, compaction_threshold) {
615                        CompactionStrategy::KeepFull => (Some(buffer.as_host().clone()), 0),
616                        CompactionStrategy::Slice { start, end } => (
617                            Some(buffer.as_host().slice(start as usize..end as usize)),
618                            start,
619                        ),
620                        CompactionStrategy::Rewrite => (None, 0),
621                    }
622                });
623
624        match (has_rewrite, has_nonzero_offset) {
625            // keep all buffers
626            (false, false) => {
627                let buffers: Vec<_> = buffers_with_offsets_iter
628                    .map(|(b, _)| b.vortex_expect("already checked for rewrite"))
629                    .collect();
630                Self::AllKept {
631                    buffers: Arc::from(buffers),
632                    offsets: None,
633                }
634            }
635            // rewrite, all zero offsets
636            (true, false) => {
637                let buffers: Vec<_> = buffers_with_offsets_iter.map(|(b, _)| b).collect();
638                Self::SomeCompacted {
639                    buffers,
640                    offsets: None,
641                }
642            }
643            // keep all buffers, but some have offsets
644            (false, true) => {
645                let (buffers, offsets): (Vec<_>, _) = buffers_with_offsets_iter
646                    .map(|(buffer, offset)| {
647                        (buffer.vortex_expect("already checked for rewrite"), offset)
648                    })
649                    .collect();
650                Self::AllKept {
651                    buffers: Arc::from(buffers),
652                    offsets: Some(offsets),
653                }
654            }
655            // rewrite and some have offsets
656            (true, true) => {
657                let (buffers, offsets) = buffers_with_offsets_iter.collect();
658                Self::SomeCompacted {
659                    buffers,
660                    offsets: Some(offsets),
661                }
662            }
663        }
664    }
665}
666
667#[derive(Debug, Clone, Copy, PartialEq, Eq)]
668enum CompactionStrategy {
669    KeepFull,
670    /// Slice the buffer to [start, end) range
671    Slice {
672        start: u32,
673        end: u32,
674    },
675    /// Rewrite data into new compacted buffer
676    Rewrite,
677}
678
679fn compaction_strategy(
680    buffer_utilization: &BufferUtilization,
681    threshold: f64,
682) -> CompactionStrategy {
683    match buffer_utilization.overall_utilization() {
684        // rewrite empty or not used buffers TODO(os): maybe keep them
685        0.0 => CompactionStrategy::Rewrite,
686        utilised if utilised >= threshold => CompactionStrategy::KeepFull,
687        _ if buffer_utilization.range_utilization() >= threshold => {
688            let Range { start, end } = buffer_utilization.range();
689            CompactionStrategy::Slice { start, end }
690        }
691        _ => CompactionStrategy::Rewrite,
692    }
693}
694
695enum ViewAdjustment {
696    Precomputed(PrecomputedViewAdjustment),
697    Rewriting(RewritingViewAdjustment),
698}
699
700impl ViewAdjustment {
701    fn shift(buffer_offset: u32, offsets: Option<Vec<u32>>) -> Self {
702        Self::Precomputed(PrecomputedViewAdjustment::Shift {
703            buffer_offset,
704            offsets,
705        })
706    }
707
708    fn lookup(buffer_lookup: Vec<u32>, offsets: Option<Vec<u32>>) -> Self {
709        Self::Precomputed(PrecomputedViewAdjustment::Lookup {
710            buffer_lookup,
711            offsets,
712        })
713    }
714
715    fn rewriting(buffer_lookup: Vec<Option<u32>>, offsets: Option<Vec<u32>>) -> Self {
716        Self::Rewriting(RewritingViewAdjustment {
717            buffer_lookup,
718            offsets,
719        })
720    }
721}
722
723// Care when adding new variants or fields in this enum, it will mess with inlining if it gets too big
724enum PrecomputedViewAdjustment {
725    Shift {
726        buffer_offset: u32,
727        offsets: Option<Vec<u32>>,
728    },
729    Lookup {
730        buffer_lookup: Vec<u32>,
731        offsets: Option<Vec<u32>>,
732    },
733}
734
735impl PrecomputedViewAdjustment {
736    #[inline]
737    fn adjust_view(&self, view: &BinaryView) -> BinaryView {
738        if view.is_inlined() {
739            return *view;
740        }
741        let view_ref = view.as_view();
742        match self {
743            Self::Shift {
744                buffer_offset,
745                offsets,
746            } => {
747                let b_idx = view_ref.buffer_index;
748                let offset_shift = offsets
749                    .as_ref()
750                    .map(|o| o[b_idx as usize])
751                    .unwrap_or_default();
752
753                // If offset < offset_shift, this view was invalid and wasn't counted in buffer_utilizations.
754                // Return an empty view to match how invalid views are handled in the Rewriting path.
755                if view_ref.offset < offset_shift {
756                    return BinaryView::empty_view();
757                }
758
759                view_ref
760                    .with_buffer_and_offset(b_idx + buffer_offset, view_ref.offset - offset_shift)
761            }
762            Self::Lookup {
763                buffer_lookup,
764                offsets,
765            } => {
766                let b_idx = view_ref.buffer_index;
767                let buffer = buffer_lookup[b_idx as usize];
768                let offset_shift = offsets
769                    .as_ref()
770                    .map(|o| o[b_idx as usize])
771                    .unwrap_or_default();
772
773                // If offset < offset_shift, this view was invalid and wasn't counted in buffer_utilizations.
774                // Return an empty view to match how invalid views are handled in the Rewriting path.
775                if view_ref.offset < offset_shift {
776                    return BinaryView::empty_view();
777                }
778
779                view_ref.with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
780            }
781        }
782        .into()
783    }
784}
785
786struct RewritingViewAdjustment {
787    buffer_lookup: Vec<Option<u32>>,
788    offsets: Option<Vec<u32>>,
789}
790
791impl RewritingViewAdjustment {
792    /// Can return None if this view can't be adjusted, because there is no precomputed lookup
793    /// for the current buffer.
794    fn adjust_view(&self, view: &BinaryView) -> Option<BinaryView> {
795        if view.is_inlined() {
796            return Some(*view);
797        }
798
799        let view_ref = view.as_view();
800        self.buffer_lookup[view_ref.buffer_index as usize].map(|buffer| {
801            let offset_shift = self
802                .offsets
803                .as_ref()
804                .map(|o| o[view_ref.buffer_index as usize])
805                .unwrap_or_default();
806            view_ref
807                .with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
808                .into()
809        })
810    }
811}
812
813#[cfg(test)]
814mod tests {
815    use vortex_error::VortexResult;
816
817    use crate::IntoArray;
818    use crate::LEGACY_SESSION;
819    use crate::VortexSessionExecute;
820    use crate::arrays::VarBinViewArray;
821    use crate::assert_arrays_eq;
822    use crate::builders::ArrayBuilder;
823    use crate::builders::VarBinViewBuilder;
824    use crate::dtype::DType;
825    use crate::dtype::Nullability;
826
827    #[test]
828    fn test_utf8_builder() {
829        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
830
831        builder.append_value("Hello");
832        builder.append_null();
833        builder.append_value("World");
834
835        builder.append_nulls(2);
836
837        builder.append_zeros(2);
838        builder.append_value("test");
839
840        let actual = builder.finish();
841        let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
842            Some("Hello"),
843            None,
844            Some("World"),
845            None,
846            None,
847            Some(""),
848            Some(""),
849            Some("test"),
850        ]);
851        assert_arrays_eq!(actual, expected);
852    }
853
854    #[test]
855    fn test_utf8_builder_with_extend() {
856        let array = {
857            let mut builder =
858                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
859            builder.append_null();
860            builder.append_value("Hello2");
861            builder.finish()
862        };
863        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
864
865        builder.append_value("Hello1");
866        builder.extend_from_array(&array);
867        builder.append_nulls(2);
868        builder.append_value("Hello3");
869
870        let actual = builder.finish_into_canonical();
871        let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
872            Some("Hello1"),
873            None,
874            Some("Hello2"),
875            None,
876            None,
877            Some("Hello3"),
878        ]);
879        assert_arrays_eq!(actual.into_array(), expected.into_array());
880    }
881
882    #[test]
883    fn test_buffer_deduplication() -> VortexResult<()> {
884        let array = {
885            let mut builder =
886                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
887            builder.append_value("This is a long string that should not be inlined");
888            builder.append_value("short string");
889            builder.finish_into_varbinview()
890        };
891
892        assert_eq!(array.buffers().len(), 1);
893        let mut builder =
894            VarBinViewBuilder::with_buffer_deduplication(DType::Utf8(Nullability::Nullable), 10);
895
896        let mut ctx = LEGACY_SESSION.create_execution_ctx();
897
898        array.append_to_builder(&mut builder, &mut ctx)?;
899        assert_eq!(builder.completed_block_count(), 1);
900
901        array
902            .slice(1..2)?
903            .append_to_builder(&mut builder, &mut ctx)?;
904        array
905            .slice(0..1)?
906            .append_to_builder(&mut builder, &mut ctx)?;
907        assert_eq!(builder.completed_block_count(), 1);
908
909        let array2 = {
910            let mut builder =
911                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
912            builder.append_value("This is a long string that should not be inlined");
913            builder.finish_into_varbinview()
914        };
915
916        array2.append_to_builder(&mut builder, &mut ctx)?;
917        assert_eq!(builder.completed_block_count(), 2);
918
919        array
920            .slice(0..1)?
921            .append_to_builder(&mut builder, &mut ctx)?;
922        array2
923            .slice(0..1)?
924            .append_to_builder(&mut builder, &mut ctx)?;
925        assert_eq!(builder.completed_block_count(), 2);
926        Ok(())
927    }
928
929    #[test]
930    fn test_append_scalar() {
931        use crate::scalar::Scalar;
932
933        // Test with Utf8 builder.
934        let mut utf8_builder =
935            VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
936
937        // Test appending a valid utf8 value.
938        let utf8_scalar1 = Scalar::utf8("hello", Nullability::Nullable);
939        utf8_builder.append_scalar(&utf8_scalar1).unwrap();
940
941        // Test appending another value.
942        let utf8_scalar2 = Scalar::utf8("world", Nullability::Nullable);
943        utf8_builder.append_scalar(&utf8_scalar2).unwrap();
944
945        // Test appending null value.
946        let null_scalar = Scalar::null(DType::Utf8(Nullability::Nullable));
947        utf8_builder.append_scalar(&null_scalar).unwrap();
948
949        let array = utf8_builder.finish();
950        let expected =
951            <VarBinViewArray as FromIterator<_>>::from_iter([Some("hello"), Some("world"), None]);
952        assert_arrays_eq!(&array, &expected);
953
954        // Test with Binary builder.
955        let mut binary_builder =
956            VarBinViewBuilder::with_capacity(DType::Binary(Nullability::Nullable), 10);
957
958        let binary_scalar = Scalar::binary(vec![1u8, 2, 3], Nullability::Nullable);
959        binary_builder.append_scalar(&binary_scalar).unwrap();
960
961        let binary_null = Scalar::null(DType::Binary(Nullability::Nullable));
962        binary_builder.append_scalar(&binary_null).unwrap();
963
964        let binary_array = binary_builder.finish();
965        let expected =
966            <VarBinViewArray as FromIterator<_>>::from_iter([Some(vec![1u8, 2, 3]), None]);
967        assert_arrays_eq!(&binary_array, &expected);
968
969        // Test wrong dtype error.
970        let mut builder =
971            VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::NonNullable), 10);
972        let wrong_scalar = Scalar::from(42i32);
973        assert!(builder.append_scalar(&wrong_scalar).is_err());
974    }
975
976    #[test]
977    fn test_buffer_growth_strategies() {
978        use super::BufferGrowthStrategy;
979
980        // Test Fixed strategy
981        let mut strategy = BufferGrowthStrategy::fixed(1024);
982
983        // Should always return the fixed size
984        assert_eq!(strategy.next_size(), 1024);
985        assert_eq!(strategy.next_size(), 1024);
986        assert_eq!(strategy.next_size(), 1024);
987
988        // Test Exponential strategy
989        let mut strategy = BufferGrowthStrategy::exponential(1024, 8192);
990
991        // Should double each time until hitting max_size
992        assert_eq!(strategy.next_size(), 1024); // First: 1024
993        assert_eq!(strategy.next_size(), 2048); // Second: 2048
994        assert_eq!(strategy.next_size(), 4096); // Third: 4096
995        assert_eq!(strategy.next_size(), 8192); // Fourth: 8192 (max)
996        assert_eq!(strategy.next_size(), 8192); // Fifth: 8192 (capped)
997    }
998
999    #[test]
1000    fn test_large_value_allocation() {
1001        use super::BufferGrowthStrategy;
1002        use super::VarBinViewBuilder;
1003
1004        let mut builder = VarBinViewBuilder::new(
1005            DType::Binary(Nullability::Nullable),
1006            10,
1007            Default::default(),
1008            BufferGrowthStrategy::exponential(1024, 4096),
1009            0.0,
1010        );
1011
1012        // Create a value larger than max_size
1013        let large_value = vec![0u8; 8192];
1014
1015        // Should successfully append the large value
1016        builder.append_value(&large_value);
1017
1018        let array = builder.finish_into_varbinview();
1019        assert_eq!(array.len(), 1);
1020
1021        // Verify the value was stored correctly
1022        let retrieved = array
1023            .scalar_at(0)
1024            .unwrap()
1025            .as_binary()
1026            .value()
1027            .cloned()
1028            .unwrap();
1029        assert_eq!(retrieved.len(), 8192);
1030        assert_eq!(retrieved.as_slice(), &large_value);
1031    }
1032}