vortex_array/builders/
varbinview.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::sync::Arc;
6
7use vortex_buffer::{Buffer, BufferMut, ByteBuffer, ByteBufferMut};
8use vortex_dtype::DType;
9use vortex_error::{VortexExpect, VortexResult, vortex_bail, vortex_ensure};
10use vortex_mask::Mask;
11use vortex_scalar::{BinaryScalar, Scalar, Utf8Scalar};
12use vortex_utils::aliases::hash_map::{Entry, HashMap};
13
14use crate::arrays::{BinaryView, VarBinViewArray};
15use crate::builders::{ArrayBuilder, LazyNullBufferBuilder};
16use crate::canonical::{Canonical, ToCanonical};
17use crate::{Array, ArrayRef, IntoArray};
18
19/// The builder for building a [`VarBinViewArray`].
20pub struct VarBinViewBuilder {
21    dtype: DType,
22    views_builder: BufferMut<BinaryView>,
23    nulls: LazyNullBufferBuilder,
24    completed: CompletedBuffers,
25    in_progress: ByteBufferMut,
26    growth_strategy: BufferGrowthStrategy,
27}
28
29impl VarBinViewBuilder {
30    pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
31        Self::new(dtype, capacity, Default::default(), Default::default())
32    }
33
34    pub fn with_buffer_deduplication(dtype: DType, capacity: usize) -> Self {
35        Self::new(
36            dtype,
37            capacity,
38            CompletedBuffers::Deduplicated(Default::default()),
39            Default::default(),
40        )
41    }
42
43    pub fn new(
44        dtype: DType,
45        capacity: usize,
46        completed: CompletedBuffers,
47        growth_strategy: BufferGrowthStrategy,
48    ) -> Self {
49        assert!(
50            matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
51            "VarBinViewBuilder DType must be Utf8 or Binary."
52        );
53        Self {
54            views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
55            nulls: LazyNullBufferBuilder::new(capacity),
56            completed,
57            in_progress: ByteBufferMut::empty(),
58            dtype,
59            growth_strategy,
60        }
61    }
62
63    fn append_value_view(&mut self, value: &[u8]) {
64        let length =
65            u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
66        if length <= 12 {
67            self.views_builder.push(BinaryView::make_view(value, 0, 0));
68            return;
69        }
70
71        let required_cap = self.in_progress.len() + value.len();
72        if self.in_progress.capacity() < required_cap {
73            self.flush_in_progress();
74            let next_buffer_size = self.growth_strategy.next_size() as usize;
75            let to_reserve = next_buffer_size.max(value.len());
76            self.in_progress.reserve(to_reserve);
77        };
78
79        let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
80        self.in_progress.extend_from_slice(value);
81        let view = BinaryView::make_view(
82            value,
83            // buffer offset
84            self.completed.len(),
85            offset,
86        );
87        self.views_builder.push(view);
88    }
89
90    /// Appends a value to the builder.
91    pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
92        self.append_value_view(value.as_ref());
93        self.nulls.append_non_null();
94    }
95
96    fn flush_in_progress(&mut self) {
97        if self.in_progress.is_empty() {
98            return;
99        }
100        let block = std::mem::take(&mut self.in_progress).freeze();
101
102        assert!(block.len() < u32::MAX as usize, "Block too large");
103
104        let initial_len = self.completed.len();
105        self.completed.push(block);
106        assert_eq!(
107            self.completed.len(),
108            initial_len + 1,
109            "Invalid state, just completed block already exists"
110        );
111    }
112
113    pub fn completed_block_count(&self) -> u32 {
114        self.completed.len()
115    }
116
117    // Pushes an array of values into the buffer, where the buffers are sections of a
118    // VarBinView and the views are the BinaryView's of the VarBinView *already with their*
119    // buffers adjusted.
120    // The views must all point to sections of the buffers and the validity length must match
121    // the view length.
122    /// ## Panics
123    /// Panics if this builder deduplicates buffers and if any of the given buffers already
124    /// exists on this builder
125    pub fn push_buffer_and_adjusted_views(
126        &mut self,
127        buffer: &[ByteBuffer],
128        views: &Buffer<BinaryView>,
129        validity_mask: Mask,
130    ) {
131        self.flush_in_progress();
132
133        let expected_completed_len = self.completed.len() as usize + buffer.len();
134        self.completed.extend_from_slice(buffer);
135        assert_eq!(
136            self.completed.len() as usize,
137            expected_completed_len,
138            "Some buffers already exist",
139        );
140        self.views_builder.extend_trusted(views.iter().copied());
141        self.push_only_validity_mask(validity_mask);
142
143        debug_assert_eq!(self.nulls.len(), self.views_builder.len())
144    }
145
146    /// Finishes the builder directly into a [`VarBinViewArray`].
147    pub fn finish_into_varbinview(&mut self) -> VarBinViewArray {
148        self.flush_in_progress();
149        let buffers = std::mem::take(&mut self.completed);
150
151        assert_eq!(
152            self.views_builder.len(),
153            self.nulls.len(),
154            "View and validity length must match"
155        );
156
157        let validity = self.nulls.finish_with_nullability(self.dtype.nullability());
158
159        // SAFETY: the builder methods check safety at each step.
160        unsafe {
161            VarBinViewArray::new_unchecked(
162                std::mem::take(&mut self.views_builder).freeze(),
163                buffers.finish(),
164                std::mem::replace(&mut self.dtype, DType::Null),
165                validity,
166            )
167        }
168    }
169}
170
171impl VarBinViewBuilder {
172    // Pushes a validity mask into the builder not affecting the views or buffers
173    fn push_only_validity_mask(&mut self, validity_mask: Mask) {
174        self.nulls.append_validity_mask(validity_mask);
175    }
176}
177
178impl ArrayBuilder for VarBinViewBuilder {
179    fn as_any(&self) -> &dyn Any {
180        self
181    }
182
183    fn as_any_mut(&mut self) -> &mut dyn Any {
184        self
185    }
186
187    fn dtype(&self) -> &DType {
188        &self.dtype
189    }
190
191    fn len(&self) -> usize {
192        self.nulls.len()
193    }
194
195    fn append_zeros(&mut self, n: usize) {
196        self.views_builder.push_n(BinaryView::empty_view(), n);
197        self.nulls.append_n_non_nulls(n);
198    }
199
200    unsafe fn append_nulls_unchecked(&mut self, n: usize) {
201        self.views_builder.push_n(BinaryView::empty_view(), n);
202        self.nulls.append_n_nulls(n);
203    }
204
205    fn append_scalar(&mut self, scalar: &Scalar) -> VortexResult<()> {
206        vortex_ensure!(
207            scalar.dtype() == self.dtype(),
208            "VarBinViewBuilder expected scalar with dtype {:?}, got {:?}",
209            self.dtype(),
210            scalar.dtype()
211        );
212
213        match self.dtype() {
214            DType::Utf8(_) => {
215                let utf8_scalar = Utf8Scalar::try_from(scalar)?;
216                match utf8_scalar.value() {
217                    Some(value) => self.append_value(value),
218                    None => self.append_null(),
219                }
220            }
221            DType::Binary(_) => {
222                let binary_scalar = BinaryScalar::try_from(scalar)?;
223                match binary_scalar.value() {
224                    Some(value) => self.append_value(value),
225                    None => self.append_null(),
226                }
227            }
228            _ => vortex_bail!(
229                "VarBinViewBuilder can only handle Utf8 or Binary scalars, got {:?}",
230                scalar.dtype()
231            ),
232        }
233
234        Ok(())
235    }
236
237    unsafe fn extend_from_array_unchecked(&mut self, array: &dyn Array) {
238        let array = array.to_varbinview();
239        self.flush_in_progress();
240
241        let new_indices = self.completed.extend_from_slice(array.buffers());
242
243        match new_indices {
244            NewIndices::ConstantOffset(offset) => {
245                self.views_builder
246                    .extend_trusted(array.views().iter().map(|view| view.offset_view(offset)));
247            }
248            NewIndices::LookupArray(lookup) => {
249                self.views_builder
250                    .extend_trusted(array.views().iter().map(|view| {
251                        if view.is_inlined() {
252                            *view
253                        } else {
254                            let new_buffer_idx = lookup[view.as_view().buffer_index() as usize];
255                            view.with_buffer_idx(new_buffer_idx)
256                        }
257                    }));
258            }
259        }
260
261        self.push_only_validity_mask(array.validity_mask());
262    }
263
264    fn ensure_capacity(&mut self, capacity: usize) {
265        if capacity > self.views_builder.capacity() {
266            self.views_builder
267                .reserve(capacity - self.views_builder.len());
268            self.nulls.ensure_capacity(capacity);
269        }
270    }
271
272    fn set_validity(&mut self, validity: Mask) {
273        self.nulls = LazyNullBufferBuilder::new(validity.len());
274        self.nulls.append_validity_mask(validity);
275    }
276
277    fn finish(&mut self) -> ArrayRef {
278        self.finish_into_varbinview().into_array()
279    }
280
281    fn finish_into_canonical(&mut self) -> Canonical {
282        Canonical::VarBinView(self.finish_into_varbinview())
283    }
284}
285
286pub enum CompletedBuffers {
287    Default(Vec<ByteBuffer>),
288    Deduplicated(DeduplicatedBuffers),
289}
290
291impl Default for CompletedBuffers {
292    fn default() -> Self {
293        Self::Default(Vec::new())
294    }
295}
296
297// Self::push enforces len < u32::max
298#[allow(clippy::cast_possible_truncation)]
299impl CompletedBuffers {
300    fn len(&self) -> u32 {
301        match self {
302            Self::Default(buffers) => buffers.len() as u32,
303            Self::Deduplicated(buffers) => buffers.len(),
304        }
305    }
306
307    fn push(&mut self, block: ByteBuffer) -> u32 {
308        match self {
309            Self::Default(buffers) => {
310                assert!(buffers.len() < u32::MAX as usize, "Too many blocks");
311                buffers.push(block);
312                self.len()
313            }
314            Self::Deduplicated(buffers) => buffers.push(block),
315        }
316    }
317
318    fn extend_from_slice(&mut self, new_buffers: &[ByteBuffer]) -> NewIndices {
319        match self {
320            Self::Default(buffers) => {
321                let offset = buffers.len() as u32;
322                buffers.extend_from_slice(new_buffers);
323                NewIndices::ConstantOffset(offset)
324            }
325            Self::Deduplicated(buffers) => {
326                NewIndices::LookupArray(buffers.extend_from_slice(new_buffers))
327            }
328        }
329    }
330
331    fn finish(self) -> Arc<[ByteBuffer]> {
332        match self {
333            Self::Default(buffers) => Arc::from(buffers),
334            Self::Deduplicated(buffers) => buffers.finish(),
335        }
336    }
337}
338
339enum NewIndices {
340    // add a constant offset to get the new idx
341    ConstantOffset(u32),
342    // lookup from the given array to get the new idx
343    LookupArray(Vec<u32>),
344}
345
346#[derive(Default)]
347pub struct DeduplicatedBuffers {
348    buffers: Vec<ByteBuffer>,
349    buffer_to_idx: HashMap<BufferId, u32>,
350}
351
352impl DeduplicatedBuffers {
353    // Self::push enforces len < u32::max
354    #[allow(clippy::cast_possible_truncation)]
355    fn len(&self) -> u32 {
356        self.buffers.len() as u32
357    }
358
359    /// Push a new block if not seen before. Returns the idx of the block.
360    fn push(&mut self, block: ByteBuffer) -> u32 {
361        assert!(self.buffers.len() < u32::MAX as usize, "Too many blocks");
362
363        let initial_len = self.len();
364        let id = BufferId::from(&block);
365        match self.buffer_to_idx.entry(id) {
366            Entry::Occupied(idx) => *idx.get(),
367            Entry::Vacant(entry) => {
368                let idx = initial_len;
369                entry.insert(idx);
370                self.buffers.push(block);
371                idx
372            }
373        }
374    }
375
376    fn extend_from_slice(&mut self, buffers: &[ByteBuffer]) -> Vec<u32> {
377        buffers
378            .iter()
379            .map(|buffer| self.push(buffer.clone()))
380            .collect()
381    }
382
383    fn finish(self) -> Arc<[ByteBuffer]> {
384        Arc::from(self.buffers)
385    }
386}
387
388#[derive(PartialEq, Eq, Hash)]
389struct BufferId {
390    // *const u8 stored as usize for `Send`
391    ptr: usize,
392    len: usize,
393}
394
395impl BufferId {
396    fn from(buffer: &ByteBuffer) -> Self {
397        let slice = buffer.as_slice();
398        Self {
399            ptr: slice.as_ptr() as usize,
400            len: slice.len(),
401        }
402    }
403}
404
405#[derive(Debug, Clone)]
406pub enum BufferGrowthStrategy {
407    /// Use a fixed buffer size for all allocations.
408    Fixed { size: u32 },
409    /// Use exponential growth starting from initial_size, doubling until max_size.
410    Exponential { current_size: u32, max_size: u32 },
411}
412
413impl Default for BufferGrowthStrategy {
414    fn default() -> Self {
415        Self::Exponential {
416            current_size: 4 * 1024,    // 4KB starting size
417            max_size: 2 * 1024 * 1024, // 2MB max size
418        }
419    }
420}
421
422impl BufferGrowthStrategy {
423    pub fn fixed(size: u32) -> Self {
424        Self::Fixed { size }
425    }
426
427    pub fn exponential(initial_size: u32, max_size: u32) -> Self {
428        Self::Exponential {
429            current_size: initial_size,
430            max_size,
431        }
432    }
433
434    /// Returns the next buffer size to allocate and updates internal state.
435    pub fn next_size(&mut self) -> u32 {
436        match self {
437            Self::Fixed { size } => *size,
438            Self::Exponential {
439                current_size,
440                max_size,
441            } => {
442                let result = *current_size;
443                if *current_size < *max_size {
444                    *current_size = current_size.saturating_mul(2).min(*max_size);
445                }
446                result
447            }
448        }
449    }
450}
451
452#[cfg(test)]
453mod tests {
454    use std::str::from_utf8;
455
456    use itertools::Itertools;
457    use vortex_dtype::{DType, Nullability};
458
459    use crate::accessor::ArrayAccessor;
460    use crate::arrays::VarBinViewVTable;
461    use crate::builders::{ArrayBuilder, VarBinViewBuilder};
462
463    #[test]
464    fn test_utf8_builder() {
465        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
466
467        builder.append_value("Hello");
468        builder.append_null();
469        builder.append_value("World");
470
471        builder.append_nulls(2);
472
473        builder.append_zeros(2);
474        builder.append_value("test");
475
476        let arr = builder.finish();
477
478        let arr = arr
479            .as_::<VarBinViewVTable>()
480            .with_iterator(|iter| {
481                iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
482                    .collect_vec()
483            })
484            .unwrap();
485        assert_eq!(arr.len(), 8);
486        assert_eq!(
487            arr,
488            vec![
489                Some("Hello".to_string()),
490                None,
491                Some("World".to_string()),
492                None,
493                None,
494                Some("".to_string()),
495                Some("".to_string()),
496                Some("test".to_string()),
497            ]
498        );
499    }
500
501    #[test]
502    fn test_utf8_builder_with_extend() {
503        let array = {
504            let mut builder =
505                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
506            builder.append_null();
507            builder.append_value("Hello2");
508            builder.finish()
509        };
510        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
511
512        builder.append_value("Hello1");
513        builder.extend_from_array(&array);
514        builder.append_nulls(2);
515        builder.append_value("Hello3");
516
517        let arr = builder.finish_into_canonical().into_varbinview();
518
519        let arr = arr
520            .with_iterator(|iter| {
521                iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
522                    .collect_vec()
523            })
524            .unwrap();
525        assert_eq!(arr.len(), 6);
526        assert_eq!(
527            arr,
528            vec![
529                Some("Hello1".to_string()),
530                None,
531                Some("Hello2".to_string()),
532                None,
533                None,
534                Some("Hello3".to_string()),
535            ]
536        );
537    }
538
539    #[test]
540    fn test_buffer_deduplication() {
541        let array = {
542            let mut builder =
543                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
544            builder.append_value("This is a long string that should not be inlined");
545            builder.append_value("short string");
546            builder.finish_into_varbinview()
547        };
548
549        assert_eq!(array.buffers().len(), 1);
550        let mut builder =
551            VarBinViewBuilder::with_buffer_deduplication(DType::Utf8(Nullability::Nullable), 10);
552
553        array.append_to_builder(&mut builder);
554        assert_eq!(builder.completed_block_count(), 1);
555
556        array.slice(1..2).append_to_builder(&mut builder);
557        array.slice(0..1).append_to_builder(&mut builder);
558        assert_eq!(builder.completed_block_count(), 1);
559
560        let array2 = {
561            let mut builder =
562                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
563            builder.append_value("This is a long string that should not be inlined");
564            builder.finish_into_varbinview()
565        };
566
567        array2.append_to_builder(&mut builder);
568        assert_eq!(builder.completed_block_count(), 2);
569
570        array.slice(0..1).append_to_builder(&mut builder);
571        array2.slice(0..1).append_to_builder(&mut builder);
572        assert_eq!(builder.completed_block_count(), 2);
573    }
574
575    #[test]
576    fn test_append_scalar() {
577        use vortex_scalar::Scalar;
578
579        // Test with Utf8 builder.
580        let mut utf8_builder =
581            VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
582
583        // Test appending a valid utf8 value.
584        let utf8_scalar1 = Scalar::utf8("hello", Nullability::Nullable);
585        utf8_builder.append_scalar(&utf8_scalar1).unwrap();
586
587        // Test appending another value.
588        let utf8_scalar2 = Scalar::utf8("world", Nullability::Nullable);
589        utf8_builder.append_scalar(&utf8_scalar2).unwrap();
590
591        // Test appending null value.
592        let null_scalar = Scalar::null(DType::Utf8(Nullability::Nullable));
593        utf8_builder.append_scalar(&null_scalar).unwrap();
594
595        let array = utf8_builder.finish();
596        assert_eq!(array.len(), 3);
597
598        // Check actual values using scalar_at.
599        use crate::array::Array;
600        let scalar0 = array.scalar_at(0).as_utf8().value();
601        assert_eq!(scalar0.as_ref().map(|s| s.as_str()), Some("hello"));
602
603        let scalar1 = array.scalar_at(1).as_utf8().value();
604        assert_eq!(scalar1.as_ref().map(|s| s.as_str()), Some("world"));
605
606        let scalar2 = array.scalar_at(2).as_utf8().value();
607        assert_eq!(scalar2, None); // This should be null.
608
609        // Test with Binary builder.
610        let mut binary_builder =
611            VarBinViewBuilder::with_capacity(DType::Binary(Nullability::Nullable), 10);
612
613        let binary_scalar = Scalar::binary(vec![1u8, 2, 3], Nullability::Nullable);
614        binary_builder.append_scalar(&binary_scalar).unwrap();
615
616        let binary_null = Scalar::null(DType::Binary(Nullability::Nullable));
617        binary_builder.append_scalar(&binary_null).unwrap();
618
619        let binary_array = binary_builder.finish();
620        assert_eq!(binary_array.len(), 2);
621
622        // Check actual binary values.
623        let binary0 = binary_array.scalar_at(0).as_binary().value();
624        assert_eq!(
625            binary0.as_ref().map(|b| b.as_slice()),
626            Some(&[1u8, 2, 3][..])
627        );
628
629        let binary1 = binary_array.scalar_at(1).as_binary().value();
630        assert_eq!(binary1, None); // This should be null.
631
632        // Test wrong dtype error.
633        let mut builder =
634            VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::NonNullable), 10);
635        let wrong_scalar = Scalar::from(42i32);
636        assert!(builder.append_scalar(&wrong_scalar).is_err());
637    }
638
639    #[test]
640    fn test_buffer_growth_strategies() {
641        use super::BufferGrowthStrategy;
642
643        // Test Fixed strategy
644        let mut strategy = BufferGrowthStrategy::fixed(1024);
645
646        // Should always return the fixed size
647        assert_eq!(strategy.next_size(), 1024);
648        assert_eq!(strategy.next_size(), 1024);
649        assert_eq!(strategy.next_size(), 1024);
650
651        // Test Exponential strategy
652        let mut strategy = BufferGrowthStrategy::exponential(1024, 8192);
653
654        // Should double each time until hitting max_size
655        assert_eq!(strategy.next_size(), 1024); // First: 1024
656        assert_eq!(strategy.next_size(), 2048); // Second: 2048
657        assert_eq!(strategy.next_size(), 4096); // Third: 4096
658        assert_eq!(strategy.next_size(), 8192); // Fourth: 8192 (max)
659        assert_eq!(strategy.next_size(), 8192); // Fifth: 8192 (capped)
660    }
661
662    #[test]
663    fn test_large_value_allocation() {
664        use super::{BufferGrowthStrategy, VarBinViewBuilder};
665
666        let mut builder = VarBinViewBuilder::new(
667            DType::Binary(Nullability::Nullable),
668            10,
669            Default::default(),
670            BufferGrowthStrategy::exponential(1024, 4096),
671        );
672
673        // Create a value larger than max_size
674        let large_value = vec![0u8; 8192];
675
676        // Should successfully append the large value
677        builder.append_value(&large_value);
678
679        let array = builder.finish_into_varbinview();
680        assert_eq!(array.len(), 1);
681
682        // Verify the value was stored correctly
683        let retrieved = array.scalar_at(0).as_binary().value().unwrap();
684        assert_eq!(retrieved.len(), 8192);
685        assert_eq!(retrieved.as_slice(), &large_value);
686    }
687}