Skip to main content

vortex_array/builders/
varbinview.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::ops::Range;
6use std::sync::Arc;
7
8use itertools::Itertools;
9use vortex_buffer::Buffer;
10use vortex_buffer::BufferMut;
11use vortex_buffer::ByteBuffer;
12use vortex_buffer::ByteBufferMut;
13use vortex_error::VortexExpect;
14use vortex_error::VortexResult;
15use vortex_error::vortex_bail;
16use vortex_error::vortex_ensure;
17use vortex_mask::Mask;
18use vortex_utils::aliases::hash_map::Entry;
19use vortex_utils::aliases::hash_map::HashMap;
20
21use crate::ArrayRef;
22use crate::IntoArray;
23use crate::arrays::VarBinViewArray;
24use crate::arrays::varbinview::build_views::BinaryView;
25use crate::arrays::varbinview::compact::BufferUtilization;
26use crate::builders::ArrayBuilder;
27use crate::builders::LazyBitBufferBuilder;
28use crate::canonical::Canonical;
29use crate::canonical::ToCanonical;
30use crate::dtype::DType;
31use crate::scalar::Scalar;
32
33/// The builder for building a [`VarBinViewArray`].
34pub struct VarBinViewBuilder {
35    dtype: DType,
36    views_builder: BufferMut<BinaryView>,
37    nulls: LazyBitBufferBuilder,
38    completed: CompletedBuffers,
39    in_progress: ByteBufferMut,
40    growth_strategy: BufferGrowthStrategy,
41    compaction_threshold: f64,
42}
43
44impl VarBinViewBuilder {
45    pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
46        Self::new(dtype, capacity, Default::default(), Default::default(), 0.0)
47    }
48
49    pub fn with_buffer_deduplication(dtype: DType, capacity: usize) -> Self {
50        Self::new(
51            dtype,
52            capacity,
53            CompletedBuffers::Deduplicated(Default::default()),
54            Default::default(),
55            0.0,
56        )
57    }
58
59    pub fn with_compaction(dtype: DType, capacity: usize, compaction_threshold: f64) -> Self {
60        Self::new(
61            dtype,
62            capacity,
63            Default::default(),
64            Default::default(),
65            compaction_threshold,
66        )
67    }
68
69    pub fn new(
70        dtype: DType,
71        capacity: usize,
72        completed: CompletedBuffers,
73        growth_strategy: BufferGrowthStrategy,
74        compaction_threshold: f64,
75    ) -> Self {
76        assert!(
77            matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
78            "VarBinViewBuilder DType must be Utf8 or Binary."
79        );
80        Self {
81            views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
82            nulls: LazyBitBufferBuilder::new(capacity),
83            completed,
84            in_progress: ByteBufferMut::empty(),
85            dtype,
86            growth_strategy,
87            compaction_threshold,
88        }
89    }
90
91    fn append_value_view(&mut self, value: &[u8]) {
92        let length =
93            u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
94        if length <= 12 {
95            self.views_builder.push(BinaryView::make_view(value, 0, 0));
96            return;
97        }
98
99        let (buffer_idx, offset) = self.append_value_to_buffer(value);
100        let view = BinaryView::make_view(value, buffer_idx, offset);
101        self.views_builder.push(view);
102    }
103
104    /// Appends a value to the builder.
105    pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
106        self.append_value_view(value.as_ref());
107        self.nulls.append_non_null();
108    }
109
110    /// Appends `n` copies of `value` as non-null entries.
111    pub fn append_n_values<S: AsRef<[u8]>>(&mut self, value: S, n: usize) {
112        if n == 0 {
113            return;
114        }
115        let bytes = value.as_ref();
116        let view = if bytes.len() <= BinaryView::MAX_INLINED_SIZE {
117            BinaryView::make_view(bytes, 0, 0)
118        } else {
119            let (buffer_idx, offset) = self.append_value_to_buffer(bytes);
120            BinaryView::make_view(bytes, buffer_idx, offset)
121        };
122        self.views_builder.push_n(view, n);
123        self.nulls.append_n_non_nulls(n);
124    }
125
126    fn flush_in_progress(&mut self) {
127        if self.in_progress.is_empty() {
128            return;
129        }
130        let block = std::mem::take(&mut self.in_progress).freeze();
131
132        assert!(block.len() < u32::MAX as usize, "Block too large");
133
134        let initial_len = self.completed.len();
135        self.completed.push(block);
136        assert_eq!(
137            self.completed.len(),
138            initial_len + 1,
139            "Invalid state, just completed block already exists"
140        );
141    }
142
143    /// append a non inlined value to self.in_progress.
144    fn append_value_to_buffer(&mut self, value: &[u8]) -> (u32, u32) {
145        assert!(
146            value.len() > BinaryView::MAX_INLINED_SIZE,
147            "must inline small strings"
148        );
149        let required_cap = self.in_progress.len() + value.len();
150        if self.in_progress.capacity() < required_cap {
151            self.flush_in_progress();
152            let next_buffer_size = self.growth_strategy.next_size() as usize;
153            let to_reserve = next_buffer_size.max(value.len());
154            self.in_progress.reserve(to_reserve);
155        }
156
157        let buffer_idx = self.completed.len();
158        let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
159        self.in_progress.extend_from_slice(value);
160
161        (buffer_idx, offset)
162    }
163
164    pub fn completed_block_count(&self) -> u32 {
165        self.completed.len()
166    }
167
168    /// Returns true if a non-empty in-progress buffer is staged (and would
169    /// become a completed buffer on the next flush), false otherwise.
170    pub fn in_progress(&self) -> bool {
171        !self.in_progress.is_empty()
172    }
173
174    /// Pushes buffers and pre-adjusted views into the builder.
175    ///
176    /// The provided `buffers` contain sections of data from a `VarBinViewArray`, and the
177    /// `views` are `BinaryView`s that have already been adjusted to reference the correct buffer
178    /// indices and offsets for this builder. All views must point to valid sections within the
179    /// provided buffers, and the validity length must match the view length.
180    ///
181    /// # Warning
182    ///
183    /// This method does not check utilization of the given buffers. Callers must provide
184    /// buffers that are fully utilized by the given adjusted views.
185    ///
186    /// # Panics
187    ///
188    /// Panics if this builder deduplicates buffers and any of the given buffers already
189    /// exist in this builder.
190    pub fn push_buffer_and_adjusted_views(
191        &mut self,
192        buffers: &[ByteBuffer],
193        views: &Buffer<BinaryView>,
194        validity_mask: Mask,
195    ) {
196        self.flush_in_progress();
197
198        let expected_completed_len = self.completed.len() as usize + buffers.len();
199        self.completed.extend_from_slice_unchecked(buffers);
200        assert_eq!(
201            self.completed.len() as usize,
202            expected_completed_len,
203            "Some buffers already exist",
204        );
205        self.views_builder.extend_trusted(views.iter().copied());
206        self.push_only_validity_mask(validity_mask);
207
208        debug_assert_eq!(self.nulls.len(), self.views_builder.len())
209    }
210
211    /// Finishes the builder directly into a [`VarBinViewArray`].
212    pub fn finish_into_varbinview(&mut self) -> VarBinViewArray {
213        self.flush_in_progress();
214        let buffers = std::mem::take(&mut self.completed);
215
216        assert_eq!(
217            self.views_builder.len(),
218            self.nulls.len(),
219            "View and validity length must match"
220        );
221
222        let validity = self.nulls.finish_with_nullability(self.dtype.nullability());
223
224        // SAFETY: the builder methods check safety at each step.
225        unsafe {
226            VarBinViewArray::new_unchecked(
227                std::mem::take(&mut self.views_builder).freeze(),
228                buffers.finish(),
229                self.dtype.clone(),
230                validity,
231            )
232        }
233    }
234
235    // Pushes a validity mask into the builder not affecting the views or buffers
236    fn push_only_validity_mask(&mut self, validity_mask: Mask) {
237        self.nulls.append_validity_mask(validity_mask);
238    }
239}
240
241impl ArrayBuilder for VarBinViewBuilder {
242    fn as_any(&self) -> &dyn Any {
243        self
244    }
245
246    fn as_any_mut(&mut self) -> &mut dyn Any {
247        self
248    }
249
250    fn dtype(&self) -> &DType {
251        &self.dtype
252    }
253
254    fn len(&self) -> usize {
255        self.nulls.len()
256    }
257
258    fn append_zeros(&mut self, n: usize) {
259        self.views_builder.push_n(BinaryView::empty_view(), n);
260        self.nulls.append_n_non_nulls(n);
261    }
262
263    unsafe fn append_nulls_unchecked(&mut self, n: usize) {
264        self.views_builder.push_n(BinaryView::empty_view(), n);
265        self.nulls.append_n_nulls(n);
266    }
267
268    fn append_scalar(&mut self, scalar: &Scalar) -> VortexResult<()> {
269        vortex_ensure!(
270            scalar.dtype() == self.dtype(),
271            "VarBinViewBuilder expected scalar with dtype {}, got {}",
272            self.dtype(),
273            scalar.dtype()
274        );
275
276        match self.dtype() {
277            DType::Utf8(_) => match scalar.as_utf8().value() {
278                Some(value) => self.append_value(value),
279                None => self.append_null(),
280            },
281            DType::Binary(_) => match scalar.as_binary().value() {
282                Some(value) => self.append_value(value),
283                None => self.append_null(),
284            },
285            _ => vortex_bail!(
286                "VarBinViewBuilder can only handle Utf8 or Binary scalars, got {:?}",
287                scalar.dtype()
288            ),
289        }
290
291        Ok(())
292    }
293
294    unsafe fn extend_from_array_unchecked(&mut self, array: &ArrayRef) {
295        let array = array.to_varbinview();
296        self.flush_in_progress();
297
298        self.push_only_validity_mask(array.validity_mask().vortex_expect("validity_mask"));
299
300        let view_adjustment =
301            self.completed
302                .extend_from_compaction(BuffersWithOffsets::from_array(
303                    &array,
304                    self.compaction_threshold,
305                ));
306
307        match view_adjustment {
308            ViewAdjustment::Precomputed(adjustment) => self.views_builder.extend_trusted(
309                array
310                    .views()
311                    .iter()
312                    .map(|view| adjustment.adjust_view(view)),
313            ),
314            ViewAdjustment::Rewriting(adjustment) => {
315                match array.validity_mask().vortex_expect("validity_mask") {
316                    Mask::AllTrue(_) => {
317                        for (idx, &view) in array.views().iter().enumerate() {
318                            let new_view = self.push_view(view, &adjustment, &array, idx);
319                            self.views_builder.push(new_view);
320                        }
321                    }
322                    Mask::AllFalse(_) => {
323                        self.views_builder
324                            .push_n(BinaryView::empty_view(), array.len());
325                    }
326                    Mask::Values(v) => {
327                        for (idx, (&view, is_valid)) in
328                            array.views().iter().zip(v.bit_buffer().iter()).enumerate()
329                        {
330                            let new_view = if !is_valid {
331                                BinaryView::empty_view()
332                            } else {
333                                self.push_view(view, &adjustment, &array, idx)
334                            };
335                            self.views_builder.push(new_view);
336                        }
337                    }
338                }
339            }
340        }
341    }
342
343    fn reserve_exact(&mut self, additional: usize) {
344        self.views_builder.reserve(additional);
345        self.nulls.reserve_exact(additional);
346    }
347
348    unsafe fn set_validity_unchecked(&mut self, validity: Mask) {
349        self.nulls = LazyBitBufferBuilder::new(validity.len());
350        self.nulls.append_validity_mask(validity);
351    }
352
353    fn finish(&mut self) -> ArrayRef {
354        self.finish_into_varbinview().into_array()
355    }
356
357    fn finish_into_canonical(&mut self) -> Canonical {
358        Canonical::VarBinView(self.finish_into_varbinview())
359    }
360}
361
362impl VarBinViewBuilder {
363    fn push_view(
364        &mut self,
365        view: BinaryView,
366        adjustment: &RewritingViewAdjustment,
367        array: &VarBinViewArray,
368        idx: usize,
369    ) -> BinaryView {
370        if view.is_inlined() {
371            view
372        } else if let Some(adjusted) = adjustment.adjust_view(&view) {
373            adjusted
374        } else {
375            let bytes = array.bytes_at(idx);
376            let (new_buf_idx, new_offset) = self.append_value_to_buffer(&bytes);
377            BinaryView::make_view(bytes.as_slice(), new_buf_idx, new_offset)
378        }
379    }
380}
381
382pub enum CompletedBuffers {
383    Default(Vec<ByteBuffer>),
384    Deduplicated(DeduplicatedBuffers),
385}
386
387impl Default for CompletedBuffers {
388    fn default() -> Self {
389        Self::Default(Vec::new())
390    }
391}
392
393// Self::push enforces len < u32::max
394#[allow(clippy::cast_possible_truncation)]
395impl CompletedBuffers {
396    fn len(&self) -> u32 {
397        match self {
398            Self::Default(buffers) => buffers.len() as u32,
399            Self::Deduplicated(buffers) => buffers.len(),
400        }
401    }
402
403    fn push(&mut self, block: ByteBuffer) -> u32 {
404        match self {
405            Self::Default(buffers) => {
406                assert!(buffers.len() < u32::MAX as usize, "Too many blocks");
407                buffers.push(block);
408                self.len()
409            }
410            Self::Deduplicated(buffers) => buffers.push(block),
411        }
412    }
413
414    /// Does not compact buffers, bypasses utilization checks.
415    fn extend_from_slice_unchecked(&mut self, buffers: &[ByteBuffer]) {
416        for buffer in buffers {
417            self.push(buffer.clone());
418        }
419    }
420
421    fn extend_from_compaction(&mut self, buffers: BuffersWithOffsets) -> ViewAdjustment {
422        match (self, buffers) {
423            (
424                Self::Default(completed_buffers),
425                BuffersWithOffsets::AllKept { buffers, offsets },
426            ) => {
427                let buffer_offset = completed_buffers.len() as u32;
428                completed_buffers.extend_from_slice(&buffers);
429                ViewAdjustment::shift(buffer_offset, offsets)
430            }
431            (
432                Self::Default(completed_buffers),
433                BuffersWithOffsets::SomeCompacted { buffers, offsets },
434            ) => {
435                let lookup = buffers
436                    .iter()
437                    .map(|maybe_buffer| {
438                        maybe_buffer.as_ref().map(|buffer| {
439                            completed_buffers.push(buffer.clone());
440                            completed_buffers.len() as u32 - 1
441                        })
442                    })
443                    .collect();
444                ViewAdjustment::rewriting(lookup, offsets)
445            }
446
447            (
448                Self::Deduplicated(completed_buffers),
449                BuffersWithOffsets::AllKept { buffers, offsets },
450            ) => {
451                let buffer_lookup = completed_buffers.extend_from_iter(buffers.iter().cloned());
452                ViewAdjustment::lookup(buffer_lookup, offsets)
453            }
454            (
455                Self::Deduplicated(completed_buffers),
456                BuffersWithOffsets::SomeCompacted { buffers, offsets },
457            ) => {
458                let buffer_lookup = completed_buffers.extend_from_option_slice(&buffers);
459                ViewAdjustment::rewriting(buffer_lookup, offsets)
460            }
461        }
462    }
463
464    fn finish(self) -> Arc<[ByteBuffer]> {
465        match self {
466            Self::Default(buffers) => Arc::from(buffers),
467            Self::Deduplicated(buffers) => buffers.finish(),
468        }
469    }
470}
471
472#[derive(Default)]
473pub struct DeduplicatedBuffers {
474    buffers: Vec<ByteBuffer>,
475    buffer_to_idx: HashMap<BufferId, u32>,
476}
477
478impl DeduplicatedBuffers {
479    // Self::push enforces len < u32::max
480    #[allow(clippy::cast_possible_truncation)]
481    fn len(&self) -> u32 {
482        self.buffers.len() as u32
483    }
484
485    /// Push a new block if not seen before. Returns the idx of the block.
486    pub(crate) fn push(&mut self, block: ByteBuffer) -> u32 {
487        assert!(self.buffers.len() < u32::MAX as usize, "Too many blocks");
488
489        let initial_len = self.len();
490        let id = BufferId::from(&block);
491        match self.buffer_to_idx.entry(id) {
492            Entry::Occupied(idx) => *idx.get(),
493            Entry::Vacant(entry) => {
494                let idx = initial_len;
495                entry.insert(idx);
496                self.buffers.push(block);
497                idx
498            }
499        }
500    }
501
502    pub(crate) fn extend_from_option_slice(
503        &mut self,
504        buffers: &[Option<ByteBuffer>],
505    ) -> Vec<Option<u32>> {
506        buffers
507            .iter()
508            .map(|buffer| buffer.as_ref().map(|buf| self.push(buf.clone())))
509            .collect()
510    }
511
512    pub(crate) fn extend_from_iter(
513        &mut self,
514        buffers: impl Iterator<Item = ByteBuffer>,
515    ) -> Vec<u32> {
516        buffers.map(|buffer| self.push(buffer)).collect()
517    }
518
519    pub(crate) fn finish(self) -> Arc<[ByteBuffer]> {
520        Arc::from(self.buffers)
521    }
522}
523
524#[derive(PartialEq, Eq, Hash)]
525struct BufferId {
526    // *const u8 stored as usize for `Send`
527    ptr: usize,
528    len: usize,
529}
530
531impl BufferId {
532    fn from(buffer: &ByteBuffer) -> Self {
533        let slice = buffer.as_slice();
534        Self {
535            ptr: slice.as_ptr() as usize,
536            len: slice.len(),
537        }
538    }
539}
540
541#[derive(Debug, Clone)]
542pub enum BufferGrowthStrategy {
543    /// Use a fixed buffer size for all allocations.
544    Fixed { size: u32 },
545    /// Use exponential growth starting from initial_size, doubling until max_size.
546    Exponential { current_size: u32, max_size: u32 },
547}
548
549impl Default for BufferGrowthStrategy {
550    fn default() -> Self {
551        Self::Exponential {
552            current_size: 4 * 1024,    // 4KB starting size
553            max_size: 2 * 1024 * 1024, // 2MB max size
554        }
555    }
556}
557
558impl BufferGrowthStrategy {
559    pub fn fixed(size: u32) -> Self {
560        Self::Fixed { size }
561    }
562
563    pub fn exponential(initial_size: u32, max_size: u32) -> Self {
564        Self::Exponential {
565            current_size: initial_size,
566            max_size,
567        }
568    }
569
570    /// Returns the next buffer size to allocate and updates internal state.
571    pub fn next_size(&mut self) -> u32 {
572        match self {
573            Self::Fixed { size } => *size,
574            Self::Exponential {
575                current_size,
576                max_size,
577            } => {
578                let result = *current_size;
579                if *current_size < *max_size {
580                    *current_size = current_size.saturating_mul(2).min(*max_size);
581                }
582                result
583            }
584        }
585    }
586}
587
588enum BuffersWithOffsets {
589    AllKept {
590        buffers: Arc<[ByteBuffer]>,
591        offsets: Option<Vec<u32>>,
592    },
593    SomeCompacted {
594        buffers: Vec<Option<ByteBuffer>>,
595        offsets: Option<Vec<u32>>,
596    },
597}
598
599impl BuffersWithOffsets {
600    pub fn from_array(array: &VarBinViewArray, compaction_threshold: f64) -> Self {
601        if compaction_threshold == 0.0 {
602            return Self::AllKept {
603                buffers: Arc::from(
604                    array
605                        .data_buffers()
606                        .to_vec()
607                        .into_iter()
608                        .map(|b| b.unwrap_host())
609                        .collect_vec(),
610                ),
611                offsets: None,
612            };
613        }
614
615        let buffer_utilizations = array
616            .buffer_utilizations()
617            .vortex_expect("buffer_utilizations in BuffersWithOffsets::from_array");
618        let mut has_rewrite = false;
619        let mut has_nonzero_offset = false;
620        for utilization in buffer_utilizations.iter() {
621            match compaction_strategy(utilization, compaction_threshold) {
622                CompactionStrategy::KeepFull => continue,
623                CompactionStrategy::Slice { .. } => has_nonzero_offset = true,
624                CompactionStrategy::Rewrite => has_rewrite = true,
625            }
626        }
627
628        let buffers_with_offsets_iter = buffer_utilizations
629            .iter()
630            .zip(array.data_buffers().iter())
631            .map(|(utilization, buffer)| {
632                match compaction_strategy(utilization, compaction_threshold) {
633                    CompactionStrategy::KeepFull => (Some(buffer.as_host().clone()), 0),
634                    CompactionStrategy::Slice { start, end } => (
635                        Some(buffer.as_host().slice(start as usize..end as usize)),
636                        start,
637                    ),
638                    CompactionStrategy::Rewrite => (None, 0),
639                }
640            });
641
642        match (has_rewrite, has_nonzero_offset) {
643            // keep all buffers
644            (false, false) => {
645                let buffers: Vec<_> = buffers_with_offsets_iter
646                    .map(|(b, _)| b.vortex_expect("already checked for rewrite"))
647                    .collect();
648                Self::AllKept {
649                    buffers: Arc::from(buffers),
650                    offsets: None,
651                }
652            }
653            // rewrite, all zero offsets
654            (true, false) => {
655                let buffers: Vec<_> = buffers_with_offsets_iter.map(|(b, _)| b).collect();
656                Self::SomeCompacted {
657                    buffers,
658                    offsets: None,
659                }
660            }
661            // keep all buffers, but some have offsets
662            (false, true) => {
663                let (buffers, offsets): (Vec<_>, _) = buffers_with_offsets_iter
664                    .map(|(buffer, offset)| {
665                        (buffer.vortex_expect("already checked for rewrite"), offset)
666                    })
667                    .collect();
668                Self::AllKept {
669                    buffers: Arc::from(buffers),
670                    offsets: Some(offsets),
671                }
672            }
673            // rewrite and some have offsets
674            (true, true) => {
675                let (buffers, offsets) = buffers_with_offsets_iter.collect();
676                Self::SomeCompacted {
677                    buffers,
678                    offsets: Some(offsets),
679                }
680            }
681        }
682    }
683}
684
685#[derive(Debug, Clone, Copy, PartialEq, Eq)]
686enum CompactionStrategy {
687    KeepFull,
688    /// Slice the buffer to [start, end) range
689    Slice {
690        start: u32,
691        end: u32,
692    },
693    /// Rewrite data into new compacted buffer
694    Rewrite,
695}
696
697fn compaction_strategy(
698    buffer_utilization: &BufferUtilization,
699    threshold: f64,
700) -> CompactionStrategy {
701    match buffer_utilization.overall_utilization() {
702        // rewrite empty or not used buffers TODO(os): maybe keep them
703        0.0 => CompactionStrategy::Rewrite,
704        utilised if utilised >= threshold => CompactionStrategy::KeepFull,
705        _ if buffer_utilization.range_utilization() >= threshold => {
706            let Range { start, end } = buffer_utilization.range();
707            CompactionStrategy::Slice { start, end }
708        }
709        _ => CompactionStrategy::Rewrite,
710    }
711}
712
713enum ViewAdjustment {
714    Precomputed(PrecomputedViewAdjustment),
715    Rewriting(RewritingViewAdjustment),
716}
717
718impl ViewAdjustment {
719    fn shift(buffer_offset: u32, offsets: Option<Vec<u32>>) -> Self {
720        Self::Precomputed(PrecomputedViewAdjustment::Shift {
721            buffer_offset,
722            offsets,
723        })
724    }
725
726    fn lookup(buffer_lookup: Vec<u32>, offsets: Option<Vec<u32>>) -> Self {
727        Self::Precomputed(PrecomputedViewAdjustment::Lookup {
728            buffer_lookup,
729            offsets,
730        })
731    }
732
733    fn rewriting(buffer_lookup: Vec<Option<u32>>, offsets: Option<Vec<u32>>) -> Self {
734        Self::Rewriting(RewritingViewAdjustment {
735            buffer_lookup,
736            offsets,
737        })
738    }
739}
740
741// Care when adding new variants or fields in this enum, it will mess with inlining if it gets too big
742enum PrecomputedViewAdjustment {
743    Shift {
744        buffer_offset: u32,
745        offsets: Option<Vec<u32>>,
746    },
747    Lookup {
748        buffer_lookup: Vec<u32>,
749        offsets: Option<Vec<u32>>,
750    },
751}
752
753impl PrecomputedViewAdjustment {
754    fn adjust_view(&self, view: &BinaryView) -> BinaryView {
755        if view.is_inlined() {
756            return *view;
757        }
758        let view_ref = view.as_view();
759        match self {
760            Self::Shift {
761                buffer_offset,
762                offsets,
763            } => {
764                let b_idx = view_ref.buffer_index;
765                let offset_shift = offsets
766                    .as_ref()
767                    .map(|o| o[b_idx as usize])
768                    .unwrap_or_default();
769
770                // If offset < offset_shift, this view was invalid and wasn't counted in buffer_utilizations.
771                // Return an empty view to match how invalid views are handled in the Rewriting path.
772                if view_ref.offset < offset_shift {
773                    return BinaryView::empty_view();
774                }
775
776                view_ref
777                    .with_buffer_and_offset(b_idx + buffer_offset, view_ref.offset - offset_shift)
778            }
779            Self::Lookup {
780                buffer_lookup,
781                offsets,
782            } => {
783                let b_idx = view_ref.buffer_index;
784                let buffer = buffer_lookup[b_idx as usize];
785                let offset_shift = offsets
786                    .as_ref()
787                    .map(|o| o[b_idx as usize])
788                    .unwrap_or_default();
789
790                // If offset < offset_shift, this view was invalid and wasn't counted in buffer_utilizations.
791                // Return an empty view to match how invalid views are handled in the Rewriting path.
792                if view_ref.offset < offset_shift {
793                    return BinaryView::empty_view();
794                }
795
796                view_ref.with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
797            }
798        }
799        .into()
800    }
801}
802
803struct RewritingViewAdjustment {
804    buffer_lookup: Vec<Option<u32>>,
805    offsets: Option<Vec<u32>>,
806}
807
808impl RewritingViewAdjustment {
809    /// Can return None if this view can't be adjusted, because there is no precomputed lookup
810    /// for the current buffer.
811    fn adjust_view(&self, view: &BinaryView) -> Option<BinaryView> {
812        if view.is_inlined() {
813            return Some(*view);
814        }
815
816        let view_ref = view.as_view();
817        self.buffer_lookup[view_ref.buffer_index as usize].map(|buffer| {
818            let offset_shift = self
819                .offsets
820                .as_ref()
821                .map(|o| o[view_ref.buffer_index as usize])
822                .unwrap_or_default();
823            view_ref
824                .with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
825                .into()
826        })
827    }
828}
829
830#[cfg(test)]
831mod tests {
832    use vortex_error::VortexResult;
833
834    use crate::IntoArray;
835    use crate::LEGACY_SESSION;
836    use crate::VortexSessionExecute;
837    use crate::assert_arrays_eq;
838    use crate::builders::ArrayBuilder;
839    use crate::builders::VarBinViewBuilder;
840    use crate::builders::varbinview::VarBinViewArray;
841    use crate::dtype::DType;
842    use crate::dtype::Nullability;
843
844    #[test]
845    fn test_utf8_builder() {
846        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
847
848        builder.append_value("Hello");
849        builder.append_null();
850        builder.append_value("World");
851
852        builder.append_nulls(2);
853
854        builder.append_zeros(2);
855        builder.append_value("test");
856
857        let actual = builder.finish();
858        let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
859            Some("Hello"),
860            None,
861            Some("World"),
862            None,
863            None,
864            Some(""),
865            Some(""),
866            Some("test"),
867        ]);
868        assert_arrays_eq!(actual, expected);
869    }
870
871    #[test]
872    fn test_utf8_builder_with_extend() {
873        let array = {
874            let mut builder =
875                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
876            builder.append_null();
877            builder.append_value("Hello2");
878            builder.finish()
879        };
880        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
881
882        builder.append_value("Hello1");
883        builder.extend_from_array(&array);
884        builder.append_nulls(2);
885        builder.append_value("Hello3");
886
887        let actual = builder.finish_into_canonical();
888        let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
889            Some("Hello1"),
890            None,
891            Some("Hello2"),
892            None,
893            None,
894            Some("Hello3"),
895        ]);
896        assert_arrays_eq!(actual.into_array(), expected.into_array());
897    }
898
899    #[test]
900    fn test_buffer_deduplication() -> VortexResult<()> {
901        let array = {
902            let mut builder =
903                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
904            builder.append_value("This is a long string that should not be inlined");
905            builder.append_value("short string");
906            builder.finish_into_varbinview()
907        };
908
909        assert_eq!(array.data_buffers().len(), 1);
910        let mut builder =
911            VarBinViewBuilder::with_buffer_deduplication(DType::Utf8(Nullability::Nullable), 10);
912
913        let mut ctx = LEGACY_SESSION.create_execution_ctx();
914
915        array.append_to_builder(&mut builder, &mut ctx)?;
916        assert_eq!(builder.completed_block_count(), 1);
917
918        array
919            .slice(1..2)?
920            .append_to_builder(&mut builder, &mut ctx)?;
921        array
922            .slice(0..1)?
923            .append_to_builder(&mut builder, &mut ctx)?;
924        assert_eq!(builder.completed_block_count(), 1);
925
926        let array2 = {
927            let mut builder =
928                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
929            builder.append_value("This is a long string that should not be inlined");
930            builder.finish_into_varbinview()
931        };
932
933        array2.append_to_builder(&mut builder, &mut ctx)?;
934        assert_eq!(builder.completed_block_count(), 2);
935
936        array
937            .slice(0..1)?
938            .append_to_builder(&mut builder, &mut ctx)?;
939        array2
940            .slice(0..1)?
941            .append_to_builder(&mut builder, &mut ctx)?;
942        assert_eq!(builder.completed_block_count(), 2);
943        Ok(())
944    }
945
946    #[test]
947    fn test_append_scalar() {
948        use crate::scalar::Scalar;
949
950        // Test with Utf8 builder.
951        let mut utf8_builder =
952            VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
953
954        // Test appending a valid utf8 value.
955        let utf8_scalar1 = Scalar::utf8("hello", Nullability::Nullable);
956        utf8_builder.append_scalar(&utf8_scalar1).unwrap();
957
958        // Test appending another value.
959        let utf8_scalar2 = Scalar::utf8("world", Nullability::Nullable);
960        utf8_builder.append_scalar(&utf8_scalar2).unwrap();
961
962        // Test appending null value.
963        let null_scalar = Scalar::null(DType::Utf8(Nullability::Nullable));
964        utf8_builder.append_scalar(&null_scalar).unwrap();
965
966        let array = utf8_builder.finish();
967        let expected =
968            <VarBinViewArray as FromIterator<_>>::from_iter([Some("hello"), Some("world"), None]);
969        assert_arrays_eq!(&array, &expected);
970
971        // Test with Binary builder.
972        let mut binary_builder =
973            VarBinViewBuilder::with_capacity(DType::Binary(Nullability::Nullable), 10);
974
975        let binary_scalar = Scalar::binary(vec![1u8, 2, 3], Nullability::Nullable);
976        binary_builder.append_scalar(&binary_scalar).unwrap();
977
978        let binary_null = Scalar::null(DType::Binary(Nullability::Nullable));
979        binary_builder.append_scalar(&binary_null).unwrap();
980
981        let binary_array = binary_builder.finish();
982        let expected =
983            <VarBinViewArray as FromIterator<_>>::from_iter([Some(vec![1u8, 2, 3]), None]);
984        assert_arrays_eq!(&binary_array, &expected);
985
986        // Test wrong dtype error.
987        let mut builder =
988            VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::NonNullable), 10);
989        let wrong_scalar = Scalar::from(42i32);
990        assert!(builder.append_scalar(&wrong_scalar).is_err());
991    }
992
993    #[test]
994    fn test_buffer_growth_strategies() {
995        use super::BufferGrowthStrategy;
996
997        // Test Fixed strategy
998        let mut strategy = BufferGrowthStrategy::fixed(1024);
999
1000        // Should always return the fixed size
1001        assert_eq!(strategy.next_size(), 1024);
1002        assert_eq!(strategy.next_size(), 1024);
1003        assert_eq!(strategy.next_size(), 1024);
1004
1005        // Test Exponential strategy
1006        let mut strategy = BufferGrowthStrategy::exponential(1024, 8192);
1007
1008        // Should double each time until hitting max_size
1009        assert_eq!(strategy.next_size(), 1024); // First: 1024
1010        assert_eq!(strategy.next_size(), 2048); // Second: 2048
1011        assert_eq!(strategy.next_size(), 4096); // Third: 4096
1012        assert_eq!(strategy.next_size(), 8192); // Fourth: 8192 (max)
1013        assert_eq!(strategy.next_size(), 8192); // Fifth: 8192 (capped)
1014    }
1015
1016    #[test]
1017    fn test_large_value_allocation() {
1018        use super::BufferGrowthStrategy;
1019        use super::VarBinViewBuilder;
1020
1021        let mut builder = VarBinViewBuilder::new(
1022            DType::Binary(Nullability::Nullable),
1023            10,
1024            Default::default(),
1025            BufferGrowthStrategy::exponential(1024, 4096),
1026            0.0,
1027        );
1028
1029        // Create a value larger than max_size
1030        let large_value = vec![0u8; 8192];
1031
1032        // Should successfully append the large value
1033        builder.append_value(&large_value);
1034
1035        let array = builder.finish_into_varbinview();
1036        assert_eq!(array.len(), 1);
1037
1038        // Verify the value was stored correctly
1039        let retrieved = array
1040            .scalar_at(0)
1041            .unwrap()
1042            .as_binary()
1043            .value()
1044            .cloned()
1045            .unwrap();
1046        assert_eq!(retrieved.len(), 8192);
1047        assert_eq!(retrieved.as_slice(), &large_value);
1048    }
1049}