vortex_array/builders/
varbinview.rs1use std::any::Any;
2use std::cmp::max;
3
4use vortex_buffer::{BufferMut, ByteBuffer, ByteBufferMut};
5use vortex_dtype::{DType, Nullability};
6use vortex_error::{VortexExpect, VortexResult};
7use vortex_mask::Mask;
8
9use crate::arrays::{BinaryView, VarBinViewArray};
10use crate::builders::ArrayBuilder;
11use crate::builders::lazy_validity_builder::LazyNullBufferBuilder;
12use crate::{Array, ArrayRef, ToCanonical};
13
14pub struct VarBinViewBuilder {
15 views_builder: BufferMut<BinaryView>,
16 pub null_buffer_builder: LazyNullBufferBuilder,
17 completed: Vec<ByteBuffer>,
18 in_progress: ByteBufferMut,
19 nullability: Nullability,
20 dtype: DType,
21}
22
23impl VarBinViewBuilder {
24 const BLOCK_SIZE: u32 = 8 * 8 * 1024;
26
27 pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
28 assert!(
29 matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
30 "VarBinViewBuilder DType must be Utf8 or Binary."
31 );
32 Self {
33 views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
34 null_buffer_builder: LazyNullBufferBuilder::new(capacity),
35 completed: vec![],
36 in_progress: ByteBufferMut::empty(),
37 nullability: dtype.nullability(),
38 dtype,
39 }
40 }
41
42 fn append_value_view(&mut self, value: &[u8]) {
43 let length =
44 u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
45 if length <= 12 {
46 self.views_builder.push(BinaryView::make_view(value, 0, 0));
47 return;
48 }
49
50 let required_cap = self.in_progress.len() + value.len();
51 if self.in_progress.capacity() < required_cap {
52 self.flush_in_progress();
53 let to_reserve = max(value.len(), VarBinViewBuilder::BLOCK_SIZE as usize);
54 self.in_progress.reserve(to_reserve);
55 };
56
57 let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
58 self.in_progress.extend_from_slice(value);
59 let view = BinaryView::make_view(
60 value,
61 u32::try_from(self.completed.len()).vortex_expect("too many buffers"),
63 offset,
64 );
65 self.views_builder.push(view);
66 }
67
68 #[inline]
69 pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
70 self.append_value_view(value.as_ref());
71 self.null_buffer_builder.append_non_null();
72 }
73
74 #[inline]
75 pub fn append_option<S: AsRef<[u8]>>(&mut self, value: Option<S>) {
76 match value {
77 Some(value) => self.append_value(value),
78 None => self.append_null(),
79 }
80 }
81
82 #[inline]
83 fn flush_in_progress(&mut self) {
84 if !self.in_progress.is_empty() {
85 let block = std::mem::take(&mut self.in_progress).freeze();
86 self.push_completed(block)
87 }
88 }
89
90 fn push_completed(&mut self, block: ByteBuffer) {
91 assert!(block.len() < u32::MAX as usize, "Block too large");
92 assert!(self.completed.len() < u32::MAX as usize, "Too many blocks");
93 self.completed.push(block);
94 }
95
96 pub fn completed_block_count(&self) -> usize {
97 self.completed.len()
98 }
99
100 pub fn push_buffer_and_adjusted_views(
106 &mut self,
107 buffer: impl IntoIterator<Item = ByteBuffer>,
108 views: impl IntoIterator<Item = BinaryView>,
109 validity_mask: Mask,
110 ) {
111 self.flush_in_progress();
112
113 self.completed.extend(buffer);
114 self.views_builder.extend(views);
115 self.push_only_validity_mask(validity_mask);
116
117 debug_assert_eq!(self.null_buffer_builder.len(), self.views_builder.len())
118 }
119
120 fn push_only_validity_mask(&mut self, validity_mask: Mask) {
122 self.null_buffer_builder.append_validity_mask(validity_mask);
123 }
124}
125
126impl ArrayBuilder for VarBinViewBuilder {
127 fn as_any(&self) -> &dyn Any {
128 self
129 }
130
131 fn as_any_mut(&mut self) -> &mut dyn Any {
132 self
133 }
134
135 #[inline]
136 fn dtype(&self) -> &DType {
137 &self.dtype
138 }
139
140 #[inline]
141 fn len(&self) -> usize {
142 self.null_buffer_builder.len()
143 }
144
145 #[inline]
146 fn append_zeros(&mut self, n: usize) {
147 self.views_builder.push_n(BinaryView::empty_view(), n);
148 self.null_buffer_builder.append_n_non_nulls(n);
149 }
150
151 #[inline]
152 fn append_nulls(&mut self, n: usize) {
153 self.views_builder.push_n(BinaryView::empty_view(), n);
154 self.null_buffer_builder.append_n_nulls(n);
155 }
156
157 #[inline]
158 fn extend_from_array(&mut self, array: &dyn Array) -> VortexResult<()> {
159 let array = array.to_varbinview()?;
160 self.flush_in_progress();
161
162 let buffers_offset = u32::try_from(self.completed.len())?;
163 self.completed.extend_from_slice(array.buffers());
164
165 self.views_builder.extend(
166 array
167 .views()
168 .iter()
169 .map(|view| view.offset_view(buffers_offset)),
170 );
171
172 self.push_only_validity_mask(array.validity_mask()?);
173
174 Ok(())
175 }
176
177 fn ensure_capacity(&mut self, capacity: usize) {
178 if capacity > self.views_builder.capacity() {
179 self.views_builder
180 .reserve(capacity - self.views_builder.len());
181 self.null_buffer_builder.ensure_capacity(capacity);
182 }
183 }
184
185 fn set_validity(&mut self, validity: Mask) {
186 self.null_buffer_builder = LazyNullBufferBuilder::new(validity.len());
187 self.null_buffer_builder.append_validity_mask(validity);
188 }
189
190 fn finish(&mut self) -> ArrayRef {
191 self.flush_in_progress();
192 let buffers = std::mem::take(&mut self.completed);
193
194 assert_eq!(
195 self.views_builder.len(),
196 self.null_buffer_builder.len(),
197 "View and validity length must match"
198 );
199
200 let validity = self
201 .null_buffer_builder
202 .finish_with_nullability(self.nullability);
203
204 VarBinViewArray::try_new(
205 std::mem::take(&mut self.views_builder).freeze(),
206 buffers,
207 std::mem::replace(&mut self.dtype, DType::Null),
208 validity,
209 )
210 .vortex_expect("VarBinViewArray components should be valid.")
211 .into_array()
212 }
213}
214
215#[cfg(test)]
216mod tests {
217 use std::str::from_utf8;
218
219 use itertools::Itertools;
220 use vortex_dtype::{DType, Nullability};
221
222 use crate::ToCanonical;
223 use crate::accessor::ArrayAccessor;
224 use crate::array::ArrayExt;
225 use crate::arrays::VarBinViewArray;
226 use crate::builders::{ArrayBuilder, VarBinViewBuilder};
227
228 #[test]
229 fn test_utf8_builder() {
230 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
231
232 builder.append_option(Some("Hello"));
233 builder.append_option::<&str>(None);
234 builder.append_value("World");
235
236 builder.append_nulls(2);
237
238 builder.append_zeros(2);
239 builder.append_value("test");
240
241 let arr = builder.finish();
242
243 let arr = arr
244 .as_::<VarBinViewArray>()
245 .with_iterator(|iter| {
246 iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
247 .collect_vec()
248 })
249 .unwrap();
250 assert_eq!(arr.len(), 8);
251 assert_eq!(
252 arr,
253 vec![
254 Some("Hello".to_string()),
255 None,
256 Some("World".to_string()),
257 None,
258 None,
259 Some("".to_string()),
260 Some("".to_string()),
261 Some("test".to_string()),
262 ]
263 );
264 }
265 #[test]
266 fn test_utf8_builder_with_extend() {
267 let array = {
268 let mut builder =
269 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
270 builder.append_null();
271 builder.append_value("Hello2");
272 builder.finish()
273 };
274 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
275
276 builder.append_option(Some("Hello1"));
277 builder.extend_from_array(&array).unwrap();
278 builder.append_nulls(2);
279 builder.append_value("Hello3");
280
281 let arr = builder.finish().to_varbinview().unwrap();
282
283 let arr = arr
284 .with_iterator(|iter| {
285 iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
286 .collect_vec()
287 })
288 .unwrap();
289 assert_eq!(arr.len(), 6);
290 assert_eq!(
291 arr,
292 vec![
293 Some("Hello1".to_string()),
294 None,
295 Some("Hello2".to_string()),
296 None,
297 None,
298 Some("Hello3".to_string()),
299 ]
300 );
301 }
302}