Skip to main content

vortex_array/builders/
varbinview.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::ops::Range;
6use std::sync::Arc;
7
8use itertools::Itertools;
9use vortex_buffer::Buffer;
10use vortex_buffer::BufferMut;
11use vortex_buffer::ByteBuffer;
12use vortex_buffer::ByteBufferMut;
13use vortex_error::VortexExpect;
14use vortex_error::VortexResult;
15use vortex_error::vortex_bail;
16use vortex_error::vortex_ensure;
17use vortex_mask::Mask;
18use vortex_utils::aliases::hash_map::Entry;
19use vortex_utils::aliases::hash_map::HashMap;
20
21use crate::ArrayRef;
22use crate::IntoArray;
23use crate::LEGACY_SESSION;
24use crate::VortexSessionExecute;
25use crate::arrays::VarBinViewArray;
26use crate::arrays::varbinview::build_views::BinaryView;
27use crate::arrays::varbinview::compact::BufferUtilization;
28use crate::builders::ArrayBuilder;
29use crate::builders::LazyBitBufferBuilder;
30use crate::canonical::Canonical;
31#[expect(deprecated)]
32use crate::canonical::ToCanonical as _;
33use crate::dtype::DType;
34use crate::scalar::Scalar;
35
36/// The builder for building a [`VarBinViewArray`].
37pub struct VarBinViewBuilder {
38    dtype: DType,
39    views_builder: BufferMut<BinaryView>,
40    nulls: LazyBitBufferBuilder,
41    completed: CompletedBuffers,
42    in_progress: ByteBufferMut,
43    growth_strategy: BufferGrowthStrategy,
44    compaction_threshold: f64,
45}
46
47impl VarBinViewBuilder {
48    pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
49        Self::new(dtype, capacity, Default::default(), Default::default(), 0.0)
50    }
51
52    pub fn with_buffer_deduplication(dtype: DType, capacity: usize) -> Self {
53        Self::new(
54            dtype,
55            capacity,
56            CompletedBuffers::Deduplicated(Default::default()),
57            Default::default(),
58            0.0,
59        )
60    }
61
62    pub fn with_compaction(dtype: DType, capacity: usize, compaction_threshold: f64) -> Self {
63        Self::new(
64            dtype,
65            capacity,
66            Default::default(),
67            Default::default(),
68            compaction_threshold,
69        )
70    }
71
72    pub fn new(
73        dtype: DType,
74        capacity: usize,
75        completed: CompletedBuffers,
76        growth_strategy: BufferGrowthStrategy,
77        compaction_threshold: f64,
78    ) -> Self {
79        assert!(
80            matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
81            "VarBinViewBuilder DType must be Utf8 or Binary."
82        );
83        Self {
84            views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
85            nulls: LazyBitBufferBuilder::new(capacity),
86            completed,
87            in_progress: ByteBufferMut::empty(),
88            dtype,
89            growth_strategy,
90            compaction_threshold,
91        }
92    }
93
94    fn append_value_view(&mut self, value: &[u8]) {
95        let length =
96            u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
97        if length <= 12 {
98            self.views_builder.push(BinaryView::make_view(value, 0, 0));
99            return;
100        }
101
102        let (buffer_idx, offset) = self.append_value_to_buffer(value);
103        let view = BinaryView::make_view(value, buffer_idx, offset);
104        self.views_builder.push(view);
105    }
106
107    /// Appends a value to the builder.
108    pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
109        self.append_value_view(value.as_ref());
110        self.nulls.append_non_null();
111    }
112
113    /// Appends `n` copies of `value` as non-null entries.
114    pub fn append_n_values<S: AsRef<[u8]>>(&mut self, value: S, n: usize) {
115        if n == 0 {
116            return;
117        }
118        let bytes = value.as_ref();
119        let view = if bytes.len() <= BinaryView::MAX_INLINED_SIZE {
120            BinaryView::make_view(bytes, 0, 0)
121        } else {
122            let (buffer_idx, offset) = self.append_value_to_buffer(bytes);
123            BinaryView::make_view(bytes, buffer_idx, offset)
124        };
125        self.views_builder.push_n(view, n);
126        self.nulls.append_n_non_nulls(n);
127    }
128
129    fn flush_in_progress(&mut self) {
130        if self.in_progress.is_empty() {
131            return;
132        }
133        let block = std::mem::take(&mut self.in_progress).freeze();
134
135        assert!(block.len() < u32::MAX as usize, "Block too large");
136
137        let initial_len = self.completed.len();
138        self.completed.push(block);
139        assert_eq!(
140            self.completed.len(),
141            initial_len + 1,
142            "Invalid state, just completed block already exists"
143        );
144    }
145
146    /// append a non inlined value to self.in_progress.
147    fn append_value_to_buffer(&mut self, value: &[u8]) -> (u32, u32) {
148        assert!(
149            value.len() > BinaryView::MAX_INLINED_SIZE,
150            "must inline small strings"
151        );
152        let required_cap = self.in_progress.len() + value.len();
153        if self.in_progress.capacity() < required_cap {
154            self.flush_in_progress();
155            let next_buffer_size = self.growth_strategy.next_size() as usize;
156            let to_reserve = next_buffer_size.max(value.len());
157            self.in_progress.reserve(to_reserve);
158        }
159
160        let buffer_idx = self.completed.len();
161        let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
162        self.in_progress.extend_from_slice(value);
163
164        (buffer_idx, offset)
165    }
166
167    pub fn completed_block_count(&self) -> u32 {
168        self.completed.len()
169    }
170
171    /// Returns true if a non-empty in-progress buffer is staged (and would
172    /// become a completed buffer on the next flush), false otherwise.
173    pub fn in_progress(&self) -> bool {
174        !self.in_progress.is_empty()
175    }
176
177    /// Pushes buffers and pre-adjusted views into the builder.
178    ///
179    /// The provided `buffers` contain sections of data from a `VarBinViewArray`, and the
180    /// `views` are `BinaryView`s that have already been adjusted to reference the correct buffer
181    /// indices and offsets for this builder. All views must point to valid sections within the
182    /// provided buffers, and the validity length must match the view length.
183    ///
184    /// # Warning
185    ///
186    /// This method does not check utilization of the given buffers. Callers must provide
187    /// buffers that are fully utilized by the given adjusted views.
188    ///
189    /// # Panics
190    ///
191    /// Panics if this builder deduplicates buffers and any of the given buffers already
192    /// exist in this builder.
193    pub fn push_buffer_and_adjusted_views(
194        &mut self,
195        buffers: &[ByteBuffer],
196        views: &Buffer<BinaryView>,
197        validity_mask: Mask,
198    ) {
199        self.flush_in_progress();
200
201        let expected_completed_len = self.completed.len() as usize + buffers.len();
202        self.completed.extend_from_slice_unchecked(buffers);
203        assert_eq!(
204            self.completed.len() as usize,
205            expected_completed_len,
206            "Some buffers already exist",
207        );
208        self.views_builder.extend_trusted(views.iter().copied());
209        self.push_only_validity_mask(&validity_mask);
210
211        debug_assert_eq!(self.nulls.len(), self.views_builder.len())
212    }
213
214    /// Finishes the builder directly into a [`VarBinViewArray`].
215    pub fn finish_into_varbinview(&mut self) -> VarBinViewArray {
216        self.flush_in_progress();
217        let buffers = std::mem::take(&mut self.completed);
218
219        assert_eq!(
220            self.views_builder.len(),
221            self.nulls.len(),
222            "View and validity length must match"
223        );
224
225        let validity = self.nulls.finish_with_nullability(self.dtype.nullability());
226
227        // SAFETY: the builder methods check safety at each step.
228        unsafe {
229            VarBinViewArray::new_unchecked(
230                std::mem::take(&mut self.views_builder).freeze(),
231                buffers.finish(),
232                self.dtype.clone(),
233                validity,
234            )
235        }
236    }
237
238    // Pushes a validity mask into the builder not affecting the views or buffers
239    fn push_only_validity_mask(&mut self, validity_mask: &Mask) {
240        self.nulls.append_validity_mask(validity_mask);
241    }
242}
243
244impl ArrayBuilder for VarBinViewBuilder {
245    fn as_any(&self) -> &dyn Any {
246        self
247    }
248
249    fn as_any_mut(&mut self) -> &mut dyn Any {
250        self
251    }
252
253    fn dtype(&self) -> &DType {
254        &self.dtype
255    }
256
257    fn len(&self) -> usize {
258        self.nulls.len()
259    }
260
261    fn append_zeros(&mut self, n: usize) {
262        self.views_builder.push_n(BinaryView::empty_view(), n);
263        self.nulls.append_n_non_nulls(n);
264    }
265
266    unsafe fn append_nulls_unchecked(&mut self, n: usize) {
267        self.views_builder.push_n(BinaryView::empty_view(), n);
268        self.nulls.append_n_nulls(n);
269    }
270
271    fn append_scalar(&mut self, scalar: &Scalar) -> VortexResult<()> {
272        vortex_ensure!(
273            scalar.dtype() == self.dtype(),
274            "VarBinViewBuilder expected scalar with dtype {}, got {}",
275            self.dtype(),
276            scalar.dtype()
277        );
278
279        match self.dtype() {
280            DType::Utf8(_) => match scalar.as_utf8().value() {
281                Some(value) => self.append_value(value),
282                None => self.append_null(),
283            },
284            DType::Binary(_) => match scalar.as_binary().value() {
285                Some(value) => self.append_value(value),
286                None => self.append_null(),
287            },
288            _ => vortex_bail!(
289                "VarBinViewBuilder can only handle Utf8 or Binary scalars, got {:?}",
290                scalar.dtype()
291            ),
292        }
293
294        Ok(())
295    }
296
297    unsafe fn extend_from_array_unchecked(&mut self, array: &ArrayRef) {
298        #[expect(deprecated)]
299        let array = array.to_varbinview();
300        self.flush_in_progress();
301
302        let mask = array
303            .validity()
304            .vortex_expect("validity_mask")
305            .execute_mask(array.len(), &mut LEGACY_SESSION.create_execution_ctx())
306            .vortex_expect("Failed to compute validity mask");
307
308        self.push_only_validity_mask(&mask);
309
310        let view_adjustment =
311            self.completed
312                .extend_from_compaction(BuffersWithOffsets::from_array(
313                    &array,
314                    self.compaction_threshold,
315                ));
316
317        match view_adjustment {
318            ViewAdjustment::Precomputed(adjustment) => self.views_builder.extend_trusted(
319                array
320                    .views()
321                    .iter()
322                    .map(|view| adjustment.adjust_view(view)),
323            ),
324            ViewAdjustment::Rewriting(adjustment) => match mask {
325                Mask::AllTrue(_) => {
326                    for (idx, &view) in array.views().iter().enumerate() {
327                        let new_view = self.push_view(view, &adjustment, &array, idx);
328                        self.views_builder.push(new_view);
329                    }
330                }
331                Mask::AllFalse(_) => {
332                    self.views_builder
333                        .push_n(BinaryView::empty_view(), array.len());
334                }
335                Mask::Values(v) => {
336                    for (idx, (&view, is_valid)) in
337                        array.views().iter().zip(v.bit_buffer().iter()).enumerate()
338                    {
339                        let new_view = if !is_valid {
340                            BinaryView::empty_view()
341                        } else {
342                            self.push_view(view, &adjustment, &array, idx)
343                        };
344                        self.views_builder.push(new_view);
345                    }
346                }
347            },
348        }
349    }
350
351    fn reserve_exact(&mut self, additional: usize) {
352        self.views_builder.reserve(additional);
353        self.nulls.reserve_exact(additional);
354    }
355
356    unsafe fn set_validity_unchecked(&mut self, validity: Mask) {
357        self.nulls = LazyBitBufferBuilder::from_validity_mask(validity);
358    }
359
360    fn finish(&mut self) -> ArrayRef {
361        self.finish_into_varbinview().into_array()
362    }
363
364    fn finish_into_canonical(&mut self) -> Canonical {
365        Canonical::VarBinView(self.finish_into_varbinview())
366    }
367}
368
369impl VarBinViewBuilder {
370    #[inline]
371    fn push_view(
372        &mut self,
373        view: BinaryView,
374        adjustment: &RewritingViewAdjustment,
375        array: &VarBinViewArray,
376        idx: usize,
377    ) -> BinaryView {
378        if view.is_inlined() {
379            view
380        } else if let Some(adjusted) = adjustment.adjust_view(&view) {
381            adjusted
382        } else {
383            let bytes = array.bytes_at(idx);
384            let (new_buf_idx, new_offset) = self.append_value_to_buffer(&bytes);
385            BinaryView::make_view(bytes.as_slice(), new_buf_idx, new_offset)
386        }
387    }
388}
389
390pub enum CompletedBuffers {
391    Default(Vec<ByteBuffer>),
392    Deduplicated(DeduplicatedBuffers),
393}
394
395impl Default for CompletedBuffers {
396    fn default() -> Self {
397        Self::Default(Vec::new())
398    }
399}
400
401// Self::push enforces len < u32::max
402#[expect(clippy::cast_possible_truncation)]
403impl CompletedBuffers {
404    fn len(&self) -> u32 {
405        match self {
406            Self::Default(buffers) => buffers.len() as u32,
407            Self::Deduplicated(buffers) => buffers.len(),
408        }
409    }
410
411    fn push(&mut self, block: ByteBuffer) -> u32 {
412        match self {
413            Self::Default(buffers) => {
414                assert!(buffers.len() < u32::MAX as usize, "Too many blocks");
415                buffers.push(block);
416                self.len()
417            }
418            Self::Deduplicated(buffers) => buffers.push(block),
419        }
420    }
421
422    /// Does not compact buffers, bypasses utilization checks.
423    fn extend_from_slice_unchecked(&mut self, buffers: &[ByteBuffer]) {
424        for buffer in buffers {
425            self.push(buffer.clone());
426        }
427    }
428
429    fn extend_from_compaction(&mut self, buffers: BuffersWithOffsets) -> ViewAdjustment {
430        match (self, buffers) {
431            (
432                Self::Default(completed_buffers),
433                BuffersWithOffsets::AllKept { buffers, offsets },
434            ) => {
435                let buffer_offset = completed_buffers.len() as u32;
436                completed_buffers.extend_from_slice(&buffers);
437                ViewAdjustment::shift(buffer_offset, offsets)
438            }
439            (
440                Self::Default(completed_buffers),
441                BuffersWithOffsets::SomeCompacted { buffers, offsets },
442            ) => {
443                let lookup = buffers
444                    .iter()
445                    .map(|maybe_buffer| {
446                        maybe_buffer.as_ref().map(|buffer| {
447                            completed_buffers.push(buffer.clone());
448                            completed_buffers.len() as u32 - 1
449                        })
450                    })
451                    .collect();
452                ViewAdjustment::rewriting(lookup, offsets)
453            }
454
455            (
456                Self::Deduplicated(completed_buffers),
457                BuffersWithOffsets::AllKept { buffers, offsets },
458            ) => {
459                let buffer_lookup = completed_buffers.extend_from_iter(buffers.iter().cloned());
460                ViewAdjustment::lookup(buffer_lookup, offsets)
461            }
462            (
463                Self::Deduplicated(completed_buffers),
464                BuffersWithOffsets::SomeCompacted { buffers, offsets },
465            ) => {
466                let buffer_lookup = completed_buffers.extend_from_option_slice(&buffers);
467                ViewAdjustment::rewriting(buffer_lookup, offsets)
468            }
469        }
470    }
471
472    fn finish(self) -> Arc<[ByteBuffer]> {
473        match self {
474            Self::Default(buffers) => Arc::from(buffers),
475            Self::Deduplicated(buffers) => buffers.finish(),
476        }
477    }
478}
479
480#[derive(Default)]
481pub struct DeduplicatedBuffers {
482    buffers: Vec<ByteBuffer>,
483    buffer_to_idx: HashMap<BufferId, u32>,
484}
485
486impl DeduplicatedBuffers {
487    // Self::push enforces len < u32::max
488    #[expect(clippy::cast_possible_truncation)]
489    fn len(&self) -> u32 {
490        self.buffers.len() as u32
491    }
492
493    /// Push a new block if not seen before. Returns the idx of the block.
494    pub(crate) fn push(&mut self, block: ByteBuffer) -> u32 {
495        assert!(self.buffers.len() < u32::MAX as usize, "Too many blocks");
496
497        let initial_len = self.len();
498        let id = BufferId::from(&block);
499        match self.buffer_to_idx.entry(id) {
500            Entry::Occupied(idx) => *idx.get(),
501            Entry::Vacant(entry) => {
502                let idx = initial_len;
503                entry.insert(idx);
504                self.buffers.push(block);
505                idx
506            }
507        }
508    }
509
510    pub(crate) fn extend_from_option_slice(
511        &mut self,
512        buffers: &[Option<ByteBuffer>],
513    ) -> Vec<Option<u32>> {
514        buffers
515            .iter()
516            .map(|buffer| buffer.as_ref().map(|buf| self.push(buf.clone())))
517            .collect()
518    }
519
520    pub(crate) fn extend_from_iter(
521        &mut self,
522        buffers: impl Iterator<Item = ByteBuffer>,
523    ) -> Vec<u32> {
524        buffers.map(|buffer| self.push(buffer)).collect()
525    }
526
527    pub(crate) fn finish(self) -> Arc<[ByteBuffer]> {
528        Arc::from(self.buffers)
529    }
530}
531
532#[derive(PartialEq, Eq, Hash)]
533struct BufferId {
534    // *const u8 stored as usize for `Send`
535    ptr: usize,
536    len: usize,
537}
538
539impl BufferId {
540    fn from(buffer: &ByteBuffer) -> Self {
541        let slice = buffer.as_slice();
542        Self {
543            ptr: slice.as_ptr() as usize,
544            len: slice.len(),
545        }
546    }
547}
548
549#[derive(Debug, Clone)]
550pub enum BufferGrowthStrategy {
551    /// Use a fixed buffer size for all allocations.
552    Fixed { size: u32 },
553    /// Use exponential growth starting from initial_size, doubling until max_size.
554    Exponential { current_size: u32, max_size: u32 },
555}
556
557impl Default for BufferGrowthStrategy {
558    fn default() -> Self {
559        Self::Exponential {
560            current_size: 4 * 1024,    // 4KB starting size
561            max_size: 2 * 1024 * 1024, // 2MB max size
562        }
563    }
564}
565
566impl BufferGrowthStrategy {
567    pub fn fixed(size: u32) -> Self {
568        Self::Fixed { size }
569    }
570
571    pub fn exponential(initial_size: u32, max_size: u32) -> Self {
572        Self::Exponential {
573            current_size: initial_size,
574            max_size,
575        }
576    }
577
578    /// Returns the next buffer size to allocate and updates internal state.
579    pub fn next_size(&mut self) -> u32 {
580        match self {
581            Self::Fixed { size } => *size,
582            Self::Exponential {
583                current_size,
584                max_size,
585            } => {
586                let result = *current_size;
587                if *current_size < *max_size {
588                    *current_size = current_size.saturating_mul(2).min(*max_size);
589                }
590                result
591            }
592        }
593    }
594}
595
596enum BuffersWithOffsets {
597    AllKept {
598        buffers: Arc<[ByteBuffer]>,
599        offsets: Option<Vec<u32>>,
600    },
601    SomeCompacted {
602        buffers: Vec<Option<ByteBuffer>>,
603        offsets: Option<Vec<u32>>,
604    },
605}
606
607impl BuffersWithOffsets {
608    pub fn from_array(array: &VarBinViewArray, compaction_threshold: f64) -> Self {
609        if compaction_threshold == 0.0 {
610            return Self::AllKept {
611                buffers: Arc::from(
612                    array
613                        .data_buffers()
614                        .iter()
615                        .cloned()
616                        .map(|b| b.unwrap_host())
617                        .collect_vec(),
618                ),
619                offsets: None,
620            };
621        }
622
623        let buffer_utilizations = array
624            .buffer_utilizations()
625            .vortex_expect("buffer_utilizations in BuffersWithOffsets::from_array");
626        let mut has_rewrite = false;
627        let mut has_nonzero_offset = false;
628        for utilization in buffer_utilizations.iter() {
629            match compaction_strategy(utilization, compaction_threshold) {
630                CompactionStrategy::KeepFull => continue,
631                CompactionStrategy::Slice { .. } => has_nonzero_offset = true,
632                CompactionStrategy::Rewrite => has_rewrite = true,
633            }
634        }
635
636        let buffers_with_offsets_iter = buffer_utilizations
637            .iter()
638            .zip(array.data_buffers().iter())
639            .map(|(utilization, buffer)| {
640                match compaction_strategy(utilization, compaction_threshold) {
641                    CompactionStrategy::KeepFull => (Some(buffer.as_host().clone()), 0),
642                    CompactionStrategy::Slice { start, end } => (
643                        Some(buffer.as_host().slice(start as usize..end as usize)),
644                        start,
645                    ),
646                    CompactionStrategy::Rewrite => (None, 0),
647                }
648            });
649
650        match (has_rewrite, has_nonzero_offset) {
651            // keep all buffers
652            (false, false) => {
653                let buffers: Vec<_> = buffers_with_offsets_iter
654                    .map(|(b, _)| b.vortex_expect("already checked for rewrite"))
655                    .collect();
656                Self::AllKept {
657                    buffers: Arc::from(buffers),
658                    offsets: None,
659                }
660            }
661            // rewrite, all zero offsets
662            (true, false) => {
663                let buffers: Vec<_> = buffers_with_offsets_iter.map(|(b, _)| b).collect();
664                Self::SomeCompacted {
665                    buffers,
666                    offsets: None,
667                }
668            }
669            // keep all buffers, but some have offsets
670            (false, true) => {
671                let (buffers, offsets): (Vec<_>, _) = buffers_with_offsets_iter
672                    .map(|(buffer, offset)| {
673                        (buffer.vortex_expect("already checked for rewrite"), offset)
674                    })
675                    .collect();
676                Self::AllKept {
677                    buffers: Arc::from(buffers),
678                    offsets: Some(offsets),
679                }
680            }
681            // rewrite and some have offsets
682            (true, true) => {
683                let (buffers, offsets) = buffers_with_offsets_iter.collect();
684                Self::SomeCompacted {
685                    buffers,
686                    offsets: Some(offsets),
687                }
688            }
689        }
690    }
691}
692
693#[derive(Debug, Clone, Copy, PartialEq, Eq)]
694enum CompactionStrategy {
695    KeepFull,
696    /// Slice the buffer to [start, end) range
697    Slice {
698        start: u32,
699        end: u32,
700    },
701    /// Rewrite data into new compacted buffer
702    Rewrite,
703}
704
705fn compaction_strategy(
706    buffer_utilization: &BufferUtilization,
707    threshold: f64,
708) -> CompactionStrategy {
709    match buffer_utilization.overall_utilization() {
710        // rewrite empty or not used buffers TODO(os): maybe keep them
711        0.0 => CompactionStrategy::Rewrite,
712        utilised if utilised >= threshold => CompactionStrategy::KeepFull,
713        _ if buffer_utilization.range_utilization() >= threshold => {
714            let Range { start, end } = buffer_utilization.range();
715            CompactionStrategy::Slice { start, end }
716        }
717        _ => CompactionStrategy::Rewrite,
718    }
719}
720
721enum ViewAdjustment {
722    Precomputed(PrecomputedViewAdjustment),
723    Rewriting(RewritingViewAdjustment),
724}
725
726impl ViewAdjustment {
727    fn shift(buffer_offset: u32, offsets: Option<Vec<u32>>) -> Self {
728        Self::Precomputed(PrecomputedViewAdjustment::Shift {
729            buffer_offset,
730            offsets,
731        })
732    }
733
734    fn lookup(buffer_lookup: Vec<u32>, offsets: Option<Vec<u32>>) -> Self {
735        Self::Precomputed(PrecomputedViewAdjustment::Lookup {
736            buffer_lookup,
737            offsets,
738        })
739    }
740
741    fn rewriting(buffer_lookup: Vec<Option<u32>>, offsets: Option<Vec<u32>>) -> Self {
742        Self::Rewriting(RewritingViewAdjustment {
743            buffer_lookup,
744            offsets,
745        })
746    }
747}
748
749// Care when adding new variants or fields in this enum, it will mess with inlining if it gets too big
750enum PrecomputedViewAdjustment {
751    Shift {
752        buffer_offset: u32,
753        offsets: Option<Vec<u32>>,
754    },
755    Lookup {
756        buffer_lookup: Vec<u32>,
757        offsets: Option<Vec<u32>>,
758    },
759}
760
761impl PrecomputedViewAdjustment {
762    #[inline]
763    fn adjust_view(&self, view: &BinaryView) -> BinaryView {
764        if view.is_inlined() {
765            return *view;
766        }
767        let view_ref = view.as_view();
768        match self {
769            Self::Shift {
770                buffer_offset,
771                offsets,
772            } => {
773                let b_idx = view_ref.buffer_index;
774                let offset_shift = offsets
775                    .as_ref()
776                    .map(|o| o[b_idx as usize])
777                    .unwrap_or_default();
778
779                // If offset < offset_shift, this view was invalid and wasn't counted in buffer_utilizations.
780                // Return an empty view to match how invalid views are handled in the Rewriting path.
781                if view_ref.offset < offset_shift {
782                    return BinaryView::empty_view();
783                }
784
785                view_ref
786                    .with_buffer_and_offset(b_idx + buffer_offset, view_ref.offset - offset_shift)
787            }
788            Self::Lookup {
789                buffer_lookup,
790                offsets,
791            } => {
792                let b_idx = view_ref.buffer_index;
793                let buffer = buffer_lookup[b_idx as usize];
794                let offset_shift = offsets
795                    .as_ref()
796                    .map(|o| o[b_idx as usize])
797                    .unwrap_or_default();
798
799                // If offset < offset_shift, this view was invalid and wasn't counted in buffer_utilizations.
800                // Return an empty view to match how invalid views are handled in the Rewriting path.
801                if view_ref.offset < offset_shift {
802                    return BinaryView::empty_view();
803                }
804
805                view_ref.with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
806            }
807        }
808        .into()
809    }
810}
811
812struct RewritingViewAdjustment {
813    buffer_lookup: Vec<Option<u32>>,
814    offsets: Option<Vec<u32>>,
815}
816
817impl RewritingViewAdjustment {
818    /// Can return None if this view can't be adjusted, because there is no precomputed lookup
819    /// for the current buffer.
820    #[inline]
821    fn adjust_view(&self, view: &BinaryView) -> Option<BinaryView> {
822        if view.is_inlined() {
823            return Some(*view);
824        }
825
826        let view_ref = view.as_view();
827        self.buffer_lookup[view_ref.buffer_index as usize].map(|buffer| {
828            let offset_shift = self
829                .offsets
830                .as_ref()
831                .map(|o| o[view_ref.buffer_index as usize])
832                .unwrap_or_default();
833            view_ref
834                .with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
835                .into()
836        })
837    }
838}
839
840#[cfg(test)]
841mod tests {
842    use vortex_error::VortexResult;
843
844    use crate::IntoArray;
845    use crate::LEGACY_SESSION;
846    use crate::VortexSessionExecute;
847    use crate::assert_arrays_eq;
848    use crate::builders::ArrayBuilder;
849    use crate::builders::VarBinViewBuilder;
850    use crate::builders::varbinview::VarBinViewArray;
851    use crate::dtype::DType;
852    use crate::dtype::Nullability;
853
854    #[test]
855    fn test_utf8_builder() {
856        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
857
858        builder.append_value("Hello");
859        builder.append_null();
860        builder.append_value("World");
861
862        builder.append_nulls(2);
863
864        builder.append_zeros(2);
865        builder.append_value("test");
866
867        let actual = builder.finish();
868        let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
869            Some("Hello"),
870            None,
871            Some("World"),
872            None,
873            None,
874            Some(""),
875            Some(""),
876            Some("test"),
877        ]);
878        assert_arrays_eq!(actual, expected);
879    }
880
881    #[test]
882    fn test_utf8_builder_with_extend() {
883        let array = {
884            let mut builder =
885                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
886            builder.append_null();
887            builder.append_value("Hello2");
888            builder.finish()
889        };
890        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
891
892        builder.append_value("Hello1");
893        builder.extend_from_array(&array);
894        builder.append_nulls(2);
895        builder.append_value("Hello3");
896
897        let actual = builder.finish_into_canonical();
898        let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
899            Some("Hello1"),
900            None,
901            Some("Hello2"),
902            None,
903            None,
904            Some("Hello3"),
905        ]);
906        assert_arrays_eq!(actual.into_array(), expected.into_array());
907    }
908
909    #[test]
910    fn test_buffer_deduplication() -> VortexResult<()> {
911        let array = {
912            let mut builder =
913                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
914            builder.append_value("This is a long string that should not be inlined");
915            builder.append_value("short string");
916            builder.finish_into_varbinview()
917        };
918
919        assert_eq!(array.data_buffers().len(), 1);
920        let mut builder =
921            VarBinViewBuilder::with_buffer_deduplication(DType::Utf8(Nullability::Nullable), 10);
922
923        let mut ctx = LEGACY_SESSION.create_execution_ctx();
924
925        array.append_to_builder(&mut builder, &mut ctx)?;
926        assert_eq!(builder.completed_block_count(), 1);
927
928        array
929            .slice(1..2)?
930            .append_to_builder(&mut builder, &mut ctx)?;
931        array
932            .slice(0..1)?
933            .append_to_builder(&mut builder, &mut ctx)?;
934        assert_eq!(builder.completed_block_count(), 1);
935
936        let array2 = {
937            let mut builder =
938                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
939            builder.append_value("This is a long string that should not be inlined");
940            builder.finish_into_varbinview()
941        };
942
943        array2.append_to_builder(&mut builder, &mut ctx)?;
944        assert_eq!(builder.completed_block_count(), 2);
945
946        array
947            .slice(0..1)?
948            .append_to_builder(&mut builder, &mut ctx)?;
949        array2
950            .slice(0..1)?
951            .append_to_builder(&mut builder, &mut ctx)?;
952        assert_eq!(builder.completed_block_count(), 2);
953        Ok(())
954    }
955
956    #[test]
957    fn test_append_scalar() {
958        use crate::scalar::Scalar;
959
960        // Test with Utf8 builder.
961        let mut utf8_builder =
962            VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
963
964        // Test appending a valid utf8 value.
965        let utf8_scalar1 = Scalar::utf8("hello", Nullability::Nullable);
966        utf8_builder.append_scalar(&utf8_scalar1).unwrap();
967
968        // Test appending another value.
969        let utf8_scalar2 = Scalar::utf8("world", Nullability::Nullable);
970        utf8_builder.append_scalar(&utf8_scalar2).unwrap();
971
972        // Test appending null value.
973        let null_scalar = Scalar::null(DType::Utf8(Nullability::Nullable));
974        utf8_builder.append_scalar(&null_scalar).unwrap();
975
976        let array = utf8_builder.finish();
977        let expected =
978            <VarBinViewArray as FromIterator<_>>::from_iter([Some("hello"), Some("world"), None]);
979        assert_arrays_eq!(&array, &expected);
980
981        // Test with Binary builder.
982        let mut binary_builder =
983            VarBinViewBuilder::with_capacity(DType::Binary(Nullability::Nullable), 10);
984
985        let binary_scalar = Scalar::binary(vec![1u8, 2, 3], Nullability::Nullable);
986        binary_builder.append_scalar(&binary_scalar).unwrap();
987
988        let binary_null = Scalar::null(DType::Binary(Nullability::Nullable));
989        binary_builder.append_scalar(&binary_null).unwrap();
990
991        let binary_array = binary_builder.finish();
992        let expected =
993            <VarBinViewArray as FromIterator<_>>::from_iter([Some(vec![1u8, 2, 3]), None]);
994        assert_arrays_eq!(&binary_array, &expected);
995
996        // Test wrong dtype error.
997        let mut builder =
998            VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::NonNullable), 10);
999        let wrong_scalar = Scalar::from(42i32);
1000        assert!(builder.append_scalar(&wrong_scalar).is_err());
1001    }
1002
1003    #[test]
1004    fn test_buffer_growth_strategies() {
1005        use super::BufferGrowthStrategy;
1006
1007        // Test Fixed strategy
1008        let mut strategy = BufferGrowthStrategy::fixed(1024);
1009
1010        // Should always return the fixed size
1011        assert_eq!(strategy.next_size(), 1024);
1012        assert_eq!(strategy.next_size(), 1024);
1013        assert_eq!(strategy.next_size(), 1024);
1014
1015        // Test Exponential strategy
1016        let mut strategy = BufferGrowthStrategy::exponential(1024, 8192);
1017
1018        // Should double each time until hitting max_size
1019        assert_eq!(strategy.next_size(), 1024); // First: 1024
1020        assert_eq!(strategy.next_size(), 2048); // Second: 2048
1021        assert_eq!(strategy.next_size(), 4096); // Third: 4096
1022        assert_eq!(strategy.next_size(), 8192); // Fourth: 8192 (max)
1023        assert_eq!(strategy.next_size(), 8192); // Fifth: 8192 (capped)
1024    }
1025
1026    #[test]
1027    fn test_large_value_allocation() {
1028        use super::BufferGrowthStrategy;
1029        use super::VarBinViewBuilder;
1030
1031        let mut builder = VarBinViewBuilder::new(
1032            DType::Binary(Nullability::Nullable),
1033            10,
1034            Default::default(),
1035            BufferGrowthStrategy::exponential(1024, 4096),
1036            0.0,
1037        );
1038
1039        // Create a value larger than max_size
1040        let large_value = vec![0u8; 8192];
1041
1042        // Should successfully append the large value
1043        builder.append_value(&large_value);
1044
1045        let array = builder.finish_into_varbinview();
1046        assert_eq!(array.len(), 1);
1047
1048        // Verify the value was stored correctly
1049        let retrieved = array
1050            .execute_scalar(0, &mut LEGACY_SESSION.create_execution_ctx())
1051            .unwrap()
1052            .as_binary()
1053            .value()
1054            .cloned()
1055            .unwrap();
1056        assert_eq!(retrieved.len(), 8192);
1057        assert_eq!(retrieved.as_slice(), &large_value);
1058    }
1059}