1use std::any::Any;
5use std::ops::Range;
6use std::sync::Arc;
7
8use vortex_buffer::Buffer;
9use vortex_buffer::BufferMut;
10use vortex_buffer::ByteBuffer;
11use vortex_buffer::ByteBufferMut;
12use vortex_dtype::DType;
13use vortex_error::VortexExpect;
14use vortex_error::VortexResult;
15use vortex_error::vortex_bail;
16use vortex_error::vortex_ensure;
17use vortex_mask::Mask;
18use vortex_scalar::BinaryScalar;
19use vortex_scalar::Scalar;
20use vortex_scalar::Utf8Scalar;
21use vortex_utils::aliases::hash_map::Entry;
22use vortex_utils::aliases::hash_map::HashMap;
23use vortex_vector::binaryview::BinaryView;
24
25use crate::Array;
26use crate::ArrayRef;
27use crate::IntoArray;
28use crate::arrays::VarBinViewArray;
29use crate::arrays::compact::BufferUtilization;
30use crate::builders::ArrayBuilder;
31use crate::builders::LazyBitBufferBuilder;
32use crate::canonical::Canonical;
33use crate::canonical::ToCanonical;
34
35pub struct VarBinViewBuilder {
37 dtype: DType,
38 views_builder: BufferMut<BinaryView>,
39 nulls: LazyBitBufferBuilder,
40 completed: CompletedBuffers,
41 in_progress: ByteBufferMut,
42 growth_strategy: BufferGrowthStrategy,
43 compaction_threshold: f64,
44}
45
46impl VarBinViewBuilder {
47 pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
48 Self::new(dtype, capacity, Default::default(), Default::default(), 0.0)
49 }
50
51 pub fn with_buffer_deduplication(dtype: DType, capacity: usize) -> Self {
52 Self::new(
53 dtype,
54 capacity,
55 CompletedBuffers::Deduplicated(Default::default()),
56 Default::default(),
57 0.0,
58 )
59 }
60
61 pub fn with_compaction(dtype: DType, capacity: usize, compaction_threshold: f64) -> Self {
62 Self::new(
63 dtype,
64 capacity,
65 Default::default(),
66 Default::default(),
67 compaction_threshold,
68 )
69 }
70
71 pub fn new(
72 dtype: DType,
73 capacity: usize,
74 completed: CompletedBuffers,
75 growth_strategy: BufferGrowthStrategy,
76 compaction_threshold: f64,
77 ) -> Self {
78 assert!(
79 matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
80 "VarBinViewBuilder DType must be Utf8 or Binary."
81 );
82 Self {
83 views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
84 nulls: LazyBitBufferBuilder::new(capacity),
85 completed,
86 in_progress: ByteBufferMut::empty(),
87 dtype,
88 growth_strategy,
89 compaction_threshold,
90 }
91 }
92
93 fn append_value_view(&mut self, value: &[u8]) {
94 let length =
95 u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
96 if length <= 12 {
97 self.views_builder.push(BinaryView::make_view(value, 0, 0));
98 return;
99 }
100
101 let (buffer_idx, offset) = self.append_value_to_buffer(value);
102 let view = BinaryView::make_view(value, buffer_idx, offset);
103 self.views_builder.push(view);
104 }
105
106 pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
108 self.append_value_view(value.as_ref());
109 self.nulls.append_non_null();
110 }
111
112 fn flush_in_progress(&mut self) {
113 if self.in_progress.is_empty() {
114 return;
115 }
116 let block = std::mem::take(&mut self.in_progress).freeze();
117
118 assert!(block.len() < u32::MAX as usize, "Block too large");
119
120 let initial_len = self.completed.len();
121 self.completed.push(block);
122 assert_eq!(
123 self.completed.len(),
124 initial_len + 1,
125 "Invalid state, just completed block already exists"
126 );
127 }
128
129 fn append_value_to_buffer(&mut self, value: &[u8]) -> (u32, u32) {
131 assert!(value.len() > 12, "must inline small strings");
132 let required_cap = self.in_progress.len() + value.len();
133 if self.in_progress.capacity() < required_cap {
134 self.flush_in_progress();
135 let next_buffer_size = self.growth_strategy.next_size() as usize;
136 let to_reserve = next_buffer_size.max(value.len());
137 self.in_progress.reserve(to_reserve);
138 }
139
140 let buffer_idx = self.completed.len();
141 let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
142 self.in_progress.extend_from_slice(value);
143
144 (buffer_idx, offset)
145 }
146
147 pub fn completed_block_count(&self) -> u32 {
148 self.completed.len()
149 }
150
151 pub fn push_buffer_and_adjusted_views(
168 &mut self,
169 buffer: &[ByteBuffer],
170 views: &Buffer<BinaryView>,
171 validity_mask: Mask,
172 ) {
173 self.flush_in_progress();
174
175 let expected_completed_len = self.completed.len() as usize + buffer.len();
176 self.completed.extend_from_slice_unchecked(buffer);
177 assert_eq!(
178 self.completed.len() as usize,
179 expected_completed_len,
180 "Some buffers already exist",
181 );
182 self.views_builder.extend_trusted(views.iter().copied());
183 self.push_only_validity_mask(validity_mask);
184
185 debug_assert_eq!(self.nulls.len(), self.views_builder.len())
186 }
187
188 pub fn finish_into_varbinview(&mut self) -> VarBinViewArray {
190 self.flush_in_progress();
191 let buffers = std::mem::take(&mut self.completed);
192
193 assert_eq!(
194 self.views_builder.len(),
195 self.nulls.len(),
196 "View and validity length must match"
197 );
198
199 let validity = self.nulls.finish_with_nullability(self.dtype.nullability());
200
201 unsafe {
203 VarBinViewArray::new_unchecked(
204 std::mem::take(&mut self.views_builder).freeze(),
205 buffers.finish(),
206 self.dtype.clone(),
207 validity,
208 )
209 }
210 }
211
212 fn push_only_validity_mask(&mut self, validity_mask: Mask) {
214 self.nulls.append_validity_mask(validity_mask);
215 }
216}
217
218impl ArrayBuilder for VarBinViewBuilder {
219 fn as_any(&self) -> &dyn Any {
220 self
221 }
222
223 fn as_any_mut(&mut self) -> &mut dyn Any {
224 self
225 }
226
227 fn dtype(&self) -> &DType {
228 &self.dtype
229 }
230
231 fn len(&self) -> usize {
232 self.nulls.len()
233 }
234
235 fn append_zeros(&mut self, n: usize) {
236 self.views_builder.push_n(BinaryView::empty_view(), n);
237 self.nulls.append_n_non_nulls(n);
238 }
239
240 unsafe fn append_nulls_unchecked(&mut self, n: usize) {
241 self.views_builder.push_n(BinaryView::empty_view(), n);
242 self.nulls.append_n_nulls(n);
243 }
244
245 fn append_scalar(&mut self, scalar: &Scalar) -> VortexResult<()> {
246 vortex_ensure!(
247 scalar.dtype() == self.dtype(),
248 "VarBinViewBuilder expected scalar with dtype {:?}, got {:?}",
249 self.dtype(),
250 scalar.dtype()
251 );
252
253 match self.dtype() {
254 DType::Utf8(_) => {
255 let utf8_scalar = Utf8Scalar::try_from(scalar)?;
256 match utf8_scalar.value() {
257 Some(value) => self.append_value(value),
258 None => self.append_null(),
259 }
260 }
261 DType::Binary(_) => {
262 let binary_scalar = BinaryScalar::try_from(scalar)?;
263 match binary_scalar.value() {
264 Some(value) => self.append_value(value),
265 None => self.append_null(),
266 }
267 }
268 _ => vortex_bail!(
269 "VarBinViewBuilder can only handle Utf8 or Binary scalars, got {:?}",
270 scalar.dtype()
271 ),
272 }
273
274 Ok(())
275 }
276
277 unsafe fn extend_from_array_unchecked(&mut self, array: &dyn Array) {
278 let array = array.to_varbinview();
279 self.flush_in_progress();
280
281 self.push_only_validity_mask(array.validity_mask());
282
283 let view_adjustment =
284 self.completed
285 .extend_from_compaction(BuffersWithOffsets::from_array(
286 &array,
287 self.compaction_threshold,
288 ));
289
290 match view_adjustment {
291 ViewAdjustment::Precomputed(adjustment) => self.views_builder.extend_trusted(
292 array
293 .views()
294 .iter()
295 .map(|view| adjustment.adjust_view(view)),
296 ),
297 ViewAdjustment::Rewriting(adjustment) => {
298 for (idx, &view) in array.views().iter().enumerate() {
299 let new_view = if !array.is_valid(idx) {
300 BinaryView::empty_view()
301 } else if view.is_inlined() {
302 view
303 } else if let Some(adjusted) = adjustment.adjust_view(&view) {
304 adjusted
305 } else {
306 let bytes = array.bytes_at(idx);
307 let (new_buf_idx, new_offset) = self.append_value_to_buffer(&bytes);
308 BinaryView::make_view(bytes.as_slice(), new_buf_idx, new_offset)
309 };
310 self.views_builder.push(new_view)
311 }
312 }
313 }
314 }
315
316 fn reserve_exact(&mut self, additional: usize) {
317 self.views_builder.reserve(additional);
318 self.nulls.reserve_exact(additional);
319 }
320
321 unsafe fn set_validity_unchecked(&mut self, validity: Mask) {
322 self.nulls = LazyBitBufferBuilder::new(validity.len());
323 self.nulls.append_validity_mask(validity);
324 }
325
326 fn finish(&mut self) -> ArrayRef {
327 self.finish_into_varbinview().into_array()
328 }
329
330 fn finish_into_canonical(&mut self) -> Canonical {
331 Canonical::VarBinView(self.finish_into_varbinview())
332 }
333}
334
335pub enum CompletedBuffers {
336 Default(Vec<ByteBuffer>),
337 Deduplicated(DeduplicatedBuffers),
338}
339
340impl Default for CompletedBuffers {
341 fn default() -> Self {
342 Self::Default(Vec::new())
343 }
344}
345
346#[allow(clippy::cast_possible_truncation)]
348impl CompletedBuffers {
349 fn len(&self) -> u32 {
350 match self {
351 Self::Default(buffers) => buffers.len() as u32,
352 Self::Deduplicated(buffers) => buffers.len(),
353 }
354 }
355
356 fn push(&mut self, block: ByteBuffer) -> u32 {
357 match self {
358 Self::Default(buffers) => {
359 assert!(buffers.len() < u32::MAX as usize, "Too many blocks");
360 buffers.push(block);
361 self.len()
362 }
363 Self::Deduplicated(buffers) => buffers.push(block),
364 }
365 }
366
367 fn extend_from_slice_unchecked(&mut self, buffers: &[ByteBuffer]) {
369 for buffer in buffers {
370 self.push(buffer.clone());
371 }
372 }
373
374 fn extend_from_compaction(&mut self, buffers: BuffersWithOffsets) -> ViewAdjustment {
375 match (self, buffers) {
376 (
377 Self::Default(completed_buffers),
378 BuffersWithOffsets::AllKept { buffers, offsets },
379 ) => {
380 let buffer_offset = completed_buffers.len() as u32;
381 completed_buffers.extend_from_slice(&buffers);
382 ViewAdjustment::shift(buffer_offset, offsets)
383 }
384 (
385 Self::Default(completed_buffers),
386 BuffersWithOffsets::SomeCompacted { buffers, offsets },
387 ) => {
388 let lookup = buffers
389 .iter()
390 .map(|maybe_buffer| {
391 maybe_buffer.as_ref().map(|buffer| {
392 completed_buffers.push(buffer.clone());
393 completed_buffers.len() as u32 - 1
394 })
395 })
396 .collect();
397 ViewAdjustment::rewriting(lookup, offsets)
398 }
399
400 (
401 Self::Deduplicated(completed_buffers),
402 BuffersWithOffsets::AllKept { buffers, offsets },
403 ) => {
404 let buffer_lookup = completed_buffers.extend_from_slice(&buffers);
405 ViewAdjustment::lookup(buffer_lookup, offsets)
406 }
407 (
408 Self::Deduplicated(completed_buffers),
409 BuffersWithOffsets::SomeCompacted { buffers, offsets },
410 ) => {
411 let buffer_lookup = completed_buffers.extend_from_option_slice(&buffers);
412 ViewAdjustment::rewriting(buffer_lookup, offsets)
413 }
414 }
415 }
416
417 fn finish(self) -> Arc<[ByteBuffer]> {
418 match self {
419 Self::Default(buffers) => Arc::from(buffers),
420 Self::Deduplicated(buffers) => buffers.finish(),
421 }
422 }
423}
424
425#[derive(Default)]
426pub struct DeduplicatedBuffers {
427 buffers: Vec<ByteBuffer>,
428 buffer_to_idx: HashMap<BufferId, u32>,
429}
430
431impl DeduplicatedBuffers {
432 #[allow(clippy::cast_possible_truncation)]
434 fn len(&self) -> u32 {
435 self.buffers.len() as u32
436 }
437
438 pub(crate) fn push(&mut self, block: ByteBuffer) -> u32 {
440 assert!(self.buffers.len() < u32::MAX as usize, "Too many blocks");
441
442 let initial_len = self.len();
443 let id = BufferId::from(&block);
444 match self.buffer_to_idx.entry(id) {
445 Entry::Occupied(idx) => *idx.get(),
446 Entry::Vacant(entry) => {
447 let idx = initial_len;
448 entry.insert(idx);
449 self.buffers.push(block);
450 idx
451 }
452 }
453 }
454
455 pub(crate) fn extend_from_option_slice(
456 &mut self,
457 buffers: &[Option<ByteBuffer>],
458 ) -> Vec<Option<u32>> {
459 buffers
460 .iter()
461 .map(|buffer| buffer.as_ref().map(|buf| self.push(buf.clone())))
462 .collect()
463 }
464
465 pub(crate) fn extend_from_slice(&mut self, buffers: &[ByteBuffer]) -> Vec<u32> {
466 buffers
467 .iter()
468 .map(|buffer| self.push(buffer.clone()))
469 .collect()
470 }
471
472 pub(crate) fn finish(self) -> Arc<[ByteBuffer]> {
473 Arc::from(self.buffers)
474 }
475}
476
477#[derive(PartialEq, Eq, Hash)]
478struct BufferId {
479 ptr: usize,
481 len: usize,
482}
483
484impl BufferId {
485 fn from(buffer: &ByteBuffer) -> Self {
486 let slice = buffer.as_slice();
487 Self {
488 ptr: slice.as_ptr() as usize,
489 len: slice.len(),
490 }
491 }
492}
493
494#[derive(Debug, Clone)]
495pub enum BufferGrowthStrategy {
496 Fixed { size: u32 },
498 Exponential { current_size: u32, max_size: u32 },
500}
501
502impl Default for BufferGrowthStrategy {
503 fn default() -> Self {
504 Self::Exponential {
505 current_size: 4 * 1024, max_size: 2 * 1024 * 1024, }
508 }
509}
510
511impl BufferGrowthStrategy {
512 pub fn fixed(size: u32) -> Self {
513 Self::Fixed { size }
514 }
515
516 pub fn exponential(initial_size: u32, max_size: u32) -> Self {
517 Self::Exponential {
518 current_size: initial_size,
519 max_size,
520 }
521 }
522
523 pub fn next_size(&mut self) -> u32 {
525 match self {
526 Self::Fixed { size } => *size,
527 Self::Exponential {
528 current_size,
529 max_size,
530 } => {
531 let result = *current_size;
532 if *current_size < *max_size {
533 *current_size = current_size.saturating_mul(2).min(*max_size);
534 }
535 result
536 }
537 }
538 }
539}
540
541enum BuffersWithOffsets {
542 AllKept {
543 buffers: Arc<[ByteBuffer]>,
544 offsets: Option<Vec<u32>>,
545 },
546 SomeCompacted {
547 buffers: Vec<Option<ByteBuffer>>,
548 offsets: Option<Vec<u32>>,
549 },
550}
551
552impl BuffersWithOffsets {
553 pub fn from_array(array: &VarBinViewArray, compaction_threshold: f64) -> Self {
554 if compaction_threshold == 0.0 {
555 return Self::AllKept {
556 buffers: array.buffers().clone(),
557 offsets: None,
558 };
559 }
560
561 let buffer_utilizations = array.buffer_utilizations();
562 let mut has_rewrite = false;
563 let mut has_nonzero_offset = false;
564 for utilization in buffer_utilizations.iter() {
565 match compaction_strategy(utilization, compaction_threshold) {
566 CompactionStrategy::KeepFull => continue,
567 CompactionStrategy::Slice { .. } => has_nonzero_offset = true,
568 CompactionStrategy::Rewrite => has_rewrite = true,
569 }
570 }
571
572 let buffers_with_offsets_iter =
573 buffer_utilizations
574 .iter()
575 .zip(array.buffers().iter())
576 .map(|(utilization, buffer)| {
577 match compaction_strategy(utilization, compaction_threshold) {
578 CompactionStrategy::KeepFull => (Some(buffer.clone()), 0),
579 CompactionStrategy::Slice { start, end } => {
580 (Some(buffer.slice(start as usize..end as usize)), start)
581 }
582 CompactionStrategy::Rewrite => (None, 0),
583 }
584 });
585
586 match (has_rewrite, has_nonzero_offset) {
587 (false, false) => {
589 let buffers: Vec<_> = buffers_with_offsets_iter
590 .map(|(b, _)| b.vortex_expect("already checked for rewrite"))
591 .collect();
592 Self::AllKept {
593 buffers: Arc::from(buffers),
594 offsets: None,
595 }
596 }
597 (true, false) => {
599 let buffers: Vec<_> = buffers_with_offsets_iter.map(|(b, _)| b).collect();
600 Self::SomeCompacted {
601 buffers,
602 offsets: None,
603 }
604 }
605 (false, true) => {
607 let (buffers, offsets): (Vec<_>, _) = buffers_with_offsets_iter
608 .map(|(buffer, offset)| {
609 (buffer.vortex_expect("already checked for rewrite"), offset)
610 })
611 .collect();
612 Self::AllKept {
613 buffers: Arc::from(buffers),
614 offsets: Some(offsets),
615 }
616 }
617 (true, true) => {
619 let (buffers, offsets) = buffers_with_offsets_iter.collect();
620 Self::SomeCompacted {
621 buffers,
622 offsets: Some(offsets),
623 }
624 }
625 }
626 }
627}
628
629#[derive(Debug, Clone, Copy, PartialEq, Eq)]
630enum CompactionStrategy {
631 KeepFull,
632 Slice {
634 start: u32,
635 end: u32,
636 },
637 Rewrite,
639}
640
641fn compaction_strategy(
642 buffer_utilization: &BufferUtilization,
643 threshold: f64,
644) -> CompactionStrategy {
645 match buffer_utilization.overall_utilization() {
646 0.0 => CompactionStrategy::Rewrite,
648 utilised if utilised >= threshold => CompactionStrategy::KeepFull,
649 _ if buffer_utilization.range_utilization() >= threshold => {
650 let Range { start, end } = buffer_utilization.range();
651 CompactionStrategy::Slice { start, end }
652 }
653 _ => CompactionStrategy::Rewrite,
654 }
655}
656
657enum ViewAdjustment {
658 Precomputed(PrecomputedViewAdjustment),
659 Rewriting(RewritingViewAdjustment),
660}
661
662impl ViewAdjustment {
663 fn shift(buffer_offset: u32, offsets: Option<Vec<u32>>) -> Self {
664 Self::Precomputed(PrecomputedViewAdjustment::Shift {
665 buffer_offset,
666 offsets,
667 })
668 }
669
670 fn lookup(buffer_lookup: Vec<u32>, offsets: Option<Vec<u32>>) -> Self {
671 Self::Precomputed(PrecomputedViewAdjustment::Lookup {
672 buffer_lookup,
673 offsets,
674 })
675 }
676
677 fn rewriting(buffer_lookup: Vec<Option<u32>>, offsets: Option<Vec<u32>>) -> Self {
678 Self::Rewriting(RewritingViewAdjustment {
679 buffer_lookup,
680 offsets,
681 })
682 }
683}
684
685enum PrecomputedViewAdjustment {
687 Shift {
688 buffer_offset: u32,
689 offsets: Option<Vec<u32>>,
690 },
691 Lookup {
692 buffer_lookup: Vec<u32>,
693 offsets: Option<Vec<u32>>,
694 },
695}
696
697impl PrecomputedViewAdjustment {
698 #[inline]
699 fn adjust_view(&self, view: &BinaryView) -> BinaryView {
700 if view.is_inlined() {
701 return *view;
702 }
703 let view_ref = view.as_view();
704 match self {
705 Self::Shift {
706 buffer_offset,
707 offsets,
708 } => {
709 let b_idx = view_ref.buffer_index;
710 let offset_shift = offsets
711 .as_ref()
712 .map(|o| o[b_idx as usize])
713 .unwrap_or_default();
714
715 if view_ref.offset < offset_shift {
718 return BinaryView::empty_view();
719 }
720
721 view_ref
722 .with_buffer_and_offset(b_idx + buffer_offset, view_ref.offset - offset_shift)
723 }
724 Self::Lookup {
725 buffer_lookup,
726 offsets,
727 } => {
728 let b_idx = view_ref.buffer_index;
729 let buffer = buffer_lookup[b_idx as usize];
730 let offset_shift = offsets
731 .as_ref()
732 .map(|o| o[b_idx as usize])
733 .unwrap_or_default();
734
735 if view_ref.offset < offset_shift {
738 return BinaryView::empty_view();
739 }
740
741 view_ref.with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
742 }
743 }
744 .into()
745 }
746}
747
748struct RewritingViewAdjustment {
749 buffer_lookup: Vec<Option<u32>>,
750 offsets: Option<Vec<u32>>,
751}
752
753impl RewritingViewAdjustment {
754 fn adjust_view(&self, view: &BinaryView) -> Option<BinaryView> {
757 if view.is_inlined() {
758 return Some(*view);
759 }
760
761 let view_ref = view.as_view();
762 self.buffer_lookup[view_ref.buffer_index as usize].map(|buffer| {
763 let offset_shift = self
764 .offsets
765 .as_ref()
766 .map(|o| o[view_ref.buffer_index as usize])
767 .unwrap_or_default();
768 view_ref
769 .with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
770 .into()
771 })
772 }
773}
774
775#[cfg(test)]
776mod tests {
777 use vortex_dtype::DType;
778 use vortex_dtype::Nullability;
779
780 use crate::IntoArray;
781 use crate::arrays::VarBinViewArray;
782 use crate::assert_arrays_eq;
783 use crate::builders::ArrayBuilder;
784 use crate::builders::VarBinViewBuilder;
785
786 #[test]
787 fn test_utf8_builder() {
788 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
789
790 builder.append_value("Hello");
791 builder.append_null();
792 builder.append_value("World");
793
794 builder.append_nulls(2);
795
796 builder.append_zeros(2);
797 builder.append_value("test");
798
799 let actual = builder.finish();
800 let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
801 Some("Hello"),
802 None,
803 Some("World"),
804 None,
805 None,
806 Some(""),
807 Some(""),
808 Some("test"),
809 ]);
810 assert_arrays_eq!(actual, expected);
811 }
812
813 #[test]
814 fn test_utf8_builder_with_extend() {
815 let array = {
816 let mut builder =
817 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
818 builder.append_null();
819 builder.append_value("Hello2");
820 builder.finish()
821 };
822 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
823
824 builder.append_value("Hello1");
825 builder.extend_from_array(&array);
826 builder.append_nulls(2);
827 builder.append_value("Hello3");
828
829 let actual = builder.finish_into_canonical();
830 let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
831 Some("Hello1"),
832 None,
833 Some("Hello2"),
834 None,
835 None,
836 Some("Hello3"),
837 ]);
838 assert_arrays_eq!(actual.into_array(), expected.into_array());
839 }
840
841 #[test]
842 fn test_buffer_deduplication() {
843 let array = {
844 let mut builder =
845 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
846 builder.append_value("This is a long string that should not be inlined");
847 builder.append_value("short string");
848 builder.finish_into_varbinview()
849 };
850
851 assert_eq!(array.buffers().len(), 1);
852 let mut builder =
853 VarBinViewBuilder::with_buffer_deduplication(DType::Utf8(Nullability::Nullable), 10);
854
855 array.append_to_builder(&mut builder);
856 assert_eq!(builder.completed_block_count(), 1);
857
858 array.slice(1..2).append_to_builder(&mut builder);
859 array.slice(0..1).append_to_builder(&mut builder);
860 assert_eq!(builder.completed_block_count(), 1);
861
862 let array2 = {
863 let mut builder =
864 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
865 builder.append_value("This is a long string that should not be inlined");
866 builder.finish_into_varbinview()
867 };
868
869 array2.append_to_builder(&mut builder);
870 assert_eq!(builder.completed_block_count(), 2);
871
872 array.slice(0..1).append_to_builder(&mut builder);
873 array2.slice(0..1).append_to_builder(&mut builder);
874 assert_eq!(builder.completed_block_count(), 2);
875 }
876
877 #[test]
878 fn test_append_scalar() {
879 use vortex_scalar::Scalar;
880
881 let mut utf8_builder =
883 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
884
885 let utf8_scalar1 = Scalar::utf8("hello", Nullability::Nullable);
887 utf8_builder.append_scalar(&utf8_scalar1).unwrap();
888
889 let utf8_scalar2 = Scalar::utf8("world", Nullability::Nullable);
891 utf8_builder.append_scalar(&utf8_scalar2).unwrap();
892
893 let null_scalar = Scalar::null(DType::Utf8(Nullability::Nullable));
895 utf8_builder.append_scalar(&null_scalar).unwrap();
896
897 let array = utf8_builder.finish();
898 let expected =
899 <VarBinViewArray as FromIterator<_>>::from_iter([Some("hello"), Some("world"), None]);
900 assert_arrays_eq!(&array, &expected);
901
902 let mut binary_builder =
904 VarBinViewBuilder::with_capacity(DType::Binary(Nullability::Nullable), 10);
905
906 let binary_scalar = Scalar::binary(vec![1u8, 2, 3], Nullability::Nullable);
907 binary_builder.append_scalar(&binary_scalar).unwrap();
908
909 let binary_null = Scalar::null(DType::Binary(Nullability::Nullable));
910 binary_builder.append_scalar(&binary_null).unwrap();
911
912 let binary_array = binary_builder.finish();
913 let expected =
914 <VarBinViewArray as FromIterator<_>>::from_iter([Some(vec![1u8, 2, 3]), None]);
915 assert_arrays_eq!(&binary_array, &expected);
916
917 let mut builder =
919 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::NonNullable), 10);
920 let wrong_scalar = Scalar::from(42i32);
921 assert!(builder.append_scalar(&wrong_scalar).is_err());
922 }
923
924 #[test]
925 fn test_buffer_growth_strategies() {
926 use super::BufferGrowthStrategy;
927
928 let mut strategy = BufferGrowthStrategy::fixed(1024);
930
931 assert_eq!(strategy.next_size(), 1024);
933 assert_eq!(strategy.next_size(), 1024);
934 assert_eq!(strategy.next_size(), 1024);
935
936 let mut strategy = BufferGrowthStrategy::exponential(1024, 8192);
938
939 assert_eq!(strategy.next_size(), 1024); assert_eq!(strategy.next_size(), 2048); assert_eq!(strategy.next_size(), 4096); assert_eq!(strategy.next_size(), 8192); assert_eq!(strategy.next_size(), 8192); }
946
947 #[test]
948 fn test_large_value_allocation() {
949 use super::BufferGrowthStrategy;
950 use super::VarBinViewBuilder;
951
952 let mut builder = VarBinViewBuilder::new(
953 DType::Binary(Nullability::Nullable),
954 10,
955 Default::default(),
956 BufferGrowthStrategy::exponential(1024, 4096),
957 0.0,
958 );
959
960 let large_value = vec![0u8; 8192];
962
963 builder.append_value(&large_value);
965
966 let array = builder.finish_into_varbinview();
967 assert_eq!(array.len(), 1);
968
969 let retrieved = array.scalar_at(0).as_binary().value().unwrap();
971 assert_eq!(retrieved.len(), 8192);
972 assert_eq!(retrieved.as_slice(), &large_value);
973 }
974}