Skip to main content

vortex_array/builders/
varbinview.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::ops::Range;
6use std::sync::Arc;
7
8use itertools::Itertools;
9use vortex_buffer::Alignment;
10use vortex_buffer::Buffer;
11use vortex_buffer::BufferMut;
12use vortex_buffer::ByteBuffer;
13use vortex_buffer::ByteBufferMut;
14use vortex_error::VortexExpect;
15use vortex_error::VortexResult;
16use vortex_error::vortex_bail;
17use vortex_error::vortex_ensure;
18use vortex_mask::Mask;
19use vortex_utils::aliases::hash_map::Entry;
20use vortex_utils::aliases::hash_map::HashMap;
21
22use crate::ArrayRef;
23use crate::ExecutionCtx;
24use crate::IntoArray;
25use crate::LEGACY_SESSION;
26use crate::VortexSessionExecute;
27use crate::arrays::VarBinViewArray;
28use crate::arrays::varbinview::VarBinViewArrayExt;
29use crate::arrays::varbinview::build_views::BinaryView;
30use crate::arrays::varbinview::compact::BufferUtilization;
31use crate::builders::ArrayBuilder;
32use crate::builders::LazyBitBufferBuilder;
33use crate::canonical::Canonical;
34#[expect(deprecated)]
35use crate::canonical::ToCanonical as _;
36use crate::dtype::DType;
37use crate::scalar::Scalar;
38
39/// The builder for building a [`VarBinViewArray`].
40pub struct VarBinViewBuilder {
41    dtype: DType,
42    views_builder: BufferMut<BinaryView>,
43    nulls: LazyBitBufferBuilder,
44    completed: CompletedBuffers,
45    in_progress: Option<ByteBufferMut>,
46    growth_strategy: BufferGrowthStrategy,
47    compaction_threshold: f64,
48}
49
50impl VarBinViewBuilder {
51    pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
52        Self::new(dtype, capacity, Default::default(), Default::default(), 0.0)
53    }
54
55    pub fn with_buffer_deduplication(dtype: DType, capacity: usize) -> Self {
56        Self::new(
57            dtype,
58            capacity,
59            CompletedBuffers::Deduplicated(Default::default()),
60            Default::default(),
61            0.0,
62        )
63    }
64
65    pub fn with_compaction(dtype: DType, capacity: usize, compaction_threshold: f64) -> Self {
66        Self::new(
67            dtype,
68            capacity,
69            Default::default(),
70            Default::default(),
71            compaction_threshold,
72        )
73    }
74
75    pub fn new(
76        dtype: DType,
77        capacity: usize,
78        completed: CompletedBuffers,
79        growth_strategy: BufferGrowthStrategy,
80        compaction_threshold: f64,
81    ) -> Self {
82        assert!(
83            matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
84            "VarBinViewBuilder DType must be Utf8 or Binary."
85        );
86        Self {
87            views_builder: BufferMut::with_capacity_preferred_aligned(
88                capacity,
89                Alignment::of::<BinaryView>(),
90                None,
91            ),
92            nulls: LazyBitBufferBuilder::new(capacity),
93            completed,
94            in_progress: None,
95            dtype,
96            growth_strategy,
97            compaction_threshold,
98        }
99    }
100
101    fn append_value_view(&mut self, value: &[u8]) {
102        let length =
103            u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
104        if length <= 12 {
105            self.views_builder.push(BinaryView::make_view(value, 0, 0));
106            return;
107        }
108
109        let (buffer_idx, offset) = self.append_value_to_buffer(value);
110        let view = BinaryView::make_view(value, buffer_idx, offset);
111        self.views_builder.push(view);
112    }
113
114    /// Appends a value to the builder.
115    pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
116        self.append_value_view(value.as_ref());
117        self.nulls.append_non_null();
118    }
119
120    /// Appends `n` copies of `value` as non-null entries.
121    pub fn append_n_values<S: AsRef<[u8]>>(&mut self, value: S, n: usize) {
122        if n == 0 {
123            return;
124        }
125        let bytes = value.as_ref();
126        let view = if bytes.len() <= BinaryView::MAX_INLINED_SIZE {
127            BinaryView::make_view(bytes, 0, 0)
128        } else {
129            let (buffer_idx, offset) = self.append_value_to_buffer(bytes);
130            BinaryView::make_view(bytes, buffer_idx, offset)
131        };
132        self.views_builder.push_n(view, n);
133        self.nulls.append_n_non_nulls(n);
134    }
135
136    fn flush_in_progress(&mut self) {
137        let Some(block) = self.in_progress.take() else {
138            return;
139        };
140
141        assert!(block.len() < u32::MAX as usize, "Block too large");
142
143        let initial_len = self.completed.len();
144        self.completed.push(block.freeze());
145        assert_eq!(
146            self.completed.len(),
147            initial_len + 1,
148            "Invalid state, just completed block already exists"
149        );
150    }
151
152    fn init_in_progress(&mut self, min_len: usize) {
153        let next_buffer_size = self.growth_strategy.next_size() as usize;
154        let to_reserve = next_buffer_size.max(min_len);
155        self.in_progress = Some(ByteBufferMut::with_capacity_preferred_aligned(
156            to_reserve,
157            Alignment::of::<u8>(),
158            None,
159        ));
160    }
161
162    /// append a non inlined value to self.in_progress.
163    fn append_value_to_buffer(&mut self, value: &[u8]) -> (u32, u32) {
164        assert!(
165            value.len() > BinaryView::MAX_INLINED_SIZE,
166            "must inline small strings"
167        );
168
169        if let Some(in_progress) = &mut self.in_progress {
170            let required_cap = in_progress.len() + value.len();
171            if in_progress.capacity() < required_cap {
172                self.flush_in_progress();
173                self.init_in_progress(value.len());
174            }
175        } else {
176            self.init_in_progress(value.len())
177        };
178
179        let in_progress = self
180            .in_progress
181            .as_mut()
182            .vortex_expect("in_progress just set");
183
184        let buffer_idx = self.completed.len();
185        let offset = u32::try_from(in_progress.len()).vortex_expect("too many buffers");
186        in_progress.extend_from_slice(value);
187
188        (buffer_idx, offset)
189    }
190
191    pub fn completed_block_count(&self) -> u32 {
192        self.completed.len()
193    }
194
195    /// Returns true if a non-empty in-progress buffer is staged (and would
196    /// become a completed buffer on the next flush), false otherwise.
197    pub fn in_progress(&self) -> bool {
198        self.in_progress.is_some()
199    }
200
201    /// Pushes buffers and pre-adjusted views into the builder.
202    ///
203    /// The provided `buffers` contain sections of data from a `VarBinViewArray`, and the
204    /// `views` are `BinaryView`s that have already been adjusted to reference the correct buffer
205    /// indices and offsets for this builder. All views must point to valid sections within the
206    /// provided buffers, and the validity length must match the view length.
207    ///
208    /// # Warning
209    ///
210    /// This method does not check utilization of the given buffers. Callers must provide
211    /// buffers that are fully utilized by the given adjusted views.
212    ///
213    /// # Panics
214    ///
215    /// Panics if this builder deduplicates buffers and any of the given buffers already
216    /// exist in this builder.
217    pub fn push_buffer_and_adjusted_views(
218        &mut self,
219        buffers: &[ByteBuffer],
220        views: &Buffer<BinaryView>,
221        validity_mask: Mask,
222    ) {
223        self.flush_in_progress();
224
225        let expected_completed_len = self.completed.len() as usize + buffers.len();
226        self.completed.extend_from_slice_unchecked(buffers);
227        assert_eq!(
228            self.completed.len() as usize,
229            expected_completed_len,
230            "Some buffers already exist",
231        );
232        self.views_builder.extend_trusted(views.iter().copied());
233        self.push_only_validity_mask(&validity_mask);
234
235        debug_assert_eq!(self.nulls.len(), self.views_builder.len())
236    }
237
238    /// Finishes the builder directly into a [`VarBinViewArray`].
239    pub fn finish_into_varbinview(&mut self) -> VarBinViewArray {
240        self.flush_in_progress();
241        let buffers = std::mem::take(&mut self.completed);
242
243        assert_eq!(
244            self.views_builder.len(),
245            self.nulls.len(),
246            "View and validity length must match"
247        );
248
249        let validity = self.nulls.finish_with_nullability(self.dtype.nullability());
250
251        // SAFETY: the builder methods check safety at each step.
252        unsafe {
253            VarBinViewArray::new_unchecked(
254                std::mem::take(&mut self.views_builder).freeze(),
255                buffers.finish(),
256                self.dtype.clone(),
257                validity,
258            )
259        }
260    }
261
262    // Pushes a validity mask into the builder not affecting the views or buffers
263    fn push_only_validity_mask(&mut self, validity_mask: &Mask) {
264        self.nulls.append_validity_mask(validity_mask);
265    }
266
267    pub(crate) fn append_varbinview_array(
268        &mut self,
269        array: &VarBinViewArray,
270        ctx: &mut ExecutionCtx,
271    ) -> VortexResult<()> {
272        self.flush_in_progress();
273
274        let mask = array.varbinview_validity().execute_mask(array.len(), ctx)?;
275
276        self.push_only_validity_mask(&mask);
277
278        let view_adjustment =
279            self.completed
280                .extend_from_compaction(BuffersWithOffsets::from_array(
281                    array,
282                    self.compaction_threshold,
283                ));
284
285        match view_adjustment {
286            ViewAdjustment::Precomputed(adjustment) => self.views_builder.extend_trusted(
287                array
288                    .views()
289                    .iter()
290                    .map(|view| adjustment.adjust_view(view)),
291            ),
292            ViewAdjustment::Rewriting(adjustment) => match mask {
293                Mask::AllTrue(_) => {
294                    for (idx, &view) in array.views().iter().enumerate() {
295                        let new_view = self.push_view(view, &adjustment, array, idx);
296                        self.views_builder.push(new_view);
297                    }
298                }
299                Mask::AllFalse(_) => {
300                    self.views_builder
301                        .push_n(BinaryView::empty_view(), array.len());
302                }
303                Mask::Values(v) => {
304                    for (idx, (&view, is_valid)) in
305                        array.views().iter().zip(v.bit_buffer().iter()).enumerate()
306                    {
307                        let new_view = if !is_valid {
308                            BinaryView::empty_view()
309                        } else {
310                            self.push_view(view, &adjustment, array, idx)
311                        };
312                        self.views_builder.push(new_view);
313                    }
314                }
315            },
316        }
317
318        Ok(())
319    }
320}
321
322impl ArrayBuilder for VarBinViewBuilder {
323    fn as_any(&self) -> &dyn Any {
324        self
325    }
326
327    fn as_any_mut(&mut self) -> &mut dyn Any {
328        self
329    }
330
331    fn dtype(&self) -> &DType {
332        &self.dtype
333    }
334
335    fn len(&self) -> usize {
336        self.nulls.len()
337    }
338
339    fn append_zeros(&mut self, n: usize) {
340        self.views_builder.push_n(BinaryView::empty_view(), n);
341        self.nulls.append_n_non_nulls(n);
342    }
343
344    unsafe fn append_nulls_unchecked(&mut self, n: usize) {
345        self.views_builder.push_n(BinaryView::empty_view(), n);
346        self.nulls.append_n_nulls(n);
347    }
348
349    fn append_scalar(&mut self, scalar: &Scalar) -> VortexResult<()> {
350        vortex_ensure!(
351            scalar.dtype() == self.dtype(),
352            "VarBinViewBuilder expected scalar with dtype {}, got {}",
353            self.dtype(),
354            scalar.dtype()
355        );
356
357        match self.dtype() {
358            DType::Utf8(_) => match scalar.as_utf8().value() {
359                Some(value) => self.append_value(value),
360                None => self.append_null(),
361            },
362            DType::Binary(_) => match scalar.as_binary().value() {
363                Some(value) => self.append_value(value),
364                None => self.append_null(),
365            },
366            _ => vortex_bail!(
367                "VarBinViewBuilder can only handle Utf8 or Binary scalars, got {:?}",
368                scalar.dtype()
369            ),
370        }
371
372        Ok(())
373    }
374
375    unsafe fn extend_from_array_unchecked(&mut self, array: &ArrayRef) {
376        #[expect(deprecated)]
377        let array = array.to_varbinview();
378        self.append_varbinview_array(&array, &mut LEGACY_SESSION.create_execution_ctx())
379            .vortex_expect("Failed to append varbinview array");
380    }
381
382    fn reserve_exact(&mut self, additional: usize) {
383        self.views_builder.reserve(additional);
384        self.nulls.reserve_exact(additional);
385    }
386
387    unsafe fn set_validity_unchecked(&mut self, validity: Mask) {
388        self.nulls = LazyBitBufferBuilder::from_validity_mask(validity);
389    }
390
391    fn finish(&mut self) -> ArrayRef {
392        self.finish_into_varbinview().into_array()
393    }
394
395    fn finish_into_canonical(&mut self) -> Canonical {
396        Canonical::VarBinView(self.finish_into_varbinview())
397    }
398}
399
400impl VarBinViewBuilder {
401    #[inline]
402    fn push_view(
403        &mut self,
404        view: BinaryView,
405        adjustment: &RewritingViewAdjustment,
406        array: &VarBinViewArray,
407        idx: usize,
408    ) -> BinaryView {
409        if view.is_inlined() {
410            view
411        } else if let Some(adjusted) = adjustment.adjust_view(&view) {
412            adjusted
413        } else {
414            let bytes = array.bytes_at(idx);
415            let (new_buf_idx, new_offset) = self.append_value_to_buffer(&bytes);
416            BinaryView::make_view(bytes.as_slice(), new_buf_idx, new_offset)
417        }
418    }
419}
420
421pub enum CompletedBuffers {
422    Default(Vec<ByteBuffer>),
423    Deduplicated(DeduplicatedBuffers),
424}
425
426impl Default for CompletedBuffers {
427    fn default() -> Self {
428        Self::Default(Vec::new())
429    }
430}
431
432// Self::push enforces len < u32::max
433#[expect(clippy::cast_possible_truncation)]
434impl CompletedBuffers {
435    fn len(&self) -> u32 {
436        match self {
437            Self::Default(buffers) => buffers.len() as u32,
438            Self::Deduplicated(buffers) => buffers.len(),
439        }
440    }
441
442    fn push(&mut self, block: ByteBuffer) -> u32 {
443        match self {
444            Self::Default(buffers) => {
445                assert!(buffers.len() < u32::MAX as usize, "Too many blocks");
446                buffers.push(block);
447                self.len()
448            }
449            Self::Deduplicated(buffers) => buffers.push(block),
450        }
451    }
452
453    /// Does not compact buffers, bypasses utilization checks.
454    fn extend_from_slice_unchecked(&mut self, buffers: &[ByteBuffer]) {
455        for buffer in buffers {
456            self.push(buffer.clone());
457        }
458    }
459
460    fn extend_from_compaction(&mut self, buffers: BuffersWithOffsets) -> ViewAdjustment {
461        match (self, buffers) {
462            (
463                Self::Default(completed_buffers),
464                BuffersWithOffsets::AllKept { buffers, offsets },
465            ) => {
466                let buffer_offset = completed_buffers.len() as u32;
467                completed_buffers.extend_from_slice(&buffers);
468                ViewAdjustment::shift(buffer_offset, offsets)
469            }
470            (
471                Self::Default(completed_buffers),
472                BuffersWithOffsets::SomeCompacted { buffers, offsets },
473            ) => {
474                let lookup = buffers
475                    .iter()
476                    .map(|maybe_buffer| {
477                        maybe_buffer.as_ref().map(|buffer| {
478                            completed_buffers.push(buffer.clone());
479                            completed_buffers.len() as u32 - 1
480                        })
481                    })
482                    .collect();
483                ViewAdjustment::rewriting(lookup, offsets)
484            }
485
486            (
487                Self::Deduplicated(completed_buffers),
488                BuffersWithOffsets::AllKept { buffers, offsets },
489            ) => {
490                let buffer_lookup = completed_buffers.extend_from_iter(buffers.iter().cloned());
491                ViewAdjustment::lookup(buffer_lookup, offsets)
492            }
493            (
494                Self::Deduplicated(completed_buffers),
495                BuffersWithOffsets::SomeCompacted { buffers, offsets },
496            ) => {
497                let buffer_lookup = completed_buffers.extend_from_option_slice(&buffers);
498                ViewAdjustment::rewriting(buffer_lookup, offsets)
499            }
500        }
501    }
502
503    fn finish(self) -> Arc<[ByteBuffer]> {
504        match self {
505            Self::Default(buffers) => Arc::from(buffers),
506            Self::Deduplicated(buffers) => buffers.finish(),
507        }
508    }
509}
510
511#[derive(Default)]
512pub struct DeduplicatedBuffers {
513    buffers: Vec<ByteBuffer>,
514    buffer_to_idx: HashMap<BufferId, u32>,
515}
516
517impl DeduplicatedBuffers {
518    // Self::push enforces len < u32::max
519    #[expect(clippy::cast_possible_truncation)]
520    fn len(&self) -> u32 {
521        self.buffers.len() as u32
522    }
523
524    /// Push a new block if not seen before. Returns the idx of the block.
525    pub(crate) fn push(&mut self, block: ByteBuffer) -> u32 {
526        assert!(self.buffers.len() < u32::MAX as usize, "Too many blocks");
527
528        let initial_len = self.len();
529        let id = BufferId::from(&block);
530        match self.buffer_to_idx.entry(id) {
531            Entry::Occupied(idx) => *idx.get(),
532            Entry::Vacant(entry) => {
533                let idx = initial_len;
534                entry.insert(idx);
535                self.buffers.push(block);
536                idx
537            }
538        }
539    }
540
541    pub(crate) fn extend_from_option_slice(
542        &mut self,
543        buffers: &[Option<ByteBuffer>],
544    ) -> Vec<Option<u32>> {
545        buffers
546            .iter()
547            .map(|buffer| buffer.as_ref().map(|buf| self.push(buf.clone())))
548            .collect()
549    }
550
551    pub(crate) fn extend_from_iter(
552        &mut self,
553        buffers: impl Iterator<Item = ByteBuffer>,
554    ) -> Vec<u32> {
555        buffers.map(|buffer| self.push(buffer)).collect()
556    }
557
558    pub(crate) fn finish(self) -> Arc<[ByteBuffer]> {
559        Arc::from(self.buffers)
560    }
561}
562
563#[derive(PartialEq, Eq, Hash)]
564struct BufferId {
565    // *const u8 stored as usize for `Send`
566    ptr: usize,
567    len: usize,
568}
569
570impl BufferId {
571    fn from(buffer: &ByteBuffer) -> Self {
572        let slice = buffer.as_slice();
573        Self {
574            ptr: slice.as_ptr() as usize,
575            len: slice.len(),
576        }
577    }
578}
579
580#[derive(Debug, Clone)]
581pub enum BufferGrowthStrategy {
582    /// Use a fixed buffer size for all allocations.
583    Fixed { size: u32 },
584    /// Use exponential growth starting from initial_size, doubling until max_size.
585    Exponential { current_size: u32, max_size: u32 },
586}
587
588impl Default for BufferGrowthStrategy {
589    fn default() -> Self {
590        Self::Exponential {
591            current_size: 4 * 1024,    // 4KB starting size
592            max_size: 2 * 1024 * 1024, // 2MB max size
593        }
594    }
595}
596
597impl BufferGrowthStrategy {
598    pub fn fixed(size: u32) -> Self {
599        Self::Fixed { size }
600    }
601
602    pub fn exponential(initial_size: u32, max_size: u32) -> Self {
603        Self::Exponential {
604            current_size: initial_size,
605            max_size,
606        }
607    }
608
609    /// Returns the next buffer size to allocate and updates internal state.
610    pub fn next_size(&mut self) -> u32 {
611        match self {
612            Self::Fixed { size } => *size,
613            Self::Exponential {
614                current_size,
615                max_size,
616            } => {
617                let result = *current_size;
618                if *current_size < *max_size {
619                    *current_size = current_size.saturating_mul(2).min(*max_size);
620                }
621                result
622            }
623        }
624    }
625}
626
627enum BuffersWithOffsets {
628    AllKept {
629        buffers: Arc<[ByteBuffer]>,
630        offsets: Option<Vec<u32>>,
631    },
632    SomeCompacted {
633        buffers: Vec<Option<ByteBuffer>>,
634        offsets: Option<Vec<u32>>,
635    },
636}
637
638impl BuffersWithOffsets {
639    pub fn from_array(array: &VarBinViewArray, compaction_threshold: f64) -> Self {
640        if compaction_threshold == 0.0 {
641            return Self::AllKept {
642                buffers: Arc::from(
643                    array
644                        .data_buffers()
645                        .iter()
646                        .cloned()
647                        .map(|b| b.unwrap_host())
648                        .collect_vec(),
649                ),
650                offsets: None,
651            };
652        }
653
654        let buffer_utilizations = array
655            .buffer_utilizations()
656            .vortex_expect("buffer_utilizations in BuffersWithOffsets::from_array");
657        let mut has_rewrite = false;
658        let mut has_nonzero_offset = false;
659        for utilization in buffer_utilizations.iter() {
660            match compaction_strategy(utilization, compaction_threshold) {
661                CompactionStrategy::KeepFull => continue,
662                CompactionStrategy::Slice { .. } => has_nonzero_offset = true,
663                CompactionStrategy::Rewrite => has_rewrite = true,
664            }
665        }
666
667        let buffers_with_offsets_iter = buffer_utilizations
668            .iter()
669            .zip(array.data_buffers().iter())
670            .map(|(utilization, buffer)| {
671                match compaction_strategy(utilization, compaction_threshold) {
672                    CompactionStrategy::KeepFull => (Some(buffer.as_host().clone()), 0),
673                    CompactionStrategy::Slice { start, end } => (
674                        Some(buffer.as_host().slice(start as usize..end as usize)),
675                        start,
676                    ),
677                    CompactionStrategy::Rewrite => (None, 0),
678                }
679            });
680
681        match (has_rewrite, has_nonzero_offset) {
682            // keep all buffers
683            (false, false) => {
684                let buffers: Vec<_> = buffers_with_offsets_iter
685                    .map(|(b, _)| b.vortex_expect("already checked for rewrite"))
686                    .collect();
687                Self::AllKept {
688                    buffers: Arc::from(buffers),
689                    offsets: None,
690                }
691            }
692            // rewrite, all zero offsets
693            (true, false) => {
694                let buffers: Vec<_> = buffers_with_offsets_iter.map(|(b, _)| b).collect();
695                Self::SomeCompacted {
696                    buffers,
697                    offsets: None,
698                }
699            }
700            // keep all buffers, but some have offsets
701            (false, true) => {
702                let (buffers, offsets): (Vec<_>, _) = buffers_with_offsets_iter
703                    .map(|(buffer, offset)| {
704                        (buffer.vortex_expect("already checked for rewrite"), offset)
705                    })
706                    .collect();
707                Self::AllKept {
708                    buffers: Arc::from(buffers),
709                    offsets: Some(offsets),
710                }
711            }
712            // rewrite and some have offsets
713            (true, true) => {
714                let (buffers, offsets) = buffers_with_offsets_iter.collect();
715                Self::SomeCompacted {
716                    buffers,
717                    offsets: Some(offsets),
718                }
719            }
720        }
721    }
722}
723
724#[derive(Debug, Clone, Copy, PartialEq, Eq)]
725enum CompactionStrategy {
726    KeepFull,
727    /// Slice the buffer to [start, end) range
728    Slice {
729        start: u32,
730        end: u32,
731    },
732    /// Rewrite data into new compacted buffer
733    Rewrite,
734}
735
736fn compaction_strategy(
737    buffer_utilization: &BufferUtilization,
738    threshold: f64,
739) -> CompactionStrategy {
740    match buffer_utilization.overall_utilization() {
741        // rewrite empty or not used buffers TODO(os): maybe keep them
742        0.0 => CompactionStrategy::Rewrite,
743        utilised if utilised >= threshold => CompactionStrategy::KeepFull,
744        _ if buffer_utilization.range_utilization() >= threshold => {
745            let Range { start, end } = buffer_utilization.range();
746            CompactionStrategy::Slice { start, end }
747        }
748        _ => CompactionStrategy::Rewrite,
749    }
750}
751
752enum ViewAdjustment {
753    Precomputed(PrecomputedViewAdjustment),
754    Rewriting(RewritingViewAdjustment),
755}
756
757impl ViewAdjustment {
758    fn shift(buffer_offset: u32, offsets: Option<Vec<u32>>) -> Self {
759        Self::Precomputed(PrecomputedViewAdjustment::Shift {
760            buffer_offset,
761            offsets,
762        })
763    }
764
765    fn lookup(buffer_lookup: Vec<u32>, offsets: Option<Vec<u32>>) -> Self {
766        Self::Precomputed(PrecomputedViewAdjustment::Lookup {
767            buffer_lookup,
768            offsets,
769        })
770    }
771
772    fn rewriting(buffer_lookup: Vec<Option<u32>>, offsets: Option<Vec<u32>>) -> Self {
773        Self::Rewriting(RewritingViewAdjustment {
774            buffer_lookup,
775            offsets,
776        })
777    }
778}
779
780// Care when adding new variants or fields in this enum, it will mess with inlining if it gets too big
781enum PrecomputedViewAdjustment {
782    Shift {
783        buffer_offset: u32,
784        offsets: Option<Vec<u32>>,
785    },
786    Lookup {
787        buffer_lookup: Vec<u32>,
788        offsets: Option<Vec<u32>>,
789    },
790}
791
792impl PrecomputedViewAdjustment {
793    #[inline]
794    fn adjust_view(&self, view: &BinaryView) -> BinaryView {
795        if view.is_inlined() {
796            return *view;
797        }
798        let view_ref = view.as_view();
799        match self {
800            Self::Shift {
801                buffer_offset,
802                offsets,
803            } => {
804                let b_idx = view_ref.buffer_index;
805                let offset_shift = offsets
806                    .as_ref()
807                    .map(|o| o[b_idx as usize])
808                    .unwrap_or_default();
809
810                // If offset < offset_shift, this view was invalid and wasn't counted in buffer_utilizations.
811                // Return an empty view to match how invalid views are handled in the Rewriting path.
812                if view_ref.offset < offset_shift {
813                    return BinaryView::empty_view();
814                }
815
816                view_ref
817                    .with_buffer_and_offset(b_idx + buffer_offset, view_ref.offset - offset_shift)
818            }
819            Self::Lookup {
820                buffer_lookup,
821                offsets,
822            } => {
823                let b_idx = view_ref.buffer_index;
824                let buffer = buffer_lookup[b_idx as usize];
825                let offset_shift = offsets
826                    .as_ref()
827                    .map(|o| o[b_idx as usize])
828                    .unwrap_or_default();
829
830                // If offset < offset_shift, this view was invalid and wasn't counted in buffer_utilizations.
831                // Return an empty view to match how invalid views are handled in the Rewriting path.
832                if view_ref.offset < offset_shift {
833                    return BinaryView::empty_view();
834                }
835
836                view_ref.with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
837            }
838        }
839        .into()
840    }
841}
842
843struct RewritingViewAdjustment {
844    buffer_lookup: Vec<Option<u32>>,
845    offsets: Option<Vec<u32>>,
846}
847
848impl RewritingViewAdjustment {
849    /// Can return None if this view can't be adjusted, because there is no precomputed lookup
850    /// for the current buffer.
851    #[inline]
852    fn adjust_view(&self, view: &BinaryView) -> Option<BinaryView> {
853        if view.is_inlined() {
854            return Some(*view);
855        }
856
857        let view_ref = view.as_view();
858        self.buffer_lookup[view_ref.buffer_index as usize].map(|buffer| {
859            let offset_shift = self
860                .offsets
861                .as_ref()
862                .map(|o| o[view_ref.buffer_index as usize])
863                .unwrap_or_default();
864            view_ref
865                .with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
866                .into()
867        })
868    }
869}
870
871#[cfg(test)]
872mod tests {
873    use vortex_error::VortexResult;
874
875    use crate::IntoArray;
876    use crate::VortexSessionExecute;
877    use crate::array_session;
878    use crate::assert_arrays_eq;
879    use crate::builders::ArrayBuilder;
880    use crate::builders::VarBinViewBuilder;
881    use crate::builders::varbinview::VarBinViewArray;
882    use crate::dtype::DType;
883    use crate::dtype::Nullability;
884
885    #[test]
886    fn test_utf8_builder() {
887        let mut ctx = array_session().create_execution_ctx();
888        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
889
890        builder.append_value("Hello");
891        builder.append_null();
892        builder.append_value("World");
893
894        builder.append_nulls(2);
895
896        builder.append_zeros(2);
897        builder.append_value("test");
898
899        let actual = builder.finish();
900        let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
901            Some("Hello"),
902            None,
903            Some("World"),
904            None,
905            None,
906            Some(""),
907            Some(""),
908            Some("test"),
909        ]);
910        assert_arrays_eq!(actual, expected, &mut ctx);
911    }
912
913    #[test]
914    fn test_utf8_builder_with_extend() {
915        let mut ctx = array_session().create_execution_ctx();
916        let array = {
917            let mut builder =
918                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
919            builder.append_null();
920            builder.append_value("Hello2");
921            builder.finish()
922        };
923        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
924
925        builder.append_value("Hello1");
926        builder.extend_from_array(&array);
927        builder.append_nulls(2);
928        builder.append_value("Hello3");
929
930        let actual = builder.finish_into_canonical();
931        let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
932            Some("Hello1"),
933            None,
934            Some("Hello2"),
935            None,
936            None,
937            Some("Hello3"),
938        ]);
939        assert_arrays_eq!(actual.into_array(), expected.into_array(), &mut ctx);
940    }
941
942    #[test]
943    fn test_buffer_deduplication() -> VortexResult<()> {
944        let array = {
945            let mut builder =
946                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
947            builder.append_value("This is a long string that should not be inlined");
948            builder.append_value("short string");
949            builder.finish_into_varbinview()
950        };
951
952        assert_eq!(array.data_buffers().len(), 1);
953        let mut builder =
954            VarBinViewBuilder::with_buffer_deduplication(DType::Utf8(Nullability::Nullable), 10);
955
956        let mut ctx = array_session().create_execution_ctx();
957
958        array.append_to_builder(&mut builder, &mut ctx)?;
959        assert_eq!(builder.completed_block_count(), 1);
960
961        array
962            .slice(1..2)?
963            .append_to_builder(&mut builder, &mut ctx)?;
964        array
965            .slice(0..1)?
966            .append_to_builder(&mut builder, &mut ctx)?;
967        assert_eq!(builder.completed_block_count(), 1);
968
969        let array2 = {
970            let mut builder =
971                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
972            builder.append_value("This is a long string that should not be inlined");
973            builder.finish_into_varbinview()
974        };
975
976        array2.append_to_builder(&mut builder, &mut ctx)?;
977        assert_eq!(builder.completed_block_count(), 2);
978
979        array
980            .slice(0..1)?
981            .append_to_builder(&mut builder, &mut ctx)?;
982        array2
983            .slice(0..1)?
984            .append_to_builder(&mut builder, &mut ctx)?;
985        assert_eq!(builder.completed_block_count(), 2);
986        Ok(())
987    }
988
989    #[test]
990    fn test_append_scalar() {
991        let mut ctx = array_session().create_execution_ctx();
992        use crate::scalar::Scalar;
993
994        // Test with Utf8 builder.
995        let mut utf8_builder =
996            VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
997
998        // Test appending a valid utf8 value.
999        let utf8_scalar1 = Scalar::utf8("hello", Nullability::Nullable);
1000        utf8_builder.append_scalar(&utf8_scalar1).unwrap();
1001
1002        // Test appending another value.
1003        let utf8_scalar2 = Scalar::utf8("world", Nullability::Nullable);
1004        utf8_builder.append_scalar(&utf8_scalar2).unwrap();
1005
1006        // Test appending null value.
1007        let null_scalar = Scalar::null(DType::Utf8(Nullability::Nullable));
1008        utf8_builder.append_scalar(&null_scalar).unwrap();
1009
1010        let array = utf8_builder.finish();
1011        let expected =
1012            <VarBinViewArray as FromIterator<_>>::from_iter([Some("hello"), Some("world"), None]);
1013        assert_arrays_eq!(&array, &expected, &mut ctx);
1014
1015        // Test with Binary builder.
1016        let mut binary_builder =
1017            VarBinViewBuilder::with_capacity(DType::Binary(Nullability::Nullable), 10);
1018
1019        let binary_scalar = Scalar::binary(vec![1u8, 2, 3], Nullability::Nullable);
1020        binary_builder.append_scalar(&binary_scalar).unwrap();
1021
1022        let binary_null = Scalar::null(DType::Binary(Nullability::Nullable));
1023        binary_builder.append_scalar(&binary_null).unwrap();
1024
1025        let binary_array = binary_builder.finish();
1026        let expected =
1027            <VarBinViewArray as FromIterator<_>>::from_iter([Some(vec![1u8, 2, 3]), None]);
1028        assert_arrays_eq!(&binary_array, &expected, &mut ctx);
1029
1030        // Test wrong dtype error.
1031        let mut builder =
1032            VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::NonNullable), 10);
1033        let wrong_scalar = Scalar::from(42i32);
1034        assert!(builder.append_scalar(&wrong_scalar).is_err());
1035    }
1036
1037    #[test]
1038    fn test_buffer_growth_strategies() {
1039        use super::BufferGrowthStrategy;
1040
1041        // Test Fixed strategy
1042        let mut strategy = BufferGrowthStrategy::fixed(1024);
1043
1044        // Should always return the fixed size
1045        assert_eq!(strategy.next_size(), 1024);
1046        assert_eq!(strategy.next_size(), 1024);
1047        assert_eq!(strategy.next_size(), 1024);
1048
1049        // Test Exponential strategy
1050        let mut strategy = BufferGrowthStrategy::exponential(1024, 8192);
1051
1052        // Should double each time until hitting max_size
1053        assert_eq!(strategy.next_size(), 1024); // First: 1024
1054        assert_eq!(strategy.next_size(), 2048); // Second: 2048
1055        assert_eq!(strategy.next_size(), 4096); // Third: 4096
1056        assert_eq!(strategy.next_size(), 8192); // Fourth: 8192 (max)
1057        assert_eq!(strategy.next_size(), 8192); // Fifth: 8192 (capped)
1058    }
1059
1060    #[test]
1061    fn test_large_value_allocation() {
1062        use super::BufferGrowthStrategy;
1063        use super::VarBinViewBuilder;
1064
1065        let mut builder = VarBinViewBuilder::new(
1066            DType::Binary(Nullability::Nullable),
1067            10,
1068            Default::default(),
1069            BufferGrowthStrategy::exponential(1024, 4096),
1070            0.0,
1071        );
1072
1073        // Create a value larger than max_size
1074        let large_value = vec![0u8; 8192];
1075
1076        // Should successfully append the large value
1077        builder.append_value(&large_value);
1078
1079        let array = builder.finish_into_varbinview();
1080        assert_eq!(array.len(), 1);
1081
1082        // Verify the value was stored correctly
1083        let retrieved = array
1084            .execute_scalar(0, &mut array_session().create_execution_ctx())
1085            .unwrap()
1086            .as_binary()
1087            .value()
1088            .cloned()
1089            .unwrap();
1090        assert_eq!(retrieved.len(), 8192);
1091        assert_eq!(retrieved.as_slice(), &large_value);
1092    }
1093}