vortex_array/builders/
varbinview.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::cmp::max;
6use std::sync::Arc;
7
8use vortex_buffer::{Buffer, BufferMut, ByteBuffer, ByteBufferMut};
9use vortex_dtype::{DType, Nullability};
10use vortex_error::{VortexExpect, VortexResult};
11use vortex_mask::Mask;
12use vortex_utils::aliases::hash_map::{Entry, HashMap};
13
14use crate::arrays::{BinaryView, VarBinViewArray};
15use crate::builders::ArrayBuilder;
16use crate::builders::lazy_validity_builder::LazyNullBufferBuilder;
17use crate::{Array, ArrayRef, IntoArray, ToCanonical};
18
19pub struct VarBinViewBuilder {
20    views_builder: BufferMut<BinaryView>,
21    pub null_buffer_builder: LazyNullBufferBuilder,
22    completed: CompletedBuffers,
23    in_progress: ByteBufferMut,
24    nullability: Nullability,
25    dtype: DType,
26}
27
28impl VarBinViewBuilder {
29    // TODO(joe): add a block growth strategy, from arrow
30    const BLOCK_SIZE: u32 = 8 * 8 * 1024;
31
32    pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
33        Self::new(dtype, capacity, Default::default())
34    }
35
36    pub fn with_buffer_deduplication(dtype: DType, capacity: usize) -> Self {
37        Self::new(
38            dtype,
39            capacity,
40            CompletedBuffers::Deduplicated(Default::default()),
41        )
42    }
43
44    fn new(dtype: DType, capacity: usize, completed: CompletedBuffers) -> Self {
45        assert!(
46            matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
47            "VarBinViewBuilder DType must be Utf8 or Binary."
48        );
49        Self {
50            views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
51            null_buffer_builder: LazyNullBufferBuilder::new(capacity),
52            completed,
53            in_progress: ByteBufferMut::empty(),
54            nullability: dtype.nullability(),
55            dtype,
56        }
57    }
58
59    fn append_value_view(&mut self, value: &[u8]) {
60        let length =
61            u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
62        if length <= 12 {
63            self.views_builder.push(BinaryView::make_view(value, 0, 0));
64            return;
65        }
66
67        let required_cap = self.in_progress.len() + value.len();
68        if self.in_progress.capacity() < required_cap {
69            self.flush_in_progress();
70            let to_reserve = max(value.len(), VarBinViewBuilder::BLOCK_SIZE as usize);
71            self.in_progress.reserve(to_reserve);
72        };
73
74        let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
75        self.in_progress.extend_from_slice(value);
76        let view = BinaryView::make_view(
77            value,
78            // buffer offset
79            self.completed.len(),
80            offset,
81        );
82        self.views_builder.push(view);
83    }
84
85    #[inline]
86    pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
87        self.append_value_view(value.as_ref());
88        self.null_buffer_builder.append_non_null();
89    }
90
91    #[inline]
92    pub fn append_option<S: AsRef<[u8]>>(&mut self, value: Option<S>) {
93        match value {
94            Some(value) => self.append_value(value),
95            None => self.append_null(),
96        }
97    }
98
99    #[inline]
100    fn flush_in_progress(&mut self) {
101        if self.in_progress.is_empty() {
102            return;
103        }
104        let block = std::mem::take(&mut self.in_progress).freeze();
105
106        assert!(block.len() < u32::MAX as usize, "Block too large");
107
108        let initial_len = self.completed.len();
109        self.completed.push(block);
110        assert_eq!(
111            self.completed.len(),
112            initial_len + 1,
113            "Invalid state, just completed block already exists"
114        );
115    }
116
117    pub fn completed_block_count(&self) -> u32 {
118        self.completed.len()
119    }
120
121    // Pushes an array of values into the buffer, where the buffers are sections of a
122    // VarBinView and the views are the BinaryView's of the VarBinView *already with their*
123    // buffers adjusted.
124    // The views must all point to sections of the buffers and the validity length must match
125    // the view length.
126    /// ## Panics
127    /// Panics if this builder deduplicates buffers and if any of the given buffers already
128    /// exists on this builder
129    pub fn push_buffer_and_adjusted_views(
130        &mut self,
131        buffer: &[ByteBuffer],
132        views: &Buffer<BinaryView>,
133        validity_mask: Mask,
134    ) {
135        self.flush_in_progress();
136
137        let expected_completed_len = self.completed.len() as usize + buffer.len();
138        self.completed.extend_from_slice(buffer);
139        assert_eq!(
140            self.completed.len() as usize,
141            expected_completed_len,
142            "Some buffers already exist",
143        );
144        self.views_builder.extend_trusted(views.iter().copied());
145        self.push_only_validity_mask(validity_mask);
146
147        debug_assert_eq!(self.null_buffer_builder.len(), self.views_builder.len())
148    }
149
150    pub fn finish_into_varbinview(&mut self) -> VarBinViewArray {
151        self.flush_in_progress();
152        let buffers = std::mem::take(&mut self.completed);
153
154        assert_eq!(
155            self.views_builder.len(),
156            self.null_buffer_builder.len(),
157            "View and validity length must match"
158        );
159
160        let validity = self
161            .null_buffer_builder
162            .finish_with_nullability(self.nullability);
163
164        // SAFETY: the builder methods check safety at each step.
165        unsafe {
166            VarBinViewArray::new_unchecked(
167                std::mem::take(&mut self.views_builder).freeze(),
168                buffers.finish(),
169                std::mem::replace(&mut self.dtype, DType::Null),
170                validity,
171            )
172        }
173    }
174}
175
176impl VarBinViewBuilder {
177    // Pushes a validity mask into the builder not affecting the views or buffers
178    fn push_only_validity_mask(&mut self, validity_mask: Mask) {
179        self.null_buffer_builder.append_validity_mask(validity_mask);
180    }
181}
182
183impl ArrayBuilder for VarBinViewBuilder {
184    fn as_any(&self) -> &dyn Any {
185        self
186    }
187
188    fn as_any_mut(&mut self) -> &mut dyn Any {
189        self
190    }
191
192    #[inline]
193    fn dtype(&self) -> &DType {
194        &self.dtype
195    }
196
197    #[inline]
198    fn len(&self) -> usize {
199        self.null_buffer_builder.len()
200    }
201
202    #[inline]
203    fn append_zeros(&mut self, n: usize) {
204        self.views_builder.push_n(BinaryView::empty_view(), n);
205        self.null_buffer_builder.append_n_non_nulls(n);
206    }
207
208    #[inline]
209    fn append_nulls(&mut self, n: usize) {
210        self.views_builder.push_n(BinaryView::empty_view(), n);
211        self.null_buffer_builder.append_n_nulls(n);
212    }
213
214    #[inline]
215    fn extend_from_array(&mut self, array: &dyn Array) -> VortexResult<()> {
216        let array = array.to_varbinview()?;
217        self.flush_in_progress();
218
219        let new_indices = self.completed.extend_from_slice(array.buffers());
220
221        match new_indices {
222            NewIndices::ConstantOffset(offset) => {
223                self.views_builder
224                    .extend_trusted(array.views().iter().map(|view| view.offset_view(offset)));
225            }
226            NewIndices::LookupArray(lookup) => {
227                self.views_builder
228                    .extend_trusted(array.views().iter().map(|view| {
229                        if view.is_inlined() {
230                            *view
231                        } else {
232                            let new_buffer_idx = lookup[view.as_view().buffer_index() as usize];
233                            view.with_buffer_idx(new_buffer_idx)
234                        }
235                    }));
236            }
237        }
238
239        self.push_only_validity_mask(array.validity_mask());
240
241        Ok(())
242    }
243
244    fn ensure_capacity(&mut self, capacity: usize) {
245        if capacity > self.views_builder.capacity() {
246            self.views_builder
247                .reserve(capacity - self.views_builder.len());
248            self.null_buffer_builder.ensure_capacity(capacity);
249        }
250    }
251
252    fn set_validity(&mut self, validity: Mask) {
253        self.null_buffer_builder = LazyNullBufferBuilder::new(validity.len());
254        self.null_buffer_builder.append_validity_mask(validity);
255    }
256
257    fn finish(&mut self) -> ArrayRef {
258        self.finish_into_varbinview().into_array()
259    }
260}
261
262enum CompletedBuffers {
263    Default(Vec<ByteBuffer>),
264    Deduplicated(DeduplicatedBuffers),
265}
266
267impl Default for CompletedBuffers {
268    fn default() -> Self {
269        Self::Default(Vec::new())
270    }
271}
272
273// Self::push enforces len < u32::max
274#[allow(clippy::cast_possible_truncation)]
275impl CompletedBuffers {
276    fn len(&self) -> u32 {
277        match self {
278            Self::Default(buffers) => buffers.len() as u32,
279            Self::Deduplicated(buffers) => buffers.len(),
280        }
281    }
282
283    fn push(&mut self, block: ByteBuffer) -> u32 {
284        match self {
285            Self::Default(buffers) => {
286                assert!(buffers.len() < u32::MAX as usize, "Too many blocks");
287                buffers.push(block);
288                self.len()
289            }
290            Self::Deduplicated(buffers) => buffers.push(block),
291        }
292    }
293
294    fn extend_from_slice(&mut self, new_buffers: &[ByteBuffer]) -> NewIndices {
295        match self {
296            Self::Default(buffers) => {
297                let offset = buffers.len() as u32;
298                buffers.extend_from_slice(new_buffers);
299                NewIndices::ConstantOffset(offset)
300            }
301            Self::Deduplicated(buffers) => {
302                NewIndices::LookupArray(buffers.extend_from_slice(new_buffers))
303            }
304        }
305    }
306
307    fn finish(self) -> Arc<[ByteBuffer]> {
308        match self {
309            Self::Default(buffers) => Arc::from(buffers),
310            Self::Deduplicated(buffers) => buffers.finish(),
311        }
312    }
313}
314
315enum NewIndices {
316    // add a constant offset to get the new idx
317    ConstantOffset(u32),
318    // lookup from the given array to get the new idx
319    LookupArray(Vec<u32>),
320}
321
322#[derive(Default)]
323struct DeduplicatedBuffers {
324    buffers: Vec<ByteBuffer>,
325    buffer_to_idx: HashMap<BufferId, u32>,
326}
327
328impl DeduplicatedBuffers {
329    // Self::push enforces len < u32::max
330    #[allow(clippy::cast_possible_truncation)]
331    fn len(&self) -> u32 {
332        self.buffers.len() as u32
333    }
334
335    /// Push a new block if not seen before. Returns the idx of the block.
336    fn push(&mut self, block: ByteBuffer) -> u32 {
337        assert!(self.buffers.len() < u32::MAX as usize, "Too many blocks");
338
339        let initial_len = self.len();
340        let id = BufferId::from(&block);
341        match self.buffer_to_idx.entry(id) {
342            Entry::Occupied(idx) => *idx.get(),
343            Entry::Vacant(entry) => {
344                let idx = initial_len;
345                entry.insert(idx);
346                self.buffers.push(block);
347                idx
348            }
349        }
350    }
351
352    fn extend_from_slice(&mut self, buffers: &[ByteBuffer]) -> Vec<u32> {
353        buffers
354            .iter()
355            .map(|buffer| self.push(buffer.clone()))
356            .collect()
357    }
358
359    fn finish(self) -> Arc<[ByteBuffer]> {
360        Arc::from(self.buffers)
361    }
362}
363
364#[derive(PartialEq, Eq, Hash)]
365struct BufferId {
366    // *const u8 stored as usize for `Send`
367    ptr: usize,
368    len: usize,
369}
370
371impl BufferId {
372    fn from(buffer: &ByteBuffer) -> Self {
373        let slice = buffer.as_slice();
374        Self {
375            ptr: slice.as_ptr() as usize,
376            len: slice.len(),
377        }
378    }
379}
380
381#[cfg(test)]
382mod tests {
383    use std::str::from_utf8;
384
385    use itertools::Itertools;
386    use vortex_dtype::{DType, Nullability};
387
388    use crate::ToCanonical;
389    use crate::accessor::ArrayAccessor;
390    use crate::arrays::VarBinViewVTable;
391    use crate::builders::{ArrayBuilder, VarBinViewBuilder};
392
393    #[test]
394    fn test_utf8_builder() {
395        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
396
397        builder.append_option(Some("Hello"));
398        builder.append_option::<&str>(None);
399        builder.append_value("World");
400
401        builder.append_nulls(2);
402
403        builder.append_zeros(2);
404        builder.append_value("test");
405
406        let arr = builder.finish();
407
408        let arr = arr
409            .as_::<VarBinViewVTable>()
410            .with_iterator(|iter| {
411                iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
412                    .collect_vec()
413            })
414            .unwrap();
415        assert_eq!(arr.len(), 8);
416        assert_eq!(
417            arr,
418            vec![
419                Some("Hello".to_string()),
420                None,
421                Some("World".to_string()),
422                None,
423                None,
424                Some("".to_string()),
425                Some("".to_string()),
426                Some("test".to_string()),
427            ]
428        );
429    }
430
431    #[test]
432    fn test_utf8_builder_with_extend() {
433        let array = {
434            let mut builder =
435                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
436            builder.append_null();
437            builder.append_value("Hello2");
438            builder.finish()
439        };
440        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
441
442        builder.append_option(Some("Hello1"));
443        builder.extend_from_array(&array).unwrap();
444        builder.append_nulls(2);
445        builder.append_value("Hello3");
446
447        let arr = builder.finish().to_varbinview().unwrap();
448
449        let arr = arr
450            .with_iterator(|iter| {
451                iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
452                    .collect_vec()
453            })
454            .unwrap();
455        assert_eq!(arr.len(), 6);
456        assert_eq!(
457            arr,
458            vec![
459                Some("Hello1".to_string()),
460                None,
461                Some("Hello2".to_string()),
462                None,
463                None,
464                Some("Hello3".to_string()),
465            ]
466        );
467    }
468
469    #[test]
470    fn test_buffer_deduplication() {
471        let array = {
472            let mut builder =
473                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
474            builder.append_value("This is a long string that should not be inlined");
475            builder.append_value("short string");
476            builder.finish_into_varbinview()
477        };
478
479        assert_eq!(array.buffers().len(), 1);
480        let mut builder =
481            VarBinViewBuilder::with_buffer_deduplication(DType::Utf8(Nullability::Nullable), 10);
482
483        array.append_to_builder(&mut builder).unwrap();
484        assert_eq!(builder.completed_block_count(), 1);
485
486        array.slice(1..2).append_to_builder(&mut builder).unwrap();
487        array.slice(0..1).append_to_builder(&mut builder).unwrap();
488        assert_eq!(builder.completed_block_count(), 1);
489
490        let array2 = {
491            let mut builder =
492                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
493            builder.append_value("This is a long string that should not be inlined");
494            builder.finish_into_varbinview()
495        };
496
497        array2.append_to_builder(&mut builder).unwrap();
498        assert_eq!(builder.completed_block_count(), 2);
499
500        array.slice(0..1).append_to_builder(&mut builder).unwrap();
501        array2.slice(0..1).append_to_builder(&mut builder).unwrap();
502        assert_eq!(builder.completed_block_count(), 2);
503    }
504}