vortex_array/builders/
varbinview.rs1use std::any::Any;
2use std::cmp::max;
3
4use vortex_buffer::{BufferMut, ByteBuffer};
5use vortex_dtype::{DType, Nullability};
6use vortex_error::{VortexExpect, VortexResult};
7use vortex_mask::Mask;
8
9use crate::arrays::{BinaryView, VarBinViewArray};
10use crate::builders::ArrayBuilder;
11use crate::builders::lazy_validity_builder::LazyNullBufferBuilder;
12use crate::{Array, ArrayRef, ToCanonical};
13
14pub struct VarBinViewBuilder {
15 views_builder: BufferMut<BinaryView>,
16 pub null_buffer_builder: LazyNullBufferBuilder,
17 completed: Vec<ByteBuffer>,
18 in_progress: Vec<u8>,
19 nullability: Nullability,
20 dtype: DType,
21}
22
23impl VarBinViewBuilder {
24 const BLOCK_SIZE: u32 = 8 * 8 * 1024;
26
27 pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
28 assert!(
29 matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
30 "VarBinViewBuilder DType must be Utf8 or Binary."
31 );
32 Self {
33 views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
34 null_buffer_builder: LazyNullBufferBuilder::new(capacity),
35 completed: vec![],
36 in_progress: vec![],
37 nullability: dtype.nullability(),
38 dtype,
39 }
40 }
41
42 fn append_value_view(&mut self, value: &[u8]) {
43 let v: &[u8] = value;
44 let length =
45 u32::try_from(v.len()).vortex_expect("cannot have a single string >2^32 in length");
46 if length <= 12 {
47 self.views_builder.push(BinaryView::new_inlined(v));
48 return;
49 }
50
51 let required_cap = self.in_progress.len() + v.len();
52 if self.in_progress.capacity() < required_cap {
53 self.flush_in_progress();
54 let to_reserve = max(v.len(), VarBinViewBuilder::BLOCK_SIZE as usize);
55 self.in_progress.reserve(to_reserve);
56 };
57 let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
58 self.in_progress.extend_from_slice(v);
59
60 let view = BinaryView::new_view(
61 length,
62 v[0..4].try_into().vortex_expect("length already checked"),
64 u32::try_from(self.completed.len()).vortex_expect("too many buffers"),
66 offset,
67 );
68 self.views_builder.push(view);
69 }
70
71 #[inline]
72 pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
73 self.append_value_view(value.as_ref());
74 self.null_buffer_builder.append_non_null();
75 }
76
77 #[inline]
78 pub fn append_option<S: AsRef<[u8]>>(&mut self, value: Option<S>) {
79 match value {
80 Some(value) => self.append_value(value),
81 None => self.append_null(),
82 }
83 }
84
85 #[inline]
86 fn flush_in_progress(&mut self) {
87 if !self.in_progress.is_empty() {
88 let f = ByteBuffer::from(std::mem::take(&mut self.in_progress));
89 self.push_completed(f)
90 }
91 }
92
93 fn push_completed(&mut self, block: ByteBuffer) {
94 assert!(block.len() < u32::MAX as usize, "Block too large");
95 assert!(self.completed.len() < u32::MAX as usize, "Too many blocks");
96 self.completed.push(block);
97 }
98
99 pub fn completed_block_count(&self) -> usize {
100 self.completed.len()
101 }
102
103 pub fn push_buffer_and_adjusted_views(
109 &mut self,
110 buffer: impl IntoIterator<Item = ByteBuffer>,
111 views: impl IntoIterator<Item = BinaryView>,
112 validity_mask: Mask,
113 ) {
114 self.flush_in_progress();
115
116 self.completed.extend(buffer);
117 self.views_builder.extend(views);
118 self.push_only_validity_mask(validity_mask);
119
120 debug_assert_eq!(self.null_buffer_builder.len(), self.views_builder.len())
121 }
122
123 fn push_only_validity_mask(&mut self, validity_mask: Mask) {
125 self.null_buffer_builder.append_validity_mask(validity_mask);
126 }
127}
128
129impl ArrayBuilder for VarBinViewBuilder {
130 fn as_any(&self) -> &dyn Any {
131 self
132 }
133
134 fn as_any_mut(&mut self) -> &mut dyn Any {
135 self
136 }
137
138 #[inline]
139 fn dtype(&self) -> &DType {
140 &self.dtype
141 }
142
143 #[inline]
144 fn len(&self) -> usize {
145 self.null_buffer_builder.len()
146 }
147
148 #[inline]
149 fn append_zeros(&mut self, n: usize) {
150 self.views_builder.push_n(BinaryView::empty_view(), n);
151 self.null_buffer_builder.append_n_non_nulls(n);
152 }
153
154 #[inline]
155 fn append_nulls(&mut self, n: usize) {
156 self.views_builder.push_n(BinaryView::empty_view(), n);
157 self.null_buffer_builder.append_n_nulls(n);
158 }
159
160 #[inline]
161 fn extend_from_array(&mut self, array: &dyn Array) -> VortexResult<()> {
162 let array = array.to_varbinview()?;
163 self.flush_in_progress();
164
165 let buffers_offset = u32::try_from(self.completed.len())?;
166 self.completed.extend_from_slice(array.buffers());
167
168 self.views_builder.extend(
169 array
170 .views()
171 .iter()
172 .map(|view| view.offset_view(buffers_offset)),
173 );
174
175 self.push_only_validity_mask(array.validity_mask()?);
176
177 Ok(())
178 }
179
180 fn finish(&mut self) -> ArrayRef {
181 self.flush_in_progress();
182 let buffers = std::mem::take(&mut self.completed);
183
184 assert_eq!(
185 self.views_builder.len(),
186 self.null_buffer_builder.len(),
187 "View and validity length must match"
188 );
189
190 let validity = self
191 .null_buffer_builder
192 .finish_with_nullability(self.nullability);
193
194 VarBinViewArray::try_new(
195 std::mem::take(&mut self.views_builder).freeze(),
196 buffers,
197 std::mem::replace(&mut self.dtype, DType::Null),
198 validity,
199 )
200 .vortex_expect("VarBinViewArray components should be valid.")
201 .into_array()
202 }
203}
204
205#[cfg(test)]
206mod tests {
207 use std::str::from_utf8;
208
209 use itertools::Itertools;
210 use vortex_dtype::{DType, Nullability};
211
212 use crate::ToCanonical;
213 use crate::accessor::ArrayAccessor;
214 use crate::array::ArrayExt;
215 use crate::arrays::VarBinViewArray;
216 use crate::builders::{ArrayBuilder, VarBinViewBuilder};
217
218 #[test]
219 fn test_utf8_builder() {
220 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
221
222 builder.append_option(Some("Hello"));
223 builder.append_option::<&str>(None);
224 builder.append_value("World");
225
226 builder.append_nulls(2);
227
228 builder.append_zeros(2);
229 builder.append_value("test");
230
231 let arr = builder.finish();
232
233 let arr = arr
234 .as_::<VarBinViewArray>()
235 .with_iterator(|iter| {
236 iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
237 .collect_vec()
238 })
239 .unwrap();
240 assert_eq!(arr.len(), 8);
241 assert_eq!(
242 arr,
243 vec![
244 Some("Hello".to_string()),
245 None,
246 Some("World".to_string()),
247 None,
248 None,
249 Some("".to_string()),
250 Some("".to_string()),
251 Some("test".to_string()),
252 ]
253 );
254 }
255 #[test]
256 fn test_utf8_builder_with_extend() {
257 let array = {
258 let mut builder =
259 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
260 builder.append_null();
261 builder.append_value("Hello2");
262 builder.finish()
263 };
264 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
265
266 builder.append_option(Some("Hello1"));
267 builder.extend_from_array(&array).unwrap();
268 builder.append_nulls(2);
269 builder.append_value("Hello3");
270
271 let arr = builder.finish().to_varbinview().unwrap();
272
273 let arr = arr
274 .with_iterator(|iter| {
275 iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
276 .collect_vec()
277 })
278 .unwrap();
279 assert_eq!(arr.len(), 6);
280 assert_eq!(
281 arr,
282 vec![
283 Some("Hello1".to_string()),
284 None,
285 Some("Hello2".to_string()),
286 None,
287 None,
288 Some("Hello3".to_string()),
289 ]
290 );
291 }
292}