1use std::any::Any;
5use std::sync::Arc;
6
7use vortex_buffer::{Buffer, BufferMut, ByteBuffer, ByteBufferMut};
8use vortex_dtype::DType;
9use vortex_error::{VortexExpect, VortexResult, vortex_bail, vortex_ensure};
10use vortex_mask::Mask;
11use vortex_scalar::{BinaryScalar, Scalar, Utf8Scalar};
12use vortex_utils::aliases::hash_map::{Entry, HashMap};
13
14use crate::arrays::{BinaryView, VarBinViewArray};
15use crate::builders::{ArrayBuilder, LazyNullBufferBuilder};
16use crate::canonical::{Canonical, ToCanonical};
17use crate::{Array, ArrayRef, IntoArray};
18
19pub struct VarBinViewBuilder {
21 dtype: DType,
22 views_builder: BufferMut<BinaryView>,
23 nulls: LazyNullBufferBuilder,
24 completed: CompletedBuffers,
25 in_progress: ByteBufferMut,
26 growth_strategy: BufferGrowthStrategy,
27}
28
29impl VarBinViewBuilder {
30 pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
31 Self::new(dtype, capacity, Default::default(), Default::default())
32 }
33
34 pub fn with_buffer_deduplication(dtype: DType, capacity: usize) -> Self {
35 Self::new(
36 dtype,
37 capacity,
38 CompletedBuffers::Deduplicated(Default::default()),
39 Default::default(),
40 )
41 }
42
43 pub fn new(
44 dtype: DType,
45 capacity: usize,
46 completed: CompletedBuffers,
47 growth_strategy: BufferGrowthStrategy,
48 ) -> Self {
49 assert!(
50 matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
51 "VarBinViewBuilder DType must be Utf8 or Binary."
52 );
53 Self {
54 views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
55 nulls: LazyNullBufferBuilder::new(capacity),
56 completed,
57 in_progress: ByteBufferMut::empty(),
58 dtype,
59 growth_strategy,
60 }
61 }
62
63 fn append_value_view(&mut self, value: &[u8]) {
64 let length =
65 u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
66 if length <= 12 {
67 self.views_builder.push(BinaryView::make_view(value, 0, 0));
68 return;
69 }
70
71 let required_cap = self.in_progress.len() + value.len();
72 if self.in_progress.capacity() < required_cap {
73 self.flush_in_progress();
74 let next_buffer_size = self.growth_strategy.next_size() as usize;
75 let to_reserve = next_buffer_size.max(value.len());
76 self.in_progress.reserve(to_reserve);
77 };
78
79 let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
80 self.in_progress.extend_from_slice(value);
81 let view = BinaryView::make_view(
82 value,
83 self.completed.len(),
85 offset,
86 );
87 self.views_builder.push(view);
88 }
89
90 pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
92 self.append_value_view(value.as_ref());
93 self.nulls.append_non_null();
94 }
95
96 fn flush_in_progress(&mut self) {
97 if self.in_progress.is_empty() {
98 return;
99 }
100 let block = std::mem::take(&mut self.in_progress).freeze();
101
102 assert!(block.len() < u32::MAX as usize, "Block too large");
103
104 let initial_len = self.completed.len();
105 self.completed.push(block);
106 assert_eq!(
107 self.completed.len(),
108 initial_len + 1,
109 "Invalid state, just completed block already exists"
110 );
111 }
112
113 pub fn completed_block_count(&self) -> u32 {
114 self.completed.len()
115 }
116
117 pub fn push_buffer_and_adjusted_views(
126 &mut self,
127 buffer: &[ByteBuffer],
128 views: &Buffer<BinaryView>,
129 validity_mask: Mask,
130 ) {
131 self.flush_in_progress();
132
133 let expected_completed_len = self.completed.len() as usize + buffer.len();
134 self.completed.extend_from_slice(buffer);
135 assert_eq!(
136 self.completed.len() as usize,
137 expected_completed_len,
138 "Some buffers already exist",
139 );
140 self.views_builder.extend_trusted(views.iter().copied());
141 self.push_only_validity_mask(validity_mask);
142
143 debug_assert_eq!(self.nulls.len(), self.views_builder.len())
144 }
145
146 pub fn finish_into_varbinview(&mut self) -> VarBinViewArray {
148 self.flush_in_progress();
149 let buffers = std::mem::take(&mut self.completed);
150
151 assert_eq!(
152 self.views_builder.len(),
153 self.nulls.len(),
154 "View and validity length must match"
155 );
156
157 let validity = self.nulls.finish_with_nullability(self.dtype.nullability());
158
159 unsafe {
161 VarBinViewArray::new_unchecked(
162 std::mem::take(&mut self.views_builder).freeze(),
163 buffers.finish(),
164 std::mem::replace(&mut self.dtype, DType::Null),
165 validity,
166 )
167 }
168 }
169}
170
171impl VarBinViewBuilder {
172 fn push_only_validity_mask(&mut self, validity_mask: Mask) {
174 self.nulls.append_validity_mask(validity_mask);
175 }
176}
177
178impl ArrayBuilder for VarBinViewBuilder {
179 fn as_any(&self) -> &dyn Any {
180 self
181 }
182
183 fn as_any_mut(&mut self) -> &mut dyn Any {
184 self
185 }
186
187 fn dtype(&self) -> &DType {
188 &self.dtype
189 }
190
191 fn len(&self) -> usize {
192 self.nulls.len()
193 }
194
195 fn append_zeros(&mut self, n: usize) {
196 self.views_builder.push_n(BinaryView::empty_view(), n);
197 self.nulls.append_n_non_nulls(n);
198 }
199
200 unsafe fn append_nulls_unchecked(&mut self, n: usize) {
201 self.views_builder.push_n(BinaryView::empty_view(), n);
202 self.nulls.append_n_nulls(n);
203 }
204
205 fn append_scalar(&mut self, scalar: &Scalar) -> VortexResult<()> {
206 vortex_ensure!(
207 scalar.dtype() == self.dtype(),
208 "VarBinViewBuilder expected scalar with dtype {:?}, got {:?}",
209 self.dtype(),
210 scalar.dtype()
211 );
212
213 match self.dtype() {
214 DType::Utf8(_) => {
215 let utf8_scalar = Utf8Scalar::try_from(scalar)?;
216 match utf8_scalar.value() {
217 Some(value) => self.append_value(value),
218 None => self.append_null(),
219 }
220 }
221 DType::Binary(_) => {
222 let binary_scalar = BinaryScalar::try_from(scalar)?;
223 match binary_scalar.value() {
224 Some(value) => self.append_value(value),
225 None => self.append_null(),
226 }
227 }
228 _ => vortex_bail!(
229 "VarBinViewBuilder can only handle Utf8 or Binary scalars, got {:?}",
230 scalar.dtype()
231 ),
232 }
233
234 Ok(())
235 }
236
237 unsafe fn extend_from_array_unchecked(&mut self, array: &dyn Array) {
238 let array = array.to_varbinview();
239 self.flush_in_progress();
240
241 let new_indices = self.completed.extend_from_slice(array.buffers());
242
243 match new_indices {
244 NewIndices::ConstantOffset(offset) => {
245 self.views_builder
246 .extend_trusted(array.views().iter().map(|view| view.offset_view(offset)));
247 }
248 NewIndices::LookupArray(lookup) => {
249 self.views_builder
250 .extend_trusted(array.views().iter().map(|view| {
251 if view.is_inlined() {
252 *view
253 } else {
254 let new_buffer_idx = lookup[view.as_view().buffer_index() as usize];
255 view.with_buffer_idx(new_buffer_idx)
256 }
257 }));
258 }
259 }
260
261 self.push_only_validity_mask(array.validity_mask());
262 }
263
264 fn ensure_capacity(&mut self, capacity: usize) {
265 if capacity > self.views_builder.capacity() {
266 self.views_builder
267 .reserve(capacity - self.views_builder.len());
268 self.nulls.ensure_capacity(capacity);
269 }
270 }
271
272 fn set_validity(&mut self, validity: Mask) {
273 self.nulls = LazyNullBufferBuilder::new(validity.len());
274 self.nulls.append_validity_mask(validity);
275 }
276
277 fn finish(&mut self) -> ArrayRef {
278 self.finish_into_varbinview().into_array()
279 }
280
281 fn finish_into_canonical(&mut self) -> Canonical {
282 Canonical::VarBinView(self.finish_into_varbinview())
283 }
284}
285
286pub enum CompletedBuffers {
287 Default(Vec<ByteBuffer>),
288 Deduplicated(DeduplicatedBuffers),
289}
290
291impl Default for CompletedBuffers {
292 fn default() -> Self {
293 Self::Default(Vec::new())
294 }
295}
296
297#[allow(clippy::cast_possible_truncation)]
299impl CompletedBuffers {
300 fn len(&self) -> u32 {
301 match self {
302 Self::Default(buffers) => buffers.len() as u32,
303 Self::Deduplicated(buffers) => buffers.len(),
304 }
305 }
306
307 fn push(&mut self, block: ByteBuffer) -> u32 {
308 match self {
309 Self::Default(buffers) => {
310 assert!(buffers.len() < u32::MAX as usize, "Too many blocks");
311 buffers.push(block);
312 self.len()
313 }
314 Self::Deduplicated(buffers) => buffers.push(block),
315 }
316 }
317
318 fn extend_from_slice(&mut self, new_buffers: &[ByteBuffer]) -> NewIndices {
319 match self {
320 Self::Default(buffers) => {
321 let offset = buffers.len() as u32;
322 buffers.extend_from_slice(new_buffers);
323 NewIndices::ConstantOffset(offset)
324 }
325 Self::Deduplicated(buffers) => {
326 NewIndices::LookupArray(buffers.extend_from_slice(new_buffers))
327 }
328 }
329 }
330
331 fn finish(self) -> Arc<[ByteBuffer]> {
332 match self {
333 Self::Default(buffers) => Arc::from(buffers),
334 Self::Deduplicated(buffers) => buffers.finish(),
335 }
336 }
337}
338
339enum NewIndices {
340 ConstantOffset(u32),
342 LookupArray(Vec<u32>),
344}
345
346#[derive(Default)]
347pub struct DeduplicatedBuffers {
348 buffers: Vec<ByteBuffer>,
349 buffer_to_idx: HashMap<BufferId, u32>,
350}
351
352impl DeduplicatedBuffers {
353 #[allow(clippy::cast_possible_truncation)]
355 fn len(&self) -> u32 {
356 self.buffers.len() as u32
357 }
358
359 fn push(&mut self, block: ByteBuffer) -> u32 {
361 assert!(self.buffers.len() < u32::MAX as usize, "Too many blocks");
362
363 let initial_len = self.len();
364 let id = BufferId::from(&block);
365 match self.buffer_to_idx.entry(id) {
366 Entry::Occupied(idx) => *idx.get(),
367 Entry::Vacant(entry) => {
368 let idx = initial_len;
369 entry.insert(idx);
370 self.buffers.push(block);
371 idx
372 }
373 }
374 }
375
376 fn extend_from_slice(&mut self, buffers: &[ByteBuffer]) -> Vec<u32> {
377 buffers
378 .iter()
379 .map(|buffer| self.push(buffer.clone()))
380 .collect()
381 }
382
383 fn finish(self) -> Arc<[ByteBuffer]> {
384 Arc::from(self.buffers)
385 }
386}
387
388#[derive(PartialEq, Eq, Hash)]
389struct BufferId {
390 ptr: usize,
392 len: usize,
393}
394
395impl BufferId {
396 fn from(buffer: &ByteBuffer) -> Self {
397 let slice = buffer.as_slice();
398 Self {
399 ptr: slice.as_ptr() as usize,
400 len: slice.len(),
401 }
402 }
403}
404
405#[derive(Debug, Clone)]
406pub enum BufferGrowthStrategy {
407 Fixed { size: u32 },
409 Exponential { current_size: u32, max_size: u32 },
411}
412
413impl Default for BufferGrowthStrategy {
414 fn default() -> Self {
415 Self::Exponential {
416 current_size: 4 * 1024, max_size: 2 * 1024 * 1024, }
419 }
420}
421
422impl BufferGrowthStrategy {
423 pub fn fixed(size: u32) -> Self {
424 Self::Fixed { size }
425 }
426
427 pub fn exponential(initial_size: u32, max_size: u32) -> Self {
428 Self::Exponential {
429 current_size: initial_size,
430 max_size,
431 }
432 }
433
434 pub fn next_size(&mut self) -> u32 {
436 match self {
437 Self::Fixed { size } => *size,
438 Self::Exponential {
439 current_size,
440 max_size,
441 } => {
442 let result = *current_size;
443 if *current_size < *max_size {
444 *current_size = current_size.saturating_mul(2).min(*max_size);
445 }
446 result
447 }
448 }
449 }
450}
451
452#[cfg(test)]
453mod tests {
454 use std::str::from_utf8;
455
456 use itertools::Itertools;
457 use vortex_dtype::{DType, Nullability};
458
459 use crate::accessor::ArrayAccessor;
460 use crate::arrays::VarBinViewVTable;
461 use crate::builders::{ArrayBuilder, VarBinViewBuilder};
462
463 #[test]
464 fn test_utf8_builder() {
465 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
466
467 builder.append_value("Hello");
468 builder.append_null();
469 builder.append_value("World");
470
471 builder.append_nulls(2);
472
473 builder.append_zeros(2);
474 builder.append_value("test");
475
476 let arr = builder.finish();
477
478 let arr = arr
479 .as_::<VarBinViewVTable>()
480 .with_iterator(|iter| {
481 iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
482 .collect_vec()
483 })
484 .unwrap();
485 assert_eq!(arr.len(), 8);
486 assert_eq!(
487 arr,
488 vec![
489 Some("Hello".to_string()),
490 None,
491 Some("World".to_string()),
492 None,
493 None,
494 Some("".to_string()),
495 Some("".to_string()),
496 Some("test".to_string()),
497 ]
498 );
499 }
500
501 #[test]
502 fn test_utf8_builder_with_extend() {
503 let array = {
504 let mut builder =
505 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
506 builder.append_null();
507 builder.append_value("Hello2");
508 builder.finish()
509 };
510 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
511
512 builder.append_value("Hello1");
513 builder.extend_from_array(&array);
514 builder.append_nulls(2);
515 builder.append_value("Hello3");
516
517 let arr = builder.finish_into_canonical().into_varbinview();
518
519 let arr = arr
520 .with_iterator(|iter| {
521 iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
522 .collect_vec()
523 })
524 .unwrap();
525 assert_eq!(arr.len(), 6);
526 assert_eq!(
527 arr,
528 vec![
529 Some("Hello1".to_string()),
530 None,
531 Some("Hello2".to_string()),
532 None,
533 None,
534 Some("Hello3".to_string()),
535 ]
536 );
537 }
538
539 #[test]
540 fn test_buffer_deduplication() {
541 let array = {
542 let mut builder =
543 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
544 builder.append_value("This is a long string that should not be inlined");
545 builder.append_value("short string");
546 builder.finish_into_varbinview()
547 };
548
549 assert_eq!(array.buffers().len(), 1);
550 let mut builder =
551 VarBinViewBuilder::with_buffer_deduplication(DType::Utf8(Nullability::Nullable), 10);
552
553 array.append_to_builder(&mut builder);
554 assert_eq!(builder.completed_block_count(), 1);
555
556 array.slice(1..2).append_to_builder(&mut builder);
557 array.slice(0..1).append_to_builder(&mut builder);
558 assert_eq!(builder.completed_block_count(), 1);
559
560 let array2 = {
561 let mut builder =
562 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
563 builder.append_value("This is a long string that should not be inlined");
564 builder.finish_into_varbinview()
565 };
566
567 array2.append_to_builder(&mut builder);
568 assert_eq!(builder.completed_block_count(), 2);
569
570 array.slice(0..1).append_to_builder(&mut builder);
571 array2.slice(0..1).append_to_builder(&mut builder);
572 assert_eq!(builder.completed_block_count(), 2);
573 }
574
575 #[test]
576 fn test_append_scalar() {
577 use vortex_scalar::Scalar;
578
579 let mut utf8_builder =
581 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
582
583 let utf8_scalar1 = Scalar::utf8("hello", Nullability::Nullable);
585 utf8_builder.append_scalar(&utf8_scalar1).unwrap();
586
587 let utf8_scalar2 = Scalar::utf8("world", Nullability::Nullable);
589 utf8_builder.append_scalar(&utf8_scalar2).unwrap();
590
591 let null_scalar = Scalar::null(DType::Utf8(Nullability::Nullable));
593 utf8_builder.append_scalar(&null_scalar).unwrap();
594
595 let array = utf8_builder.finish();
596 assert_eq!(array.len(), 3);
597
598 use crate::array::Array;
600 let scalar0 = array.scalar_at(0).as_utf8().value();
601 assert_eq!(scalar0.as_ref().map(|s| s.as_str()), Some("hello"));
602
603 let scalar1 = array.scalar_at(1).as_utf8().value();
604 assert_eq!(scalar1.as_ref().map(|s| s.as_str()), Some("world"));
605
606 let scalar2 = array.scalar_at(2).as_utf8().value();
607 assert_eq!(scalar2, None); let mut binary_builder =
611 VarBinViewBuilder::with_capacity(DType::Binary(Nullability::Nullable), 10);
612
613 let binary_scalar = Scalar::binary(vec![1u8, 2, 3], Nullability::Nullable);
614 binary_builder.append_scalar(&binary_scalar).unwrap();
615
616 let binary_null = Scalar::null(DType::Binary(Nullability::Nullable));
617 binary_builder.append_scalar(&binary_null).unwrap();
618
619 let binary_array = binary_builder.finish();
620 assert_eq!(binary_array.len(), 2);
621
622 let binary0 = binary_array.scalar_at(0).as_binary().value();
624 assert_eq!(
625 binary0.as_ref().map(|b| b.as_slice()),
626 Some(&[1u8, 2, 3][..])
627 );
628
629 let binary1 = binary_array.scalar_at(1).as_binary().value();
630 assert_eq!(binary1, None); let mut builder =
634 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::NonNullable), 10);
635 let wrong_scalar = Scalar::from(42i32);
636 assert!(builder.append_scalar(&wrong_scalar).is_err());
637 }
638
639 #[test]
640 fn test_buffer_growth_strategies() {
641 use super::BufferGrowthStrategy;
642
643 let mut strategy = BufferGrowthStrategy::fixed(1024);
645
646 assert_eq!(strategy.next_size(), 1024);
648 assert_eq!(strategy.next_size(), 1024);
649 assert_eq!(strategy.next_size(), 1024);
650
651 let mut strategy = BufferGrowthStrategy::exponential(1024, 8192);
653
654 assert_eq!(strategy.next_size(), 1024); assert_eq!(strategy.next_size(), 2048); assert_eq!(strategy.next_size(), 4096); assert_eq!(strategy.next_size(), 8192); assert_eq!(strategy.next_size(), 8192); }
661
662 #[test]
663 fn test_large_value_allocation() {
664 use super::{BufferGrowthStrategy, VarBinViewBuilder};
665
666 let mut builder = VarBinViewBuilder::new(
667 DType::Binary(Nullability::Nullable),
668 10,
669 Default::default(),
670 BufferGrowthStrategy::exponential(1024, 4096),
671 );
672
673 let large_value = vec![0u8; 8192];
675
676 builder.append_value(&large_value);
678
679 let array = builder.finish_into_varbinview();
680 assert_eq!(array.len(), 1);
681
682 let retrieved = array.scalar_at(0).as_binary().value().unwrap();
684 assert_eq!(retrieved.len(), 8192);
685 assert_eq!(retrieved.as_slice(), &large_value);
686 }
687}