vortex_array/builders/
varbinview.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::cmp::max;
6use std::sync::Arc;
7
8use vortex_buffer::{Buffer, BufferMut, ByteBuffer, ByteBufferMut};
9use vortex_dtype::{DType, Nullability};
10use vortex_error::{VortexExpect, VortexResult};
11use vortex_mask::Mask;
12use vortex_utils::aliases::hash_map::{Entry, HashMap};
13
14use crate::arrays::{BinaryView, VarBinViewArray};
15use crate::builders::ArrayBuilder;
16use crate::builders::lazy_validity_builder::LazyNullBufferBuilder;
17use crate::{Array, ArrayRef, IntoArray, ToCanonical};
18
19pub struct VarBinViewBuilder {
20    views_builder: BufferMut<BinaryView>,
21    pub null_buffer_builder: LazyNullBufferBuilder,
22    completed: CompletedBuffers,
23    in_progress: ByteBufferMut,
24    nullability: Nullability,
25    dtype: DType,
26}
27
28impl VarBinViewBuilder {
29    // TODO(joe): add a block growth strategy, from arrow
30    const BLOCK_SIZE: u32 = 8 * 8 * 1024;
31
32    pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
33        Self::new(dtype, capacity, Default::default())
34    }
35
36    pub fn with_buffer_deduplication(dtype: DType, capacity: usize) -> Self {
37        Self::new(
38            dtype,
39            capacity,
40            CompletedBuffers::Deduplicated(Default::default()),
41        )
42    }
43
44    fn new(dtype: DType, capacity: usize, completed: CompletedBuffers) -> Self {
45        assert!(
46            matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
47            "VarBinViewBuilder DType must be Utf8 or Binary."
48        );
49        Self {
50            views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
51            null_buffer_builder: LazyNullBufferBuilder::new(capacity),
52            completed,
53            in_progress: ByteBufferMut::empty(),
54            nullability: dtype.nullability(),
55            dtype,
56        }
57    }
58
59    fn append_value_view(&mut self, value: &[u8]) {
60        let length =
61            u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
62        if length <= 12 {
63            self.views_builder.push(BinaryView::make_view(value, 0, 0));
64            return;
65        }
66
67        let required_cap = self.in_progress.len() + value.len();
68        if self.in_progress.capacity() < required_cap {
69            self.flush_in_progress();
70            let to_reserve = max(value.len(), VarBinViewBuilder::BLOCK_SIZE as usize);
71            self.in_progress.reserve(to_reserve);
72        };
73
74        let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
75        self.in_progress.extend_from_slice(value);
76        let view = BinaryView::make_view(
77            value,
78            // buffer offset
79            self.completed.len(),
80            offset,
81        );
82        self.views_builder.push(view);
83    }
84
85    #[inline]
86    pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
87        self.append_value_view(value.as_ref());
88        self.null_buffer_builder.append_non_null();
89    }
90
91    #[inline]
92    pub fn append_option<S: AsRef<[u8]>>(&mut self, value: Option<S>) {
93        match value {
94            Some(value) => self.append_value(value),
95            None => self.append_null(),
96        }
97    }
98
99    #[inline]
100    fn flush_in_progress(&mut self) {
101        if self.in_progress.is_empty() {
102            return;
103        }
104        let block = std::mem::take(&mut self.in_progress).freeze();
105
106        assert!(block.len() < u32::MAX as usize, "Block too large");
107
108        let initial_len = self.completed.len();
109        self.completed.push(block);
110        assert_eq!(
111            self.completed.len(),
112            initial_len + 1,
113            "Invalid state, just completed block already exists"
114        );
115    }
116
117    pub fn completed_block_count(&self) -> usize {
118        self.completed.len() as usize
119    }
120
121    // Pushes an array of values into the buffer, where the buffers are sections of a
122    // VarBinView and the views are the BinaryView's of the VarBinView *already with their*
123    // buffers adjusted.
124    // The views must all point to sections of the buffers and the validity length must match
125    // the view length.
126    /// ## Panics
127    /// Panics if this builder deduplicates buffers and if any of the given buffers already
128    /// exists on this builder
129    pub fn push_buffer_and_adjusted_views(
130        &mut self,
131        buffer: &[ByteBuffer],
132        views: &Buffer<BinaryView>,
133        validity_mask: Mask,
134    ) {
135        self.flush_in_progress();
136
137        let expected_completed_len = self.completed.len() as usize + buffer.len();
138        self.completed.extend_from_slice(buffer);
139        assert_eq!(
140            self.completed.len() as usize,
141            expected_completed_len,
142            "Some buffers already exist",
143        );
144        self.views_builder.extend_trusted(views.iter().copied());
145        self.push_only_validity_mask(validity_mask);
146
147        debug_assert_eq!(self.null_buffer_builder.len(), self.views_builder.len())
148    }
149
150    pub fn finish_into_varbinview(&mut self) -> VarBinViewArray {
151        self.flush_in_progress();
152        let buffers = std::mem::take(&mut self.completed);
153
154        assert_eq!(
155            self.views_builder.len(),
156            self.null_buffer_builder.len(),
157            "View and validity length must match"
158        );
159
160        let validity = self
161            .null_buffer_builder
162            .finish_with_nullability(self.nullability);
163
164        VarBinViewArray::try_new(
165            std::mem::take(&mut self.views_builder).freeze(),
166            buffers.finish(),
167            std::mem::replace(&mut self.dtype, DType::Null),
168            validity,
169        )
170        .vortex_expect("VarBinViewArray components should be valid.")
171    }
172}
173
174impl VarBinViewBuilder {
175    // Pushes a validity mask into the builder not affecting the views or buffers
176    fn push_only_validity_mask(&mut self, validity_mask: Mask) {
177        self.null_buffer_builder.append_validity_mask(validity_mask);
178    }
179}
180
181impl ArrayBuilder for VarBinViewBuilder {
182    fn as_any(&self) -> &dyn Any {
183        self
184    }
185
186    fn as_any_mut(&mut self) -> &mut dyn Any {
187        self
188    }
189
190    #[inline]
191    fn dtype(&self) -> &DType {
192        &self.dtype
193    }
194
195    #[inline]
196    fn len(&self) -> usize {
197        self.null_buffer_builder.len()
198    }
199
200    #[inline]
201    fn append_zeros(&mut self, n: usize) {
202        self.views_builder.push_n(BinaryView::empty_view(), n);
203        self.null_buffer_builder.append_n_non_nulls(n);
204    }
205
206    #[inline]
207    fn append_nulls(&mut self, n: usize) {
208        self.views_builder.push_n(BinaryView::empty_view(), n);
209        self.null_buffer_builder.append_n_nulls(n);
210    }
211
212    #[inline]
213    fn extend_from_array(&mut self, array: &dyn Array) -> VortexResult<()> {
214        let array = array.to_varbinview()?;
215        self.flush_in_progress();
216
217        let new_indices = self.completed.extend_from_slice(array.buffers());
218
219        match new_indices {
220            NewIndices::ConstantOffset(offset) => {
221                self.views_builder
222                    .extend_trusted(array.views().iter().map(|view| view.offset_view(offset)));
223            }
224            NewIndices::LookupArray(lookup) => {
225                self.views_builder
226                    .extend_trusted(array.views().iter().map(|view| {
227                        if view.is_inlined() {
228                            *view
229                        } else {
230                            let new_buffer_idx = lookup[view.as_view().buffer_index() as usize];
231                            view.with_buffer_idx(new_buffer_idx)
232                        }
233                    }));
234            }
235        }
236
237        self.push_only_validity_mask(array.validity_mask()?);
238
239        Ok(())
240    }
241
242    fn ensure_capacity(&mut self, capacity: usize) {
243        if capacity > self.views_builder.capacity() {
244            self.views_builder
245                .reserve(capacity - self.views_builder.len());
246            self.null_buffer_builder.ensure_capacity(capacity);
247        }
248    }
249
250    fn set_validity(&mut self, validity: Mask) {
251        self.null_buffer_builder = LazyNullBufferBuilder::new(validity.len());
252        self.null_buffer_builder.append_validity_mask(validity);
253    }
254
255    fn finish(&mut self) -> ArrayRef {
256        self.finish_into_varbinview().into_array()
257    }
258}
259
260enum CompletedBuffers {
261    Default(Vec<ByteBuffer>),
262    Deduplicated(DeduplicatedBuffers),
263}
264
265impl Default for CompletedBuffers {
266    fn default() -> Self {
267        Self::Default(Vec::new())
268    }
269}
270
271// Self::push enforces len < u32::max
272#[allow(clippy::cast_possible_truncation)]
273impl CompletedBuffers {
274    fn len(&self) -> u32 {
275        match self {
276            Self::Default(buffers) => buffers.len() as u32,
277            Self::Deduplicated(buffers) => buffers.len(),
278        }
279    }
280
281    fn push(&mut self, block: ByteBuffer) -> u32 {
282        match self {
283            Self::Default(buffers) => {
284                assert!(buffers.len() < u32::MAX as usize, "Too many blocks");
285                buffers.push(block);
286                self.len()
287            }
288            Self::Deduplicated(buffers) => buffers.push(block),
289        }
290    }
291
292    fn extend_from_slice(&mut self, new_buffers: &[ByteBuffer]) -> NewIndices {
293        match self {
294            Self::Default(buffers) => {
295                let offset = buffers.len() as u32;
296                buffers.extend_from_slice(new_buffers);
297                NewIndices::ConstantOffset(offset)
298            }
299            Self::Deduplicated(buffers) => {
300                NewIndices::LookupArray(buffers.extend_from_slice(new_buffers))
301            }
302        }
303    }
304
305    fn finish(self) -> Arc<[ByteBuffer]> {
306        match self {
307            Self::Default(buffers) => Arc::from(buffers),
308            Self::Deduplicated(buffers) => buffers.finish(),
309        }
310    }
311}
312
313enum NewIndices {
314    // add a constant offset to get the new idx
315    ConstantOffset(u32),
316    // lookup from the given array to get the new idx
317    LookupArray(Vec<u32>),
318}
319
320#[derive(Default)]
321struct DeduplicatedBuffers {
322    buffers: Vec<ByteBuffer>,
323    buffer_to_idx: HashMap<BufferId, u32>,
324}
325
326impl DeduplicatedBuffers {
327    // Self::push enforces len < u32::max
328    #[allow(clippy::cast_possible_truncation)]
329    fn len(&self) -> u32 {
330        self.buffers.len() as u32
331    }
332
333    /// Push a new block if not seen before. Returns the idx of the block.
334    fn push(&mut self, block: ByteBuffer) -> u32 {
335        assert!(self.buffers.len() < u32::MAX as usize, "Too many blocks");
336
337        let initial_len = self.len();
338        let id = BufferId::from(&block);
339        match self.buffer_to_idx.entry(id) {
340            Entry::Occupied(idx) => *idx.get(),
341            Entry::Vacant(entry) => {
342                let idx = initial_len;
343                entry.insert(idx);
344                self.buffers.push(block);
345                idx
346            }
347        }
348    }
349
350    fn extend_from_slice(&mut self, buffers: &[ByteBuffer]) -> Vec<u32> {
351        buffers
352            .iter()
353            .map(|buffer| self.push(buffer.clone()))
354            .collect()
355    }
356
357    fn finish(self) -> Arc<[ByteBuffer]> {
358        Arc::from(self.buffers)
359    }
360}
361
362#[derive(PartialEq, Eq, Hash)]
363struct BufferId {
364    // *const u8 stored as usize for `Send`
365    ptr: usize,
366    len: usize,
367}
368
369impl BufferId {
370    fn from(buffer: &ByteBuffer) -> Self {
371        let slice = buffer.as_slice();
372        Self {
373            ptr: slice.as_ptr() as usize,
374            len: slice.len(),
375        }
376    }
377}
378
379#[cfg(test)]
380mod tests {
381    use std::str::from_utf8;
382
383    use itertools::Itertools;
384    use vortex_dtype::{DType, Nullability};
385
386    use crate::ToCanonical;
387    use crate::accessor::ArrayAccessor;
388    use crate::arrays::VarBinViewVTable;
389    use crate::builders::{ArrayBuilder, VarBinViewBuilder};
390
391    #[test]
392    fn test_utf8_builder() {
393        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
394
395        builder.append_option(Some("Hello"));
396        builder.append_option::<&str>(None);
397        builder.append_value("World");
398
399        builder.append_nulls(2);
400
401        builder.append_zeros(2);
402        builder.append_value("test");
403
404        let arr = builder.finish();
405
406        let arr = arr
407            .as_::<VarBinViewVTable>()
408            .with_iterator(|iter| {
409                iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
410                    .collect_vec()
411            })
412            .unwrap();
413        assert_eq!(arr.len(), 8);
414        assert_eq!(
415            arr,
416            vec![
417                Some("Hello".to_string()),
418                None,
419                Some("World".to_string()),
420                None,
421                None,
422                Some("".to_string()),
423                Some("".to_string()),
424                Some("test".to_string()),
425            ]
426        );
427    }
428
429    #[test]
430    fn test_utf8_builder_with_extend() {
431        let array = {
432            let mut builder =
433                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
434            builder.append_null();
435            builder.append_value("Hello2");
436            builder.finish()
437        };
438        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
439
440        builder.append_option(Some("Hello1"));
441        builder.extend_from_array(&array).unwrap();
442        builder.append_nulls(2);
443        builder.append_value("Hello3");
444
445        let arr = builder.finish().to_varbinview().unwrap();
446
447        let arr = arr
448            .with_iterator(|iter| {
449                iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
450                    .collect_vec()
451            })
452            .unwrap();
453        assert_eq!(arr.len(), 6);
454        assert_eq!(
455            arr,
456            vec![
457                Some("Hello1".to_string()),
458                None,
459                Some("Hello2".to_string()),
460                None,
461                None,
462                Some("Hello3".to_string()),
463            ]
464        );
465    }
466
467    #[test]
468    fn test_buffer_deduplication() {
469        let array = {
470            let mut builder =
471                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
472            builder.append_value("This is a long string that should not be inlined");
473            builder.append_value("short string");
474            builder.finish_into_varbinview()
475        };
476
477        assert_eq!(array.buffers().len(), 1);
478        let mut builder =
479            VarBinViewBuilder::with_buffer_deduplication(DType::Utf8(Nullability::Nullable), 10);
480
481        array.append_to_builder(&mut builder).unwrap();
482        assert_eq!(builder.completed_block_count(), 1);
483
484        array
485            .slice(1, 2)
486            .unwrap()
487            .append_to_builder(&mut builder)
488            .unwrap();
489        array
490            .slice(0, 1)
491            .unwrap()
492            .append_to_builder(&mut builder)
493            .unwrap();
494        assert_eq!(builder.completed_block_count(), 1);
495
496        let array2 = {
497            let mut builder =
498                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
499            builder.append_value("This is a long string that should not be inlined");
500            builder.finish_into_varbinview()
501        };
502
503        array2.append_to_builder(&mut builder).unwrap();
504        assert_eq!(builder.completed_block_count(), 2);
505
506        array
507            .slice(0, 1)
508            .unwrap()
509            .append_to_builder(&mut builder)
510            .unwrap();
511        array2
512            .slice(0, 1)
513            .unwrap()
514            .append_to_builder(&mut builder)
515            .unwrap();
516        assert_eq!(builder.completed_block_count(), 2);
517    }
518}