1use std::any::Any;
5use std::cmp::max;
6use std::sync::Arc;
7
8use vortex_buffer::{Buffer, BufferMut, ByteBuffer, ByteBufferMut};
9use vortex_dtype::{DType, Nullability};
10use vortex_error::{VortexExpect, VortexResult};
11use vortex_mask::Mask;
12use vortex_utils::aliases::hash_map::{Entry, HashMap};
13
14use crate::arrays::{BinaryView, VarBinViewArray};
15use crate::builders::ArrayBuilder;
16use crate::builders::lazy_validity_builder::LazyNullBufferBuilder;
17use crate::{Array, ArrayRef, IntoArray, ToCanonical};
18
19pub struct VarBinViewBuilder {
20 views_builder: BufferMut<BinaryView>,
21 pub null_buffer_builder: LazyNullBufferBuilder,
22 completed: CompletedBuffers,
23 in_progress: ByteBufferMut,
24 nullability: Nullability,
25 dtype: DType,
26}
27
28impl VarBinViewBuilder {
29 const BLOCK_SIZE: u32 = 8 * 8 * 1024;
31
32 pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
33 Self::new(dtype, capacity, Default::default())
34 }
35
36 pub fn with_buffer_deduplication(dtype: DType, capacity: usize) -> Self {
37 Self::new(
38 dtype,
39 capacity,
40 CompletedBuffers::Deduplicated(Default::default()),
41 )
42 }
43
44 fn new(dtype: DType, capacity: usize, completed: CompletedBuffers) -> Self {
45 assert!(
46 matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
47 "VarBinViewBuilder DType must be Utf8 or Binary."
48 );
49 Self {
50 views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
51 null_buffer_builder: LazyNullBufferBuilder::new(capacity),
52 completed,
53 in_progress: ByteBufferMut::empty(),
54 nullability: dtype.nullability(),
55 dtype,
56 }
57 }
58
59 fn append_value_view(&mut self, value: &[u8]) {
60 let length =
61 u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
62 if length <= 12 {
63 self.views_builder.push(BinaryView::make_view(value, 0, 0));
64 return;
65 }
66
67 let required_cap = self.in_progress.len() + value.len();
68 if self.in_progress.capacity() < required_cap {
69 self.flush_in_progress();
70 let to_reserve = max(value.len(), VarBinViewBuilder::BLOCK_SIZE as usize);
71 self.in_progress.reserve(to_reserve);
72 };
73
74 let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
75 self.in_progress.extend_from_slice(value);
76 let view = BinaryView::make_view(
77 value,
78 self.completed.len(),
80 offset,
81 );
82 self.views_builder.push(view);
83 }
84
85 #[inline]
86 pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
87 self.append_value_view(value.as_ref());
88 self.null_buffer_builder.append_non_null();
89 }
90
91 #[inline]
92 pub fn append_option<S: AsRef<[u8]>>(&mut self, value: Option<S>) {
93 match value {
94 Some(value) => self.append_value(value),
95 None => self.append_null(),
96 }
97 }
98
99 #[inline]
100 fn flush_in_progress(&mut self) {
101 if self.in_progress.is_empty() {
102 return;
103 }
104 let block = std::mem::take(&mut self.in_progress).freeze();
105
106 assert!(block.len() < u32::MAX as usize, "Block too large");
107
108 let initial_len = self.completed.len();
109 self.completed.push(block);
110 assert_eq!(
111 self.completed.len(),
112 initial_len + 1,
113 "Invalid state, just completed block already exists"
114 );
115 }
116
117 pub fn completed_block_count(&self) -> usize {
118 self.completed.len() as usize
119 }
120
121 pub fn push_buffer_and_adjusted_views(
130 &mut self,
131 buffer: &[ByteBuffer],
132 views: &Buffer<BinaryView>,
133 validity_mask: Mask,
134 ) {
135 self.flush_in_progress();
136
137 let expected_completed_len = self.completed.len() as usize + buffer.len();
138 self.completed.extend_from_slice(buffer);
139 assert_eq!(
140 self.completed.len() as usize,
141 expected_completed_len,
142 "Some buffers already exist",
143 );
144 self.views_builder.extend_trusted(views.iter().copied());
145 self.push_only_validity_mask(validity_mask);
146
147 debug_assert_eq!(self.null_buffer_builder.len(), self.views_builder.len())
148 }
149
150 pub fn finish_into_varbinview(&mut self) -> VarBinViewArray {
151 self.flush_in_progress();
152 let buffers = std::mem::take(&mut self.completed);
153
154 assert_eq!(
155 self.views_builder.len(),
156 self.null_buffer_builder.len(),
157 "View and validity length must match"
158 );
159
160 let validity = self
161 .null_buffer_builder
162 .finish_with_nullability(self.nullability);
163
164 VarBinViewArray::try_new(
165 std::mem::take(&mut self.views_builder).freeze(),
166 buffers.finish(),
167 std::mem::replace(&mut self.dtype, DType::Null),
168 validity,
169 )
170 .vortex_expect("VarBinViewArray components should be valid.")
171 }
172}
173
174impl VarBinViewBuilder {
175 fn push_only_validity_mask(&mut self, validity_mask: Mask) {
177 self.null_buffer_builder.append_validity_mask(validity_mask);
178 }
179}
180
181impl ArrayBuilder for VarBinViewBuilder {
182 fn as_any(&self) -> &dyn Any {
183 self
184 }
185
186 fn as_any_mut(&mut self) -> &mut dyn Any {
187 self
188 }
189
190 #[inline]
191 fn dtype(&self) -> &DType {
192 &self.dtype
193 }
194
195 #[inline]
196 fn len(&self) -> usize {
197 self.null_buffer_builder.len()
198 }
199
200 #[inline]
201 fn append_zeros(&mut self, n: usize) {
202 self.views_builder.push_n(BinaryView::empty_view(), n);
203 self.null_buffer_builder.append_n_non_nulls(n);
204 }
205
206 #[inline]
207 fn append_nulls(&mut self, n: usize) {
208 self.views_builder.push_n(BinaryView::empty_view(), n);
209 self.null_buffer_builder.append_n_nulls(n);
210 }
211
212 #[inline]
213 fn extend_from_array(&mut self, array: &dyn Array) -> VortexResult<()> {
214 let array = array.to_varbinview()?;
215 self.flush_in_progress();
216
217 let new_indices = self.completed.extend_from_slice(array.buffers());
218
219 match new_indices {
220 NewIndices::ConstantOffset(offset) => {
221 self.views_builder
222 .extend_trusted(array.views().iter().map(|view| view.offset_view(offset)));
223 }
224 NewIndices::LookupArray(lookup) => {
225 self.views_builder
226 .extend_trusted(array.views().iter().map(|view| {
227 if view.is_inlined() {
228 *view
229 } else {
230 let new_buffer_idx = lookup[view.as_view().buffer_index() as usize];
231 view.with_buffer_idx(new_buffer_idx)
232 }
233 }));
234 }
235 }
236
237 self.push_only_validity_mask(array.validity_mask()?);
238
239 Ok(())
240 }
241
242 fn ensure_capacity(&mut self, capacity: usize) {
243 if capacity > self.views_builder.capacity() {
244 self.views_builder
245 .reserve(capacity - self.views_builder.len());
246 self.null_buffer_builder.ensure_capacity(capacity);
247 }
248 }
249
250 fn set_validity(&mut self, validity: Mask) {
251 self.null_buffer_builder = LazyNullBufferBuilder::new(validity.len());
252 self.null_buffer_builder.append_validity_mask(validity);
253 }
254
255 fn finish(&mut self) -> ArrayRef {
256 self.finish_into_varbinview().into_array()
257 }
258}
259
260enum CompletedBuffers {
261 Default(Vec<ByteBuffer>),
262 Deduplicated(DeduplicatedBuffers),
263}
264
265impl Default for CompletedBuffers {
266 fn default() -> Self {
267 Self::Default(Vec::new())
268 }
269}
270
271#[allow(clippy::cast_possible_truncation)]
273impl CompletedBuffers {
274 fn len(&self) -> u32 {
275 match self {
276 Self::Default(buffers) => buffers.len() as u32,
277 Self::Deduplicated(buffers) => buffers.len(),
278 }
279 }
280
281 fn push(&mut self, block: ByteBuffer) -> u32 {
282 match self {
283 Self::Default(buffers) => {
284 assert!(buffers.len() < u32::MAX as usize, "Too many blocks");
285 buffers.push(block);
286 self.len()
287 }
288 Self::Deduplicated(buffers) => buffers.push(block),
289 }
290 }
291
292 fn extend_from_slice(&mut self, new_buffers: &[ByteBuffer]) -> NewIndices {
293 match self {
294 Self::Default(buffers) => {
295 let offset = buffers.len() as u32;
296 buffers.extend_from_slice(new_buffers);
297 NewIndices::ConstantOffset(offset)
298 }
299 Self::Deduplicated(buffers) => {
300 NewIndices::LookupArray(buffers.extend_from_slice(new_buffers))
301 }
302 }
303 }
304
305 fn finish(self) -> Arc<[ByteBuffer]> {
306 match self {
307 Self::Default(buffers) => Arc::from(buffers),
308 Self::Deduplicated(buffers) => buffers.finish(),
309 }
310 }
311}
312
313enum NewIndices {
314 ConstantOffset(u32),
316 LookupArray(Vec<u32>),
318}
319
320#[derive(Default)]
321struct DeduplicatedBuffers {
322 buffers: Vec<ByteBuffer>,
323 buffer_to_idx: HashMap<BufferId, u32>,
324}
325
326impl DeduplicatedBuffers {
327 #[allow(clippy::cast_possible_truncation)]
329 fn len(&self) -> u32 {
330 self.buffers.len() as u32
331 }
332
333 fn push(&mut self, block: ByteBuffer) -> u32 {
335 assert!(self.buffers.len() < u32::MAX as usize, "Too many blocks");
336
337 let initial_len = self.len();
338 let id = BufferId::from(&block);
339 match self.buffer_to_idx.entry(id) {
340 Entry::Occupied(idx) => *idx.get(),
341 Entry::Vacant(entry) => {
342 let idx = initial_len;
343 entry.insert(idx);
344 self.buffers.push(block);
345 idx
346 }
347 }
348 }
349
350 fn extend_from_slice(&mut self, buffers: &[ByteBuffer]) -> Vec<u32> {
351 buffers
352 .iter()
353 .map(|buffer| self.push(buffer.clone()))
354 .collect()
355 }
356
357 fn finish(self) -> Arc<[ByteBuffer]> {
358 Arc::from(self.buffers)
359 }
360}
361
362#[derive(PartialEq, Eq, Hash)]
363struct BufferId {
364 ptr: usize,
366 len: usize,
367}
368
369impl BufferId {
370 fn from(buffer: &ByteBuffer) -> Self {
371 let slice = buffer.as_slice();
372 Self {
373 ptr: slice.as_ptr() as usize,
374 len: slice.len(),
375 }
376 }
377}
378
379#[cfg(test)]
380mod tests {
381 use std::str::from_utf8;
382
383 use itertools::Itertools;
384 use vortex_dtype::{DType, Nullability};
385
386 use crate::ToCanonical;
387 use crate::accessor::ArrayAccessor;
388 use crate::arrays::VarBinViewVTable;
389 use crate::builders::{ArrayBuilder, VarBinViewBuilder};
390
391 #[test]
392 fn test_utf8_builder() {
393 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
394
395 builder.append_option(Some("Hello"));
396 builder.append_option::<&str>(None);
397 builder.append_value("World");
398
399 builder.append_nulls(2);
400
401 builder.append_zeros(2);
402 builder.append_value("test");
403
404 let arr = builder.finish();
405
406 let arr = arr
407 .as_::<VarBinViewVTable>()
408 .with_iterator(|iter| {
409 iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
410 .collect_vec()
411 })
412 .unwrap();
413 assert_eq!(arr.len(), 8);
414 assert_eq!(
415 arr,
416 vec![
417 Some("Hello".to_string()),
418 None,
419 Some("World".to_string()),
420 None,
421 None,
422 Some("".to_string()),
423 Some("".to_string()),
424 Some("test".to_string()),
425 ]
426 );
427 }
428
429 #[test]
430 fn test_utf8_builder_with_extend() {
431 let array = {
432 let mut builder =
433 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
434 builder.append_null();
435 builder.append_value("Hello2");
436 builder.finish()
437 };
438 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
439
440 builder.append_option(Some("Hello1"));
441 builder.extend_from_array(&array).unwrap();
442 builder.append_nulls(2);
443 builder.append_value("Hello3");
444
445 let arr = builder.finish().to_varbinview().unwrap();
446
447 let arr = arr
448 .with_iterator(|iter| {
449 iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
450 .collect_vec()
451 })
452 .unwrap();
453 assert_eq!(arr.len(), 6);
454 assert_eq!(
455 arr,
456 vec![
457 Some("Hello1".to_string()),
458 None,
459 Some("Hello2".to_string()),
460 None,
461 None,
462 Some("Hello3".to_string()),
463 ]
464 );
465 }
466
467 #[test]
468 fn test_buffer_deduplication() {
469 let array = {
470 let mut builder =
471 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
472 builder.append_value("This is a long string that should not be inlined");
473 builder.append_value("short string");
474 builder.finish_into_varbinview()
475 };
476
477 assert_eq!(array.buffers().len(), 1);
478 let mut builder =
479 VarBinViewBuilder::with_buffer_deduplication(DType::Utf8(Nullability::Nullable), 10);
480
481 array.append_to_builder(&mut builder).unwrap();
482 assert_eq!(builder.completed_block_count(), 1);
483
484 array
485 .slice(1, 2)
486 .unwrap()
487 .append_to_builder(&mut builder)
488 .unwrap();
489 array
490 .slice(0, 1)
491 .unwrap()
492 .append_to_builder(&mut builder)
493 .unwrap();
494 assert_eq!(builder.completed_block_count(), 1);
495
496 let array2 = {
497 let mut builder =
498 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
499 builder.append_value("This is a long string that should not be inlined");
500 builder.finish_into_varbinview()
501 };
502
503 array2.append_to_builder(&mut builder).unwrap();
504 assert_eq!(builder.completed_block_count(), 2);
505
506 array
507 .slice(0, 1)
508 .unwrap()
509 .append_to_builder(&mut builder)
510 .unwrap();
511 array2
512 .slice(0, 1)
513 .unwrap()
514 .append_to_builder(&mut builder)
515 .unwrap();
516 assert_eq!(builder.completed_block_count(), 2);
517 }
518}