Skip to main content

vortex_array/builders/
varbinview.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::ops::Range;
6use std::sync::Arc;
7
8use itertools::Itertools;
9use vortex_buffer::Buffer;
10use vortex_buffer::BufferMut;
11use vortex_buffer::ByteBuffer;
12use vortex_buffer::ByteBufferMut;
13use vortex_error::VortexExpect;
14use vortex_error::VortexResult;
15use vortex_error::vortex_bail;
16use vortex_error::vortex_ensure;
17use vortex_mask::Mask;
18use vortex_utils::aliases::hash_map::Entry;
19use vortex_utils::aliases::hash_map::HashMap;
20
21use crate::ArrayRef;
22use crate::IntoArray;
23use crate::LEGACY_SESSION;
24use crate::VortexSessionExecute;
25use crate::arrays::VarBinViewArray;
26use crate::arrays::varbinview::build_views::BinaryView;
27use crate::arrays::varbinview::compact::BufferUtilization;
28use crate::builders::ArrayBuilder;
29use crate::builders::LazyBitBufferBuilder;
30use crate::canonical::Canonical;
31use crate::canonical::ToCanonical;
32use crate::dtype::DType;
33use crate::scalar::Scalar;
34
35/// The builder for building a [`VarBinViewArray`].
36pub struct VarBinViewBuilder {
37    dtype: DType,
38    views_builder: BufferMut<BinaryView>,
39    nulls: LazyBitBufferBuilder,
40    completed: CompletedBuffers,
41    in_progress: ByteBufferMut,
42    growth_strategy: BufferGrowthStrategy,
43    compaction_threshold: f64,
44}
45
46impl VarBinViewBuilder {
47    pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
48        Self::new(dtype, capacity, Default::default(), Default::default(), 0.0)
49    }
50
51    pub fn with_buffer_deduplication(dtype: DType, capacity: usize) -> Self {
52        Self::new(
53            dtype,
54            capacity,
55            CompletedBuffers::Deduplicated(Default::default()),
56            Default::default(),
57            0.0,
58        )
59    }
60
61    pub fn with_compaction(dtype: DType, capacity: usize, compaction_threshold: f64) -> Self {
62        Self::new(
63            dtype,
64            capacity,
65            Default::default(),
66            Default::default(),
67            compaction_threshold,
68        )
69    }
70
71    pub fn new(
72        dtype: DType,
73        capacity: usize,
74        completed: CompletedBuffers,
75        growth_strategy: BufferGrowthStrategy,
76        compaction_threshold: f64,
77    ) -> Self {
78        assert!(
79            matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
80            "VarBinViewBuilder DType must be Utf8 or Binary."
81        );
82        Self {
83            views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
84            nulls: LazyBitBufferBuilder::new(capacity),
85            completed,
86            in_progress: ByteBufferMut::empty(),
87            dtype,
88            growth_strategy,
89            compaction_threshold,
90        }
91    }
92
93    fn append_value_view(&mut self, value: &[u8]) {
94        let length =
95            u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
96        if length <= 12 {
97            self.views_builder.push(BinaryView::make_view(value, 0, 0));
98            return;
99        }
100
101        let (buffer_idx, offset) = self.append_value_to_buffer(value);
102        let view = BinaryView::make_view(value, buffer_idx, offset);
103        self.views_builder.push(view);
104    }
105
106    /// Appends a value to the builder.
107    pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
108        self.append_value_view(value.as_ref());
109        self.nulls.append_non_null();
110    }
111
112    /// Appends `n` copies of `value` as non-null entries.
113    pub fn append_n_values<S: AsRef<[u8]>>(&mut self, value: S, n: usize) {
114        if n == 0 {
115            return;
116        }
117        let bytes = value.as_ref();
118        let view = if bytes.len() <= BinaryView::MAX_INLINED_SIZE {
119            BinaryView::make_view(bytes, 0, 0)
120        } else {
121            let (buffer_idx, offset) = self.append_value_to_buffer(bytes);
122            BinaryView::make_view(bytes, buffer_idx, offset)
123        };
124        self.views_builder.push_n(view, n);
125        self.nulls.append_n_non_nulls(n);
126    }
127
128    fn flush_in_progress(&mut self) {
129        if self.in_progress.is_empty() {
130            return;
131        }
132        let block = std::mem::take(&mut self.in_progress).freeze();
133
134        assert!(block.len() < u32::MAX as usize, "Block too large");
135
136        let initial_len = self.completed.len();
137        self.completed.push(block);
138        assert_eq!(
139            self.completed.len(),
140            initial_len + 1,
141            "Invalid state, just completed block already exists"
142        );
143    }
144
145    /// append a non inlined value to self.in_progress.
146    fn append_value_to_buffer(&mut self, value: &[u8]) -> (u32, u32) {
147        assert!(
148            value.len() > BinaryView::MAX_INLINED_SIZE,
149            "must inline small strings"
150        );
151        let required_cap = self.in_progress.len() + value.len();
152        if self.in_progress.capacity() < required_cap {
153            self.flush_in_progress();
154            let next_buffer_size = self.growth_strategy.next_size() as usize;
155            let to_reserve = next_buffer_size.max(value.len());
156            self.in_progress.reserve(to_reserve);
157        }
158
159        let buffer_idx = self.completed.len();
160        let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
161        self.in_progress.extend_from_slice(value);
162
163        (buffer_idx, offset)
164    }
165
166    pub fn completed_block_count(&self) -> u32 {
167        self.completed.len()
168    }
169
170    /// Returns true if a non-empty in-progress buffer is staged (and would
171    /// become a completed buffer on the next flush), false otherwise.
172    pub fn in_progress(&self) -> bool {
173        !self.in_progress.is_empty()
174    }
175
176    /// Pushes buffers and pre-adjusted views into the builder.
177    ///
178    /// The provided `buffers` contain sections of data from a `VarBinViewArray`, and the
179    /// `views` are `BinaryView`s that have already been adjusted to reference the correct buffer
180    /// indices and offsets for this builder. All views must point to valid sections within the
181    /// provided buffers, and the validity length must match the view length.
182    ///
183    /// # Warning
184    ///
185    /// This method does not check utilization of the given buffers. Callers must provide
186    /// buffers that are fully utilized by the given adjusted views.
187    ///
188    /// # Panics
189    ///
190    /// Panics if this builder deduplicates buffers and any of the given buffers already
191    /// exist in this builder.
192    pub fn push_buffer_and_adjusted_views(
193        &mut self,
194        buffers: &[ByteBuffer],
195        views: &Buffer<BinaryView>,
196        validity_mask: Mask,
197    ) {
198        self.flush_in_progress();
199
200        let expected_completed_len = self.completed.len() as usize + buffers.len();
201        self.completed.extend_from_slice_unchecked(buffers);
202        assert_eq!(
203            self.completed.len() as usize,
204            expected_completed_len,
205            "Some buffers already exist",
206        );
207        self.views_builder.extend_trusted(views.iter().copied());
208        self.push_only_validity_mask(validity_mask);
209
210        debug_assert_eq!(self.nulls.len(), self.views_builder.len())
211    }
212
213    /// Finishes the builder directly into a [`VarBinViewArray`].
214    pub fn finish_into_varbinview(&mut self) -> VarBinViewArray {
215        self.flush_in_progress();
216        let buffers = std::mem::take(&mut self.completed);
217
218        assert_eq!(
219            self.views_builder.len(),
220            self.nulls.len(),
221            "View and validity length must match"
222        );
223
224        let validity = self.nulls.finish_with_nullability(self.dtype.nullability());
225
226        // SAFETY: the builder methods check safety at each step.
227        unsafe {
228            VarBinViewArray::new_unchecked(
229                std::mem::take(&mut self.views_builder).freeze(),
230                buffers.finish(),
231                self.dtype.clone(),
232                validity,
233            )
234        }
235    }
236
237    // Pushes a validity mask into the builder not affecting the views or buffers
238    fn push_only_validity_mask(&mut self, validity_mask: Mask) {
239        self.nulls.append_validity_mask(validity_mask);
240    }
241}
242
243impl ArrayBuilder for VarBinViewBuilder {
244    fn as_any(&self) -> &dyn Any {
245        self
246    }
247
248    fn as_any_mut(&mut self) -> &mut dyn Any {
249        self
250    }
251
252    fn dtype(&self) -> &DType {
253        &self.dtype
254    }
255
256    fn len(&self) -> usize {
257        self.nulls.len()
258    }
259
260    fn append_zeros(&mut self, n: usize) {
261        self.views_builder.push_n(BinaryView::empty_view(), n);
262        self.nulls.append_n_non_nulls(n);
263    }
264
265    unsafe fn append_nulls_unchecked(&mut self, n: usize) {
266        self.views_builder.push_n(BinaryView::empty_view(), n);
267        self.nulls.append_n_nulls(n);
268    }
269
270    fn append_scalar(&mut self, scalar: &Scalar) -> VortexResult<()> {
271        vortex_ensure!(
272            scalar.dtype() == self.dtype(),
273            "VarBinViewBuilder expected scalar with dtype {}, got {}",
274            self.dtype(),
275            scalar.dtype()
276        );
277
278        match self.dtype() {
279            DType::Utf8(_) => match scalar.as_utf8().value() {
280                Some(value) => self.append_value(value),
281                None => self.append_null(),
282            },
283            DType::Binary(_) => match scalar.as_binary().value() {
284                Some(value) => self.append_value(value),
285                None => self.append_null(),
286            },
287            _ => vortex_bail!(
288                "VarBinViewBuilder can only handle Utf8 or Binary scalars, got {:?}",
289                scalar.dtype()
290            ),
291        }
292
293        Ok(())
294    }
295
296    unsafe fn extend_from_array_unchecked(&mut self, array: &ArrayRef) {
297        let array = array.to_varbinview();
298        self.flush_in_progress();
299
300        self.push_only_validity_mask(
301            array
302                .as_ref()
303                .validity()
304                .vortex_expect("validity_mask")
305                .to_mask(
306                    array.as_ref().len(),
307                    &mut LEGACY_SESSION.create_execution_ctx(),
308                )
309                .vortex_expect("Failed to compute validity mask"),
310        );
311
312        let view_adjustment =
313            self.completed
314                .extend_from_compaction(BuffersWithOffsets::from_array(
315                    &array,
316                    self.compaction_threshold,
317                ));
318
319        match view_adjustment {
320            ViewAdjustment::Precomputed(adjustment) => self.views_builder.extend_trusted(
321                array
322                    .views()
323                    .iter()
324                    .map(|view| adjustment.adjust_view(view)),
325            ),
326            ViewAdjustment::Rewriting(adjustment) => {
327                match array
328                    .as_ref()
329                    .validity()
330                    .vortex_expect("validity_mask")
331                    .to_mask(
332                        array.as_ref().len(),
333                        &mut LEGACY_SESSION.create_execution_ctx(),
334                    )
335                    .vortex_expect("Failed to compute validity mask")
336                {
337                    Mask::AllTrue(_) => {
338                        for (idx, &view) in array.views().iter().enumerate() {
339                            let new_view = self.push_view(view, &adjustment, &array, idx);
340                            self.views_builder.push(new_view);
341                        }
342                    }
343                    Mask::AllFalse(_) => {
344                        self.views_builder
345                            .push_n(BinaryView::empty_view(), array.len());
346                    }
347                    Mask::Values(v) => {
348                        for (idx, (&view, is_valid)) in
349                            array.views().iter().zip(v.bit_buffer().iter()).enumerate()
350                        {
351                            let new_view = if !is_valid {
352                                BinaryView::empty_view()
353                            } else {
354                                self.push_view(view, &adjustment, &array, idx)
355                            };
356                            self.views_builder.push(new_view);
357                        }
358                    }
359                }
360            }
361        }
362    }
363
364    fn reserve_exact(&mut self, additional: usize) {
365        self.views_builder.reserve(additional);
366        self.nulls.reserve_exact(additional);
367    }
368
369    unsafe fn set_validity_unchecked(&mut self, validity: Mask) {
370        self.nulls = LazyBitBufferBuilder::new(validity.len());
371        self.nulls.append_validity_mask(validity);
372    }
373
374    fn finish(&mut self) -> ArrayRef {
375        self.finish_into_varbinview().into_array()
376    }
377
378    fn finish_into_canonical(&mut self) -> Canonical {
379        Canonical::VarBinView(self.finish_into_varbinview())
380    }
381}
382
383impl VarBinViewBuilder {
384    fn push_view(
385        &mut self,
386        view: BinaryView,
387        adjustment: &RewritingViewAdjustment,
388        array: &VarBinViewArray,
389        idx: usize,
390    ) -> BinaryView {
391        if view.is_inlined() {
392            view
393        } else if let Some(adjusted) = adjustment.adjust_view(&view) {
394            adjusted
395        } else {
396            let bytes = array.bytes_at(idx);
397            let (new_buf_idx, new_offset) = self.append_value_to_buffer(&bytes);
398            BinaryView::make_view(bytes.as_slice(), new_buf_idx, new_offset)
399        }
400    }
401}
402
403pub enum CompletedBuffers {
404    Default(Vec<ByteBuffer>),
405    Deduplicated(DeduplicatedBuffers),
406}
407
408impl Default for CompletedBuffers {
409    fn default() -> Self {
410        Self::Default(Vec::new())
411    }
412}
413
414// Self::push enforces len < u32::max
415#[expect(clippy::cast_possible_truncation)]
416impl CompletedBuffers {
417    fn len(&self) -> u32 {
418        match self {
419            Self::Default(buffers) => buffers.len() as u32,
420            Self::Deduplicated(buffers) => buffers.len(),
421        }
422    }
423
424    fn push(&mut self, block: ByteBuffer) -> u32 {
425        match self {
426            Self::Default(buffers) => {
427                assert!(buffers.len() < u32::MAX as usize, "Too many blocks");
428                buffers.push(block);
429                self.len()
430            }
431            Self::Deduplicated(buffers) => buffers.push(block),
432        }
433    }
434
435    /// Does not compact buffers, bypasses utilization checks.
436    fn extend_from_slice_unchecked(&mut self, buffers: &[ByteBuffer]) {
437        for buffer in buffers {
438            self.push(buffer.clone());
439        }
440    }
441
442    fn extend_from_compaction(&mut self, buffers: BuffersWithOffsets) -> ViewAdjustment {
443        match (self, buffers) {
444            (
445                Self::Default(completed_buffers),
446                BuffersWithOffsets::AllKept { buffers, offsets },
447            ) => {
448                let buffer_offset = completed_buffers.len() as u32;
449                completed_buffers.extend_from_slice(&buffers);
450                ViewAdjustment::shift(buffer_offset, offsets)
451            }
452            (
453                Self::Default(completed_buffers),
454                BuffersWithOffsets::SomeCompacted { buffers, offsets },
455            ) => {
456                let lookup = buffers
457                    .iter()
458                    .map(|maybe_buffer| {
459                        maybe_buffer.as_ref().map(|buffer| {
460                            completed_buffers.push(buffer.clone());
461                            completed_buffers.len() as u32 - 1
462                        })
463                    })
464                    .collect();
465                ViewAdjustment::rewriting(lookup, offsets)
466            }
467
468            (
469                Self::Deduplicated(completed_buffers),
470                BuffersWithOffsets::AllKept { buffers, offsets },
471            ) => {
472                let buffer_lookup = completed_buffers.extend_from_iter(buffers.iter().cloned());
473                ViewAdjustment::lookup(buffer_lookup, offsets)
474            }
475            (
476                Self::Deduplicated(completed_buffers),
477                BuffersWithOffsets::SomeCompacted { buffers, offsets },
478            ) => {
479                let buffer_lookup = completed_buffers.extend_from_option_slice(&buffers);
480                ViewAdjustment::rewriting(buffer_lookup, offsets)
481            }
482        }
483    }
484
485    fn finish(self) -> Arc<[ByteBuffer]> {
486        match self {
487            Self::Default(buffers) => Arc::from(buffers),
488            Self::Deduplicated(buffers) => buffers.finish(),
489        }
490    }
491}
492
493#[derive(Default)]
494pub struct DeduplicatedBuffers {
495    buffers: Vec<ByteBuffer>,
496    buffer_to_idx: HashMap<BufferId, u32>,
497}
498
499impl DeduplicatedBuffers {
500    // Self::push enforces len < u32::max
501    #[expect(clippy::cast_possible_truncation)]
502    fn len(&self) -> u32 {
503        self.buffers.len() as u32
504    }
505
506    /// Push a new block if not seen before. Returns the idx of the block.
507    pub(crate) fn push(&mut self, block: ByteBuffer) -> u32 {
508        assert!(self.buffers.len() < u32::MAX as usize, "Too many blocks");
509
510        let initial_len = self.len();
511        let id = BufferId::from(&block);
512        match self.buffer_to_idx.entry(id) {
513            Entry::Occupied(idx) => *idx.get(),
514            Entry::Vacant(entry) => {
515                let idx = initial_len;
516                entry.insert(idx);
517                self.buffers.push(block);
518                idx
519            }
520        }
521    }
522
523    pub(crate) fn extend_from_option_slice(
524        &mut self,
525        buffers: &[Option<ByteBuffer>],
526    ) -> Vec<Option<u32>> {
527        buffers
528            .iter()
529            .map(|buffer| buffer.as_ref().map(|buf| self.push(buf.clone())))
530            .collect()
531    }
532
533    pub(crate) fn extend_from_iter(
534        &mut self,
535        buffers: impl Iterator<Item = ByteBuffer>,
536    ) -> Vec<u32> {
537        buffers.map(|buffer| self.push(buffer)).collect()
538    }
539
540    pub(crate) fn finish(self) -> Arc<[ByteBuffer]> {
541        Arc::from(self.buffers)
542    }
543}
544
545#[derive(PartialEq, Eq, Hash)]
546struct BufferId {
547    // *const u8 stored as usize for `Send`
548    ptr: usize,
549    len: usize,
550}
551
552impl BufferId {
553    fn from(buffer: &ByteBuffer) -> Self {
554        let slice = buffer.as_slice();
555        Self {
556            ptr: slice.as_ptr() as usize,
557            len: slice.len(),
558        }
559    }
560}
561
562#[derive(Debug, Clone)]
563pub enum BufferGrowthStrategy {
564    /// Use a fixed buffer size for all allocations.
565    Fixed { size: u32 },
566    /// Use exponential growth starting from initial_size, doubling until max_size.
567    Exponential { current_size: u32, max_size: u32 },
568}
569
570impl Default for BufferGrowthStrategy {
571    fn default() -> Self {
572        Self::Exponential {
573            current_size: 4 * 1024,    // 4KB starting size
574            max_size: 2 * 1024 * 1024, // 2MB max size
575        }
576    }
577}
578
579impl BufferGrowthStrategy {
580    pub fn fixed(size: u32) -> Self {
581        Self::Fixed { size }
582    }
583
584    pub fn exponential(initial_size: u32, max_size: u32) -> Self {
585        Self::Exponential {
586            current_size: initial_size,
587            max_size,
588        }
589    }
590
591    /// Returns the next buffer size to allocate and updates internal state.
592    pub fn next_size(&mut self) -> u32 {
593        match self {
594            Self::Fixed { size } => *size,
595            Self::Exponential {
596                current_size,
597                max_size,
598            } => {
599                let result = *current_size;
600                if *current_size < *max_size {
601                    *current_size = current_size.saturating_mul(2).min(*max_size);
602                }
603                result
604            }
605        }
606    }
607}
608
609enum BuffersWithOffsets {
610    AllKept {
611        buffers: Arc<[ByteBuffer]>,
612        offsets: Option<Vec<u32>>,
613    },
614    SomeCompacted {
615        buffers: Vec<Option<ByteBuffer>>,
616        offsets: Option<Vec<u32>>,
617    },
618}
619
620impl BuffersWithOffsets {
621    pub fn from_array(array: &VarBinViewArray, compaction_threshold: f64) -> Self {
622        if compaction_threshold == 0.0 {
623            return Self::AllKept {
624                buffers: Arc::from(
625                    array
626                        .data_buffers()
627                        .to_vec()
628                        .into_iter()
629                        .map(|b| b.unwrap_host())
630                        .collect_vec(),
631                ),
632                offsets: None,
633            };
634        }
635
636        let buffer_utilizations = array
637            .buffer_utilizations()
638            .vortex_expect("buffer_utilizations in BuffersWithOffsets::from_array");
639        let mut has_rewrite = false;
640        let mut has_nonzero_offset = false;
641        for utilization in buffer_utilizations.iter() {
642            match compaction_strategy(utilization, compaction_threshold) {
643                CompactionStrategy::KeepFull => continue,
644                CompactionStrategy::Slice { .. } => has_nonzero_offset = true,
645                CompactionStrategy::Rewrite => has_rewrite = true,
646            }
647        }
648
649        let buffers_with_offsets_iter = buffer_utilizations
650            .iter()
651            .zip(array.data_buffers().iter())
652            .map(|(utilization, buffer)| {
653                match compaction_strategy(utilization, compaction_threshold) {
654                    CompactionStrategy::KeepFull => (Some(buffer.as_host().clone()), 0),
655                    CompactionStrategy::Slice { start, end } => (
656                        Some(buffer.as_host().slice(start as usize..end as usize)),
657                        start,
658                    ),
659                    CompactionStrategy::Rewrite => (None, 0),
660                }
661            });
662
663        match (has_rewrite, has_nonzero_offset) {
664            // keep all buffers
665            (false, false) => {
666                let buffers: Vec<_> = buffers_with_offsets_iter
667                    .map(|(b, _)| b.vortex_expect("already checked for rewrite"))
668                    .collect();
669                Self::AllKept {
670                    buffers: Arc::from(buffers),
671                    offsets: None,
672                }
673            }
674            // rewrite, all zero offsets
675            (true, false) => {
676                let buffers: Vec<_> = buffers_with_offsets_iter.map(|(b, _)| b).collect();
677                Self::SomeCompacted {
678                    buffers,
679                    offsets: None,
680                }
681            }
682            // keep all buffers, but some have offsets
683            (false, true) => {
684                let (buffers, offsets): (Vec<_>, _) = buffers_with_offsets_iter
685                    .map(|(buffer, offset)| {
686                        (buffer.vortex_expect("already checked for rewrite"), offset)
687                    })
688                    .collect();
689                Self::AllKept {
690                    buffers: Arc::from(buffers),
691                    offsets: Some(offsets),
692                }
693            }
694            // rewrite and some have offsets
695            (true, true) => {
696                let (buffers, offsets) = buffers_with_offsets_iter.collect();
697                Self::SomeCompacted {
698                    buffers,
699                    offsets: Some(offsets),
700                }
701            }
702        }
703    }
704}
705
706#[derive(Debug, Clone, Copy, PartialEq, Eq)]
707enum CompactionStrategy {
708    KeepFull,
709    /// Slice the buffer to [start, end) range
710    Slice {
711        start: u32,
712        end: u32,
713    },
714    /// Rewrite data into new compacted buffer
715    Rewrite,
716}
717
718fn compaction_strategy(
719    buffer_utilization: &BufferUtilization,
720    threshold: f64,
721) -> CompactionStrategy {
722    match buffer_utilization.overall_utilization() {
723        // rewrite empty or not used buffers TODO(os): maybe keep them
724        0.0 => CompactionStrategy::Rewrite,
725        utilised if utilised >= threshold => CompactionStrategy::KeepFull,
726        _ if buffer_utilization.range_utilization() >= threshold => {
727            let Range { start, end } = buffer_utilization.range();
728            CompactionStrategy::Slice { start, end }
729        }
730        _ => CompactionStrategy::Rewrite,
731    }
732}
733
734enum ViewAdjustment {
735    Precomputed(PrecomputedViewAdjustment),
736    Rewriting(RewritingViewAdjustment),
737}
738
739impl ViewAdjustment {
740    fn shift(buffer_offset: u32, offsets: Option<Vec<u32>>) -> Self {
741        Self::Precomputed(PrecomputedViewAdjustment::Shift {
742            buffer_offset,
743            offsets,
744        })
745    }
746
747    fn lookup(buffer_lookup: Vec<u32>, offsets: Option<Vec<u32>>) -> Self {
748        Self::Precomputed(PrecomputedViewAdjustment::Lookup {
749            buffer_lookup,
750            offsets,
751        })
752    }
753
754    fn rewriting(buffer_lookup: Vec<Option<u32>>, offsets: Option<Vec<u32>>) -> Self {
755        Self::Rewriting(RewritingViewAdjustment {
756            buffer_lookup,
757            offsets,
758        })
759    }
760}
761
762// Care when adding new variants or fields in this enum, it will mess with inlining if it gets too big
763enum PrecomputedViewAdjustment {
764    Shift {
765        buffer_offset: u32,
766        offsets: Option<Vec<u32>>,
767    },
768    Lookup {
769        buffer_lookup: Vec<u32>,
770        offsets: Option<Vec<u32>>,
771    },
772}
773
774impl PrecomputedViewAdjustment {
775    fn adjust_view(&self, view: &BinaryView) -> BinaryView {
776        if view.is_inlined() {
777            return *view;
778        }
779        let view_ref = view.as_view();
780        match self {
781            Self::Shift {
782                buffer_offset,
783                offsets,
784            } => {
785                let b_idx = view_ref.buffer_index;
786                let offset_shift = offsets
787                    .as_ref()
788                    .map(|o| o[b_idx as usize])
789                    .unwrap_or_default();
790
791                // If offset < offset_shift, this view was invalid and wasn't counted in buffer_utilizations.
792                // Return an empty view to match how invalid views are handled in the Rewriting path.
793                if view_ref.offset < offset_shift {
794                    return BinaryView::empty_view();
795                }
796
797                view_ref
798                    .with_buffer_and_offset(b_idx + buffer_offset, view_ref.offset - offset_shift)
799            }
800            Self::Lookup {
801                buffer_lookup,
802                offsets,
803            } => {
804                let b_idx = view_ref.buffer_index;
805                let buffer = buffer_lookup[b_idx as usize];
806                let offset_shift = offsets
807                    .as_ref()
808                    .map(|o| o[b_idx as usize])
809                    .unwrap_or_default();
810
811                // If offset < offset_shift, this view was invalid and wasn't counted in buffer_utilizations.
812                // Return an empty view to match how invalid views are handled in the Rewriting path.
813                if view_ref.offset < offset_shift {
814                    return BinaryView::empty_view();
815                }
816
817                view_ref.with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
818            }
819        }
820        .into()
821    }
822}
823
824struct RewritingViewAdjustment {
825    buffer_lookup: Vec<Option<u32>>,
826    offsets: Option<Vec<u32>>,
827}
828
829impl RewritingViewAdjustment {
830    /// Can return None if this view can't be adjusted, because there is no precomputed lookup
831    /// for the current buffer.
832    fn adjust_view(&self, view: &BinaryView) -> Option<BinaryView> {
833        if view.is_inlined() {
834            return Some(*view);
835        }
836
837        let view_ref = view.as_view();
838        self.buffer_lookup[view_ref.buffer_index as usize].map(|buffer| {
839            let offset_shift = self
840                .offsets
841                .as_ref()
842                .map(|o| o[view_ref.buffer_index as usize])
843                .unwrap_or_default();
844            view_ref
845                .with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
846                .into()
847        })
848    }
849}
850
851#[cfg(test)]
852mod tests {
853    use vortex_error::VortexResult;
854
855    use crate::IntoArray;
856    use crate::LEGACY_SESSION;
857    use crate::VortexSessionExecute;
858    use crate::assert_arrays_eq;
859    use crate::builders::ArrayBuilder;
860    use crate::builders::VarBinViewBuilder;
861    use crate::builders::varbinview::VarBinViewArray;
862    use crate::dtype::DType;
863    use crate::dtype::Nullability;
864
865    #[test]
866    fn test_utf8_builder() {
867        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
868
869        builder.append_value("Hello");
870        builder.append_null();
871        builder.append_value("World");
872
873        builder.append_nulls(2);
874
875        builder.append_zeros(2);
876        builder.append_value("test");
877
878        let actual = builder.finish();
879        let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
880            Some("Hello"),
881            None,
882            Some("World"),
883            None,
884            None,
885            Some(""),
886            Some(""),
887            Some("test"),
888        ]);
889        assert_arrays_eq!(actual, expected);
890    }
891
892    #[test]
893    fn test_utf8_builder_with_extend() {
894        let array = {
895            let mut builder =
896                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
897            builder.append_null();
898            builder.append_value("Hello2");
899            builder.finish()
900        };
901        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
902
903        builder.append_value("Hello1");
904        builder.extend_from_array(&array);
905        builder.append_nulls(2);
906        builder.append_value("Hello3");
907
908        let actual = builder.finish_into_canonical();
909        let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
910            Some("Hello1"),
911            None,
912            Some("Hello2"),
913            None,
914            None,
915            Some("Hello3"),
916        ]);
917        assert_arrays_eq!(actual.into_array(), expected.into_array());
918    }
919
920    #[test]
921    fn test_buffer_deduplication() -> VortexResult<()> {
922        let array = {
923            let mut builder =
924                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
925            builder.append_value("This is a long string that should not be inlined");
926            builder.append_value("short string");
927            builder.finish_into_varbinview()
928        };
929
930        assert_eq!(array.data_buffers().len(), 1);
931        let mut builder =
932            VarBinViewBuilder::with_buffer_deduplication(DType::Utf8(Nullability::Nullable), 10);
933
934        let mut ctx = LEGACY_SESSION.create_execution_ctx();
935
936        array.append_to_builder(&mut builder, &mut ctx)?;
937        assert_eq!(builder.completed_block_count(), 1);
938
939        array
940            .slice(1..2)?
941            .append_to_builder(&mut builder, &mut ctx)?;
942        array
943            .slice(0..1)?
944            .append_to_builder(&mut builder, &mut ctx)?;
945        assert_eq!(builder.completed_block_count(), 1);
946
947        let array2 = {
948            let mut builder =
949                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
950            builder.append_value("This is a long string that should not be inlined");
951            builder.finish_into_varbinview()
952        };
953
954        array2.append_to_builder(&mut builder, &mut ctx)?;
955        assert_eq!(builder.completed_block_count(), 2);
956
957        array
958            .slice(0..1)?
959            .append_to_builder(&mut builder, &mut ctx)?;
960        array2
961            .slice(0..1)?
962            .append_to_builder(&mut builder, &mut ctx)?;
963        assert_eq!(builder.completed_block_count(), 2);
964        Ok(())
965    }
966
967    #[test]
968    fn test_append_scalar() {
969        use crate::scalar::Scalar;
970
971        // Test with Utf8 builder.
972        let mut utf8_builder =
973            VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
974
975        // Test appending a valid utf8 value.
976        let utf8_scalar1 = Scalar::utf8("hello", Nullability::Nullable);
977        utf8_builder.append_scalar(&utf8_scalar1).unwrap();
978
979        // Test appending another value.
980        let utf8_scalar2 = Scalar::utf8("world", Nullability::Nullable);
981        utf8_builder.append_scalar(&utf8_scalar2).unwrap();
982
983        // Test appending null value.
984        let null_scalar = Scalar::null(DType::Utf8(Nullability::Nullable));
985        utf8_builder.append_scalar(&null_scalar).unwrap();
986
987        let array = utf8_builder.finish();
988        let expected =
989            <VarBinViewArray as FromIterator<_>>::from_iter([Some("hello"), Some("world"), None]);
990        assert_arrays_eq!(&array, &expected);
991
992        // Test with Binary builder.
993        let mut binary_builder =
994            VarBinViewBuilder::with_capacity(DType::Binary(Nullability::Nullable), 10);
995
996        let binary_scalar = Scalar::binary(vec![1u8, 2, 3], Nullability::Nullable);
997        binary_builder.append_scalar(&binary_scalar).unwrap();
998
999        let binary_null = Scalar::null(DType::Binary(Nullability::Nullable));
1000        binary_builder.append_scalar(&binary_null).unwrap();
1001
1002        let binary_array = binary_builder.finish();
1003        let expected =
1004            <VarBinViewArray as FromIterator<_>>::from_iter([Some(vec![1u8, 2, 3]), None]);
1005        assert_arrays_eq!(&binary_array, &expected);
1006
1007        // Test wrong dtype error.
1008        let mut builder =
1009            VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::NonNullable), 10);
1010        let wrong_scalar = Scalar::from(42i32);
1011        assert!(builder.append_scalar(&wrong_scalar).is_err());
1012    }
1013
1014    #[test]
1015    fn test_buffer_growth_strategies() {
1016        use super::BufferGrowthStrategy;
1017
1018        // Test Fixed strategy
1019        let mut strategy = BufferGrowthStrategy::fixed(1024);
1020
1021        // Should always return the fixed size
1022        assert_eq!(strategy.next_size(), 1024);
1023        assert_eq!(strategy.next_size(), 1024);
1024        assert_eq!(strategy.next_size(), 1024);
1025
1026        // Test Exponential strategy
1027        let mut strategy = BufferGrowthStrategy::exponential(1024, 8192);
1028
1029        // Should double each time until hitting max_size
1030        assert_eq!(strategy.next_size(), 1024); // First: 1024
1031        assert_eq!(strategy.next_size(), 2048); // Second: 2048
1032        assert_eq!(strategy.next_size(), 4096); // Third: 4096
1033        assert_eq!(strategy.next_size(), 8192); // Fourth: 8192 (max)
1034        assert_eq!(strategy.next_size(), 8192); // Fifth: 8192 (capped)
1035    }
1036
1037    #[test]
1038    fn test_large_value_allocation() {
1039        use super::BufferGrowthStrategy;
1040        use super::VarBinViewBuilder;
1041
1042        let mut builder = VarBinViewBuilder::new(
1043            DType::Binary(Nullability::Nullable),
1044            10,
1045            Default::default(),
1046            BufferGrowthStrategy::exponential(1024, 4096),
1047            0.0,
1048        );
1049
1050        // Create a value larger than max_size
1051        let large_value = vec![0u8; 8192];
1052
1053        // Should successfully append the large value
1054        builder.append_value(&large_value);
1055
1056        let array = builder.finish_into_varbinview();
1057        assert_eq!(array.len(), 1);
1058
1059        // Verify the value was stored correctly
1060        let retrieved = array
1061            .execute_scalar(0, &mut LEGACY_SESSION.create_execution_ctx())
1062            .unwrap()
1063            .as_binary()
1064            .value()
1065            .cloned()
1066            .unwrap();
1067        assert_eq!(retrieved.len(), 8192);
1068        assert_eq!(retrieved.as_slice(), &large_value);
1069    }
1070}