vortex_array/builders/
varbinview.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::cmp::max;
6
7use vortex_buffer::{Buffer, BufferMut, ByteBuffer, ByteBufferMut};
8use vortex_dtype::{DType, Nullability};
9use vortex_error::{VortexExpect, VortexResult};
10use vortex_mask::Mask;
11
12use crate::arrays::{BinaryView, VarBinViewArray};
13use crate::builders::ArrayBuilder;
14use crate::builders::lazy_validity_builder::LazyNullBufferBuilder;
15use crate::{Array, ArrayRef, IntoArray, ToCanonical};
16
17pub struct VarBinViewBuilder {
18    views_builder: BufferMut<BinaryView>,
19    pub null_buffer_builder: LazyNullBufferBuilder,
20    completed: Vec<ByteBuffer>,
21    in_progress: ByteBufferMut,
22    nullability: Nullability,
23    dtype: DType,
24}
25
26impl VarBinViewBuilder {
27    // TODO(joe): add a block growth strategy, from arrow
28    const BLOCK_SIZE: u32 = 8 * 8 * 1024;
29
30    pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
31        assert!(
32            matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
33            "VarBinViewBuilder DType must be Utf8 or Binary."
34        );
35        Self {
36            views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
37            null_buffer_builder: LazyNullBufferBuilder::new(capacity),
38            completed: vec![],
39            in_progress: ByteBufferMut::empty(),
40            nullability: dtype.nullability(),
41            dtype,
42        }
43    }
44
45    fn append_value_view(&mut self, value: &[u8]) {
46        let length =
47            u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
48        if length <= 12 {
49            self.views_builder.push(BinaryView::make_view(value, 0, 0));
50            return;
51        }
52
53        let required_cap = self.in_progress.len() + value.len();
54        if self.in_progress.capacity() < required_cap {
55            self.flush_in_progress();
56            let to_reserve = max(value.len(), VarBinViewBuilder::BLOCK_SIZE as usize);
57            self.in_progress.reserve(to_reserve);
58        };
59
60        let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
61        self.in_progress.extend_from_slice(value);
62        let view = BinaryView::make_view(
63            value,
64            // buffer offset
65            u32::try_from(self.completed.len()).vortex_expect("too many buffers"),
66            offset,
67        );
68        self.views_builder.push(view);
69    }
70
71    #[inline]
72    pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
73        self.append_value_view(value.as_ref());
74        self.null_buffer_builder.append_non_null();
75    }
76
77    #[inline]
78    pub fn append_option<S: AsRef<[u8]>>(&mut self, value: Option<S>) {
79        match value {
80            Some(value) => self.append_value(value),
81            None => self.append_null(),
82        }
83    }
84
85    #[inline]
86    fn flush_in_progress(&mut self) {
87        if !self.in_progress.is_empty() {
88            let block = std::mem::take(&mut self.in_progress).freeze();
89            self.push_completed(block)
90        }
91    }
92
93    fn push_completed(&mut self, block: ByteBuffer) {
94        assert!(block.len() < u32::MAX as usize, "Block too large");
95        assert!(self.completed.len() < u32::MAX as usize, "Too many blocks");
96        self.completed.push(block);
97    }
98
99    pub fn completed_block_count(&self) -> usize {
100        self.completed.len()
101    }
102
103    // Pushes an array of values into the buffer, where the buffers are sections of a
104    // VarBinView and the views are the BinaryView's of the VarBinView *already with their*
105    // buffers adjusted.
106    // The views must all point to sections of the buffers and the validity length must match
107    // the view length.
108    pub fn push_buffer_and_adjusted_views(
109        &mut self,
110        buffer: &[ByteBuffer],
111        views: &Buffer<BinaryView>,
112        validity_mask: Mask,
113    ) {
114        self.flush_in_progress();
115
116        self.completed.extend(buffer.iter().cloned());
117        self.views_builder.extend_trusted(views.iter().copied());
118        self.push_only_validity_mask(validity_mask);
119
120        debug_assert_eq!(self.null_buffer_builder.len(), self.views_builder.len())
121    }
122
123    pub fn finish_into_varbinview(&mut self) -> VarBinViewArray {
124        self.flush_in_progress();
125        let buffers = std::mem::take(&mut self.completed);
126
127        assert_eq!(
128            self.views_builder.len(),
129            self.null_buffer_builder.len(),
130            "View and validity length must match"
131        );
132
133        let validity = self
134            .null_buffer_builder
135            .finish_with_nullability(self.nullability);
136
137        VarBinViewArray::try_new(
138            std::mem::take(&mut self.views_builder).freeze(),
139            buffers,
140            std::mem::replace(&mut self.dtype, DType::Null),
141            validity,
142        )
143        .vortex_expect("VarBinViewArray components should be valid.")
144    }
145}
146
147impl VarBinViewBuilder {
148    // Pushes a validity mask into the builder not affecting the views or buffers
149    fn push_only_validity_mask(&mut self, validity_mask: Mask) {
150        self.null_buffer_builder.append_validity_mask(validity_mask);
151    }
152}
153
154impl ArrayBuilder for VarBinViewBuilder {
155    fn as_any(&self) -> &dyn Any {
156        self
157    }
158
159    fn as_any_mut(&mut self) -> &mut dyn Any {
160        self
161    }
162
163    #[inline]
164    fn dtype(&self) -> &DType {
165        &self.dtype
166    }
167
168    #[inline]
169    fn len(&self) -> usize {
170        self.null_buffer_builder.len()
171    }
172
173    #[inline]
174    fn append_zeros(&mut self, n: usize) {
175        self.views_builder.push_n(BinaryView::empty_view(), n);
176        self.null_buffer_builder.append_n_non_nulls(n);
177    }
178
179    #[inline]
180    fn append_nulls(&mut self, n: usize) {
181        self.views_builder.push_n(BinaryView::empty_view(), n);
182        self.null_buffer_builder.append_n_nulls(n);
183    }
184
185    #[inline]
186    fn extend_from_array(&mut self, array: &dyn Array) -> VortexResult<()> {
187        let array = array.to_varbinview()?;
188        self.flush_in_progress();
189
190        let buffers_offset = u32::try_from(self.completed.len())?;
191        self.completed.extend_from_slice(array.buffers());
192
193        self.views_builder.extend_trusted(
194            array
195                .views()
196                .iter()
197                .map(|view| view.offset_view(buffers_offset)),
198        );
199
200        self.push_only_validity_mask(array.validity_mask()?);
201
202        Ok(())
203    }
204
205    fn ensure_capacity(&mut self, capacity: usize) {
206        if capacity > self.views_builder.capacity() {
207            self.views_builder
208                .reserve(capacity - self.views_builder.len());
209            self.null_buffer_builder.ensure_capacity(capacity);
210        }
211    }
212
213    fn set_validity(&mut self, validity: Mask) {
214        self.null_buffer_builder = LazyNullBufferBuilder::new(validity.len());
215        self.null_buffer_builder.append_validity_mask(validity);
216    }
217
218    fn finish(&mut self) -> ArrayRef {
219        self.finish_into_varbinview().into_array()
220    }
221}
222
223#[cfg(test)]
224mod tests {
225    use std::str::from_utf8;
226
227    use itertools::Itertools;
228    use vortex_dtype::{DType, Nullability};
229
230    use crate::ToCanonical;
231    use crate::accessor::ArrayAccessor;
232    use crate::arrays::VarBinViewVTable;
233    use crate::builders::{ArrayBuilder, VarBinViewBuilder};
234
235    #[test]
236    fn test_utf8_builder() {
237        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
238
239        builder.append_option(Some("Hello"));
240        builder.append_option::<&str>(None);
241        builder.append_value("World");
242
243        builder.append_nulls(2);
244
245        builder.append_zeros(2);
246        builder.append_value("test");
247
248        let arr = builder.finish();
249
250        let arr = arr
251            .as_::<VarBinViewVTable>()
252            .with_iterator(|iter| {
253                iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
254                    .collect_vec()
255            })
256            .unwrap();
257        assert_eq!(arr.len(), 8);
258        assert_eq!(
259            arr,
260            vec![
261                Some("Hello".to_string()),
262                None,
263                Some("World".to_string()),
264                None,
265                None,
266                Some("".to_string()),
267                Some("".to_string()),
268                Some("test".to_string()),
269            ]
270        );
271    }
272    #[test]
273    fn test_utf8_builder_with_extend() {
274        let array = {
275            let mut builder =
276                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
277            builder.append_null();
278            builder.append_value("Hello2");
279            builder.finish()
280        };
281        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
282
283        builder.append_option(Some("Hello1"));
284        builder.extend_from_array(&array).unwrap();
285        builder.append_nulls(2);
286        builder.append_value("Hello3");
287
288        let arr = builder.finish().to_varbinview().unwrap();
289
290        let arr = arr
291            .with_iterator(|iter| {
292                iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
293                    .collect_vec()
294            })
295            .unwrap();
296        assert_eq!(arr.len(), 6);
297        assert_eq!(
298            arr,
299            vec![
300                Some("Hello1".to_string()),
301                None,
302                Some("Hello2".to_string()),
303                None,
304                None,
305                Some("Hello3".to_string()),
306            ]
307        );
308    }
309}