vortex_array/builders/
varbinview.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4// TODO(connor): Bring this file more in line with the rest of the builders.
5
6use std::any::Any;
7use std::cmp::max;
8use std::sync::Arc;
9
10use vortex_buffer::{Buffer, BufferMut, ByteBuffer, ByteBufferMut};
11use vortex_dtype::DType;
12use vortex_error::VortexExpect;
13use vortex_mask::Mask;
14use vortex_utils::aliases::hash_map::{Entry, HashMap};
15
16use crate::arrays::{BinaryView, VarBinViewArray};
17use crate::builders::{ArrayBuilder, LazyNullBufferBuilder};
18use crate::canonical::{Canonical, ToCanonical};
19use crate::{Array, ArrayRef, IntoArray};
20
21/// The builder for building a [`VarBinViewArray`].
22pub struct VarBinViewBuilder {
23    dtype: DType,
24    views_builder: BufferMut<BinaryView>,
25    nulls: LazyNullBufferBuilder,
26    completed: CompletedBuffers,
27    in_progress: ByteBufferMut,
28}
29
30impl VarBinViewBuilder {
31    // TODO(joe): add a block growth strategy, from arrow
32    const BLOCK_SIZE: u32 = 8 * 8 * 1024;
33
34    pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
35        Self::new(dtype, capacity, Default::default())
36    }
37
38    pub fn with_buffer_deduplication(dtype: DType, capacity: usize) -> Self {
39        Self::new(
40            dtype,
41            capacity,
42            CompletedBuffers::Deduplicated(Default::default()),
43        )
44    }
45
46    fn new(dtype: DType, capacity: usize, completed: CompletedBuffers) -> Self {
47        assert!(
48            matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
49            "VarBinViewBuilder DType must be Utf8 or Binary."
50        );
51        Self {
52            views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
53            nulls: LazyNullBufferBuilder::new(capacity),
54            completed,
55            in_progress: ByteBufferMut::empty(),
56            dtype,
57        }
58    }
59
60    fn append_value_view(&mut self, value: &[u8]) {
61        let length =
62            u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
63        if length <= 12 {
64            self.views_builder.push(BinaryView::make_view(value, 0, 0));
65            return;
66        }
67
68        let required_cap = self.in_progress.len() + value.len();
69        if self.in_progress.capacity() < required_cap {
70            self.flush_in_progress();
71            let to_reserve = max(value.len(), VarBinViewBuilder::BLOCK_SIZE as usize);
72            self.in_progress.reserve(to_reserve);
73        };
74
75        let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
76        self.in_progress.extend_from_slice(value);
77        let view = BinaryView::make_view(
78            value,
79            // buffer offset
80            self.completed.len(),
81            offset,
82        );
83        self.views_builder.push(view);
84    }
85
86    /// Appends a value to the builder.
87    pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
88        self.append_value_view(value.as_ref());
89        self.nulls.append_non_null();
90    }
91
92    /// Appends an optional value to the builder.
93    ///
94    /// If the value is `Some`, it appends the varbin view value. If the value is `None`, it appends
95    /// a null.
96    ///
97    /// # Panics
98    ///
99    /// This method will panic if the input is `None` and the builder is non-nullable.
100    pub fn append_option<S: AsRef<[u8]>>(&mut self, value: Option<S>) {
101        match value {
102            Some(value) => self.append_value(value),
103            None => self.append_null(),
104        }
105    }
106
107    fn flush_in_progress(&mut self) {
108        if self.in_progress.is_empty() {
109            return;
110        }
111        let block = std::mem::take(&mut self.in_progress).freeze();
112
113        assert!(block.len() < u32::MAX as usize, "Block too large");
114
115        let initial_len = self.completed.len();
116        self.completed.push(block);
117        assert_eq!(
118            self.completed.len(),
119            initial_len + 1,
120            "Invalid state, just completed block already exists"
121        );
122    }
123
124    pub fn completed_block_count(&self) -> u32 {
125        self.completed.len()
126    }
127
128    // Pushes an array of values into the buffer, where the buffers are sections of a
129    // VarBinView and the views are the BinaryView's of the VarBinView *already with their*
130    // buffers adjusted.
131    // The views must all point to sections of the buffers and the validity length must match
132    // the view length.
133    /// ## Panics
134    /// Panics if this builder deduplicates buffers and if any of the given buffers already
135    /// exists on this builder
136    pub fn push_buffer_and_adjusted_views(
137        &mut self,
138        buffer: &[ByteBuffer],
139        views: &Buffer<BinaryView>,
140        validity_mask: Mask,
141    ) {
142        self.flush_in_progress();
143
144        let expected_completed_len = self.completed.len() as usize + buffer.len();
145        self.completed.extend_from_slice(buffer);
146        assert_eq!(
147            self.completed.len() as usize,
148            expected_completed_len,
149            "Some buffers already exist",
150        );
151        self.views_builder.extend_trusted(views.iter().copied());
152        self.push_only_validity_mask(validity_mask);
153
154        debug_assert_eq!(self.nulls.len(), self.views_builder.len())
155    }
156
157    /// Finishes the builder directly into a [`VarBinViewArray`].
158    pub fn finish_into_varbinview(&mut self) -> VarBinViewArray {
159        self.flush_in_progress();
160        let buffers = std::mem::take(&mut self.completed);
161
162        assert_eq!(
163            self.views_builder.len(),
164            self.nulls.len(),
165            "View and validity length must match"
166        );
167
168        let validity = self.nulls.finish_with_nullability(self.dtype.nullability());
169
170        // SAFETY: the builder methods check safety at each step.
171        unsafe {
172            VarBinViewArray::new_unchecked(
173                std::mem::take(&mut self.views_builder).freeze(),
174                buffers.finish(),
175                std::mem::replace(&mut self.dtype, DType::Null),
176                validity,
177            )
178        }
179    }
180}
181
182impl VarBinViewBuilder {
183    // Pushes a validity mask into the builder not affecting the views or buffers
184    fn push_only_validity_mask(&mut self, validity_mask: Mask) {
185        self.nulls.append_validity_mask(validity_mask);
186    }
187}
188
189impl ArrayBuilder for VarBinViewBuilder {
190    fn as_any(&self) -> &dyn Any {
191        self
192    }
193
194    fn as_any_mut(&mut self) -> &mut dyn Any {
195        self
196    }
197
198    fn dtype(&self) -> &DType {
199        &self.dtype
200    }
201
202    fn len(&self) -> usize {
203        self.nulls.len()
204    }
205
206    fn append_zeros(&mut self, n: usize) {
207        self.views_builder.push_n(BinaryView::empty_view(), n);
208        self.nulls.append_n_non_nulls(n);
209    }
210
211    unsafe fn append_nulls_unchecked(&mut self, n: usize) {
212        self.views_builder.push_n(BinaryView::empty_view(), n);
213        self.nulls.append_n_nulls(n);
214    }
215
216    unsafe fn extend_from_array_unchecked(&mut self, array: &dyn Array) {
217        let array = array.to_varbinview();
218        self.flush_in_progress();
219
220        let new_indices = self.completed.extend_from_slice(array.buffers());
221
222        match new_indices {
223            NewIndices::ConstantOffset(offset) => {
224                self.views_builder
225                    .extend_trusted(array.views().iter().map(|view| view.offset_view(offset)));
226            }
227            NewIndices::LookupArray(lookup) => {
228                self.views_builder
229                    .extend_trusted(array.views().iter().map(|view| {
230                        if view.is_inlined() {
231                            *view
232                        } else {
233                            let new_buffer_idx = lookup[view.as_view().buffer_index() as usize];
234                            view.with_buffer_idx(new_buffer_idx)
235                        }
236                    }));
237            }
238        }
239
240        self.push_only_validity_mask(array.validity_mask());
241    }
242
243    fn ensure_capacity(&mut self, capacity: usize) {
244        if capacity > self.views_builder.capacity() {
245            self.views_builder
246                .reserve(capacity - self.views_builder.len());
247            self.nulls.ensure_capacity(capacity);
248        }
249    }
250
251    fn set_validity(&mut self, validity: Mask) {
252        self.nulls = LazyNullBufferBuilder::new(validity.len());
253        self.nulls.append_validity_mask(validity);
254    }
255
256    fn finish(&mut self) -> ArrayRef {
257        self.finish_into_varbinview().into_array()
258    }
259
260    fn finish_into_canonical(&mut self) -> Canonical {
261        Canonical::VarBinView(self.finish_into_varbinview())
262    }
263}
264
265enum CompletedBuffers {
266    Default(Vec<ByteBuffer>),
267    Deduplicated(DeduplicatedBuffers),
268}
269
270impl Default for CompletedBuffers {
271    fn default() -> Self {
272        Self::Default(Vec::new())
273    }
274}
275
276// Self::push enforces len < u32::max
277#[allow(clippy::cast_possible_truncation)]
278impl CompletedBuffers {
279    fn len(&self) -> u32 {
280        match self {
281            Self::Default(buffers) => buffers.len() as u32,
282            Self::Deduplicated(buffers) => buffers.len(),
283        }
284    }
285
286    fn push(&mut self, block: ByteBuffer) -> u32 {
287        match self {
288            Self::Default(buffers) => {
289                assert!(buffers.len() < u32::MAX as usize, "Too many blocks");
290                buffers.push(block);
291                self.len()
292            }
293            Self::Deduplicated(buffers) => buffers.push(block),
294        }
295    }
296
297    fn extend_from_slice(&mut self, new_buffers: &[ByteBuffer]) -> NewIndices {
298        match self {
299            Self::Default(buffers) => {
300                let offset = buffers.len() as u32;
301                buffers.extend_from_slice(new_buffers);
302                NewIndices::ConstantOffset(offset)
303            }
304            Self::Deduplicated(buffers) => {
305                NewIndices::LookupArray(buffers.extend_from_slice(new_buffers))
306            }
307        }
308    }
309
310    fn finish(self) -> Arc<[ByteBuffer]> {
311        match self {
312            Self::Default(buffers) => Arc::from(buffers),
313            Self::Deduplicated(buffers) => buffers.finish(),
314        }
315    }
316}
317
318enum NewIndices {
319    // add a constant offset to get the new idx
320    ConstantOffset(u32),
321    // lookup from the given array to get the new idx
322    LookupArray(Vec<u32>),
323}
324
325#[derive(Default)]
326struct DeduplicatedBuffers {
327    buffers: Vec<ByteBuffer>,
328    buffer_to_idx: HashMap<BufferId, u32>,
329}
330
331impl DeduplicatedBuffers {
332    // Self::push enforces len < u32::max
333    #[allow(clippy::cast_possible_truncation)]
334    fn len(&self) -> u32 {
335        self.buffers.len() as u32
336    }
337
338    /// Push a new block if not seen before. Returns the idx of the block.
339    fn push(&mut self, block: ByteBuffer) -> u32 {
340        assert!(self.buffers.len() < u32::MAX as usize, "Too many blocks");
341
342        let initial_len = self.len();
343        let id = BufferId::from(&block);
344        match self.buffer_to_idx.entry(id) {
345            Entry::Occupied(idx) => *idx.get(),
346            Entry::Vacant(entry) => {
347                let idx = initial_len;
348                entry.insert(idx);
349                self.buffers.push(block);
350                idx
351            }
352        }
353    }
354
355    fn extend_from_slice(&mut self, buffers: &[ByteBuffer]) -> Vec<u32> {
356        buffers
357            .iter()
358            .map(|buffer| self.push(buffer.clone()))
359            .collect()
360    }
361
362    fn finish(self) -> Arc<[ByteBuffer]> {
363        Arc::from(self.buffers)
364    }
365}
366
367#[derive(PartialEq, Eq, Hash)]
368struct BufferId {
369    // *const u8 stored as usize for `Send`
370    ptr: usize,
371    len: usize,
372}
373
374impl BufferId {
375    fn from(buffer: &ByteBuffer) -> Self {
376        let slice = buffer.as_slice();
377        Self {
378            ptr: slice.as_ptr() as usize,
379            len: slice.len(),
380        }
381    }
382}
383
384#[cfg(test)]
385mod tests {
386    use std::str::from_utf8;
387
388    use itertools::Itertools;
389    use vortex_dtype::{DType, Nullability};
390
391    use crate::accessor::ArrayAccessor;
392    use crate::arrays::VarBinViewVTable;
393    use crate::builders::{ArrayBuilder, VarBinViewBuilder};
394
395    #[test]
396    fn test_utf8_builder() {
397        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
398
399        builder.append_option(Some("Hello"));
400        builder.append_option::<&str>(None);
401        builder.append_value("World");
402
403        builder.append_nulls(2);
404
405        builder.append_zeros(2);
406        builder.append_value("test");
407
408        let arr = builder.finish();
409
410        let arr = arr
411            .as_::<VarBinViewVTable>()
412            .with_iterator(|iter| {
413                iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
414                    .collect_vec()
415            })
416            .unwrap();
417        assert_eq!(arr.len(), 8);
418        assert_eq!(
419            arr,
420            vec![
421                Some("Hello".to_string()),
422                None,
423                Some("World".to_string()),
424                None,
425                None,
426                Some("".to_string()),
427                Some("".to_string()),
428                Some("test".to_string()),
429            ]
430        );
431    }
432
433    #[test]
434    fn test_utf8_builder_with_extend() {
435        let array = {
436            let mut builder =
437                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
438            builder.append_null();
439            builder.append_value("Hello2");
440            builder.finish()
441        };
442        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
443
444        builder.append_option(Some("Hello1"));
445        builder.extend_from_array(&array);
446        builder.append_nulls(2);
447        builder.append_value("Hello3");
448
449        let arr = builder.finish_into_canonical().into_varbinview();
450
451        let arr = arr
452            .with_iterator(|iter| {
453                iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
454                    .collect_vec()
455            })
456            .unwrap();
457        assert_eq!(arr.len(), 6);
458        assert_eq!(
459            arr,
460            vec![
461                Some("Hello1".to_string()),
462                None,
463                Some("Hello2".to_string()),
464                None,
465                None,
466                Some("Hello3".to_string()),
467            ]
468        );
469    }
470
471    #[test]
472    fn test_buffer_deduplication() {
473        let array = {
474            let mut builder =
475                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
476            builder.append_value("This is a long string that should not be inlined");
477            builder.append_value("short string");
478            builder.finish_into_varbinview()
479        };
480
481        assert_eq!(array.buffers().len(), 1);
482        let mut builder =
483            VarBinViewBuilder::with_buffer_deduplication(DType::Utf8(Nullability::Nullable), 10);
484
485        array.append_to_builder(&mut builder);
486        assert_eq!(builder.completed_block_count(), 1);
487
488        array.slice(1..2).append_to_builder(&mut builder);
489        array.slice(0..1).append_to_builder(&mut builder);
490        assert_eq!(builder.completed_block_count(), 1);
491
492        let array2 = {
493            let mut builder =
494                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
495            builder.append_value("This is a long string that should not be inlined");
496            builder.finish_into_varbinview()
497        };
498
499        array2.append_to_builder(&mut builder);
500        assert_eq!(builder.completed_block_count(), 2);
501
502        array.slice(0..1).append_to_builder(&mut builder);
503        array2.slice(0..1).append_to_builder(&mut builder);
504        assert_eq!(builder.completed_block_count(), 2);
505    }
506}