Skip to main content

vortex_array/builders/
varbinview.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::ops::Range;
6use std::sync::Arc;
7
8use itertools::Itertools;
9use vortex_buffer::Buffer;
10use vortex_buffer::BufferMut;
11use vortex_buffer::ByteBuffer;
12use vortex_buffer::ByteBufferMut;
13use vortex_error::VortexExpect;
14use vortex_error::VortexResult;
15use vortex_error::vortex_bail;
16use vortex_error::vortex_ensure;
17use vortex_mask::Mask;
18use vortex_utils::aliases::hash_map::Entry;
19use vortex_utils::aliases::hash_map::HashMap;
20
21use crate::ArrayRef;
22use crate::IntoArray;
23use crate::arrays::VarBinViewArray;
24use crate::arrays::varbinview::build_views::BinaryView;
25use crate::arrays::varbinview::compact::BufferUtilization;
26use crate::builders::ArrayBuilder;
27use crate::builders::LazyBitBufferBuilder;
28use crate::canonical::Canonical;
29use crate::canonical::ToCanonical;
30use crate::dtype::DType;
31use crate::scalar::Scalar;
32
33/// The builder for building a [`VarBinViewArray`].
34pub struct VarBinViewBuilder {
35    dtype: DType,
36    views_builder: BufferMut<BinaryView>,
37    nulls: LazyBitBufferBuilder,
38    completed: CompletedBuffers,
39    in_progress: ByteBufferMut,
40    growth_strategy: BufferGrowthStrategy,
41    compaction_threshold: f64,
42}
43
44impl VarBinViewBuilder {
45    pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
46        Self::new(dtype, capacity, Default::default(), Default::default(), 0.0)
47    }
48
49    pub fn with_buffer_deduplication(dtype: DType, capacity: usize) -> Self {
50        Self::new(
51            dtype,
52            capacity,
53            CompletedBuffers::Deduplicated(Default::default()),
54            Default::default(),
55            0.0,
56        )
57    }
58
59    pub fn with_compaction(dtype: DType, capacity: usize, compaction_threshold: f64) -> Self {
60        Self::new(
61            dtype,
62            capacity,
63            Default::default(),
64            Default::default(),
65            compaction_threshold,
66        )
67    }
68
69    pub fn new(
70        dtype: DType,
71        capacity: usize,
72        completed: CompletedBuffers,
73        growth_strategy: BufferGrowthStrategy,
74        compaction_threshold: f64,
75    ) -> Self {
76        assert!(
77            matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
78            "VarBinViewBuilder DType must be Utf8 or Binary."
79        );
80        Self {
81            views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
82            nulls: LazyBitBufferBuilder::new(capacity),
83            completed,
84            in_progress: ByteBufferMut::empty(),
85            dtype,
86            growth_strategy,
87            compaction_threshold,
88        }
89    }
90
91    fn append_value_view(&mut self, value: &[u8]) {
92        let length =
93            u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
94        if length <= 12 {
95            self.views_builder.push(BinaryView::make_view(value, 0, 0));
96            return;
97        }
98
99        let (buffer_idx, offset) = self.append_value_to_buffer(value);
100        let view = BinaryView::make_view(value, buffer_idx, offset);
101        self.views_builder.push(view);
102    }
103
104    /// Appends a value to the builder.
105    pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
106        self.append_value_view(value.as_ref());
107        self.nulls.append_non_null();
108    }
109
110    /// Appends `n` copies of `value` as non-null entries.
111    pub fn append_n_values<S: AsRef<[u8]>>(&mut self, value: S, n: usize) {
112        if n == 0 {
113            return;
114        }
115        let bytes = value.as_ref();
116        let view = if bytes.len() <= BinaryView::MAX_INLINED_SIZE {
117            BinaryView::make_view(bytes, 0, 0)
118        } else {
119            let (buffer_idx, offset) = self.append_value_to_buffer(bytes);
120            BinaryView::make_view(bytes, buffer_idx, offset)
121        };
122        self.views_builder.push_n(view, n);
123        self.nulls.append_n_non_nulls(n);
124    }
125
126    fn flush_in_progress(&mut self) {
127        if self.in_progress.is_empty() {
128            return;
129        }
130        let block = std::mem::take(&mut self.in_progress).freeze();
131
132        assert!(block.len() < u32::MAX as usize, "Block too large");
133
134        let initial_len = self.completed.len();
135        self.completed.push(block);
136        assert_eq!(
137            self.completed.len(),
138            initial_len + 1,
139            "Invalid state, just completed block already exists"
140        );
141    }
142
143    /// append a non inlined value to self.in_progress.
144    fn append_value_to_buffer(&mut self, value: &[u8]) -> (u32, u32) {
145        assert!(
146            value.len() > BinaryView::MAX_INLINED_SIZE,
147            "must inline small strings"
148        );
149        let required_cap = self.in_progress.len() + value.len();
150        if self.in_progress.capacity() < required_cap {
151            self.flush_in_progress();
152            let next_buffer_size = self.growth_strategy.next_size() as usize;
153            let to_reserve = next_buffer_size.max(value.len());
154            self.in_progress.reserve(to_reserve);
155        }
156
157        let buffer_idx = self.completed.len();
158        let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
159        self.in_progress.extend_from_slice(value);
160
161        (buffer_idx, offset)
162    }
163
164    pub fn completed_block_count(&self) -> u32 {
165        self.completed.len()
166    }
167
168    /// Pushes buffers and pre-adjusted views into the builder.
169    ///
170    /// The provided `buffers` contain sections of data from a `VarBinViewArray`, and the
171    /// `views` are `BinaryView`s that have already been adjusted to reference the correct buffer
172    /// indices and offsets for this builder. All views must point to valid sections within the
173    /// provided buffers, and the validity length must match the view length.
174    ///
175    /// # Warning
176    ///
177    /// This method does not check utilization of the given buffers. Callers must provide
178    /// buffers that are fully utilized by the given adjusted views.
179    ///
180    /// # Panics
181    ///
182    /// Panics if this builder deduplicates buffers and any of the given buffers already
183    /// exist in this builder.
184    pub fn push_buffer_and_adjusted_views(
185        &mut self,
186        buffers: &[ByteBuffer],
187        views: &Buffer<BinaryView>,
188        validity_mask: Mask,
189    ) {
190        self.flush_in_progress();
191
192        let expected_completed_len = self.completed.len() as usize + buffers.len();
193        self.completed.extend_from_slice_unchecked(buffers);
194        assert_eq!(
195            self.completed.len() as usize,
196            expected_completed_len,
197            "Some buffers already exist",
198        );
199        self.views_builder.extend_trusted(views.iter().copied());
200        self.push_only_validity_mask(validity_mask);
201
202        debug_assert_eq!(self.nulls.len(), self.views_builder.len())
203    }
204
205    /// Finishes the builder directly into a [`VarBinViewArray`].
206    pub fn finish_into_varbinview(&mut self) -> VarBinViewArray {
207        self.flush_in_progress();
208        let buffers = std::mem::take(&mut self.completed);
209
210        assert_eq!(
211            self.views_builder.len(),
212            self.nulls.len(),
213            "View and validity length must match"
214        );
215
216        let validity = self.nulls.finish_with_nullability(self.dtype.nullability());
217
218        // SAFETY: the builder methods check safety at each step.
219        unsafe {
220            VarBinViewArray::new_unchecked(
221                std::mem::take(&mut self.views_builder).freeze(),
222                buffers.finish(),
223                self.dtype.clone(),
224                validity,
225            )
226        }
227    }
228
229    // Pushes a validity mask into the builder not affecting the views or buffers
230    fn push_only_validity_mask(&mut self, validity_mask: Mask) {
231        self.nulls.append_validity_mask(validity_mask);
232    }
233}
234
235impl ArrayBuilder for VarBinViewBuilder {
236    fn as_any(&self) -> &dyn Any {
237        self
238    }
239
240    fn as_any_mut(&mut self) -> &mut dyn Any {
241        self
242    }
243
244    fn dtype(&self) -> &DType {
245        &self.dtype
246    }
247
248    fn len(&self) -> usize {
249        self.nulls.len()
250    }
251
252    fn append_zeros(&mut self, n: usize) {
253        self.views_builder.push_n(BinaryView::empty_view(), n);
254        self.nulls.append_n_non_nulls(n);
255    }
256
257    unsafe fn append_nulls_unchecked(&mut self, n: usize) {
258        self.views_builder.push_n(BinaryView::empty_view(), n);
259        self.nulls.append_n_nulls(n);
260    }
261
262    fn append_scalar(&mut self, scalar: &Scalar) -> VortexResult<()> {
263        vortex_ensure!(
264            scalar.dtype() == self.dtype(),
265            "VarBinViewBuilder expected scalar with dtype {}, got {}",
266            self.dtype(),
267            scalar.dtype()
268        );
269
270        match self.dtype() {
271            DType::Utf8(_) => match scalar.as_utf8().value() {
272                Some(value) => self.append_value(value),
273                None => self.append_null(),
274            },
275            DType::Binary(_) => match scalar.as_binary().value() {
276                Some(value) => self.append_value(value),
277                None => self.append_null(),
278            },
279            _ => vortex_bail!(
280                "VarBinViewBuilder can only handle Utf8 or Binary scalars, got {:?}",
281                scalar.dtype()
282            ),
283        }
284
285        Ok(())
286    }
287
288    unsafe fn extend_from_array_unchecked(&mut self, array: &ArrayRef) {
289        let array = array.to_varbinview();
290        self.flush_in_progress();
291
292        self.push_only_validity_mask(
293            array
294                .validity_mask()
295                .vortex_expect("validity_mask in extend_from_array_unchecked"),
296        );
297
298        let view_adjustment =
299            self.completed
300                .extend_from_compaction(BuffersWithOffsets::from_array(
301                    &array,
302                    self.compaction_threshold,
303                ));
304
305        match view_adjustment {
306            ViewAdjustment::Precomputed(adjustment) => self.views_builder.extend_trusted(
307                array
308                    .views()
309                    .iter()
310                    .map(|view| adjustment.adjust_view(view)),
311            ),
312            ViewAdjustment::Rewriting(adjustment) => match array
313                .validity_mask()
314                .vortex_expect("validity_mask in extend_from_array_unchecked")
315            {
316                Mask::AllTrue(_) => {
317                    for (idx, &view) in array.views().iter().enumerate() {
318                        let new_view = self.push_view(view, &adjustment, &array, idx);
319                        self.views_builder.push(new_view);
320                    }
321                }
322                Mask::AllFalse(_) => {
323                    self.views_builder
324                        .push_n(BinaryView::empty_view(), array.len());
325                }
326                Mask::Values(v) => {
327                    for (idx, (&view, is_valid)) in
328                        array.views().iter().zip(v.bit_buffer().iter()).enumerate()
329                    {
330                        let new_view = if !is_valid {
331                            BinaryView::empty_view()
332                        } else {
333                            self.push_view(view, &adjustment, &array, idx)
334                        };
335                        self.views_builder.push(new_view);
336                    }
337                }
338            },
339        }
340    }
341
342    fn reserve_exact(&mut self, additional: usize) {
343        self.views_builder.reserve(additional);
344        self.nulls.reserve_exact(additional);
345    }
346
347    unsafe fn set_validity_unchecked(&mut self, validity: Mask) {
348        self.nulls = LazyBitBufferBuilder::new(validity.len());
349        self.nulls.append_validity_mask(validity);
350    }
351
352    fn finish(&mut self) -> ArrayRef {
353        self.finish_into_varbinview().into_array()
354    }
355
356    fn finish_into_canonical(&mut self) -> Canonical {
357        Canonical::VarBinView(self.finish_into_varbinview())
358    }
359}
360
361impl VarBinViewBuilder {
362    fn push_view(
363        &mut self,
364        view: BinaryView,
365        adjustment: &RewritingViewAdjustment,
366        array: &VarBinViewArray,
367        idx: usize,
368    ) -> BinaryView {
369        if view.is_inlined() {
370            view
371        } else if let Some(adjusted) = adjustment.adjust_view(&view) {
372            adjusted
373        } else {
374            let bytes = array.bytes_at(idx);
375            let (new_buf_idx, new_offset) = self.append_value_to_buffer(&bytes);
376            BinaryView::make_view(bytes.as_slice(), new_buf_idx, new_offset)
377        }
378    }
379}
380
381pub enum CompletedBuffers {
382    Default(Vec<ByteBuffer>),
383    Deduplicated(DeduplicatedBuffers),
384}
385
386impl Default for CompletedBuffers {
387    fn default() -> Self {
388        Self::Default(Vec::new())
389    }
390}
391
392// Self::push enforces len < u32::max
393#[allow(clippy::cast_possible_truncation)]
394impl CompletedBuffers {
395    fn len(&self) -> u32 {
396        match self {
397            Self::Default(buffers) => buffers.len() as u32,
398            Self::Deduplicated(buffers) => buffers.len(),
399        }
400    }
401
402    fn push(&mut self, block: ByteBuffer) -> u32 {
403        match self {
404            Self::Default(buffers) => {
405                assert!(buffers.len() < u32::MAX as usize, "Too many blocks");
406                buffers.push(block);
407                self.len()
408            }
409            Self::Deduplicated(buffers) => buffers.push(block),
410        }
411    }
412
413    /// Does not compact buffers, bypasses utilization checks.
414    fn extend_from_slice_unchecked(&mut self, buffers: &[ByteBuffer]) {
415        for buffer in buffers {
416            self.push(buffer.clone());
417        }
418    }
419
420    fn extend_from_compaction(&mut self, buffers: BuffersWithOffsets) -> ViewAdjustment {
421        match (self, buffers) {
422            (
423                Self::Default(completed_buffers),
424                BuffersWithOffsets::AllKept { buffers, offsets },
425            ) => {
426                let buffer_offset = completed_buffers.len() as u32;
427                completed_buffers.extend_from_slice(&buffers);
428                ViewAdjustment::shift(buffer_offset, offsets)
429            }
430            (
431                Self::Default(completed_buffers),
432                BuffersWithOffsets::SomeCompacted { buffers, offsets },
433            ) => {
434                let lookup = buffers
435                    .iter()
436                    .map(|maybe_buffer| {
437                        maybe_buffer.as_ref().map(|buffer| {
438                            completed_buffers.push(buffer.clone());
439                            completed_buffers.len() as u32 - 1
440                        })
441                    })
442                    .collect();
443                ViewAdjustment::rewriting(lookup, offsets)
444            }
445
446            (
447                Self::Deduplicated(completed_buffers),
448                BuffersWithOffsets::AllKept { buffers, offsets },
449            ) => {
450                let buffer_lookup = completed_buffers.extend_from_iter(buffers.iter().cloned());
451                ViewAdjustment::lookup(buffer_lookup, offsets)
452            }
453            (
454                Self::Deduplicated(completed_buffers),
455                BuffersWithOffsets::SomeCompacted { buffers, offsets },
456            ) => {
457                let buffer_lookup = completed_buffers.extend_from_option_slice(&buffers);
458                ViewAdjustment::rewriting(buffer_lookup, offsets)
459            }
460        }
461    }
462
463    fn finish(self) -> Arc<[ByteBuffer]> {
464        match self {
465            Self::Default(buffers) => Arc::from(buffers),
466            Self::Deduplicated(buffers) => buffers.finish(),
467        }
468    }
469}
470
471#[derive(Default)]
472pub struct DeduplicatedBuffers {
473    buffers: Vec<ByteBuffer>,
474    buffer_to_idx: HashMap<BufferId, u32>,
475}
476
477impl DeduplicatedBuffers {
478    // Self::push enforces len < u32::max
479    #[allow(clippy::cast_possible_truncation)]
480    fn len(&self) -> u32 {
481        self.buffers.len() as u32
482    }
483
484    /// Push a new block if not seen before. Returns the idx of the block.
485    pub(crate) fn push(&mut self, block: ByteBuffer) -> u32 {
486        assert!(self.buffers.len() < u32::MAX as usize, "Too many blocks");
487
488        let initial_len = self.len();
489        let id = BufferId::from(&block);
490        match self.buffer_to_idx.entry(id) {
491            Entry::Occupied(idx) => *idx.get(),
492            Entry::Vacant(entry) => {
493                let idx = initial_len;
494                entry.insert(idx);
495                self.buffers.push(block);
496                idx
497            }
498        }
499    }
500
501    pub(crate) fn extend_from_option_slice(
502        &mut self,
503        buffers: &[Option<ByteBuffer>],
504    ) -> Vec<Option<u32>> {
505        buffers
506            .iter()
507            .map(|buffer| buffer.as_ref().map(|buf| self.push(buf.clone())))
508            .collect()
509    }
510
511    pub(crate) fn extend_from_iter(
512        &mut self,
513        buffers: impl Iterator<Item = ByteBuffer>,
514    ) -> Vec<u32> {
515        buffers.map(|buffer| self.push(buffer)).collect()
516    }
517
518    pub(crate) fn finish(self) -> Arc<[ByteBuffer]> {
519        Arc::from(self.buffers)
520    }
521}
522
523#[derive(PartialEq, Eq, Hash)]
524struct BufferId {
525    // *const u8 stored as usize for `Send`
526    ptr: usize,
527    len: usize,
528}
529
530impl BufferId {
531    fn from(buffer: &ByteBuffer) -> Self {
532        let slice = buffer.as_slice();
533        Self {
534            ptr: slice.as_ptr() as usize,
535            len: slice.len(),
536        }
537    }
538}
539
540#[derive(Debug, Clone)]
541pub enum BufferGrowthStrategy {
542    /// Use a fixed buffer size for all allocations.
543    Fixed { size: u32 },
544    /// Use exponential growth starting from initial_size, doubling until max_size.
545    Exponential { current_size: u32, max_size: u32 },
546}
547
548impl Default for BufferGrowthStrategy {
549    fn default() -> Self {
550        Self::Exponential {
551            current_size: 4 * 1024,    // 4KB starting size
552            max_size: 2 * 1024 * 1024, // 2MB max size
553        }
554    }
555}
556
557impl BufferGrowthStrategy {
558    pub fn fixed(size: u32) -> Self {
559        Self::Fixed { size }
560    }
561
562    pub fn exponential(initial_size: u32, max_size: u32) -> Self {
563        Self::Exponential {
564            current_size: initial_size,
565            max_size,
566        }
567    }
568
569    /// Returns the next buffer size to allocate and updates internal state.
570    pub fn next_size(&mut self) -> u32 {
571        match self {
572            Self::Fixed { size } => *size,
573            Self::Exponential {
574                current_size,
575                max_size,
576            } => {
577                let result = *current_size;
578                if *current_size < *max_size {
579                    *current_size = current_size.saturating_mul(2).min(*max_size);
580                }
581                result
582            }
583        }
584    }
585}
586
587enum BuffersWithOffsets {
588    AllKept {
589        buffers: Arc<[ByteBuffer]>,
590        offsets: Option<Vec<u32>>,
591    },
592    SomeCompacted {
593        buffers: Vec<Option<ByteBuffer>>,
594        offsets: Option<Vec<u32>>,
595    },
596}
597
598impl BuffersWithOffsets {
599    pub fn from_array(array: &VarBinViewArray, compaction_threshold: f64) -> Self {
600        if compaction_threshold == 0.0 {
601            return Self::AllKept {
602                buffers: Arc::from(
603                    array
604                        .buffers()
605                        .to_vec()
606                        .into_iter()
607                        .map(|b| b.unwrap_host())
608                        .collect_vec(),
609                ),
610                offsets: None,
611            };
612        }
613
614        let buffer_utilizations = array
615            .buffer_utilizations()
616            .vortex_expect("buffer_utilizations in BuffersWithOffsets::from_array");
617        let mut has_rewrite = false;
618        let mut has_nonzero_offset = false;
619        for utilization in buffer_utilizations.iter() {
620            match compaction_strategy(utilization, compaction_threshold) {
621                CompactionStrategy::KeepFull => continue,
622                CompactionStrategy::Slice { .. } => has_nonzero_offset = true,
623                CompactionStrategy::Rewrite => has_rewrite = true,
624            }
625        }
626
627        let buffers_with_offsets_iter =
628            buffer_utilizations
629                .iter()
630                .zip(array.buffers().iter())
631                .map(|(utilization, buffer)| {
632                    match compaction_strategy(utilization, compaction_threshold) {
633                        CompactionStrategy::KeepFull => (Some(buffer.as_host().clone()), 0),
634                        CompactionStrategy::Slice { start, end } => (
635                            Some(buffer.as_host().slice(start as usize..end as usize)),
636                            start,
637                        ),
638                        CompactionStrategy::Rewrite => (None, 0),
639                    }
640                });
641
642        match (has_rewrite, has_nonzero_offset) {
643            // keep all buffers
644            (false, false) => {
645                let buffers: Vec<_> = buffers_with_offsets_iter
646                    .map(|(b, _)| b.vortex_expect("already checked for rewrite"))
647                    .collect();
648                Self::AllKept {
649                    buffers: Arc::from(buffers),
650                    offsets: None,
651                }
652            }
653            // rewrite, all zero offsets
654            (true, false) => {
655                let buffers: Vec<_> = buffers_with_offsets_iter.map(|(b, _)| b).collect();
656                Self::SomeCompacted {
657                    buffers,
658                    offsets: None,
659                }
660            }
661            // keep all buffers, but some have offsets
662            (false, true) => {
663                let (buffers, offsets): (Vec<_>, _) = buffers_with_offsets_iter
664                    .map(|(buffer, offset)| {
665                        (buffer.vortex_expect("already checked for rewrite"), offset)
666                    })
667                    .collect();
668                Self::AllKept {
669                    buffers: Arc::from(buffers),
670                    offsets: Some(offsets),
671                }
672            }
673            // rewrite and some have offsets
674            (true, true) => {
675                let (buffers, offsets) = buffers_with_offsets_iter.collect();
676                Self::SomeCompacted {
677                    buffers,
678                    offsets: Some(offsets),
679                }
680            }
681        }
682    }
683}
684
685#[derive(Debug, Clone, Copy, PartialEq, Eq)]
686enum CompactionStrategy {
687    KeepFull,
688    /// Slice the buffer to [start, end) range
689    Slice {
690        start: u32,
691        end: u32,
692    },
693    /// Rewrite data into new compacted buffer
694    Rewrite,
695}
696
697fn compaction_strategy(
698    buffer_utilization: &BufferUtilization,
699    threshold: f64,
700) -> CompactionStrategy {
701    match buffer_utilization.overall_utilization() {
702        // rewrite empty or not used buffers TODO(os): maybe keep them
703        0.0 => CompactionStrategy::Rewrite,
704        utilised if utilised >= threshold => CompactionStrategy::KeepFull,
705        _ if buffer_utilization.range_utilization() >= threshold => {
706            let Range { start, end } = buffer_utilization.range();
707            CompactionStrategy::Slice { start, end }
708        }
709        _ => CompactionStrategy::Rewrite,
710    }
711}
712
713enum ViewAdjustment {
714    Precomputed(PrecomputedViewAdjustment),
715    Rewriting(RewritingViewAdjustment),
716}
717
718impl ViewAdjustment {
719    fn shift(buffer_offset: u32, offsets: Option<Vec<u32>>) -> Self {
720        Self::Precomputed(PrecomputedViewAdjustment::Shift {
721            buffer_offset,
722            offsets,
723        })
724    }
725
726    fn lookup(buffer_lookup: Vec<u32>, offsets: Option<Vec<u32>>) -> Self {
727        Self::Precomputed(PrecomputedViewAdjustment::Lookup {
728            buffer_lookup,
729            offsets,
730        })
731    }
732
733    fn rewriting(buffer_lookup: Vec<Option<u32>>, offsets: Option<Vec<u32>>) -> Self {
734        Self::Rewriting(RewritingViewAdjustment {
735            buffer_lookup,
736            offsets,
737        })
738    }
739}
740
741// Care when adding new variants or fields in this enum, it will mess with inlining if it gets too big
742enum PrecomputedViewAdjustment {
743    Shift {
744        buffer_offset: u32,
745        offsets: Option<Vec<u32>>,
746    },
747    Lookup {
748        buffer_lookup: Vec<u32>,
749        offsets: Option<Vec<u32>>,
750    },
751}
752
753impl PrecomputedViewAdjustment {
754    fn adjust_view(&self, view: &BinaryView) -> BinaryView {
755        if view.is_inlined() {
756            return *view;
757        }
758        let view_ref = view.as_view();
759        match self {
760            Self::Shift {
761                buffer_offset,
762                offsets,
763            } => {
764                let b_idx = view_ref.buffer_index;
765                let offset_shift = offsets
766                    .as_ref()
767                    .map(|o| o[b_idx as usize])
768                    .unwrap_or_default();
769
770                // If offset < offset_shift, this view was invalid and wasn't counted in buffer_utilizations.
771                // Return an empty view to match how invalid views are handled in the Rewriting path.
772                if view_ref.offset < offset_shift {
773                    return BinaryView::empty_view();
774                }
775
776                view_ref
777                    .with_buffer_and_offset(b_idx + buffer_offset, view_ref.offset - offset_shift)
778            }
779            Self::Lookup {
780                buffer_lookup,
781                offsets,
782            } => {
783                let b_idx = view_ref.buffer_index;
784                let buffer = buffer_lookup[b_idx as usize];
785                let offset_shift = offsets
786                    .as_ref()
787                    .map(|o| o[b_idx as usize])
788                    .unwrap_or_default();
789
790                // If offset < offset_shift, this view was invalid and wasn't counted in buffer_utilizations.
791                // Return an empty view to match how invalid views are handled in the Rewriting path.
792                if view_ref.offset < offset_shift {
793                    return BinaryView::empty_view();
794                }
795
796                view_ref.with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
797            }
798        }
799        .into()
800    }
801}
802
803struct RewritingViewAdjustment {
804    buffer_lookup: Vec<Option<u32>>,
805    offsets: Option<Vec<u32>>,
806}
807
808impl RewritingViewAdjustment {
809    /// Can return None if this view can't be adjusted, because there is no precomputed lookup
810    /// for the current buffer.
811    fn adjust_view(&self, view: &BinaryView) -> Option<BinaryView> {
812        if view.is_inlined() {
813            return Some(*view);
814        }
815
816        let view_ref = view.as_view();
817        self.buffer_lookup[view_ref.buffer_index as usize].map(|buffer| {
818            let offset_shift = self
819                .offsets
820                .as_ref()
821                .map(|o| o[view_ref.buffer_index as usize])
822                .unwrap_or_default();
823            view_ref
824                .with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
825                .into()
826        })
827    }
828}
829
830#[cfg(test)]
831mod tests {
832    use vortex_error::VortexResult;
833
834    use crate::IntoArray;
835    use crate::LEGACY_SESSION;
836    use crate::VortexSessionExecute;
837    use crate::assert_arrays_eq;
838    use crate::builders::ArrayBuilder;
839    use crate::builders::VarBinViewBuilder;
840    use crate::builders::varbinview::VarBinViewArray;
841    use crate::dtype::DType;
842    use crate::dtype::Nullability;
843
844    #[test]
845    fn test_utf8_builder() {
846        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
847
848        builder.append_value("Hello");
849        builder.append_null();
850        builder.append_value("World");
851
852        builder.append_nulls(2);
853
854        builder.append_zeros(2);
855        builder.append_value("test");
856
857        let actual = builder.finish();
858        let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
859            Some("Hello"),
860            None,
861            Some("World"),
862            None,
863            None,
864            Some(""),
865            Some(""),
866            Some("test"),
867        ]);
868        assert_arrays_eq!(actual, expected);
869    }
870
871    #[test]
872    fn test_utf8_builder_with_extend() {
873        let array = {
874            let mut builder =
875                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
876            builder.append_null();
877            builder.append_value("Hello2");
878            builder.finish()
879        };
880        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
881
882        builder.append_value("Hello1");
883        builder.extend_from_array(&array);
884        builder.append_nulls(2);
885        builder.append_value("Hello3");
886
887        let actual = builder.finish_into_canonical();
888        let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
889            Some("Hello1"),
890            None,
891            Some("Hello2"),
892            None,
893            None,
894            Some("Hello3"),
895        ]);
896        assert_arrays_eq!(actual.into_array(), expected.into_array());
897    }
898
899    #[test]
900    fn test_buffer_deduplication() -> VortexResult<()> {
901        let array = {
902            let mut builder =
903                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
904            builder.append_value("This is a long string that should not be inlined");
905            builder.append_value("short string");
906            builder.finish_into_varbinview()
907        };
908
909        assert_eq!(array.buffers().len(), 1);
910        let mut builder =
911            VarBinViewBuilder::with_buffer_deduplication(DType::Utf8(Nullability::Nullable), 10);
912
913        let mut ctx = LEGACY_SESSION.create_execution_ctx();
914
915        array.append_to_builder(&mut builder, &mut ctx)?;
916        assert_eq!(builder.completed_block_count(), 1);
917
918        array
919            .slice(1..2)?
920            .append_to_builder(&mut builder, &mut ctx)?;
921        array
922            .slice(0..1)?
923            .append_to_builder(&mut builder, &mut ctx)?;
924        assert_eq!(builder.completed_block_count(), 1);
925
926        let array2 = {
927            let mut builder =
928                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
929            builder.append_value("This is a long string that should not be inlined");
930            builder.finish_into_varbinview()
931        };
932
933        array2.append_to_builder(&mut builder, &mut ctx)?;
934        assert_eq!(builder.completed_block_count(), 2);
935
936        array
937            .slice(0..1)?
938            .append_to_builder(&mut builder, &mut ctx)?;
939        array2
940            .slice(0..1)?
941            .append_to_builder(&mut builder, &mut ctx)?;
942        assert_eq!(builder.completed_block_count(), 2);
943        Ok(())
944    }
945
946    #[test]
947    fn test_append_scalar() {
948        use crate::scalar::Scalar;
949
950        // Test with Utf8 builder.
951        let mut utf8_builder =
952            VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
953
954        // Test appending a valid utf8 value.
955        let utf8_scalar1 = Scalar::utf8("hello", Nullability::Nullable);
956        utf8_builder.append_scalar(&utf8_scalar1).unwrap();
957
958        // Test appending another value.
959        let utf8_scalar2 = Scalar::utf8("world", Nullability::Nullable);
960        utf8_builder.append_scalar(&utf8_scalar2).unwrap();
961
962        // Test appending null value.
963        let null_scalar = Scalar::null(DType::Utf8(Nullability::Nullable));
964        utf8_builder.append_scalar(&null_scalar).unwrap();
965
966        let array = utf8_builder.finish();
967        let expected =
968            <VarBinViewArray as FromIterator<_>>::from_iter([Some("hello"), Some("world"), None]);
969        assert_arrays_eq!(&array, &expected);
970
971        // Test with Binary builder.
972        let mut binary_builder =
973            VarBinViewBuilder::with_capacity(DType::Binary(Nullability::Nullable), 10);
974
975        let binary_scalar = Scalar::binary(vec![1u8, 2, 3], Nullability::Nullable);
976        binary_builder.append_scalar(&binary_scalar).unwrap();
977
978        let binary_null = Scalar::null(DType::Binary(Nullability::Nullable));
979        binary_builder.append_scalar(&binary_null).unwrap();
980
981        let binary_array = binary_builder.finish();
982        let expected =
983            <VarBinViewArray as FromIterator<_>>::from_iter([Some(vec![1u8, 2, 3]), None]);
984        assert_arrays_eq!(&binary_array, &expected);
985
986        // Test wrong dtype error.
987        let mut builder =
988            VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::NonNullable), 10);
989        let wrong_scalar = Scalar::from(42i32);
990        assert!(builder.append_scalar(&wrong_scalar).is_err());
991    }
992
993    #[test]
994    fn test_buffer_growth_strategies() {
995        use super::BufferGrowthStrategy;
996
997        // Test Fixed strategy
998        let mut strategy = BufferGrowthStrategy::fixed(1024);
999
1000        // Should always return the fixed size
1001        assert_eq!(strategy.next_size(), 1024);
1002        assert_eq!(strategy.next_size(), 1024);
1003        assert_eq!(strategy.next_size(), 1024);
1004
1005        // Test Exponential strategy
1006        let mut strategy = BufferGrowthStrategy::exponential(1024, 8192);
1007
1008        // Should double each time until hitting max_size
1009        assert_eq!(strategy.next_size(), 1024); // First: 1024
1010        assert_eq!(strategy.next_size(), 2048); // Second: 2048
1011        assert_eq!(strategy.next_size(), 4096); // Third: 4096
1012        assert_eq!(strategy.next_size(), 8192); // Fourth: 8192 (max)
1013        assert_eq!(strategy.next_size(), 8192); // Fifth: 8192 (capped)
1014    }
1015
1016    #[test]
1017    fn test_large_value_allocation() {
1018        use super::BufferGrowthStrategy;
1019        use super::VarBinViewBuilder;
1020
1021        let mut builder = VarBinViewBuilder::new(
1022            DType::Binary(Nullability::Nullable),
1023            10,
1024            Default::default(),
1025            BufferGrowthStrategy::exponential(1024, 4096),
1026            0.0,
1027        );
1028
1029        // Create a value larger than max_size
1030        let large_value = vec![0u8; 8192];
1031
1032        // Should successfully append the large value
1033        builder.append_value(&large_value);
1034
1035        let array = builder.finish_into_varbinview();
1036        assert_eq!(array.len(), 1);
1037
1038        // Verify the value was stored correctly
1039        let retrieved = array
1040            .scalar_at(0)
1041            .unwrap()
1042            .as_binary()
1043            .value()
1044            .cloned()
1045            .unwrap();
1046        assert_eq!(retrieved.len(), 8192);
1047        assert_eq!(retrieved.as_slice(), &large_value);
1048    }
1049}