Skip to main content

vortex_array/builders/
varbinview.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::ops::Range;
6use std::sync::Arc;
7
8use itertools::Itertools;
9use vortex_buffer::Buffer;
10use vortex_buffer::BufferMut;
11use vortex_buffer::ByteBuffer;
12use vortex_buffer::ByteBufferMut;
13use vortex_dtype::DType;
14use vortex_error::VortexExpect;
15use vortex_error::VortexResult;
16use vortex_error::vortex_bail;
17use vortex_error::vortex_ensure;
18use vortex_mask::Mask;
19use vortex_utils::aliases::hash_map::Entry;
20use vortex_utils::aliases::hash_map::HashMap;
21
22use crate::Array;
23use crate::ArrayRef;
24use crate::IntoArray;
25use crate::arrays::BinaryView;
26use crate::arrays::VarBinViewArray;
27use crate::arrays::compact::BufferUtilization;
28use crate::builders::ArrayBuilder;
29use crate::builders::LazyBitBufferBuilder;
30use crate::canonical::Canonical;
31use crate::canonical::ToCanonical;
32use crate::scalar::Scalar;
33
34/// The builder for building a [`VarBinViewArray`].
35pub struct VarBinViewBuilder {
36    dtype: DType,
37    views_builder: BufferMut<BinaryView>,
38    nulls: LazyBitBufferBuilder,
39    completed: CompletedBuffers,
40    in_progress: ByteBufferMut,
41    growth_strategy: BufferGrowthStrategy,
42    compaction_threshold: f64,
43}
44
45impl VarBinViewBuilder {
46    pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
47        Self::new(dtype, capacity, Default::default(), Default::default(), 0.0)
48    }
49
50    pub fn with_buffer_deduplication(dtype: DType, capacity: usize) -> Self {
51        Self::new(
52            dtype,
53            capacity,
54            CompletedBuffers::Deduplicated(Default::default()),
55            Default::default(),
56            0.0,
57        )
58    }
59
60    pub fn with_compaction(dtype: DType, capacity: usize, compaction_threshold: f64) -> Self {
61        Self::new(
62            dtype,
63            capacity,
64            Default::default(),
65            Default::default(),
66            compaction_threshold,
67        )
68    }
69
70    pub fn new(
71        dtype: DType,
72        capacity: usize,
73        completed: CompletedBuffers,
74        growth_strategy: BufferGrowthStrategy,
75        compaction_threshold: f64,
76    ) -> Self {
77        assert!(
78            matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
79            "VarBinViewBuilder DType must be Utf8 or Binary."
80        );
81        Self {
82            views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
83            nulls: LazyBitBufferBuilder::new(capacity),
84            completed,
85            in_progress: ByteBufferMut::empty(),
86            dtype,
87            growth_strategy,
88            compaction_threshold,
89        }
90    }
91
92    fn append_value_view(&mut self, value: &[u8]) {
93        let length =
94            u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
95        if length <= 12 {
96            self.views_builder.push(BinaryView::make_view(value, 0, 0));
97            return;
98        }
99
100        let (buffer_idx, offset) = self.append_value_to_buffer(value);
101        let view = BinaryView::make_view(value, buffer_idx, offset);
102        self.views_builder.push(view);
103    }
104
105    /// Appends a value to the builder.
106    pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
107        self.append_value_view(value.as_ref());
108        self.nulls.append_non_null();
109    }
110
111    fn flush_in_progress(&mut self) {
112        if self.in_progress.is_empty() {
113            return;
114        }
115        let block = std::mem::take(&mut self.in_progress).freeze();
116
117        assert!(block.len() < u32::MAX as usize, "Block too large");
118
119        let initial_len = self.completed.len();
120        self.completed.push(block);
121        assert_eq!(
122            self.completed.len(),
123            initial_len + 1,
124            "Invalid state, just completed block already exists"
125        );
126    }
127
128    /// append a non inlined value to self.in_progress.
129    fn append_value_to_buffer(&mut self, value: &[u8]) -> (u32, u32) {
130        assert!(value.len() > 12, "must inline small strings");
131        let required_cap = self.in_progress.len() + value.len();
132        if self.in_progress.capacity() < required_cap {
133            self.flush_in_progress();
134            let next_buffer_size = self.growth_strategy.next_size() as usize;
135            let to_reserve = next_buffer_size.max(value.len());
136            self.in_progress.reserve(to_reserve);
137        }
138
139        let buffer_idx = self.completed.len();
140        let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
141        self.in_progress.extend_from_slice(value);
142
143        (buffer_idx, offset)
144    }
145
146    pub fn completed_block_count(&self) -> u32 {
147        self.completed.len()
148    }
149
150    /// Pushes buffers and pre-adjusted views into the builder.
151    ///
152    /// The provided `buffers` contain sections of data from a `VarBinViewArray`, and the
153    /// `views` are `BinaryView`s that have already been adjusted to reference the correct buffer
154    /// indices and offsets for this builder. All views must point to valid sections within the
155    /// provided buffers, and the validity length must match the view length.
156    ///
157    /// # Warning
158    ///
159    /// This method does not check utilization of the given buffers. Callers must provide
160    /// buffers that are fully utilized by the given adjusted views.
161    ///
162    /// # Panics
163    ///
164    /// Panics if this builder deduplicates buffers and any of the given buffers already
165    /// exist in this builder.
166    pub fn push_buffer_and_adjusted_views(
167        &mut self,
168        buffers: &[ByteBuffer],
169        views: &Buffer<BinaryView>,
170        validity_mask: Mask,
171    ) {
172        self.flush_in_progress();
173
174        let expected_completed_len = self.completed.len() as usize + buffers.len();
175        self.completed.extend_from_slice_unchecked(buffers);
176        assert_eq!(
177            self.completed.len() as usize,
178            expected_completed_len,
179            "Some buffers already exist",
180        );
181        self.views_builder.extend_trusted(views.iter().copied());
182        self.push_only_validity_mask(validity_mask);
183
184        debug_assert_eq!(self.nulls.len(), self.views_builder.len())
185    }
186
187    /// Finishes the builder directly into a [`VarBinViewArray`].
188    pub fn finish_into_varbinview(&mut self) -> VarBinViewArray {
189        self.flush_in_progress();
190        let buffers = std::mem::take(&mut self.completed);
191
192        assert_eq!(
193            self.views_builder.len(),
194            self.nulls.len(),
195            "View and validity length must match"
196        );
197
198        let validity = self.nulls.finish_with_nullability(self.dtype.nullability());
199
200        // SAFETY: the builder methods check safety at each step.
201        unsafe {
202            VarBinViewArray::new_unchecked(
203                std::mem::take(&mut self.views_builder).freeze(),
204                buffers.finish(),
205                self.dtype.clone(),
206                validity,
207            )
208        }
209    }
210
211    // Pushes a validity mask into the builder not affecting the views or buffers
212    fn push_only_validity_mask(&mut self, validity_mask: Mask) {
213        self.nulls.append_validity_mask(validity_mask);
214    }
215}
216
217impl ArrayBuilder for VarBinViewBuilder {
218    fn as_any(&self) -> &dyn Any {
219        self
220    }
221
222    fn as_any_mut(&mut self) -> &mut dyn Any {
223        self
224    }
225
226    fn dtype(&self) -> &DType {
227        &self.dtype
228    }
229
230    fn len(&self) -> usize {
231        self.nulls.len()
232    }
233
234    fn append_zeros(&mut self, n: usize) {
235        self.views_builder.push_n(BinaryView::empty_view(), n);
236        self.nulls.append_n_non_nulls(n);
237    }
238
239    unsafe fn append_nulls_unchecked(&mut self, n: usize) {
240        self.views_builder.push_n(BinaryView::empty_view(), n);
241        self.nulls.append_n_nulls(n);
242    }
243
244    fn append_scalar(&mut self, scalar: &Scalar) -> VortexResult<()> {
245        vortex_ensure!(
246            scalar.dtype() == self.dtype(),
247            "VarBinViewBuilder expected scalar with dtype {}, got {}",
248            self.dtype(),
249            scalar.dtype()
250        );
251
252        match self.dtype() {
253            DType::Utf8(_) => match scalar.as_utf8().value() {
254                Some(value) => self.append_value(value),
255                None => self.append_null(),
256            },
257            DType::Binary(_) => match scalar.as_binary().value() {
258                Some(value) => self.append_value(value),
259                None => self.append_null(),
260            },
261            _ => vortex_bail!(
262                "VarBinViewBuilder can only handle Utf8 or Binary scalars, got {:?}",
263                scalar.dtype()
264            ),
265        }
266
267        Ok(())
268    }
269
270    unsafe fn extend_from_array_unchecked(&mut self, array: &dyn Array) {
271        let array = array.to_varbinview();
272        self.flush_in_progress();
273
274        self.push_only_validity_mask(
275            array
276                .validity_mask()
277                .vortex_expect("validity_mask in extend_from_array_unchecked"),
278        );
279
280        let view_adjustment =
281            self.completed
282                .extend_from_compaction(BuffersWithOffsets::from_array(
283                    &array,
284                    self.compaction_threshold,
285                ));
286
287        match view_adjustment {
288            ViewAdjustment::Precomputed(adjustment) => self.views_builder.extend_trusted(
289                array
290                    .views()
291                    .iter()
292                    .map(|view| adjustment.adjust_view(view)),
293            ),
294            ViewAdjustment::Rewriting(adjustment) => match array
295                .validity_mask()
296                .vortex_expect("validity_mask in extend_from_array_unchecked")
297            {
298                Mask::AllTrue(_) => {
299                    for (idx, &view) in array.views().iter().enumerate() {
300                        let new_view = self.push_view(view, &adjustment, &array, idx);
301                        self.views_builder.push(new_view);
302                    }
303                }
304                Mask::AllFalse(_) => {
305                    self.views_builder
306                        .push_n(BinaryView::empty_view(), array.len());
307                }
308                Mask::Values(v) => {
309                    for (idx, (&view, is_valid)) in
310                        array.views().iter().zip(v.bit_buffer().iter()).enumerate()
311                    {
312                        let new_view = if !is_valid {
313                            BinaryView::empty_view()
314                        } else {
315                            self.push_view(view, &adjustment, &array, idx)
316                        };
317                        self.views_builder.push(new_view);
318                    }
319                }
320            },
321        }
322    }
323
324    fn reserve_exact(&mut self, additional: usize) {
325        self.views_builder.reserve(additional);
326        self.nulls.reserve_exact(additional);
327    }
328
329    unsafe fn set_validity_unchecked(&mut self, validity: Mask) {
330        self.nulls = LazyBitBufferBuilder::new(validity.len());
331        self.nulls.append_validity_mask(validity);
332    }
333
334    fn finish(&mut self) -> ArrayRef {
335        self.finish_into_varbinview().into_array()
336    }
337
338    fn finish_into_canonical(&mut self) -> Canonical {
339        Canonical::VarBinView(self.finish_into_varbinview())
340    }
341}
342
343impl VarBinViewBuilder {
344    #[inline]
345    fn push_view(
346        &mut self,
347        view: BinaryView,
348        adjustment: &RewritingViewAdjustment,
349        array: &VarBinViewArray,
350        idx: usize,
351    ) -> BinaryView {
352        if view.is_inlined() {
353            view
354        } else if let Some(adjusted) = adjustment.adjust_view(&view) {
355            adjusted
356        } else {
357            let bytes = array.bytes_at(idx);
358            let (new_buf_idx, new_offset) = self.append_value_to_buffer(&bytes);
359            BinaryView::make_view(bytes.as_slice(), new_buf_idx, new_offset)
360        }
361    }
362}
363
364pub enum CompletedBuffers {
365    Default(Vec<ByteBuffer>),
366    Deduplicated(DeduplicatedBuffers),
367}
368
369impl Default for CompletedBuffers {
370    fn default() -> Self {
371        Self::Default(Vec::new())
372    }
373}
374
375// Self::push enforces len < u32::max
376#[allow(clippy::cast_possible_truncation)]
377impl CompletedBuffers {
378    fn len(&self) -> u32 {
379        match self {
380            Self::Default(buffers) => buffers.len() as u32,
381            Self::Deduplicated(buffers) => buffers.len(),
382        }
383    }
384
385    fn push(&mut self, block: ByteBuffer) -> u32 {
386        match self {
387            Self::Default(buffers) => {
388                assert!(buffers.len() < u32::MAX as usize, "Too many blocks");
389                buffers.push(block);
390                self.len()
391            }
392            Self::Deduplicated(buffers) => buffers.push(block),
393        }
394    }
395
396    /// Does not compact buffers, bypasses utilization checks.
397    fn extend_from_slice_unchecked(&mut self, buffers: &[ByteBuffer]) {
398        for buffer in buffers {
399            self.push(buffer.clone());
400        }
401    }
402
403    fn extend_from_compaction(&mut self, buffers: BuffersWithOffsets) -> ViewAdjustment {
404        match (self, buffers) {
405            (
406                Self::Default(completed_buffers),
407                BuffersWithOffsets::AllKept { buffers, offsets },
408            ) => {
409                let buffer_offset = completed_buffers.len() as u32;
410                completed_buffers.extend_from_slice(&buffers);
411                ViewAdjustment::shift(buffer_offset, offsets)
412            }
413            (
414                Self::Default(completed_buffers),
415                BuffersWithOffsets::SomeCompacted { buffers, offsets },
416            ) => {
417                let lookup = buffers
418                    .iter()
419                    .map(|maybe_buffer| {
420                        maybe_buffer.as_ref().map(|buffer| {
421                            completed_buffers.push(buffer.clone());
422                            completed_buffers.len() as u32 - 1
423                        })
424                    })
425                    .collect();
426                ViewAdjustment::rewriting(lookup, offsets)
427            }
428
429            (
430                Self::Deduplicated(completed_buffers),
431                BuffersWithOffsets::AllKept { buffers, offsets },
432            ) => {
433                let buffer_lookup = completed_buffers.extend_from_iter(buffers.iter().cloned());
434                ViewAdjustment::lookup(buffer_lookup, offsets)
435            }
436            (
437                Self::Deduplicated(completed_buffers),
438                BuffersWithOffsets::SomeCompacted { buffers, offsets },
439            ) => {
440                let buffer_lookup = completed_buffers.extend_from_option_slice(&buffers);
441                ViewAdjustment::rewriting(buffer_lookup, offsets)
442            }
443        }
444    }
445
446    fn finish(self) -> Arc<[ByteBuffer]> {
447        match self {
448            Self::Default(buffers) => Arc::from(buffers),
449            Self::Deduplicated(buffers) => buffers.finish(),
450        }
451    }
452}
453
454#[derive(Default)]
455pub struct DeduplicatedBuffers {
456    buffers: Vec<ByteBuffer>,
457    buffer_to_idx: HashMap<BufferId, u32>,
458}
459
460impl DeduplicatedBuffers {
461    // Self::push enforces len < u32::max
462    #[allow(clippy::cast_possible_truncation)]
463    fn len(&self) -> u32 {
464        self.buffers.len() as u32
465    }
466
467    /// Push a new block if not seen before. Returns the idx of the block.
468    pub(crate) fn push(&mut self, block: ByteBuffer) -> u32 {
469        assert!(self.buffers.len() < u32::MAX as usize, "Too many blocks");
470
471        let initial_len = self.len();
472        let id = BufferId::from(&block);
473        match self.buffer_to_idx.entry(id) {
474            Entry::Occupied(idx) => *idx.get(),
475            Entry::Vacant(entry) => {
476                let idx = initial_len;
477                entry.insert(idx);
478                self.buffers.push(block);
479                idx
480            }
481        }
482    }
483
484    pub(crate) fn extend_from_option_slice(
485        &mut self,
486        buffers: &[Option<ByteBuffer>],
487    ) -> Vec<Option<u32>> {
488        buffers
489            .iter()
490            .map(|buffer| buffer.as_ref().map(|buf| self.push(buf.clone())))
491            .collect()
492    }
493
494    pub(crate) fn extend_from_iter(
495        &mut self,
496        buffers: impl Iterator<Item = ByteBuffer>,
497    ) -> Vec<u32> {
498        buffers.map(|buffer| self.push(buffer)).collect()
499    }
500
501    pub(crate) fn finish(self) -> Arc<[ByteBuffer]> {
502        Arc::from(self.buffers)
503    }
504}
505
506#[derive(PartialEq, Eq, Hash)]
507struct BufferId {
508    // *const u8 stored as usize for `Send`
509    ptr: usize,
510    len: usize,
511}
512
513impl BufferId {
514    fn from(buffer: &ByteBuffer) -> Self {
515        let slice = buffer.as_slice();
516        Self {
517            ptr: slice.as_ptr() as usize,
518            len: slice.len(),
519        }
520    }
521}
522
523#[derive(Debug, Clone)]
524pub enum BufferGrowthStrategy {
525    /// Use a fixed buffer size for all allocations.
526    Fixed { size: u32 },
527    /// Use exponential growth starting from initial_size, doubling until max_size.
528    Exponential { current_size: u32, max_size: u32 },
529}
530
531impl Default for BufferGrowthStrategy {
532    fn default() -> Self {
533        Self::Exponential {
534            current_size: 4 * 1024,    // 4KB starting size
535            max_size: 2 * 1024 * 1024, // 2MB max size
536        }
537    }
538}
539
540impl BufferGrowthStrategy {
541    pub fn fixed(size: u32) -> Self {
542        Self::Fixed { size }
543    }
544
545    pub fn exponential(initial_size: u32, max_size: u32) -> Self {
546        Self::Exponential {
547            current_size: initial_size,
548            max_size,
549        }
550    }
551
552    /// Returns the next buffer size to allocate and updates internal state.
553    pub fn next_size(&mut self) -> u32 {
554        match self {
555            Self::Fixed { size } => *size,
556            Self::Exponential {
557                current_size,
558                max_size,
559            } => {
560                let result = *current_size;
561                if *current_size < *max_size {
562                    *current_size = current_size.saturating_mul(2).min(*max_size);
563                }
564                result
565            }
566        }
567    }
568}
569
570enum BuffersWithOffsets {
571    AllKept {
572        buffers: Arc<[ByteBuffer]>,
573        offsets: Option<Vec<u32>>,
574    },
575    SomeCompacted {
576        buffers: Vec<Option<ByteBuffer>>,
577        offsets: Option<Vec<u32>>,
578    },
579}
580
581impl BuffersWithOffsets {
582    pub fn from_array(array: &VarBinViewArray, compaction_threshold: f64) -> Self {
583        if compaction_threshold == 0.0 {
584            return Self::AllKept {
585                buffers: Arc::from(
586                    array
587                        .buffers()
588                        .to_vec()
589                        .into_iter()
590                        .map(|b| b.unwrap_host())
591                        .collect_vec(),
592                ),
593                offsets: None,
594            };
595        }
596
597        let buffer_utilizations = array
598            .buffer_utilizations()
599            .vortex_expect("buffer_utilizations in BuffersWithOffsets::from_array");
600        let mut has_rewrite = false;
601        let mut has_nonzero_offset = false;
602        for utilization in buffer_utilizations.iter() {
603            match compaction_strategy(utilization, compaction_threshold) {
604                CompactionStrategy::KeepFull => continue,
605                CompactionStrategy::Slice { .. } => has_nonzero_offset = true,
606                CompactionStrategy::Rewrite => has_rewrite = true,
607            }
608        }
609
610        let buffers_with_offsets_iter =
611            buffer_utilizations
612                .iter()
613                .zip(array.buffers().iter())
614                .map(|(utilization, buffer)| {
615                    match compaction_strategy(utilization, compaction_threshold) {
616                        CompactionStrategy::KeepFull => (Some(buffer.as_host().clone()), 0),
617                        CompactionStrategy::Slice { start, end } => (
618                            Some(buffer.as_host().slice(start as usize..end as usize)),
619                            start,
620                        ),
621                        CompactionStrategy::Rewrite => (None, 0),
622                    }
623                });
624
625        match (has_rewrite, has_nonzero_offset) {
626            // keep all buffers
627            (false, false) => {
628                let buffers: Vec<_> = buffers_with_offsets_iter
629                    .map(|(b, _)| b.vortex_expect("already checked for rewrite"))
630                    .collect();
631                Self::AllKept {
632                    buffers: Arc::from(buffers),
633                    offsets: None,
634                }
635            }
636            // rewrite, all zero offsets
637            (true, false) => {
638                let buffers: Vec<_> = buffers_with_offsets_iter.map(|(b, _)| b).collect();
639                Self::SomeCompacted {
640                    buffers,
641                    offsets: None,
642                }
643            }
644            // keep all buffers, but some have offsets
645            (false, true) => {
646                let (buffers, offsets): (Vec<_>, _) = buffers_with_offsets_iter
647                    .map(|(buffer, offset)| {
648                        (buffer.vortex_expect("already checked for rewrite"), offset)
649                    })
650                    .collect();
651                Self::AllKept {
652                    buffers: Arc::from(buffers),
653                    offsets: Some(offsets),
654                }
655            }
656            // rewrite and some have offsets
657            (true, true) => {
658                let (buffers, offsets) = buffers_with_offsets_iter.collect();
659                Self::SomeCompacted {
660                    buffers,
661                    offsets: Some(offsets),
662                }
663            }
664        }
665    }
666}
667
668#[derive(Debug, Clone, Copy, PartialEq, Eq)]
669enum CompactionStrategy {
670    KeepFull,
671    /// Slice the buffer to [start, end) range
672    Slice {
673        start: u32,
674        end: u32,
675    },
676    /// Rewrite data into new compacted buffer
677    Rewrite,
678}
679
680fn compaction_strategy(
681    buffer_utilization: &BufferUtilization,
682    threshold: f64,
683) -> CompactionStrategy {
684    match buffer_utilization.overall_utilization() {
685        // rewrite empty or not used buffers TODO(os): maybe keep them
686        0.0 => CompactionStrategy::Rewrite,
687        utilised if utilised >= threshold => CompactionStrategy::KeepFull,
688        _ if buffer_utilization.range_utilization() >= threshold => {
689            let Range { start, end } = buffer_utilization.range();
690            CompactionStrategy::Slice { start, end }
691        }
692        _ => CompactionStrategy::Rewrite,
693    }
694}
695
696enum ViewAdjustment {
697    Precomputed(PrecomputedViewAdjustment),
698    Rewriting(RewritingViewAdjustment),
699}
700
701impl ViewAdjustment {
702    fn shift(buffer_offset: u32, offsets: Option<Vec<u32>>) -> Self {
703        Self::Precomputed(PrecomputedViewAdjustment::Shift {
704            buffer_offset,
705            offsets,
706        })
707    }
708
709    fn lookup(buffer_lookup: Vec<u32>, offsets: Option<Vec<u32>>) -> Self {
710        Self::Precomputed(PrecomputedViewAdjustment::Lookup {
711            buffer_lookup,
712            offsets,
713        })
714    }
715
716    fn rewriting(buffer_lookup: Vec<Option<u32>>, offsets: Option<Vec<u32>>) -> Self {
717        Self::Rewriting(RewritingViewAdjustment {
718            buffer_lookup,
719            offsets,
720        })
721    }
722}
723
724// Care when adding new variants or fields in this enum, it will mess with inlining if it gets too big
725enum PrecomputedViewAdjustment {
726    Shift {
727        buffer_offset: u32,
728        offsets: Option<Vec<u32>>,
729    },
730    Lookup {
731        buffer_lookup: Vec<u32>,
732        offsets: Option<Vec<u32>>,
733    },
734}
735
736impl PrecomputedViewAdjustment {
737    #[inline]
738    fn adjust_view(&self, view: &BinaryView) -> BinaryView {
739        if view.is_inlined() {
740            return *view;
741        }
742        let view_ref = view.as_view();
743        match self {
744            Self::Shift {
745                buffer_offset,
746                offsets,
747            } => {
748                let b_idx = view_ref.buffer_index;
749                let offset_shift = offsets
750                    .as_ref()
751                    .map(|o| o[b_idx as usize])
752                    .unwrap_or_default();
753
754                // If offset < offset_shift, this view was invalid and wasn't counted in buffer_utilizations.
755                // Return an empty view to match how invalid views are handled in the Rewriting path.
756                if view_ref.offset < offset_shift {
757                    return BinaryView::empty_view();
758                }
759
760                view_ref
761                    .with_buffer_and_offset(b_idx + buffer_offset, view_ref.offset - offset_shift)
762            }
763            Self::Lookup {
764                buffer_lookup,
765                offsets,
766            } => {
767                let b_idx = view_ref.buffer_index;
768                let buffer = buffer_lookup[b_idx as usize];
769                let offset_shift = offsets
770                    .as_ref()
771                    .map(|o| o[b_idx as usize])
772                    .unwrap_or_default();
773
774                // If offset < offset_shift, this view was invalid and wasn't counted in buffer_utilizations.
775                // Return an empty view to match how invalid views are handled in the Rewriting path.
776                if view_ref.offset < offset_shift {
777                    return BinaryView::empty_view();
778                }
779
780                view_ref.with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
781            }
782        }
783        .into()
784    }
785}
786
787struct RewritingViewAdjustment {
788    buffer_lookup: Vec<Option<u32>>,
789    offsets: Option<Vec<u32>>,
790}
791
792impl RewritingViewAdjustment {
793    /// Can return None if this view can't be adjusted, because there is no precomputed lookup
794    /// for the current buffer.
795    fn adjust_view(&self, view: &BinaryView) -> Option<BinaryView> {
796        if view.is_inlined() {
797            return Some(*view);
798        }
799
800        let view_ref = view.as_view();
801        self.buffer_lookup[view_ref.buffer_index as usize].map(|buffer| {
802            let offset_shift = self
803                .offsets
804                .as_ref()
805                .map(|o| o[view_ref.buffer_index as usize])
806                .unwrap_or_default();
807            view_ref
808                .with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
809                .into()
810        })
811    }
812}
813
814#[cfg(test)]
815mod tests {
816    use vortex_dtype::DType;
817    use vortex_dtype::Nullability;
818    use vortex_error::VortexResult;
819
820    use crate::IntoArray;
821    use crate::LEGACY_SESSION;
822    use crate::VortexSessionExecute;
823    use crate::arrays::VarBinViewArray;
824    use crate::assert_arrays_eq;
825    use crate::builders::ArrayBuilder;
826    use crate::builders::VarBinViewBuilder;
827
828    #[test]
829    fn test_utf8_builder() {
830        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
831
832        builder.append_value("Hello");
833        builder.append_null();
834        builder.append_value("World");
835
836        builder.append_nulls(2);
837
838        builder.append_zeros(2);
839        builder.append_value("test");
840
841        let actual = builder.finish();
842        let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
843            Some("Hello"),
844            None,
845            Some("World"),
846            None,
847            None,
848            Some(""),
849            Some(""),
850            Some("test"),
851        ]);
852        assert_arrays_eq!(actual, expected);
853    }
854
855    #[test]
856    fn test_utf8_builder_with_extend() {
857        let array = {
858            let mut builder =
859                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
860            builder.append_null();
861            builder.append_value("Hello2");
862            builder.finish()
863        };
864        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
865
866        builder.append_value("Hello1");
867        builder.extend_from_array(&array);
868        builder.append_nulls(2);
869        builder.append_value("Hello3");
870
871        let actual = builder.finish_into_canonical();
872        let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
873            Some("Hello1"),
874            None,
875            Some("Hello2"),
876            None,
877            None,
878            Some("Hello3"),
879        ]);
880        assert_arrays_eq!(actual.into_array(), expected.into_array());
881    }
882
883    #[test]
884    fn test_buffer_deduplication() -> VortexResult<()> {
885        let array = {
886            let mut builder =
887                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
888            builder.append_value("This is a long string that should not be inlined");
889            builder.append_value("short string");
890            builder.finish_into_varbinview()
891        };
892
893        assert_eq!(array.buffers().len(), 1);
894        let mut builder =
895            VarBinViewBuilder::with_buffer_deduplication(DType::Utf8(Nullability::Nullable), 10);
896
897        let mut ctx = LEGACY_SESSION.create_execution_ctx();
898
899        array.append_to_builder(&mut builder, &mut ctx)?;
900        assert_eq!(builder.completed_block_count(), 1);
901
902        array
903            .slice(1..2)?
904            .append_to_builder(&mut builder, &mut ctx)?;
905        array
906            .slice(0..1)?
907            .append_to_builder(&mut builder, &mut ctx)?;
908        assert_eq!(builder.completed_block_count(), 1);
909
910        let array2 = {
911            let mut builder =
912                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
913            builder.append_value("This is a long string that should not be inlined");
914            builder.finish_into_varbinview()
915        };
916
917        array2.append_to_builder(&mut builder, &mut ctx)?;
918        assert_eq!(builder.completed_block_count(), 2);
919
920        array
921            .slice(0..1)?
922            .append_to_builder(&mut builder, &mut ctx)?;
923        array2
924            .slice(0..1)?
925            .append_to_builder(&mut builder, &mut ctx)?;
926        assert_eq!(builder.completed_block_count(), 2);
927        Ok(())
928    }
929
930    #[test]
931    fn test_append_scalar() {
932        use crate::scalar::Scalar;
933
934        // Test with Utf8 builder.
935        let mut utf8_builder =
936            VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
937
938        // Test appending a valid utf8 value.
939        let utf8_scalar1 = Scalar::utf8("hello", Nullability::Nullable);
940        utf8_builder.append_scalar(&utf8_scalar1).unwrap();
941
942        // Test appending another value.
943        let utf8_scalar2 = Scalar::utf8("world", Nullability::Nullable);
944        utf8_builder.append_scalar(&utf8_scalar2).unwrap();
945
946        // Test appending null value.
947        let null_scalar = Scalar::null(DType::Utf8(Nullability::Nullable));
948        utf8_builder.append_scalar(&null_scalar).unwrap();
949
950        let array = utf8_builder.finish();
951        let expected =
952            <VarBinViewArray as FromIterator<_>>::from_iter([Some("hello"), Some("world"), None]);
953        assert_arrays_eq!(&array, &expected);
954
955        // Test with Binary builder.
956        let mut binary_builder =
957            VarBinViewBuilder::with_capacity(DType::Binary(Nullability::Nullable), 10);
958
959        let binary_scalar = Scalar::binary(vec![1u8, 2, 3], Nullability::Nullable);
960        binary_builder.append_scalar(&binary_scalar).unwrap();
961
962        let binary_null = Scalar::null(DType::Binary(Nullability::Nullable));
963        binary_builder.append_scalar(&binary_null).unwrap();
964
965        let binary_array = binary_builder.finish();
966        let expected =
967            <VarBinViewArray as FromIterator<_>>::from_iter([Some(vec![1u8, 2, 3]), None]);
968        assert_arrays_eq!(&binary_array, &expected);
969
970        // Test wrong dtype error.
971        let mut builder =
972            VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::NonNullable), 10);
973        let wrong_scalar = Scalar::from(42i32);
974        assert!(builder.append_scalar(&wrong_scalar).is_err());
975    }
976
977    #[test]
978    fn test_buffer_growth_strategies() {
979        use super::BufferGrowthStrategy;
980
981        // Test Fixed strategy
982        let mut strategy = BufferGrowthStrategy::fixed(1024);
983
984        // Should always return the fixed size
985        assert_eq!(strategy.next_size(), 1024);
986        assert_eq!(strategy.next_size(), 1024);
987        assert_eq!(strategy.next_size(), 1024);
988
989        // Test Exponential strategy
990        let mut strategy = BufferGrowthStrategy::exponential(1024, 8192);
991
992        // Should double each time until hitting max_size
993        assert_eq!(strategy.next_size(), 1024); // First: 1024
994        assert_eq!(strategy.next_size(), 2048); // Second: 2048
995        assert_eq!(strategy.next_size(), 4096); // Third: 4096
996        assert_eq!(strategy.next_size(), 8192); // Fourth: 8192 (max)
997        assert_eq!(strategy.next_size(), 8192); // Fifth: 8192 (capped)
998    }
999
1000    #[test]
1001    fn test_large_value_allocation() {
1002        use super::BufferGrowthStrategy;
1003        use super::VarBinViewBuilder;
1004
1005        let mut builder = VarBinViewBuilder::new(
1006            DType::Binary(Nullability::Nullable),
1007            10,
1008            Default::default(),
1009            BufferGrowthStrategy::exponential(1024, 4096),
1010            0.0,
1011        );
1012
1013        // Create a value larger than max_size
1014        let large_value = vec![0u8; 8192];
1015
1016        // Should successfully append the large value
1017        builder.append_value(&large_value);
1018
1019        let array = builder.finish_into_varbinview();
1020        assert_eq!(array.len(), 1);
1021
1022        // Verify the value was stored correctly
1023        let retrieved = array
1024            .scalar_at(0)
1025            .unwrap()
1026            .as_binary()
1027            .value()
1028            .cloned()
1029            .unwrap();
1030        assert_eq!(retrieved.len(), 8192);
1031        assert_eq!(retrieved.as_slice(), &large_value);
1032    }
1033}