vortex_array/builders/
varbinview.rs

1use std::any::Any;
2use std::cmp::max;
3
4use vortex_buffer::{BufferMut, ByteBuffer, ByteBufferMut};
5use vortex_dtype::{DType, Nullability};
6use vortex_error::{VortexExpect, VortexResult};
7use vortex_mask::Mask;
8
9use crate::arrays::{BinaryView, VarBinViewArray};
10use crate::builders::ArrayBuilder;
11use crate::builders::lazy_validity_builder::LazyNullBufferBuilder;
12use crate::{Array, ArrayRef, ToCanonical};
13
14pub struct VarBinViewBuilder {
15    views_builder: BufferMut<BinaryView>,
16    pub null_buffer_builder: LazyNullBufferBuilder,
17    completed: Vec<ByteBuffer>,
18    in_progress: ByteBufferMut,
19    nullability: Nullability,
20    dtype: DType,
21}
22
23impl VarBinViewBuilder {
24    // TODO(joe): add a block growth strategy, from arrow
25    const BLOCK_SIZE: u32 = 8 * 8 * 1024;
26
27    pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
28        assert!(
29            matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
30            "VarBinViewBuilder DType must be Utf8 or Binary."
31        );
32        Self {
33            views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
34            null_buffer_builder: LazyNullBufferBuilder::new(capacity),
35            completed: vec![],
36            in_progress: ByteBufferMut::empty(),
37            nullability: dtype.nullability(),
38            dtype,
39        }
40    }
41
42    fn append_value_view(&mut self, value: &[u8]) {
43        let length =
44            u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
45        if length <= 12 {
46            self.views_builder.push(BinaryView::make_view(value, 0, 0));
47            return;
48        }
49
50        let required_cap = self.in_progress.len() + value.len();
51        if self.in_progress.capacity() < required_cap {
52            self.flush_in_progress();
53            let to_reserve = max(value.len(), VarBinViewBuilder::BLOCK_SIZE as usize);
54            self.in_progress.reserve(to_reserve);
55        };
56
57        let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
58        self.in_progress.extend_from_slice(value);
59        let view = BinaryView::make_view(
60            value,
61            // buffer offset
62            u32::try_from(self.completed.len()).vortex_expect("too many buffers"),
63            offset,
64        );
65        self.views_builder.push(view);
66    }
67
68    #[inline]
69    pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
70        self.append_value_view(value.as_ref());
71        self.null_buffer_builder.append_non_null();
72    }
73
74    #[inline]
75    pub fn append_option<S: AsRef<[u8]>>(&mut self, value: Option<S>) {
76        match value {
77            Some(value) => self.append_value(value),
78            None => self.append_null(),
79        }
80    }
81
82    #[inline]
83    fn flush_in_progress(&mut self) {
84        if !self.in_progress.is_empty() {
85            let block = std::mem::take(&mut self.in_progress).freeze();
86            self.push_completed(block)
87        }
88    }
89
90    fn push_completed(&mut self, block: ByteBuffer) {
91        assert!(block.len() < u32::MAX as usize, "Block too large");
92        assert!(self.completed.len() < u32::MAX as usize, "Too many blocks");
93        self.completed.push(block);
94    }
95
96    pub fn completed_block_count(&self) -> usize {
97        self.completed.len()
98    }
99
100    // Pushes an array of values into the buffer, where the buffers are sections of a
101    // VarBinView and the views are the BinaryView's of the VarBinView *already with their*
102    // buffers adjusted.
103    // The views must all point to sections of the buffers and the validity length must match
104    // the view length.
105    pub fn push_buffer_and_adjusted_views(
106        &mut self,
107        buffer: impl IntoIterator<Item = ByteBuffer>,
108        views: impl IntoIterator<Item = BinaryView>,
109        validity_mask: Mask,
110    ) {
111        self.flush_in_progress();
112
113        self.completed.extend(buffer);
114        self.views_builder.extend(views);
115        self.push_only_validity_mask(validity_mask);
116
117        debug_assert_eq!(self.null_buffer_builder.len(), self.views_builder.len())
118    }
119
120    // Pushes a validity mask into the builder not affecting the views or buffers
121    fn push_only_validity_mask(&mut self, validity_mask: Mask) {
122        self.null_buffer_builder.append_validity_mask(validity_mask);
123    }
124}
125
126impl ArrayBuilder for VarBinViewBuilder {
127    fn as_any(&self) -> &dyn Any {
128        self
129    }
130
131    fn as_any_mut(&mut self) -> &mut dyn Any {
132        self
133    }
134
135    #[inline]
136    fn dtype(&self) -> &DType {
137        &self.dtype
138    }
139
140    #[inline]
141    fn len(&self) -> usize {
142        self.null_buffer_builder.len()
143    }
144
145    #[inline]
146    fn append_zeros(&mut self, n: usize) {
147        self.views_builder.push_n(BinaryView::empty_view(), n);
148        self.null_buffer_builder.append_n_non_nulls(n);
149    }
150
151    #[inline]
152    fn append_nulls(&mut self, n: usize) {
153        self.views_builder.push_n(BinaryView::empty_view(), n);
154        self.null_buffer_builder.append_n_nulls(n);
155    }
156
157    #[inline]
158    fn extend_from_array(&mut self, array: &dyn Array) -> VortexResult<()> {
159        let array = array.to_varbinview()?;
160        self.flush_in_progress();
161
162        let buffers_offset = u32::try_from(self.completed.len())?;
163        self.completed.extend_from_slice(array.buffers());
164
165        self.views_builder.extend(
166            array
167                .views()
168                .iter()
169                .map(|view| view.offset_view(buffers_offset)),
170        );
171
172        self.push_only_validity_mask(array.validity_mask()?);
173
174        Ok(())
175    }
176
177    fn ensure_capacity(&mut self, capacity: usize) {
178        if capacity > self.views_builder.capacity() {
179            self.views_builder
180                .reserve(capacity - self.views_builder.len());
181            self.null_buffer_builder.ensure_capacity(capacity);
182        }
183    }
184
185    fn set_validity(&mut self, validity: Mask) {
186        self.null_buffer_builder = LazyNullBufferBuilder::new(validity.len());
187        self.null_buffer_builder.append_validity_mask(validity);
188    }
189
190    fn finish(&mut self) -> ArrayRef {
191        self.flush_in_progress();
192        let buffers = std::mem::take(&mut self.completed);
193
194        assert_eq!(
195            self.views_builder.len(),
196            self.null_buffer_builder.len(),
197            "View and validity length must match"
198        );
199
200        let validity = self
201            .null_buffer_builder
202            .finish_with_nullability(self.nullability);
203
204        VarBinViewArray::try_new(
205            std::mem::take(&mut self.views_builder).freeze(),
206            buffers,
207            std::mem::replace(&mut self.dtype, DType::Null),
208            validity,
209        )
210        .vortex_expect("VarBinViewArray components should be valid.")
211        .into_array()
212    }
213}
214
215#[cfg(test)]
216mod tests {
217    use std::str::from_utf8;
218
219    use itertools::Itertools;
220    use vortex_dtype::{DType, Nullability};
221
222    use crate::ToCanonical;
223    use crate::accessor::ArrayAccessor;
224    use crate::array::ArrayExt;
225    use crate::arrays::VarBinViewArray;
226    use crate::builders::{ArrayBuilder, VarBinViewBuilder};
227
228    #[test]
229    fn test_utf8_builder() {
230        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
231
232        builder.append_option(Some("Hello"));
233        builder.append_option::<&str>(None);
234        builder.append_value("World");
235
236        builder.append_nulls(2);
237
238        builder.append_zeros(2);
239        builder.append_value("test");
240
241        let arr = builder.finish();
242
243        let arr = arr
244            .as_::<VarBinViewArray>()
245            .with_iterator(|iter| {
246                iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
247                    .collect_vec()
248            })
249            .unwrap();
250        assert_eq!(arr.len(), 8);
251        assert_eq!(
252            arr,
253            vec![
254                Some("Hello".to_string()),
255                None,
256                Some("World".to_string()),
257                None,
258                None,
259                Some("".to_string()),
260                Some("".to_string()),
261                Some("test".to_string()),
262            ]
263        );
264    }
265    #[test]
266    fn test_utf8_builder_with_extend() {
267        let array = {
268            let mut builder =
269                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
270            builder.append_null();
271            builder.append_value("Hello2");
272            builder.finish()
273        };
274        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
275
276        builder.append_option(Some("Hello1"));
277        builder.extend_from_array(&array).unwrap();
278        builder.append_nulls(2);
279        builder.append_value("Hello3");
280
281        let arr = builder.finish().to_varbinview().unwrap();
282
283        let arr = arr
284            .with_iterator(|iter| {
285                iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
286                    .collect_vec()
287            })
288            .unwrap();
289        assert_eq!(arr.len(), 6);
290        assert_eq!(
291            arr,
292            vec![
293                Some("Hello1".to_string()),
294                None,
295                Some("Hello2".to_string()),
296                None,
297                None,
298                Some("Hello3".to_string()),
299            ]
300        );
301    }
302}