Skip to main content

vortex_array/builders/
varbinview.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::ops::Range;
6use std::sync::Arc;
7
8use itertools::Itertools;
9use vortex_buffer::Buffer;
10use vortex_buffer::BufferMut;
11use vortex_buffer::ByteBuffer;
12use vortex_buffer::ByteBufferMut;
13use vortex_error::VortexExpect;
14use vortex_error::VortexResult;
15use vortex_error::vortex_bail;
16use vortex_error::vortex_ensure;
17use vortex_mask::Mask;
18use vortex_utils::aliases::hash_map::Entry;
19use vortex_utils::aliases::hash_map::HashMap;
20
21use crate::ArrayRef;
22use crate::IntoArray;
23use crate::arrays::VarBinViewArray;
24use crate::arrays::varbinview::build_views::BinaryView;
25use crate::arrays::varbinview::compact::BufferUtilization;
26use crate::builders::ArrayBuilder;
27use crate::builders::LazyBitBufferBuilder;
28use crate::canonical::Canonical;
29use crate::canonical::ToCanonical;
30use crate::dtype::DType;
31use crate::scalar::Scalar;
32
33/// The builder for building a [`VarBinViewArray`].
34pub struct VarBinViewBuilder {
35    dtype: DType,
36    views_builder: BufferMut<BinaryView>,
37    nulls: LazyBitBufferBuilder,
38    completed: CompletedBuffers,
39    in_progress: ByteBufferMut,
40    growth_strategy: BufferGrowthStrategy,
41    compaction_threshold: f64,
42}
43
44impl VarBinViewBuilder {
45    pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
46        Self::new(dtype, capacity, Default::default(), Default::default(), 0.0)
47    }
48
49    pub fn with_buffer_deduplication(dtype: DType, capacity: usize) -> Self {
50        Self::new(
51            dtype,
52            capacity,
53            CompletedBuffers::Deduplicated(Default::default()),
54            Default::default(),
55            0.0,
56        )
57    }
58
59    pub fn with_compaction(dtype: DType, capacity: usize, compaction_threshold: f64) -> Self {
60        Self::new(
61            dtype,
62            capacity,
63            Default::default(),
64            Default::default(),
65            compaction_threshold,
66        )
67    }
68
69    pub fn new(
70        dtype: DType,
71        capacity: usize,
72        completed: CompletedBuffers,
73        growth_strategy: BufferGrowthStrategy,
74        compaction_threshold: f64,
75    ) -> Self {
76        assert!(
77            matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
78            "VarBinViewBuilder DType must be Utf8 or Binary."
79        );
80        Self {
81            views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
82            nulls: LazyBitBufferBuilder::new(capacity),
83            completed,
84            in_progress: ByteBufferMut::empty(),
85            dtype,
86            growth_strategy,
87            compaction_threshold,
88        }
89    }
90
91    fn append_value_view(&mut self, value: &[u8]) {
92        let length =
93            u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
94        if length <= 12 {
95            self.views_builder.push(BinaryView::make_view(value, 0, 0));
96            return;
97        }
98
99        let (buffer_idx, offset) = self.append_value_to_buffer(value);
100        let view = BinaryView::make_view(value, buffer_idx, offset);
101        self.views_builder.push(view);
102    }
103
104    /// Appends a value to the builder.
105    pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
106        self.append_value_view(value.as_ref());
107        self.nulls.append_non_null();
108    }
109
110    /// Appends `n` copies of `value` as non-null entries.
111    pub fn append_n_values<S: AsRef<[u8]>>(&mut self, value: S, n: usize) {
112        if n == 0 {
113            return;
114        }
115        let bytes = value.as_ref();
116        let view = if bytes.len() <= BinaryView::MAX_INLINED_SIZE {
117            BinaryView::make_view(bytes, 0, 0)
118        } else {
119            let (buffer_idx, offset) = self.append_value_to_buffer(bytes);
120            BinaryView::make_view(bytes, buffer_idx, offset)
121        };
122        self.views_builder.push_n(view, n);
123        self.nulls.append_n_non_nulls(n);
124    }
125
126    fn flush_in_progress(&mut self) {
127        if self.in_progress.is_empty() {
128            return;
129        }
130        let block = std::mem::take(&mut self.in_progress).freeze();
131
132        assert!(block.len() < u32::MAX as usize, "Block too large");
133
134        let initial_len = self.completed.len();
135        self.completed.push(block);
136        assert_eq!(
137            self.completed.len(),
138            initial_len + 1,
139            "Invalid state, just completed block already exists"
140        );
141    }
142
143    /// append a non inlined value to self.in_progress.
144    fn append_value_to_buffer(&mut self, value: &[u8]) -> (u32, u32) {
145        assert!(
146            value.len() > BinaryView::MAX_INLINED_SIZE,
147            "must inline small strings"
148        );
149        let required_cap = self.in_progress.len() + value.len();
150        if self.in_progress.capacity() < required_cap {
151            self.flush_in_progress();
152            let next_buffer_size = self.growth_strategy.next_size() as usize;
153            let to_reserve = next_buffer_size.max(value.len());
154            self.in_progress.reserve(to_reserve);
155        }
156
157        let buffer_idx = self.completed.len();
158        let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
159        self.in_progress.extend_from_slice(value);
160
161        (buffer_idx, offset)
162    }
163
164    pub fn completed_block_count(&self) -> u32 {
165        self.completed.len()
166    }
167
168    /// Pushes buffers and pre-adjusted views into the builder.
169    ///
170    /// The provided `buffers` contain sections of data from a `VarBinViewArray`, and the
171    /// `views` are `BinaryView`s that have already been adjusted to reference the correct buffer
172    /// indices and offsets for this builder. All views must point to valid sections within the
173    /// provided buffers, and the validity length must match the view length.
174    ///
175    /// # Warning
176    ///
177    /// This method does not check utilization of the given buffers. Callers must provide
178    /// buffers that are fully utilized by the given adjusted views.
179    ///
180    /// # Panics
181    ///
182    /// Panics if this builder deduplicates buffers and any of the given buffers already
183    /// exist in this builder.
184    pub fn push_buffer_and_adjusted_views(
185        &mut self,
186        buffers: &[ByteBuffer],
187        views: &Buffer<BinaryView>,
188        validity_mask: Mask,
189    ) {
190        self.flush_in_progress();
191
192        let expected_completed_len = self.completed.len() as usize + buffers.len();
193        self.completed.extend_from_slice_unchecked(buffers);
194        assert_eq!(
195            self.completed.len() as usize,
196            expected_completed_len,
197            "Some buffers already exist",
198        );
199        self.views_builder.extend_trusted(views.iter().copied());
200        self.push_only_validity_mask(validity_mask);
201
202        debug_assert_eq!(self.nulls.len(), self.views_builder.len())
203    }
204
205    /// Finishes the builder directly into a [`VarBinViewArray`].
206    pub fn finish_into_varbinview(&mut self) -> VarBinViewArray {
207        self.flush_in_progress();
208        let buffers = std::mem::take(&mut self.completed);
209
210        assert_eq!(
211            self.views_builder.len(),
212            self.nulls.len(),
213            "View and validity length must match"
214        );
215
216        let validity = self.nulls.finish_with_nullability(self.dtype.nullability());
217
218        // SAFETY: the builder methods check safety at each step.
219        unsafe {
220            VarBinViewArray::new_unchecked(
221                std::mem::take(&mut self.views_builder).freeze(),
222                buffers.finish(),
223                self.dtype.clone(),
224                validity,
225            )
226        }
227    }
228
229    // Pushes a validity mask into the builder not affecting the views or buffers
230    fn push_only_validity_mask(&mut self, validity_mask: Mask) {
231        self.nulls.append_validity_mask(validity_mask);
232    }
233}
234
235impl ArrayBuilder for VarBinViewBuilder {
236    fn as_any(&self) -> &dyn Any {
237        self
238    }
239
240    fn as_any_mut(&mut self) -> &mut dyn Any {
241        self
242    }
243
244    fn dtype(&self) -> &DType {
245        &self.dtype
246    }
247
248    fn len(&self) -> usize {
249        self.nulls.len()
250    }
251
252    fn append_zeros(&mut self, n: usize) {
253        self.views_builder.push_n(BinaryView::empty_view(), n);
254        self.nulls.append_n_non_nulls(n);
255    }
256
257    unsafe fn append_nulls_unchecked(&mut self, n: usize) {
258        self.views_builder.push_n(BinaryView::empty_view(), n);
259        self.nulls.append_n_nulls(n);
260    }
261
262    fn append_scalar(&mut self, scalar: &Scalar) -> VortexResult<()> {
263        vortex_ensure!(
264            scalar.dtype() == self.dtype(),
265            "VarBinViewBuilder expected scalar with dtype {}, got {}",
266            self.dtype(),
267            scalar.dtype()
268        );
269
270        match self.dtype() {
271            DType::Utf8(_) => match scalar.as_utf8().value() {
272                Some(value) => self.append_value(value),
273                None => self.append_null(),
274            },
275            DType::Binary(_) => match scalar.as_binary().value() {
276                Some(value) => self.append_value(value),
277                None => self.append_null(),
278            },
279            _ => vortex_bail!(
280                "VarBinViewBuilder can only handle Utf8 or Binary scalars, got {:?}",
281                scalar.dtype()
282            ),
283        }
284
285        Ok(())
286    }
287
288    unsafe fn extend_from_array_unchecked(&mut self, array: &ArrayRef) {
289        let array = array.to_varbinview();
290        self.flush_in_progress();
291
292        self.push_only_validity_mask(
293            array
294                .validity_mask()
295                .vortex_expect("validity_mask in extend_from_array_unchecked"),
296        );
297
298        let view_adjustment =
299            self.completed
300                .extend_from_compaction(BuffersWithOffsets::from_array(
301                    &array,
302                    self.compaction_threshold,
303                ));
304
305        match view_adjustment {
306            ViewAdjustment::Precomputed(adjustment) => self.views_builder.extend_trusted(
307                array
308                    .views()
309                    .iter()
310                    .map(|view| adjustment.adjust_view(view)),
311            ),
312            ViewAdjustment::Rewriting(adjustment) => match array
313                .validity_mask()
314                .vortex_expect("validity_mask in extend_from_array_unchecked")
315            {
316                Mask::AllTrue(_) => {
317                    for (idx, &view) in array.views().iter().enumerate() {
318                        let new_view = self.push_view(view, &adjustment, &array, idx);
319                        self.views_builder.push(new_view);
320                    }
321                }
322                Mask::AllFalse(_) => {
323                    self.views_builder
324                        .push_n(BinaryView::empty_view(), array.len());
325                }
326                Mask::Values(v) => {
327                    for (idx, (&view, is_valid)) in
328                        array.views().iter().zip(v.bit_buffer().iter()).enumerate()
329                    {
330                        let new_view = if !is_valid {
331                            BinaryView::empty_view()
332                        } else {
333                            self.push_view(view, &adjustment, &array, idx)
334                        };
335                        self.views_builder.push(new_view);
336                    }
337                }
338            },
339        }
340    }
341
342    fn reserve_exact(&mut self, additional: usize) {
343        self.views_builder.reserve(additional);
344        self.nulls.reserve_exact(additional);
345    }
346
347    unsafe fn set_validity_unchecked(&mut self, validity: Mask) {
348        self.nulls = LazyBitBufferBuilder::new(validity.len());
349        self.nulls.append_validity_mask(validity);
350    }
351
352    fn finish(&mut self) -> ArrayRef {
353        self.finish_into_varbinview().into_array()
354    }
355
356    fn finish_into_canonical(&mut self) -> Canonical {
357        Canonical::VarBinView(self.finish_into_varbinview())
358    }
359}
360
361impl VarBinViewBuilder {
362    #[inline]
363    fn push_view(
364        &mut self,
365        view: BinaryView,
366        adjustment: &RewritingViewAdjustment,
367        array: &VarBinViewArray,
368        idx: usize,
369    ) -> BinaryView {
370        if view.is_inlined() {
371            view
372        } else if let Some(adjusted) = adjustment.adjust_view(&view) {
373            adjusted
374        } else {
375            let bytes = array.bytes_at(idx);
376            let (new_buf_idx, new_offset) = self.append_value_to_buffer(&bytes);
377            BinaryView::make_view(bytes.as_slice(), new_buf_idx, new_offset)
378        }
379    }
380}
381
382pub enum CompletedBuffers {
383    Default(Vec<ByteBuffer>),
384    Deduplicated(DeduplicatedBuffers),
385}
386
387impl Default for CompletedBuffers {
388    fn default() -> Self {
389        Self::Default(Vec::new())
390    }
391}
392
393// Self::push enforces len < u32::max
394#[allow(clippy::cast_possible_truncation)]
395impl CompletedBuffers {
396    fn len(&self) -> u32 {
397        match self {
398            Self::Default(buffers) => buffers.len() as u32,
399            Self::Deduplicated(buffers) => buffers.len(),
400        }
401    }
402
403    fn push(&mut self, block: ByteBuffer) -> u32 {
404        match self {
405            Self::Default(buffers) => {
406                assert!(buffers.len() < u32::MAX as usize, "Too many blocks");
407                buffers.push(block);
408                self.len()
409            }
410            Self::Deduplicated(buffers) => buffers.push(block),
411        }
412    }
413
414    /// Does not compact buffers, bypasses utilization checks.
415    fn extend_from_slice_unchecked(&mut self, buffers: &[ByteBuffer]) {
416        for buffer in buffers {
417            self.push(buffer.clone());
418        }
419    }
420
421    fn extend_from_compaction(&mut self, buffers: BuffersWithOffsets) -> ViewAdjustment {
422        match (self, buffers) {
423            (
424                Self::Default(completed_buffers),
425                BuffersWithOffsets::AllKept { buffers, offsets },
426            ) => {
427                let buffer_offset = completed_buffers.len() as u32;
428                completed_buffers.extend_from_slice(&buffers);
429                ViewAdjustment::shift(buffer_offset, offsets)
430            }
431            (
432                Self::Default(completed_buffers),
433                BuffersWithOffsets::SomeCompacted { buffers, offsets },
434            ) => {
435                let lookup = buffers
436                    .iter()
437                    .map(|maybe_buffer| {
438                        maybe_buffer.as_ref().map(|buffer| {
439                            completed_buffers.push(buffer.clone());
440                            completed_buffers.len() as u32 - 1
441                        })
442                    })
443                    .collect();
444                ViewAdjustment::rewriting(lookup, offsets)
445            }
446
447            (
448                Self::Deduplicated(completed_buffers),
449                BuffersWithOffsets::AllKept { buffers, offsets },
450            ) => {
451                let buffer_lookup = completed_buffers.extend_from_iter(buffers.iter().cloned());
452                ViewAdjustment::lookup(buffer_lookup, offsets)
453            }
454            (
455                Self::Deduplicated(completed_buffers),
456                BuffersWithOffsets::SomeCompacted { buffers, offsets },
457            ) => {
458                let buffer_lookup = completed_buffers.extend_from_option_slice(&buffers);
459                ViewAdjustment::rewriting(buffer_lookup, offsets)
460            }
461        }
462    }
463
464    fn finish(self) -> Arc<[ByteBuffer]> {
465        match self {
466            Self::Default(buffers) => Arc::from(buffers),
467            Self::Deduplicated(buffers) => buffers.finish(),
468        }
469    }
470}
471
472#[derive(Default)]
473pub struct DeduplicatedBuffers {
474    buffers: Vec<ByteBuffer>,
475    buffer_to_idx: HashMap<BufferId, u32>,
476}
477
478impl DeduplicatedBuffers {
479    // Self::push enforces len < u32::max
480    #[allow(clippy::cast_possible_truncation)]
481    fn len(&self) -> u32 {
482        self.buffers.len() as u32
483    }
484
485    /// Push a new block if not seen before. Returns the idx of the block.
486    pub(crate) fn push(&mut self, block: ByteBuffer) -> u32 {
487        assert!(self.buffers.len() < u32::MAX as usize, "Too many blocks");
488
489        let initial_len = self.len();
490        let id = BufferId::from(&block);
491        match self.buffer_to_idx.entry(id) {
492            Entry::Occupied(idx) => *idx.get(),
493            Entry::Vacant(entry) => {
494                let idx = initial_len;
495                entry.insert(idx);
496                self.buffers.push(block);
497                idx
498            }
499        }
500    }
501
502    pub(crate) fn extend_from_option_slice(
503        &mut self,
504        buffers: &[Option<ByteBuffer>],
505    ) -> Vec<Option<u32>> {
506        buffers
507            .iter()
508            .map(|buffer| buffer.as_ref().map(|buf| self.push(buf.clone())))
509            .collect()
510    }
511
512    pub(crate) fn extend_from_iter(
513        &mut self,
514        buffers: impl Iterator<Item = ByteBuffer>,
515    ) -> Vec<u32> {
516        buffers.map(|buffer| self.push(buffer)).collect()
517    }
518
519    pub(crate) fn finish(self) -> Arc<[ByteBuffer]> {
520        Arc::from(self.buffers)
521    }
522}
523
524#[derive(PartialEq, Eq, Hash)]
525struct BufferId {
526    // *const u8 stored as usize for `Send`
527    ptr: usize,
528    len: usize,
529}
530
531impl BufferId {
532    fn from(buffer: &ByteBuffer) -> Self {
533        let slice = buffer.as_slice();
534        Self {
535            ptr: slice.as_ptr() as usize,
536            len: slice.len(),
537        }
538    }
539}
540
541#[derive(Debug, Clone)]
542pub enum BufferGrowthStrategy {
543    /// Use a fixed buffer size for all allocations.
544    Fixed { size: u32 },
545    /// Use exponential growth starting from initial_size, doubling until max_size.
546    Exponential { current_size: u32, max_size: u32 },
547}
548
549impl Default for BufferGrowthStrategy {
550    fn default() -> Self {
551        Self::Exponential {
552            current_size: 4 * 1024,    // 4KB starting size
553            max_size: 2 * 1024 * 1024, // 2MB max size
554        }
555    }
556}
557
558impl BufferGrowthStrategy {
559    pub fn fixed(size: u32) -> Self {
560        Self::Fixed { size }
561    }
562
563    pub fn exponential(initial_size: u32, max_size: u32) -> Self {
564        Self::Exponential {
565            current_size: initial_size,
566            max_size,
567        }
568    }
569
570    /// Returns the next buffer size to allocate and updates internal state.
571    pub fn next_size(&mut self) -> u32 {
572        match self {
573            Self::Fixed { size } => *size,
574            Self::Exponential {
575                current_size,
576                max_size,
577            } => {
578                let result = *current_size;
579                if *current_size < *max_size {
580                    *current_size = current_size.saturating_mul(2).min(*max_size);
581                }
582                result
583            }
584        }
585    }
586}
587
588enum BuffersWithOffsets {
589    AllKept {
590        buffers: Arc<[ByteBuffer]>,
591        offsets: Option<Vec<u32>>,
592    },
593    SomeCompacted {
594        buffers: Vec<Option<ByteBuffer>>,
595        offsets: Option<Vec<u32>>,
596    },
597}
598
599impl BuffersWithOffsets {
600    pub fn from_array(array: &VarBinViewArray, compaction_threshold: f64) -> Self {
601        if compaction_threshold == 0.0 {
602            return Self::AllKept {
603                buffers: Arc::from(
604                    array
605                        .buffers()
606                        .to_vec()
607                        .into_iter()
608                        .map(|b| b.unwrap_host())
609                        .collect_vec(),
610                ),
611                offsets: None,
612            };
613        }
614
615        let buffer_utilizations = array
616            .buffer_utilizations()
617            .vortex_expect("buffer_utilizations in BuffersWithOffsets::from_array");
618        let mut has_rewrite = false;
619        let mut has_nonzero_offset = false;
620        for utilization in buffer_utilizations.iter() {
621            match compaction_strategy(utilization, compaction_threshold) {
622                CompactionStrategy::KeepFull => continue,
623                CompactionStrategy::Slice { .. } => has_nonzero_offset = true,
624                CompactionStrategy::Rewrite => has_rewrite = true,
625            }
626        }
627
628        let buffers_with_offsets_iter =
629            buffer_utilizations
630                .iter()
631                .zip(array.buffers().iter())
632                .map(|(utilization, buffer)| {
633                    match compaction_strategy(utilization, compaction_threshold) {
634                        CompactionStrategy::KeepFull => (Some(buffer.as_host().clone()), 0),
635                        CompactionStrategy::Slice { start, end } => (
636                            Some(buffer.as_host().slice(start as usize..end as usize)),
637                            start,
638                        ),
639                        CompactionStrategy::Rewrite => (None, 0),
640                    }
641                });
642
643        match (has_rewrite, has_nonzero_offset) {
644            // keep all buffers
645            (false, false) => {
646                let buffers: Vec<_> = buffers_with_offsets_iter
647                    .map(|(b, _)| b.vortex_expect("already checked for rewrite"))
648                    .collect();
649                Self::AllKept {
650                    buffers: Arc::from(buffers),
651                    offsets: None,
652                }
653            }
654            // rewrite, all zero offsets
655            (true, false) => {
656                let buffers: Vec<_> = buffers_with_offsets_iter.map(|(b, _)| b).collect();
657                Self::SomeCompacted {
658                    buffers,
659                    offsets: None,
660                }
661            }
662            // keep all buffers, but some have offsets
663            (false, true) => {
664                let (buffers, offsets): (Vec<_>, _) = buffers_with_offsets_iter
665                    .map(|(buffer, offset)| {
666                        (buffer.vortex_expect("already checked for rewrite"), offset)
667                    })
668                    .collect();
669                Self::AllKept {
670                    buffers: Arc::from(buffers),
671                    offsets: Some(offsets),
672                }
673            }
674            // rewrite and some have offsets
675            (true, true) => {
676                let (buffers, offsets) = buffers_with_offsets_iter.collect();
677                Self::SomeCompacted {
678                    buffers,
679                    offsets: Some(offsets),
680                }
681            }
682        }
683    }
684}
685
686#[derive(Debug, Clone, Copy, PartialEq, Eq)]
687enum CompactionStrategy {
688    KeepFull,
689    /// Slice the buffer to [start, end) range
690    Slice {
691        start: u32,
692        end: u32,
693    },
694    /// Rewrite data into new compacted buffer
695    Rewrite,
696}
697
698fn compaction_strategy(
699    buffer_utilization: &BufferUtilization,
700    threshold: f64,
701) -> CompactionStrategy {
702    match buffer_utilization.overall_utilization() {
703        // rewrite empty or not used buffers TODO(os): maybe keep them
704        0.0 => CompactionStrategy::Rewrite,
705        utilised if utilised >= threshold => CompactionStrategy::KeepFull,
706        _ if buffer_utilization.range_utilization() >= threshold => {
707            let Range { start, end } = buffer_utilization.range();
708            CompactionStrategy::Slice { start, end }
709        }
710        _ => CompactionStrategy::Rewrite,
711    }
712}
713
714enum ViewAdjustment {
715    Precomputed(PrecomputedViewAdjustment),
716    Rewriting(RewritingViewAdjustment),
717}
718
719impl ViewAdjustment {
720    fn shift(buffer_offset: u32, offsets: Option<Vec<u32>>) -> Self {
721        Self::Precomputed(PrecomputedViewAdjustment::Shift {
722            buffer_offset,
723            offsets,
724        })
725    }
726
727    fn lookup(buffer_lookup: Vec<u32>, offsets: Option<Vec<u32>>) -> Self {
728        Self::Precomputed(PrecomputedViewAdjustment::Lookup {
729            buffer_lookup,
730            offsets,
731        })
732    }
733
734    fn rewriting(buffer_lookup: Vec<Option<u32>>, offsets: Option<Vec<u32>>) -> Self {
735        Self::Rewriting(RewritingViewAdjustment {
736            buffer_lookup,
737            offsets,
738        })
739    }
740}
741
742// Care when adding new variants or fields in this enum, it will mess with inlining if it gets too big
743enum PrecomputedViewAdjustment {
744    Shift {
745        buffer_offset: u32,
746        offsets: Option<Vec<u32>>,
747    },
748    Lookup {
749        buffer_lookup: Vec<u32>,
750        offsets: Option<Vec<u32>>,
751    },
752}
753
754impl PrecomputedViewAdjustment {
755    #[inline]
756    fn adjust_view(&self, view: &BinaryView) -> BinaryView {
757        if view.is_inlined() {
758            return *view;
759        }
760        let view_ref = view.as_view();
761        match self {
762            Self::Shift {
763                buffer_offset,
764                offsets,
765            } => {
766                let b_idx = view_ref.buffer_index;
767                let offset_shift = offsets
768                    .as_ref()
769                    .map(|o| o[b_idx as usize])
770                    .unwrap_or_default();
771
772                // If offset < offset_shift, this view was invalid and wasn't counted in buffer_utilizations.
773                // Return an empty view to match how invalid views are handled in the Rewriting path.
774                if view_ref.offset < offset_shift {
775                    return BinaryView::empty_view();
776                }
777
778                view_ref
779                    .with_buffer_and_offset(b_idx + buffer_offset, view_ref.offset - offset_shift)
780            }
781            Self::Lookup {
782                buffer_lookup,
783                offsets,
784            } => {
785                let b_idx = view_ref.buffer_index;
786                let buffer = buffer_lookup[b_idx as usize];
787                let offset_shift = offsets
788                    .as_ref()
789                    .map(|o| o[b_idx as usize])
790                    .unwrap_or_default();
791
792                // If offset < offset_shift, this view was invalid and wasn't counted in buffer_utilizations.
793                // Return an empty view to match how invalid views are handled in the Rewriting path.
794                if view_ref.offset < offset_shift {
795                    return BinaryView::empty_view();
796                }
797
798                view_ref.with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
799            }
800        }
801        .into()
802    }
803}
804
805struct RewritingViewAdjustment {
806    buffer_lookup: Vec<Option<u32>>,
807    offsets: Option<Vec<u32>>,
808}
809
810impl RewritingViewAdjustment {
811    /// Can return None if this view can't be adjusted, because there is no precomputed lookup
812    /// for the current buffer.
813    fn adjust_view(&self, view: &BinaryView) -> Option<BinaryView> {
814        if view.is_inlined() {
815            return Some(*view);
816        }
817
818        let view_ref = view.as_view();
819        self.buffer_lookup[view_ref.buffer_index as usize].map(|buffer| {
820            let offset_shift = self
821                .offsets
822                .as_ref()
823                .map(|o| o[view_ref.buffer_index as usize])
824                .unwrap_or_default();
825            view_ref
826                .with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
827                .into()
828        })
829    }
830}
831
832#[cfg(test)]
833mod tests {
834    use vortex_error::VortexResult;
835
836    use crate::IntoArray;
837    use crate::LEGACY_SESSION;
838    use crate::VortexSessionExecute;
839    use crate::assert_arrays_eq;
840    use crate::builders::ArrayBuilder;
841    use crate::builders::VarBinViewBuilder;
842    use crate::builders::varbinview::VarBinViewArray;
843    use crate::dtype::DType;
844    use crate::dtype::Nullability;
845
846    #[test]
847    fn test_utf8_builder() {
848        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
849
850        builder.append_value("Hello");
851        builder.append_null();
852        builder.append_value("World");
853
854        builder.append_nulls(2);
855
856        builder.append_zeros(2);
857        builder.append_value("test");
858
859        let actual = builder.finish();
860        let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
861            Some("Hello"),
862            None,
863            Some("World"),
864            None,
865            None,
866            Some(""),
867            Some(""),
868            Some("test"),
869        ]);
870        assert_arrays_eq!(actual, expected);
871    }
872
873    #[test]
874    fn test_utf8_builder_with_extend() {
875        let array = {
876            let mut builder =
877                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
878            builder.append_null();
879            builder.append_value("Hello2");
880            builder.finish()
881        };
882        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
883
884        builder.append_value("Hello1");
885        builder.extend_from_array(&array);
886        builder.append_nulls(2);
887        builder.append_value("Hello3");
888
889        let actual = builder.finish_into_canonical();
890        let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
891            Some("Hello1"),
892            None,
893            Some("Hello2"),
894            None,
895            None,
896            Some("Hello3"),
897        ]);
898        assert_arrays_eq!(actual.into_array(), expected.into_array());
899    }
900
901    #[test]
902    fn test_buffer_deduplication() -> VortexResult<()> {
903        let array = {
904            let mut builder =
905                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
906            builder.append_value("This is a long string that should not be inlined");
907            builder.append_value("short string");
908            builder.finish_into_varbinview()
909        };
910
911        assert_eq!(array.buffers().len(), 1);
912        let mut builder =
913            VarBinViewBuilder::with_buffer_deduplication(DType::Utf8(Nullability::Nullable), 10);
914
915        let mut ctx = LEGACY_SESSION.create_execution_ctx();
916
917        array.append_to_builder(&mut builder, &mut ctx)?;
918        assert_eq!(builder.completed_block_count(), 1);
919
920        array
921            .slice(1..2)?
922            .append_to_builder(&mut builder, &mut ctx)?;
923        array
924            .slice(0..1)?
925            .append_to_builder(&mut builder, &mut ctx)?;
926        assert_eq!(builder.completed_block_count(), 1);
927
928        let array2 = {
929            let mut builder =
930                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
931            builder.append_value("This is a long string that should not be inlined");
932            builder.finish_into_varbinview()
933        };
934
935        array2.append_to_builder(&mut builder, &mut ctx)?;
936        assert_eq!(builder.completed_block_count(), 2);
937
938        array
939            .slice(0..1)?
940            .append_to_builder(&mut builder, &mut ctx)?;
941        array2
942            .slice(0..1)?
943            .append_to_builder(&mut builder, &mut ctx)?;
944        assert_eq!(builder.completed_block_count(), 2);
945        Ok(())
946    }
947
948    #[test]
949    fn test_append_scalar() {
950        use crate::scalar::Scalar;
951
952        // Test with Utf8 builder.
953        let mut utf8_builder =
954            VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
955
956        // Test appending a valid utf8 value.
957        let utf8_scalar1 = Scalar::utf8("hello", Nullability::Nullable);
958        utf8_builder.append_scalar(&utf8_scalar1).unwrap();
959
960        // Test appending another value.
961        let utf8_scalar2 = Scalar::utf8("world", Nullability::Nullable);
962        utf8_builder.append_scalar(&utf8_scalar2).unwrap();
963
964        // Test appending null value.
965        let null_scalar = Scalar::null(DType::Utf8(Nullability::Nullable));
966        utf8_builder.append_scalar(&null_scalar).unwrap();
967
968        let array = utf8_builder.finish();
969        let expected =
970            <VarBinViewArray as FromIterator<_>>::from_iter([Some("hello"), Some("world"), None]);
971        assert_arrays_eq!(&array, &expected);
972
973        // Test with Binary builder.
974        let mut binary_builder =
975            VarBinViewBuilder::with_capacity(DType::Binary(Nullability::Nullable), 10);
976
977        let binary_scalar = Scalar::binary(vec![1u8, 2, 3], Nullability::Nullable);
978        binary_builder.append_scalar(&binary_scalar).unwrap();
979
980        let binary_null = Scalar::null(DType::Binary(Nullability::Nullable));
981        binary_builder.append_scalar(&binary_null).unwrap();
982
983        let binary_array = binary_builder.finish();
984        let expected =
985            <VarBinViewArray as FromIterator<_>>::from_iter([Some(vec![1u8, 2, 3]), None]);
986        assert_arrays_eq!(&binary_array, &expected);
987
988        // Test wrong dtype error.
989        let mut builder =
990            VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::NonNullable), 10);
991        let wrong_scalar = Scalar::from(42i32);
992        assert!(builder.append_scalar(&wrong_scalar).is_err());
993    }
994
995    #[test]
996    fn test_buffer_growth_strategies() {
997        use super::BufferGrowthStrategy;
998
999        // Test Fixed strategy
1000        let mut strategy = BufferGrowthStrategy::fixed(1024);
1001
1002        // Should always return the fixed size
1003        assert_eq!(strategy.next_size(), 1024);
1004        assert_eq!(strategy.next_size(), 1024);
1005        assert_eq!(strategy.next_size(), 1024);
1006
1007        // Test Exponential strategy
1008        let mut strategy = BufferGrowthStrategy::exponential(1024, 8192);
1009
1010        // Should double each time until hitting max_size
1011        assert_eq!(strategy.next_size(), 1024); // First: 1024
1012        assert_eq!(strategy.next_size(), 2048); // Second: 2048
1013        assert_eq!(strategy.next_size(), 4096); // Third: 4096
1014        assert_eq!(strategy.next_size(), 8192); // Fourth: 8192 (max)
1015        assert_eq!(strategy.next_size(), 8192); // Fifth: 8192 (capped)
1016    }
1017
1018    #[test]
1019    fn test_large_value_allocation() {
1020        use super::BufferGrowthStrategy;
1021        use super::VarBinViewBuilder;
1022
1023        let mut builder = VarBinViewBuilder::new(
1024            DType::Binary(Nullability::Nullable),
1025            10,
1026            Default::default(),
1027            BufferGrowthStrategy::exponential(1024, 4096),
1028            0.0,
1029        );
1030
1031        // Create a value larger than max_size
1032        let large_value = vec![0u8; 8192];
1033
1034        // Should successfully append the large value
1035        builder.append_value(&large_value);
1036
1037        let array = builder.finish_into_varbinview();
1038        assert_eq!(array.len(), 1);
1039
1040        // Verify the value was stored correctly
1041        let retrieved = array
1042            .scalar_at(0)
1043            .unwrap()
1044            .as_binary()
1045            .value()
1046            .cloned()
1047            .unwrap();
1048        assert_eq!(retrieved.len(), 8192);
1049        assert_eq!(retrieved.as_slice(), &large_value);
1050    }
1051}