vortex_array/builders/
varbinview.rs1use std::any::Any;
5use std::cmp::max;
6
7use vortex_buffer::{Buffer, BufferMut, ByteBuffer, ByteBufferMut};
8use vortex_dtype::{DType, Nullability};
9use vortex_error::{VortexExpect, VortexResult};
10use vortex_mask::Mask;
11
12use crate::arrays::{BinaryView, VarBinViewArray};
13use crate::builders::ArrayBuilder;
14use crate::builders::lazy_validity_builder::LazyNullBufferBuilder;
15use crate::{Array, ArrayRef, IntoArray, ToCanonical};
16
17pub struct VarBinViewBuilder {
18 views_builder: BufferMut<BinaryView>,
19 pub null_buffer_builder: LazyNullBufferBuilder,
20 completed: Vec<ByteBuffer>,
21 in_progress: ByteBufferMut,
22 nullability: Nullability,
23 dtype: DType,
24}
25
26impl VarBinViewBuilder {
27 const BLOCK_SIZE: u32 = 8 * 8 * 1024;
29
30 pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
31 assert!(
32 matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
33 "VarBinViewBuilder DType must be Utf8 or Binary."
34 );
35 Self {
36 views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
37 null_buffer_builder: LazyNullBufferBuilder::new(capacity),
38 completed: vec![],
39 in_progress: ByteBufferMut::empty(),
40 nullability: dtype.nullability(),
41 dtype,
42 }
43 }
44
45 fn append_value_view(&mut self, value: &[u8]) {
46 let length =
47 u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
48 if length <= 12 {
49 self.views_builder.push(BinaryView::make_view(value, 0, 0));
50 return;
51 }
52
53 let required_cap = self.in_progress.len() + value.len();
54 if self.in_progress.capacity() < required_cap {
55 self.flush_in_progress();
56 let to_reserve = max(value.len(), VarBinViewBuilder::BLOCK_SIZE as usize);
57 self.in_progress.reserve(to_reserve);
58 };
59
60 let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
61 self.in_progress.extend_from_slice(value);
62 let view = BinaryView::make_view(
63 value,
64 u32::try_from(self.completed.len()).vortex_expect("too many buffers"),
66 offset,
67 );
68 self.views_builder.push(view);
69 }
70
71 #[inline]
72 pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
73 self.append_value_view(value.as_ref());
74 self.null_buffer_builder.append_non_null();
75 }
76
77 #[inline]
78 pub fn append_option<S: AsRef<[u8]>>(&mut self, value: Option<S>) {
79 match value {
80 Some(value) => self.append_value(value),
81 None => self.append_null(),
82 }
83 }
84
85 #[inline]
86 fn flush_in_progress(&mut self) {
87 if !self.in_progress.is_empty() {
88 let block = std::mem::take(&mut self.in_progress).freeze();
89 self.push_completed(block)
90 }
91 }
92
93 fn push_completed(&mut self, block: ByteBuffer) {
94 assert!(block.len() < u32::MAX as usize, "Block too large");
95 assert!(self.completed.len() < u32::MAX as usize, "Too many blocks");
96 self.completed.push(block);
97 }
98
99 pub fn completed_block_count(&self) -> usize {
100 self.completed.len()
101 }
102
103 pub fn push_buffer_and_adjusted_views(
109 &mut self,
110 buffer: &[ByteBuffer],
111 views: &Buffer<BinaryView>,
112 validity_mask: Mask,
113 ) {
114 self.flush_in_progress();
115
116 self.completed.extend(buffer.iter().cloned());
117 self.views_builder.extend_trusted(views.iter().copied());
118 self.push_only_validity_mask(validity_mask);
119
120 debug_assert_eq!(self.null_buffer_builder.len(), self.views_builder.len())
121 }
122
123 pub fn finish_into_varbinview(&mut self) -> VarBinViewArray {
124 self.flush_in_progress();
125 let buffers = std::mem::take(&mut self.completed);
126
127 assert_eq!(
128 self.views_builder.len(),
129 self.null_buffer_builder.len(),
130 "View and validity length must match"
131 );
132
133 let validity = self
134 .null_buffer_builder
135 .finish_with_nullability(self.nullability);
136
137 VarBinViewArray::try_new(
138 std::mem::take(&mut self.views_builder).freeze(),
139 buffers,
140 std::mem::replace(&mut self.dtype, DType::Null),
141 validity,
142 )
143 .vortex_expect("VarBinViewArray components should be valid.")
144 }
145}
146
147impl VarBinViewBuilder {
148 fn push_only_validity_mask(&mut self, validity_mask: Mask) {
150 self.null_buffer_builder.append_validity_mask(validity_mask);
151 }
152}
153
154impl ArrayBuilder for VarBinViewBuilder {
155 fn as_any(&self) -> &dyn Any {
156 self
157 }
158
159 fn as_any_mut(&mut self) -> &mut dyn Any {
160 self
161 }
162
163 #[inline]
164 fn dtype(&self) -> &DType {
165 &self.dtype
166 }
167
168 #[inline]
169 fn len(&self) -> usize {
170 self.null_buffer_builder.len()
171 }
172
173 #[inline]
174 fn append_zeros(&mut self, n: usize) {
175 self.views_builder.push_n(BinaryView::empty_view(), n);
176 self.null_buffer_builder.append_n_non_nulls(n);
177 }
178
179 #[inline]
180 fn append_nulls(&mut self, n: usize) {
181 self.views_builder.push_n(BinaryView::empty_view(), n);
182 self.null_buffer_builder.append_n_nulls(n);
183 }
184
185 #[inline]
186 fn extend_from_array(&mut self, array: &dyn Array) -> VortexResult<()> {
187 let array = array.to_varbinview()?;
188 self.flush_in_progress();
189
190 let buffers_offset = u32::try_from(self.completed.len())?;
191 self.completed.extend_from_slice(array.buffers());
192
193 self.views_builder.extend_trusted(
194 array
195 .views()
196 .iter()
197 .map(|view| view.offset_view(buffers_offset)),
198 );
199
200 self.push_only_validity_mask(array.validity_mask()?);
201
202 Ok(())
203 }
204
205 fn ensure_capacity(&mut self, capacity: usize) {
206 if capacity > self.views_builder.capacity() {
207 self.views_builder
208 .reserve(capacity - self.views_builder.len());
209 self.null_buffer_builder.ensure_capacity(capacity);
210 }
211 }
212
213 fn set_validity(&mut self, validity: Mask) {
214 self.null_buffer_builder = LazyNullBufferBuilder::new(validity.len());
215 self.null_buffer_builder.append_validity_mask(validity);
216 }
217
218 fn finish(&mut self) -> ArrayRef {
219 self.finish_into_varbinview().into_array()
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use std::str::from_utf8;
226
227 use itertools::Itertools;
228 use vortex_dtype::{DType, Nullability};
229
230 use crate::ToCanonical;
231 use crate::accessor::ArrayAccessor;
232 use crate::arrays::VarBinViewVTable;
233 use crate::builders::{ArrayBuilder, VarBinViewBuilder};
234
235 #[test]
236 fn test_utf8_builder() {
237 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
238
239 builder.append_option(Some("Hello"));
240 builder.append_option::<&str>(None);
241 builder.append_value("World");
242
243 builder.append_nulls(2);
244
245 builder.append_zeros(2);
246 builder.append_value("test");
247
248 let arr = builder.finish();
249
250 let arr = arr
251 .as_::<VarBinViewVTable>()
252 .with_iterator(|iter| {
253 iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
254 .collect_vec()
255 })
256 .unwrap();
257 assert_eq!(arr.len(), 8);
258 assert_eq!(
259 arr,
260 vec![
261 Some("Hello".to_string()),
262 None,
263 Some("World".to_string()),
264 None,
265 None,
266 Some("".to_string()),
267 Some("".to_string()),
268 Some("test".to_string()),
269 ]
270 );
271 }
272 #[test]
273 fn test_utf8_builder_with_extend() {
274 let array = {
275 let mut builder =
276 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
277 builder.append_null();
278 builder.append_value("Hello2");
279 builder.finish()
280 };
281 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
282
283 builder.append_option(Some("Hello1"));
284 builder.extend_from_array(&array).unwrap();
285 builder.append_nulls(2);
286 builder.append_value("Hello3");
287
288 let arr = builder.finish().to_varbinview().unwrap();
289
290 let arr = arr
291 .with_iterator(|iter| {
292 iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
293 .collect_vec()
294 })
295 .unwrap();
296 assert_eq!(arr.len(), 6);
297 assert_eq!(
298 arr,
299 vec![
300 Some("Hello1".to_string()),
301 None,
302 Some("Hello2".to_string()),
303 None,
304 None,
305 Some("Hello3".to_string()),
306 ]
307 );
308 }
309}