vortex_array/builders/
varbinview.rs

1use std::any::Any;
2use std::cmp::max;
3
4use vortex_buffer::{BufferMut, ByteBuffer};
5use vortex_dtype::{DType, Nullability};
6use vortex_error::{VortexExpect, VortexResult};
7use vortex_mask::Mask;
8
9use crate::arrays::{BinaryView, VarBinViewArray};
10use crate::builders::ArrayBuilder;
11use crate::builders::lazy_validity_builder::LazyNullBufferBuilder;
12use crate::{Array, ArrayRef, ToCanonical};
13
14pub struct VarBinViewBuilder {
15    views_builder: BufferMut<BinaryView>,
16    pub null_buffer_builder: LazyNullBufferBuilder,
17    completed: Vec<ByteBuffer>,
18    in_progress: Vec<u8>,
19    nullability: Nullability,
20    dtype: DType,
21}
22
23impl VarBinViewBuilder {
24    // TODO(joe): add a block growth strategy, from arrow
25    const BLOCK_SIZE: u32 = 8 * 8 * 1024;
26
27    pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
28        assert!(
29            matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
30            "VarBinViewBuilder DType must be Utf8 or Binary."
31        );
32        Self {
33            views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
34            null_buffer_builder: LazyNullBufferBuilder::new(capacity),
35            completed: vec![],
36            in_progress: vec![],
37            nullability: dtype.nullability(),
38            dtype,
39        }
40    }
41
42    fn append_value_view(&mut self, value: &[u8]) {
43        let v: &[u8] = value;
44        let length =
45            u32::try_from(v.len()).vortex_expect("cannot have a single string >2^32 in length");
46        if length <= 12 {
47            self.views_builder.push(BinaryView::new_inlined(v));
48            return;
49        }
50
51        let required_cap = self.in_progress.len() + v.len();
52        if self.in_progress.capacity() < required_cap {
53            self.flush_in_progress();
54            let to_reserve = max(v.len(), VarBinViewBuilder::BLOCK_SIZE as usize);
55            self.in_progress.reserve(to_reserve);
56        };
57        let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
58        self.in_progress.extend_from_slice(v);
59
60        let view = BinaryView::new_view(
61            length,
62            // inline the first 4 bytes of the view
63            v[0..4].try_into().vortex_expect("length already checked"),
64            // buffer offset
65            u32::try_from(self.completed.len()).vortex_expect("too many buffers"),
66            offset,
67        );
68        self.views_builder.push(view);
69    }
70
71    #[inline]
72    pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
73        self.append_value_view(value.as_ref());
74        self.null_buffer_builder.append_non_null();
75    }
76
77    #[inline]
78    pub fn append_option<S: AsRef<[u8]>>(&mut self, value: Option<S>) {
79        match value {
80            Some(value) => self.append_value(value),
81            None => self.append_null(),
82        }
83    }
84
85    #[inline]
86    fn flush_in_progress(&mut self) {
87        if !self.in_progress.is_empty() {
88            let f = ByteBuffer::from(std::mem::take(&mut self.in_progress));
89            self.push_completed(f)
90        }
91    }
92
93    fn push_completed(&mut self, block: ByteBuffer) {
94        assert!(block.len() < u32::MAX as usize, "Block too large");
95        assert!(self.completed.len() < u32::MAX as usize, "Too many blocks");
96        self.completed.push(block);
97    }
98
99    pub fn completed_block_count(&self) -> usize {
100        self.completed.len()
101    }
102
103    // Pushes an array of values into the buffer, where the buffers are sections of a
104    // VarBinView and the views are the BinaryView's of the VarBinView *already with their*
105    // buffers adjusted.
106    // The views must all point to sections of the buffers and the validity length must match
107    // the view length.
108    pub fn push_buffer_and_adjusted_views(
109        &mut self,
110        buffer: impl IntoIterator<Item = ByteBuffer>,
111        views: impl IntoIterator<Item = BinaryView>,
112        validity_mask: Mask,
113    ) {
114        self.flush_in_progress();
115
116        self.completed.extend(buffer);
117        self.views_builder.extend(views);
118        self.push_only_validity_mask(validity_mask);
119
120        debug_assert_eq!(self.null_buffer_builder.len(), self.views_builder.len())
121    }
122
123    // Pushes a validity mask into the builder not affecting the views or buffers
124    fn push_only_validity_mask(&mut self, validity_mask: Mask) {
125        self.null_buffer_builder.append_validity_mask(validity_mask);
126    }
127}
128
129impl ArrayBuilder for VarBinViewBuilder {
130    fn as_any(&self) -> &dyn Any {
131        self
132    }
133
134    fn as_any_mut(&mut self) -> &mut dyn Any {
135        self
136    }
137
138    #[inline]
139    fn dtype(&self) -> &DType {
140        &self.dtype
141    }
142
143    #[inline]
144    fn len(&self) -> usize {
145        self.null_buffer_builder.len()
146    }
147
148    #[inline]
149    fn append_zeros(&mut self, n: usize) {
150        self.views_builder.push_n(BinaryView::empty_view(), n);
151        self.null_buffer_builder.append_n_non_nulls(n);
152    }
153
154    #[inline]
155    fn append_nulls(&mut self, n: usize) {
156        self.views_builder.push_n(BinaryView::empty_view(), n);
157        self.null_buffer_builder.append_n_nulls(n);
158    }
159
160    #[inline]
161    fn extend_from_array(&mut self, array: &dyn Array) -> VortexResult<()> {
162        let array = array.to_varbinview()?;
163        self.flush_in_progress();
164
165        let buffers_offset = u32::try_from(self.completed.len())?;
166        self.completed.extend_from_slice(array.buffers());
167
168        self.views_builder.extend(
169            array
170                .views()
171                .iter()
172                .map(|view| view.offset_view(buffers_offset)),
173        );
174
175        self.push_only_validity_mask(array.validity_mask()?);
176
177        Ok(())
178    }
179
180    fn finish(&mut self) -> ArrayRef {
181        self.flush_in_progress();
182        let buffers = std::mem::take(&mut self.completed);
183
184        assert_eq!(
185            self.views_builder.len(),
186            self.null_buffer_builder.len(),
187            "View and validity length must match"
188        );
189
190        let validity = self
191            .null_buffer_builder
192            .finish_with_nullability(self.nullability);
193
194        VarBinViewArray::try_new(
195            std::mem::take(&mut self.views_builder).freeze(),
196            buffers,
197            std::mem::replace(&mut self.dtype, DType::Null),
198            validity,
199        )
200        .vortex_expect("VarBinViewArray components should be valid.")
201        .into_array()
202    }
203}
204
205#[cfg(test)]
206mod tests {
207    use std::str::from_utf8;
208
209    use itertools::Itertools;
210    use vortex_dtype::{DType, Nullability};
211
212    use crate::ToCanonical;
213    use crate::accessor::ArrayAccessor;
214    use crate::array::ArrayExt;
215    use crate::arrays::VarBinViewArray;
216    use crate::builders::{ArrayBuilder, VarBinViewBuilder};
217
218    #[test]
219    fn test_utf8_builder() {
220        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
221
222        builder.append_option(Some("Hello"));
223        builder.append_option::<&str>(None);
224        builder.append_value("World");
225
226        builder.append_nulls(2);
227
228        builder.append_zeros(2);
229        builder.append_value("test");
230
231        let arr = builder.finish();
232
233        let arr = arr
234            .as_::<VarBinViewArray>()
235            .with_iterator(|iter| {
236                iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
237                    .collect_vec()
238            })
239            .unwrap();
240        assert_eq!(arr.len(), 8);
241        assert_eq!(
242            arr,
243            vec![
244                Some("Hello".to_string()),
245                None,
246                Some("World".to_string()),
247                None,
248                None,
249                Some("".to_string()),
250                Some("".to_string()),
251                Some("test".to_string()),
252            ]
253        );
254    }
255    #[test]
256    fn test_utf8_builder_with_extend() {
257        let array = {
258            let mut builder =
259                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
260            builder.append_null();
261            builder.append_value("Hello2");
262            builder.finish()
263        };
264        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
265
266        builder.append_option(Some("Hello1"));
267        builder.extend_from_array(&array).unwrap();
268        builder.append_nulls(2);
269        builder.append_value("Hello3");
270
271        let arr = builder.finish().to_varbinview().unwrap();
272
273        let arr = arr
274            .with_iterator(|iter| {
275                iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
276                    .collect_vec()
277            })
278            .unwrap();
279        assert_eq!(arr.len(), 6);
280        assert_eq!(
281            arr,
282            vec![
283                Some("Hello1".to_string()),
284                None,
285                Some("Hello2".to_string()),
286                None,
287                None,
288                Some("Hello3".to_string()),
289            ]
290        );
291    }
292}