Skip to main content

vortex_array/builders/
varbinview.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::ops::Range;
6use std::sync::Arc;
7
8use itertools::Itertools;
9use vortex_buffer::Buffer;
10use vortex_buffer::BufferMut;
11use vortex_buffer::ByteBuffer;
12use vortex_buffer::ByteBufferMut;
13use vortex_error::VortexExpect;
14use vortex_error::VortexResult;
15use vortex_error::vortex_bail;
16use vortex_error::vortex_ensure;
17use vortex_mask::Mask;
18use vortex_utils::aliases::hash_map::Entry;
19use vortex_utils::aliases::hash_map::HashMap;
20
21use crate::ArrayRef;
22use crate::IntoArray;
23use crate::LEGACY_SESSION;
24use crate::VortexSessionExecute;
25use crate::arrays::VarBinViewArray;
26use crate::arrays::varbinview::build_views::BinaryView;
27use crate::arrays::varbinview::compact::BufferUtilization;
28use crate::builders::ArrayBuilder;
29use crate::builders::LazyBitBufferBuilder;
30use crate::canonical::Canonical;
31#[expect(deprecated)]
32use crate::canonical::ToCanonical as _;
33use crate::dtype::DType;
34use crate::scalar::Scalar;
35
36/// The builder for building a [`VarBinViewArray`].
37pub struct VarBinViewBuilder {
38    dtype: DType,
39    views_builder: BufferMut<BinaryView>,
40    nulls: LazyBitBufferBuilder,
41    completed: CompletedBuffers,
42    in_progress: ByteBufferMut,
43    growth_strategy: BufferGrowthStrategy,
44    compaction_threshold: f64,
45}
46
47impl VarBinViewBuilder {
48    pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
49        Self::new(dtype, capacity, Default::default(), Default::default(), 0.0)
50    }
51
52    pub fn with_buffer_deduplication(dtype: DType, capacity: usize) -> Self {
53        Self::new(
54            dtype,
55            capacity,
56            CompletedBuffers::Deduplicated(Default::default()),
57            Default::default(),
58            0.0,
59        )
60    }
61
62    pub fn with_compaction(dtype: DType, capacity: usize, compaction_threshold: f64) -> Self {
63        Self::new(
64            dtype,
65            capacity,
66            Default::default(),
67            Default::default(),
68            compaction_threshold,
69        )
70    }
71
72    pub fn new(
73        dtype: DType,
74        capacity: usize,
75        completed: CompletedBuffers,
76        growth_strategy: BufferGrowthStrategy,
77        compaction_threshold: f64,
78    ) -> Self {
79        assert!(
80            matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
81            "VarBinViewBuilder DType must be Utf8 or Binary."
82        );
83        Self {
84            views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
85            nulls: LazyBitBufferBuilder::new(capacity),
86            completed,
87            in_progress: ByteBufferMut::empty(),
88            dtype,
89            growth_strategy,
90            compaction_threshold,
91        }
92    }
93
94    fn append_value_view(&mut self, value: &[u8]) {
95        let length =
96            u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
97        if length <= 12 {
98            self.views_builder.push(BinaryView::make_view(value, 0, 0));
99            return;
100        }
101
102        let (buffer_idx, offset) = self.append_value_to_buffer(value);
103        let view = BinaryView::make_view(value, buffer_idx, offset);
104        self.views_builder.push(view);
105    }
106
107    /// Appends a value to the builder.
108    pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
109        self.append_value_view(value.as_ref());
110        self.nulls.append_non_null();
111    }
112
113    /// Appends `n` copies of `value` as non-null entries.
114    pub fn append_n_values<S: AsRef<[u8]>>(&mut self, value: S, n: usize) {
115        if n == 0 {
116            return;
117        }
118        let bytes = value.as_ref();
119        let view = if bytes.len() <= BinaryView::MAX_INLINED_SIZE {
120            BinaryView::make_view(bytes, 0, 0)
121        } else {
122            let (buffer_idx, offset) = self.append_value_to_buffer(bytes);
123            BinaryView::make_view(bytes, buffer_idx, offset)
124        };
125        self.views_builder.push_n(view, n);
126        self.nulls.append_n_non_nulls(n);
127    }
128
129    fn flush_in_progress(&mut self) {
130        if self.in_progress.is_empty() {
131            return;
132        }
133        let block = std::mem::take(&mut self.in_progress).freeze();
134
135        assert!(block.len() < u32::MAX as usize, "Block too large");
136
137        let initial_len = self.completed.len();
138        self.completed.push(block);
139        assert_eq!(
140            self.completed.len(),
141            initial_len + 1,
142            "Invalid state, just completed block already exists"
143        );
144    }
145
146    /// append a non inlined value to self.in_progress.
147    fn append_value_to_buffer(&mut self, value: &[u8]) -> (u32, u32) {
148        assert!(
149            value.len() > BinaryView::MAX_INLINED_SIZE,
150            "must inline small strings"
151        );
152        let required_cap = self.in_progress.len() + value.len();
153        if self.in_progress.capacity() < required_cap {
154            self.flush_in_progress();
155            let next_buffer_size = self.growth_strategy.next_size() as usize;
156            let to_reserve = next_buffer_size.max(value.len());
157            self.in_progress.reserve(to_reserve);
158        }
159
160        let buffer_idx = self.completed.len();
161        let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
162        self.in_progress.extend_from_slice(value);
163
164        (buffer_idx, offset)
165    }
166
167    pub fn completed_block_count(&self) -> u32 {
168        self.completed.len()
169    }
170
171    /// Returns true if a non-empty in-progress buffer is staged (and would
172    /// become a completed buffer on the next flush), false otherwise.
173    pub fn in_progress(&self) -> bool {
174        !self.in_progress.is_empty()
175    }
176
177    /// Pushes buffers and pre-adjusted views into the builder.
178    ///
179    /// The provided `buffers` contain sections of data from a `VarBinViewArray`, and the
180    /// `views` are `BinaryView`s that have already been adjusted to reference the correct buffer
181    /// indices and offsets for this builder. All views must point to valid sections within the
182    /// provided buffers, and the validity length must match the view length.
183    ///
184    /// # Warning
185    ///
186    /// This method does not check utilization of the given buffers. Callers must provide
187    /// buffers that are fully utilized by the given adjusted views.
188    ///
189    /// # Panics
190    ///
191    /// Panics if this builder deduplicates buffers and any of the given buffers already
192    /// exist in this builder.
193    pub fn push_buffer_and_adjusted_views(
194        &mut self,
195        buffers: &[ByteBuffer],
196        views: &Buffer<BinaryView>,
197        validity_mask: Mask,
198    ) {
199        self.flush_in_progress();
200
201        let expected_completed_len = self.completed.len() as usize + buffers.len();
202        self.completed.extend_from_slice_unchecked(buffers);
203        assert_eq!(
204            self.completed.len() as usize,
205            expected_completed_len,
206            "Some buffers already exist",
207        );
208        self.views_builder.extend_trusted(views.iter().copied());
209        self.push_only_validity_mask(validity_mask);
210
211        debug_assert_eq!(self.nulls.len(), self.views_builder.len())
212    }
213
214    /// Finishes the builder directly into a [`VarBinViewArray`].
215    pub fn finish_into_varbinview(&mut self) -> VarBinViewArray {
216        self.flush_in_progress();
217        let buffers = std::mem::take(&mut self.completed);
218
219        assert_eq!(
220            self.views_builder.len(),
221            self.nulls.len(),
222            "View and validity length must match"
223        );
224
225        let validity = self.nulls.finish_with_nullability(self.dtype.nullability());
226
227        // SAFETY: the builder methods check safety at each step.
228        unsafe {
229            VarBinViewArray::new_unchecked(
230                std::mem::take(&mut self.views_builder).freeze(),
231                buffers.finish(),
232                self.dtype.clone(),
233                validity,
234            )
235        }
236    }
237
238    // Pushes a validity mask into the builder not affecting the views or buffers
239    fn push_only_validity_mask(&mut self, validity_mask: Mask) {
240        self.nulls.append_validity_mask(validity_mask);
241    }
242}
243
244impl ArrayBuilder for VarBinViewBuilder {
245    fn as_any(&self) -> &dyn Any {
246        self
247    }
248
249    fn as_any_mut(&mut self) -> &mut dyn Any {
250        self
251    }
252
253    fn dtype(&self) -> &DType {
254        &self.dtype
255    }
256
257    fn len(&self) -> usize {
258        self.nulls.len()
259    }
260
261    fn append_zeros(&mut self, n: usize) {
262        self.views_builder.push_n(BinaryView::empty_view(), n);
263        self.nulls.append_n_non_nulls(n);
264    }
265
266    unsafe fn append_nulls_unchecked(&mut self, n: usize) {
267        self.views_builder.push_n(BinaryView::empty_view(), n);
268        self.nulls.append_n_nulls(n);
269    }
270
271    fn append_scalar(&mut self, scalar: &Scalar) -> VortexResult<()> {
272        vortex_ensure!(
273            scalar.dtype() == self.dtype(),
274            "VarBinViewBuilder expected scalar with dtype {}, got {}",
275            self.dtype(),
276            scalar.dtype()
277        );
278
279        match self.dtype() {
280            DType::Utf8(_) => match scalar.as_utf8().value() {
281                Some(value) => self.append_value(value),
282                None => self.append_null(),
283            },
284            DType::Binary(_) => match scalar.as_binary().value() {
285                Some(value) => self.append_value(value),
286                None => self.append_null(),
287            },
288            _ => vortex_bail!(
289                "VarBinViewBuilder can only handle Utf8 or Binary scalars, got {:?}",
290                scalar.dtype()
291            ),
292        }
293
294        Ok(())
295    }
296
297    unsafe fn extend_from_array_unchecked(&mut self, array: &ArrayRef) {
298        #[expect(deprecated)]
299        let array = array.to_varbinview();
300        self.flush_in_progress();
301
302        self.push_only_validity_mask(
303            array
304                .as_ref()
305                .validity()
306                .vortex_expect("validity_mask")
307                .execute_mask(
308                    array.as_ref().len(),
309                    &mut LEGACY_SESSION.create_execution_ctx(),
310                )
311                .vortex_expect("Failed to compute validity mask"),
312        );
313
314        let view_adjustment =
315            self.completed
316                .extend_from_compaction(BuffersWithOffsets::from_array(
317                    &array,
318                    self.compaction_threshold,
319                ));
320
321        match view_adjustment {
322            ViewAdjustment::Precomputed(adjustment) => self.views_builder.extend_trusted(
323                array
324                    .views()
325                    .iter()
326                    .map(|view| adjustment.adjust_view(view)),
327            ),
328            ViewAdjustment::Rewriting(adjustment) => {
329                match array
330                    .as_ref()
331                    .validity()
332                    .vortex_expect("validity_mask")
333                    .execute_mask(
334                        array.as_ref().len(),
335                        &mut LEGACY_SESSION.create_execution_ctx(),
336                    )
337                    .vortex_expect("Failed to compute validity mask")
338                {
339                    Mask::AllTrue(_) => {
340                        for (idx, &view) in array.views().iter().enumerate() {
341                            let new_view = self.push_view(view, &adjustment, &array, idx);
342                            self.views_builder.push(new_view);
343                        }
344                    }
345                    Mask::AllFalse(_) => {
346                        self.views_builder
347                            .push_n(BinaryView::empty_view(), array.len());
348                    }
349                    Mask::Values(v) => {
350                        for (idx, (&view, is_valid)) in
351                            array.views().iter().zip(v.bit_buffer().iter()).enumerate()
352                        {
353                            let new_view = if !is_valid {
354                                BinaryView::empty_view()
355                            } else {
356                                self.push_view(view, &adjustment, &array, idx)
357                            };
358                            self.views_builder.push(new_view);
359                        }
360                    }
361                }
362            }
363        }
364    }
365
366    fn reserve_exact(&mut self, additional: usize) {
367        self.views_builder.reserve(additional);
368        self.nulls.reserve_exact(additional);
369    }
370
371    unsafe fn set_validity_unchecked(&mut self, validity: Mask) {
372        self.nulls = LazyBitBufferBuilder::new(validity.len());
373        self.nulls.append_validity_mask(validity);
374    }
375
376    fn finish(&mut self) -> ArrayRef {
377        self.finish_into_varbinview().into_array()
378    }
379
380    fn finish_into_canonical(&mut self) -> Canonical {
381        Canonical::VarBinView(self.finish_into_varbinview())
382    }
383}
384
385impl VarBinViewBuilder {
386    fn push_view(
387        &mut self,
388        view: BinaryView,
389        adjustment: &RewritingViewAdjustment,
390        array: &VarBinViewArray,
391        idx: usize,
392    ) -> BinaryView {
393        if view.is_inlined() {
394            view
395        } else if let Some(adjusted) = adjustment.adjust_view(&view) {
396            adjusted
397        } else {
398            let bytes = array.bytes_at(idx);
399            let (new_buf_idx, new_offset) = self.append_value_to_buffer(&bytes);
400            BinaryView::make_view(bytes.as_slice(), new_buf_idx, new_offset)
401        }
402    }
403}
404
405pub enum CompletedBuffers {
406    Default(Vec<ByteBuffer>),
407    Deduplicated(DeduplicatedBuffers),
408}
409
410impl Default for CompletedBuffers {
411    fn default() -> Self {
412        Self::Default(Vec::new())
413    }
414}
415
416// Self::push enforces len < u32::max
417#[expect(clippy::cast_possible_truncation)]
418impl CompletedBuffers {
419    fn len(&self) -> u32 {
420        match self {
421            Self::Default(buffers) => buffers.len() as u32,
422            Self::Deduplicated(buffers) => buffers.len(),
423        }
424    }
425
426    fn push(&mut self, block: ByteBuffer) -> u32 {
427        match self {
428            Self::Default(buffers) => {
429                assert!(buffers.len() < u32::MAX as usize, "Too many blocks");
430                buffers.push(block);
431                self.len()
432            }
433            Self::Deduplicated(buffers) => buffers.push(block),
434        }
435    }
436
437    /// Does not compact buffers, bypasses utilization checks.
438    fn extend_from_slice_unchecked(&mut self, buffers: &[ByteBuffer]) {
439        for buffer in buffers {
440            self.push(buffer.clone());
441        }
442    }
443
444    fn extend_from_compaction(&mut self, buffers: BuffersWithOffsets) -> ViewAdjustment {
445        match (self, buffers) {
446            (
447                Self::Default(completed_buffers),
448                BuffersWithOffsets::AllKept { buffers, offsets },
449            ) => {
450                let buffer_offset = completed_buffers.len() as u32;
451                completed_buffers.extend_from_slice(&buffers);
452                ViewAdjustment::shift(buffer_offset, offsets)
453            }
454            (
455                Self::Default(completed_buffers),
456                BuffersWithOffsets::SomeCompacted { buffers, offsets },
457            ) => {
458                let lookup = buffers
459                    .iter()
460                    .map(|maybe_buffer| {
461                        maybe_buffer.as_ref().map(|buffer| {
462                            completed_buffers.push(buffer.clone());
463                            completed_buffers.len() as u32 - 1
464                        })
465                    })
466                    .collect();
467                ViewAdjustment::rewriting(lookup, offsets)
468            }
469
470            (
471                Self::Deduplicated(completed_buffers),
472                BuffersWithOffsets::AllKept { buffers, offsets },
473            ) => {
474                let buffer_lookup = completed_buffers.extend_from_iter(buffers.iter().cloned());
475                ViewAdjustment::lookup(buffer_lookup, offsets)
476            }
477            (
478                Self::Deduplicated(completed_buffers),
479                BuffersWithOffsets::SomeCompacted { buffers, offsets },
480            ) => {
481                let buffer_lookup = completed_buffers.extend_from_option_slice(&buffers);
482                ViewAdjustment::rewriting(buffer_lookup, offsets)
483            }
484        }
485    }
486
487    fn finish(self) -> Arc<[ByteBuffer]> {
488        match self {
489            Self::Default(buffers) => Arc::from(buffers),
490            Self::Deduplicated(buffers) => buffers.finish(),
491        }
492    }
493}
494
495#[derive(Default)]
496pub struct DeduplicatedBuffers {
497    buffers: Vec<ByteBuffer>,
498    buffer_to_idx: HashMap<BufferId, u32>,
499}
500
501impl DeduplicatedBuffers {
502    // Self::push enforces len < u32::max
503    #[expect(clippy::cast_possible_truncation)]
504    fn len(&self) -> u32 {
505        self.buffers.len() as u32
506    }
507
508    /// Push a new block if not seen before. Returns the idx of the block.
509    pub(crate) fn push(&mut self, block: ByteBuffer) -> u32 {
510        assert!(self.buffers.len() < u32::MAX as usize, "Too many blocks");
511
512        let initial_len = self.len();
513        let id = BufferId::from(&block);
514        match self.buffer_to_idx.entry(id) {
515            Entry::Occupied(idx) => *idx.get(),
516            Entry::Vacant(entry) => {
517                let idx = initial_len;
518                entry.insert(idx);
519                self.buffers.push(block);
520                idx
521            }
522        }
523    }
524
525    pub(crate) fn extend_from_option_slice(
526        &mut self,
527        buffers: &[Option<ByteBuffer>],
528    ) -> Vec<Option<u32>> {
529        buffers
530            .iter()
531            .map(|buffer| buffer.as_ref().map(|buf| self.push(buf.clone())))
532            .collect()
533    }
534
535    pub(crate) fn extend_from_iter(
536        &mut self,
537        buffers: impl Iterator<Item = ByteBuffer>,
538    ) -> Vec<u32> {
539        buffers.map(|buffer| self.push(buffer)).collect()
540    }
541
542    pub(crate) fn finish(self) -> Arc<[ByteBuffer]> {
543        Arc::from(self.buffers)
544    }
545}
546
547#[derive(PartialEq, Eq, Hash)]
548struct BufferId {
549    // *const u8 stored as usize for `Send`
550    ptr: usize,
551    len: usize,
552}
553
554impl BufferId {
555    fn from(buffer: &ByteBuffer) -> Self {
556        let slice = buffer.as_slice();
557        Self {
558            ptr: slice.as_ptr() as usize,
559            len: slice.len(),
560        }
561    }
562}
563
564#[derive(Debug, Clone)]
565pub enum BufferGrowthStrategy {
566    /// Use a fixed buffer size for all allocations.
567    Fixed { size: u32 },
568    /// Use exponential growth starting from initial_size, doubling until max_size.
569    Exponential { current_size: u32, max_size: u32 },
570}
571
572impl Default for BufferGrowthStrategy {
573    fn default() -> Self {
574        Self::Exponential {
575            current_size: 4 * 1024,    // 4KB starting size
576            max_size: 2 * 1024 * 1024, // 2MB max size
577        }
578    }
579}
580
581impl BufferGrowthStrategy {
582    pub fn fixed(size: u32) -> Self {
583        Self::Fixed { size }
584    }
585
586    pub fn exponential(initial_size: u32, max_size: u32) -> Self {
587        Self::Exponential {
588            current_size: initial_size,
589            max_size,
590        }
591    }
592
593    /// Returns the next buffer size to allocate and updates internal state.
594    pub fn next_size(&mut self) -> u32 {
595        match self {
596            Self::Fixed { size } => *size,
597            Self::Exponential {
598                current_size,
599                max_size,
600            } => {
601                let result = *current_size;
602                if *current_size < *max_size {
603                    *current_size = current_size.saturating_mul(2).min(*max_size);
604                }
605                result
606            }
607        }
608    }
609}
610
611enum BuffersWithOffsets {
612    AllKept {
613        buffers: Arc<[ByteBuffer]>,
614        offsets: Option<Vec<u32>>,
615    },
616    SomeCompacted {
617        buffers: Vec<Option<ByteBuffer>>,
618        offsets: Option<Vec<u32>>,
619    },
620}
621
622impl BuffersWithOffsets {
623    pub fn from_array(array: &VarBinViewArray, compaction_threshold: f64) -> Self {
624        if compaction_threshold == 0.0 {
625            return Self::AllKept {
626                buffers: Arc::from(
627                    array
628                        .data_buffers()
629                        .to_vec()
630                        .into_iter()
631                        .map(|b| b.unwrap_host())
632                        .collect_vec(),
633                ),
634                offsets: None,
635            };
636        }
637
638        let buffer_utilizations = array
639            .buffer_utilizations()
640            .vortex_expect("buffer_utilizations in BuffersWithOffsets::from_array");
641        let mut has_rewrite = false;
642        let mut has_nonzero_offset = false;
643        for utilization in buffer_utilizations.iter() {
644            match compaction_strategy(utilization, compaction_threshold) {
645                CompactionStrategy::KeepFull => continue,
646                CompactionStrategy::Slice { .. } => has_nonzero_offset = true,
647                CompactionStrategy::Rewrite => has_rewrite = true,
648            }
649        }
650
651        let buffers_with_offsets_iter = buffer_utilizations
652            .iter()
653            .zip(array.data_buffers().iter())
654            .map(|(utilization, buffer)| {
655                match compaction_strategy(utilization, compaction_threshold) {
656                    CompactionStrategy::KeepFull => (Some(buffer.as_host().clone()), 0),
657                    CompactionStrategy::Slice { start, end } => (
658                        Some(buffer.as_host().slice(start as usize..end as usize)),
659                        start,
660                    ),
661                    CompactionStrategy::Rewrite => (None, 0),
662                }
663            });
664
665        match (has_rewrite, has_nonzero_offset) {
666            // keep all buffers
667            (false, false) => {
668                let buffers: Vec<_> = buffers_with_offsets_iter
669                    .map(|(b, _)| b.vortex_expect("already checked for rewrite"))
670                    .collect();
671                Self::AllKept {
672                    buffers: Arc::from(buffers),
673                    offsets: None,
674                }
675            }
676            // rewrite, all zero offsets
677            (true, false) => {
678                let buffers: Vec<_> = buffers_with_offsets_iter.map(|(b, _)| b).collect();
679                Self::SomeCompacted {
680                    buffers,
681                    offsets: None,
682                }
683            }
684            // keep all buffers, but some have offsets
685            (false, true) => {
686                let (buffers, offsets): (Vec<_>, _) = buffers_with_offsets_iter
687                    .map(|(buffer, offset)| {
688                        (buffer.vortex_expect("already checked for rewrite"), offset)
689                    })
690                    .collect();
691                Self::AllKept {
692                    buffers: Arc::from(buffers),
693                    offsets: Some(offsets),
694                }
695            }
696            // rewrite and some have offsets
697            (true, true) => {
698                let (buffers, offsets) = buffers_with_offsets_iter.collect();
699                Self::SomeCompacted {
700                    buffers,
701                    offsets: Some(offsets),
702                }
703            }
704        }
705    }
706}
707
708#[derive(Debug, Clone, Copy, PartialEq, Eq)]
709enum CompactionStrategy {
710    KeepFull,
711    /// Slice the buffer to [start, end) range
712    Slice {
713        start: u32,
714        end: u32,
715    },
716    /// Rewrite data into new compacted buffer
717    Rewrite,
718}
719
720fn compaction_strategy(
721    buffer_utilization: &BufferUtilization,
722    threshold: f64,
723) -> CompactionStrategy {
724    match buffer_utilization.overall_utilization() {
725        // rewrite empty or not used buffers TODO(os): maybe keep them
726        0.0 => CompactionStrategy::Rewrite,
727        utilised if utilised >= threshold => CompactionStrategy::KeepFull,
728        _ if buffer_utilization.range_utilization() >= threshold => {
729            let Range { start, end } = buffer_utilization.range();
730            CompactionStrategy::Slice { start, end }
731        }
732        _ => CompactionStrategy::Rewrite,
733    }
734}
735
736enum ViewAdjustment {
737    Precomputed(PrecomputedViewAdjustment),
738    Rewriting(RewritingViewAdjustment),
739}
740
741impl ViewAdjustment {
742    fn shift(buffer_offset: u32, offsets: Option<Vec<u32>>) -> Self {
743        Self::Precomputed(PrecomputedViewAdjustment::Shift {
744            buffer_offset,
745            offsets,
746        })
747    }
748
749    fn lookup(buffer_lookup: Vec<u32>, offsets: Option<Vec<u32>>) -> Self {
750        Self::Precomputed(PrecomputedViewAdjustment::Lookup {
751            buffer_lookup,
752            offsets,
753        })
754    }
755
756    fn rewriting(buffer_lookup: Vec<Option<u32>>, offsets: Option<Vec<u32>>) -> Self {
757        Self::Rewriting(RewritingViewAdjustment {
758            buffer_lookup,
759            offsets,
760        })
761    }
762}
763
764// Care when adding new variants or fields in this enum, it will mess with inlining if it gets too big
765enum PrecomputedViewAdjustment {
766    Shift {
767        buffer_offset: u32,
768        offsets: Option<Vec<u32>>,
769    },
770    Lookup {
771        buffer_lookup: Vec<u32>,
772        offsets: Option<Vec<u32>>,
773    },
774}
775
776impl PrecomputedViewAdjustment {
777    fn adjust_view(&self, view: &BinaryView) -> BinaryView {
778        if view.is_inlined() {
779            return *view;
780        }
781        let view_ref = view.as_view();
782        match self {
783            Self::Shift {
784                buffer_offset,
785                offsets,
786            } => {
787                let b_idx = view_ref.buffer_index;
788                let offset_shift = offsets
789                    .as_ref()
790                    .map(|o| o[b_idx as usize])
791                    .unwrap_or_default();
792
793                // If offset < offset_shift, this view was invalid and wasn't counted in buffer_utilizations.
794                // Return an empty view to match how invalid views are handled in the Rewriting path.
795                if view_ref.offset < offset_shift {
796                    return BinaryView::empty_view();
797                }
798
799                view_ref
800                    .with_buffer_and_offset(b_idx + buffer_offset, view_ref.offset - offset_shift)
801            }
802            Self::Lookup {
803                buffer_lookup,
804                offsets,
805            } => {
806                let b_idx = view_ref.buffer_index;
807                let buffer = buffer_lookup[b_idx as usize];
808                let offset_shift = offsets
809                    .as_ref()
810                    .map(|o| o[b_idx as usize])
811                    .unwrap_or_default();
812
813                // If offset < offset_shift, this view was invalid and wasn't counted in buffer_utilizations.
814                // Return an empty view to match how invalid views are handled in the Rewriting path.
815                if view_ref.offset < offset_shift {
816                    return BinaryView::empty_view();
817                }
818
819                view_ref.with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
820            }
821        }
822        .into()
823    }
824}
825
826struct RewritingViewAdjustment {
827    buffer_lookup: Vec<Option<u32>>,
828    offsets: Option<Vec<u32>>,
829}
830
831impl RewritingViewAdjustment {
832    /// Can return None if this view can't be adjusted, because there is no precomputed lookup
833    /// for the current buffer.
834    fn adjust_view(&self, view: &BinaryView) -> Option<BinaryView> {
835        if view.is_inlined() {
836            return Some(*view);
837        }
838
839        let view_ref = view.as_view();
840        self.buffer_lookup[view_ref.buffer_index as usize].map(|buffer| {
841            let offset_shift = self
842                .offsets
843                .as_ref()
844                .map(|o| o[view_ref.buffer_index as usize])
845                .unwrap_or_default();
846            view_ref
847                .with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
848                .into()
849        })
850    }
851}
852
853#[cfg(test)]
854mod tests {
855    use vortex_error::VortexResult;
856
857    use crate::IntoArray;
858    use crate::LEGACY_SESSION;
859    use crate::VortexSessionExecute;
860    use crate::assert_arrays_eq;
861    use crate::builders::ArrayBuilder;
862    use crate::builders::VarBinViewBuilder;
863    use crate::builders::varbinview::VarBinViewArray;
864    use crate::dtype::DType;
865    use crate::dtype::Nullability;
866
867    #[test]
868    fn test_utf8_builder() {
869        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
870
871        builder.append_value("Hello");
872        builder.append_null();
873        builder.append_value("World");
874
875        builder.append_nulls(2);
876
877        builder.append_zeros(2);
878        builder.append_value("test");
879
880        let actual = builder.finish();
881        let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
882            Some("Hello"),
883            None,
884            Some("World"),
885            None,
886            None,
887            Some(""),
888            Some(""),
889            Some("test"),
890        ]);
891        assert_arrays_eq!(actual, expected);
892    }
893
894    #[test]
895    fn test_utf8_builder_with_extend() {
896        let array = {
897            let mut builder =
898                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
899            builder.append_null();
900            builder.append_value("Hello2");
901            builder.finish()
902        };
903        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
904
905        builder.append_value("Hello1");
906        builder.extend_from_array(&array);
907        builder.append_nulls(2);
908        builder.append_value("Hello3");
909
910        let actual = builder.finish_into_canonical();
911        let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
912            Some("Hello1"),
913            None,
914            Some("Hello2"),
915            None,
916            None,
917            Some("Hello3"),
918        ]);
919        assert_arrays_eq!(actual.into_array(), expected.into_array());
920    }
921
922    #[test]
923    fn test_buffer_deduplication() -> VortexResult<()> {
924        let array = {
925            let mut builder =
926                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
927            builder.append_value("This is a long string that should not be inlined");
928            builder.append_value("short string");
929            builder.finish_into_varbinview()
930        };
931
932        assert_eq!(array.data_buffers().len(), 1);
933        let mut builder =
934            VarBinViewBuilder::with_buffer_deduplication(DType::Utf8(Nullability::Nullable), 10);
935
936        let mut ctx = LEGACY_SESSION.create_execution_ctx();
937
938        array.append_to_builder(&mut builder, &mut ctx)?;
939        assert_eq!(builder.completed_block_count(), 1);
940
941        array
942            .slice(1..2)?
943            .append_to_builder(&mut builder, &mut ctx)?;
944        array
945            .slice(0..1)?
946            .append_to_builder(&mut builder, &mut ctx)?;
947        assert_eq!(builder.completed_block_count(), 1);
948
949        let array2 = {
950            let mut builder =
951                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
952            builder.append_value("This is a long string that should not be inlined");
953            builder.finish_into_varbinview()
954        };
955
956        array2.append_to_builder(&mut builder, &mut ctx)?;
957        assert_eq!(builder.completed_block_count(), 2);
958
959        array
960            .slice(0..1)?
961            .append_to_builder(&mut builder, &mut ctx)?;
962        array2
963            .slice(0..1)?
964            .append_to_builder(&mut builder, &mut ctx)?;
965        assert_eq!(builder.completed_block_count(), 2);
966        Ok(())
967    }
968
969    #[test]
970    fn test_append_scalar() {
971        use crate::scalar::Scalar;
972
973        // Test with Utf8 builder.
974        let mut utf8_builder =
975            VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
976
977        // Test appending a valid utf8 value.
978        let utf8_scalar1 = Scalar::utf8("hello", Nullability::Nullable);
979        utf8_builder.append_scalar(&utf8_scalar1).unwrap();
980
981        // Test appending another value.
982        let utf8_scalar2 = Scalar::utf8("world", Nullability::Nullable);
983        utf8_builder.append_scalar(&utf8_scalar2).unwrap();
984
985        // Test appending null value.
986        let null_scalar = Scalar::null(DType::Utf8(Nullability::Nullable));
987        utf8_builder.append_scalar(&null_scalar).unwrap();
988
989        let array = utf8_builder.finish();
990        let expected =
991            <VarBinViewArray as FromIterator<_>>::from_iter([Some("hello"), Some("world"), None]);
992        assert_arrays_eq!(&array, &expected);
993
994        // Test with Binary builder.
995        let mut binary_builder =
996            VarBinViewBuilder::with_capacity(DType::Binary(Nullability::Nullable), 10);
997
998        let binary_scalar = Scalar::binary(vec![1u8, 2, 3], Nullability::Nullable);
999        binary_builder.append_scalar(&binary_scalar).unwrap();
1000
1001        let binary_null = Scalar::null(DType::Binary(Nullability::Nullable));
1002        binary_builder.append_scalar(&binary_null).unwrap();
1003
1004        let binary_array = binary_builder.finish();
1005        let expected =
1006            <VarBinViewArray as FromIterator<_>>::from_iter([Some(vec![1u8, 2, 3]), None]);
1007        assert_arrays_eq!(&binary_array, &expected);
1008
1009        // Test wrong dtype error.
1010        let mut builder =
1011            VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::NonNullable), 10);
1012        let wrong_scalar = Scalar::from(42i32);
1013        assert!(builder.append_scalar(&wrong_scalar).is_err());
1014    }
1015
1016    #[test]
1017    fn test_buffer_growth_strategies() {
1018        use super::BufferGrowthStrategy;
1019
1020        // Test Fixed strategy
1021        let mut strategy = BufferGrowthStrategy::fixed(1024);
1022
1023        // Should always return the fixed size
1024        assert_eq!(strategy.next_size(), 1024);
1025        assert_eq!(strategy.next_size(), 1024);
1026        assert_eq!(strategy.next_size(), 1024);
1027
1028        // Test Exponential strategy
1029        let mut strategy = BufferGrowthStrategy::exponential(1024, 8192);
1030
1031        // Should double each time until hitting max_size
1032        assert_eq!(strategy.next_size(), 1024); // First: 1024
1033        assert_eq!(strategy.next_size(), 2048); // Second: 2048
1034        assert_eq!(strategy.next_size(), 4096); // Third: 4096
1035        assert_eq!(strategy.next_size(), 8192); // Fourth: 8192 (max)
1036        assert_eq!(strategy.next_size(), 8192); // Fifth: 8192 (capped)
1037    }
1038
1039    #[test]
1040    fn test_large_value_allocation() {
1041        use super::BufferGrowthStrategy;
1042        use super::VarBinViewBuilder;
1043
1044        let mut builder = VarBinViewBuilder::new(
1045            DType::Binary(Nullability::Nullable),
1046            10,
1047            Default::default(),
1048            BufferGrowthStrategy::exponential(1024, 4096),
1049            0.0,
1050        );
1051
1052        // Create a value larger than max_size
1053        let large_value = vec![0u8; 8192];
1054
1055        // Should successfully append the large value
1056        builder.append_value(&large_value);
1057
1058        let array = builder.finish_into_varbinview();
1059        assert_eq!(array.len(), 1);
1060
1061        // Verify the value was stored correctly
1062        let retrieved = array
1063            .execute_scalar(0, &mut LEGACY_SESSION.create_execution_ctx())
1064            .unwrap()
1065            .as_binary()
1066            .value()
1067            .cloned()
1068            .unwrap();
1069        assert_eq!(retrieved.len(), 8192);
1070        assert_eq!(retrieved.as_slice(), &large_value);
1071    }
1072}