1use std::any::Any;
5use std::ops::Range;
6use std::sync::Arc;
7
8use itertools::Itertools;
9use vortex_buffer::Buffer;
10use vortex_buffer::BufferMut;
11use vortex_buffer::ByteBuffer;
12use vortex_buffer::ByteBufferMut;
13use vortex_error::VortexExpect;
14use vortex_error::VortexResult;
15use vortex_error::vortex_bail;
16use vortex_error::vortex_ensure;
17use vortex_mask::Mask;
18use vortex_utils::aliases::hash_map::Entry;
19use vortex_utils::aliases::hash_map::HashMap;
20
21use crate::ArrayRef;
22use crate::IntoArray;
23use crate::arrays::BinaryView;
24use crate::arrays::VarBinViewArray;
25use crate::arrays::compact::BufferUtilization;
26use crate::builders::ArrayBuilder;
27use crate::builders::LazyBitBufferBuilder;
28use crate::canonical::Canonical;
29use crate::canonical::ToCanonical;
30use crate::dtype::DType;
31use crate::scalar::Scalar;
32
33pub struct VarBinViewBuilder {
35 dtype: DType,
36 views_builder: BufferMut<BinaryView>,
37 nulls: LazyBitBufferBuilder,
38 completed: CompletedBuffers,
39 in_progress: ByteBufferMut,
40 growth_strategy: BufferGrowthStrategy,
41 compaction_threshold: f64,
42}
43
44impl VarBinViewBuilder {
45 pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
46 Self::new(dtype, capacity, Default::default(), Default::default(), 0.0)
47 }
48
49 pub fn with_buffer_deduplication(dtype: DType, capacity: usize) -> Self {
50 Self::new(
51 dtype,
52 capacity,
53 CompletedBuffers::Deduplicated(Default::default()),
54 Default::default(),
55 0.0,
56 )
57 }
58
59 pub fn with_compaction(dtype: DType, capacity: usize, compaction_threshold: f64) -> Self {
60 Self::new(
61 dtype,
62 capacity,
63 Default::default(),
64 Default::default(),
65 compaction_threshold,
66 )
67 }
68
69 pub fn new(
70 dtype: DType,
71 capacity: usize,
72 completed: CompletedBuffers,
73 growth_strategy: BufferGrowthStrategy,
74 compaction_threshold: f64,
75 ) -> Self {
76 assert!(
77 matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
78 "VarBinViewBuilder DType must be Utf8 or Binary."
79 );
80 Self {
81 views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
82 nulls: LazyBitBufferBuilder::new(capacity),
83 completed,
84 in_progress: ByteBufferMut::empty(),
85 dtype,
86 growth_strategy,
87 compaction_threshold,
88 }
89 }
90
91 fn append_value_view(&mut self, value: &[u8]) {
92 let length =
93 u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
94 if length <= 12 {
95 self.views_builder.push(BinaryView::make_view(value, 0, 0));
96 return;
97 }
98
99 let (buffer_idx, offset) = self.append_value_to_buffer(value);
100 let view = BinaryView::make_view(value, buffer_idx, offset);
101 self.views_builder.push(view);
102 }
103
104 pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
106 self.append_value_view(value.as_ref());
107 self.nulls.append_non_null();
108 }
109
110 fn flush_in_progress(&mut self) {
111 if self.in_progress.is_empty() {
112 return;
113 }
114 let block = std::mem::take(&mut self.in_progress).freeze();
115
116 assert!(block.len() < u32::MAX as usize, "Block too large");
117
118 let initial_len = self.completed.len();
119 self.completed.push(block);
120 assert_eq!(
121 self.completed.len(),
122 initial_len + 1,
123 "Invalid state, just completed block already exists"
124 );
125 }
126
127 fn append_value_to_buffer(&mut self, value: &[u8]) -> (u32, u32) {
129 assert!(value.len() > 12, "must inline small strings");
130 let required_cap = self.in_progress.len() + value.len();
131 if self.in_progress.capacity() < required_cap {
132 self.flush_in_progress();
133 let next_buffer_size = self.growth_strategy.next_size() as usize;
134 let to_reserve = next_buffer_size.max(value.len());
135 self.in_progress.reserve(to_reserve);
136 }
137
138 let buffer_idx = self.completed.len();
139 let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
140 self.in_progress.extend_from_slice(value);
141
142 (buffer_idx, offset)
143 }
144
145 pub fn completed_block_count(&self) -> u32 {
146 self.completed.len()
147 }
148
149 pub fn push_buffer_and_adjusted_views(
166 &mut self,
167 buffers: &[ByteBuffer],
168 views: &Buffer<BinaryView>,
169 validity_mask: Mask,
170 ) {
171 self.flush_in_progress();
172
173 let expected_completed_len = self.completed.len() as usize + buffers.len();
174 self.completed.extend_from_slice_unchecked(buffers);
175 assert_eq!(
176 self.completed.len() as usize,
177 expected_completed_len,
178 "Some buffers already exist",
179 );
180 self.views_builder.extend_trusted(views.iter().copied());
181 self.push_only_validity_mask(validity_mask);
182
183 debug_assert_eq!(self.nulls.len(), self.views_builder.len())
184 }
185
186 pub fn finish_into_varbinview(&mut self) -> VarBinViewArray {
188 self.flush_in_progress();
189 let buffers = std::mem::take(&mut self.completed);
190
191 assert_eq!(
192 self.views_builder.len(),
193 self.nulls.len(),
194 "View and validity length must match"
195 );
196
197 let validity = self.nulls.finish_with_nullability(self.dtype.nullability());
198
199 unsafe {
201 VarBinViewArray::new_unchecked(
202 std::mem::take(&mut self.views_builder).freeze(),
203 buffers.finish(),
204 self.dtype.clone(),
205 validity,
206 )
207 }
208 }
209
210 fn push_only_validity_mask(&mut self, validity_mask: Mask) {
212 self.nulls.append_validity_mask(validity_mask);
213 }
214}
215
216impl ArrayBuilder for VarBinViewBuilder {
217 fn as_any(&self) -> &dyn Any {
218 self
219 }
220
221 fn as_any_mut(&mut self) -> &mut dyn Any {
222 self
223 }
224
225 fn dtype(&self) -> &DType {
226 &self.dtype
227 }
228
229 fn len(&self) -> usize {
230 self.nulls.len()
231 }
232
233 fn append_zeros(&mut self, n: usize) {
234 self.views_builder.push_n(BinaryView::empty_view(), n);
235 self.nulls.append_n_non_nulls(n);
236 }
237
238 unsafe fn append_nulls_unchecked(&mut self, n: usize) {
239 self.views_builder.push_n(BinaryView::empty_view(), n);
240 self.nulls.append_n_nulls(n);
241 }
242
243 fn append_scalar(&mut self, scalar: &Scalar) -> VortexResult<()> {
244 vortex_ensure!(
245 scalar.dtype() == self.dtype(),
246 "VarBinViewBuilder expected scalar with dtype {}, got {}",
247 self.dtype(),
248 scalar.dtype()
249 );
250
251 match self.dtype() {
252 DType::Utf8(_) => match scalar.as_utf8().value() {
253 Some(value) => self.append_value(value),
254 None => self.append_null(),
255 },
256 DType::Binary(_) => match scalar.as_binary().value() {
257 Some(value) => self.append_value(value),
258 None => self.append_null(),
259 },
260 _ => vortex_bail!(
261 "VarBinViewBuilder can only handle Utf8 or Binary scalars, got {:?}",
262 scalar.dtype()
263 ),
264 }
265
266 Ok(())
267 }
268
269 unsafe fn extend_from_array_unchecked(&mut self, array: &ArrayRef) {
270 let array = array.to_varbinview();
271 self.flush_in_progress();
272
273 self.push_only_validity_mask(
274 array
275 .validity_mask()
276 .vortex_expect("validity_mask in extend_from_array_unchecked"),
277 );
278
279 let view_adjustment =
280 self.completed
281 .extend_from_compaction(BuffersWithOffsets::from_array(
282 &array,
283 self.compaction_threshold,
284 ));
285
286 match view_adjustment {
287 ViewAdjustment::Precomputed(adjustment) => self.views_builder.extend_trusted(
288 array
289 .views()
290 .iter()
291 .map(|view| adjustment.adjust_view(view)),
292 ),
293 ViewAdjustment::Rewriting(adjustment) => match array
294 .validity_mask()
295 .vortex_expect("validity_mask in extend_from_array_unchecked")
296 {
297 Mask::AllTrue(_) => {
298 for (idx, &view) in array.views().iter().enumerate() {
299 let new_view = self.push_view(view, &adjustment, &array, idx);
300 self.views_builder.push(new_view);
301 }
302 }
303 Mask::AllFalse(_) => {
304 self.views_builder
305 .push_n(BinaryView::empty_view(), array.len());
306 }
307 Mask::Values(v) => {
308 for (idx, (&view, is_valid)) in
309 array.views().iter().zip(v.bit_buffer().iter()).enumerate()
310 {
311 let new_view = if !is_valid {
312 BinaryView::empty_view()
313 } else {
314 self.push_view(view, &adjustment, &array, idx)
315 };
316 self.views_builder.push(new_view);
317 }
318 }
319 },
320 }
321 }
322
323 fn reserve_exact(&mut self, additional: usize) {
324 self.views_builder.reserve(additional);
325 self.nulls.reserve_exact(additional);
326 }
327
328 unsafe fn set_validity_unchecked(&mut self, validity: Mask) {
329 self.nulls = LazyBitBufferBuilder::new(validity.len());
330 self.nulls.append_validity_mask(validity);
331 }
332
333 fn finish(&mut self) -> ArrayRef {
334 self.finish_into_varbinview().into_array()
335 }
336
337 fn finish_into_canonical(&mut self) -> Canonical {
338 Canonical::VarBinView(self.finish_into_varbinview())
339 }
340}
341
342impl VarBinViewBuilder {
343 #[inline]
344 fn push_view(
345 &mut self,
346 view: BinaryView,
347 adjustment: &RewritingViewAdjustment,
348 array: &VarBinViewArray,
349 idx: usize,
350 ) -> BinaryView {
351 if view.is_inlined() {
352 view
353 } else if let Some(adjusted) = adjustment.adjust_view(&view) {
354 adjusted
355 } else {
356 let bytes = array.bytes_at(idx);
357 let (new_buf_idx, new_offset) = self.append_value_to_buffer(&bytes);
358 BinaryView::make_view(bytes.as_slice(), new_buf_idx, new_offset)
359 }
360 }
361}
362
363pub enum CompletedBuffers {
364 Default(Vec<ByteBuffer>),
365 Deduplicated(DeduplicatedBuffers),
366}
367
368impl Default for CompletedBuffers {
369 fn default() -> Self {
370 Self::Default(Vec::new())
371 }
372}
373
374#[allow(clippy::cast_possible_truncation)]
376impl CompletedBuffers {
377 fn len(&self) -> u32 {
378 match self {
379 Self::Default(buffers) => buffers.len() as u32,
380 Self::Deduplicated(buffers) => buffers.len(),
381 }
382 }
383
384 fn push(&mut self, block: ByteBuffer) -> u32 {
385 match self {
386 Self::Default(buffers) => {
387 assert!(buffers.len() < u32::MAX as usize, "Too many blocks");
388 buffers.push(block);
389 self.len()
390 }
391 Self::Deduplicated(buffers) => buffers.push(block),
392 }
393 }
394
395 fn extend_from_slice_unchecked(&mut self, buffers: &[ByteBuffer]) {
397 for buffer in buffers {
398 self.push(buffer.clone());
399 }
400 }
401
402 fn extend_from_compaction(&mut self, buffers: BuffersWithOffsets) -> ViewAdjustment {
403 match (self, buffers) {
404 (
405 Self::Default(completed_buffers),
406 BuffersWithOffsets::AllKept { buffers, offsets },
407 ) => {
408 let buffer_offset = completed_buffers.len() as u32;
409 completed_buffers.extend_from_slice(&buffers);
410 ViewAdjustment::shift(buffer_offset, offsets)
411 }
412 (
413 Self::Default(completed_buffers),
414 BuffersWithOffsets::SomeCompacted { buffers, offsets },
415 ) => {
416 let lookup = buffers
417 .iter()
418 .map(|maybe_buffer| {
419 maybe_buffer.as_ref().map(|buffer| {
420 completed_buffers.push(buffer.clone());
421 completed_buffers.len() as u32 - 1
422 })
423 })
424 .collect();
425 ViewAdjustment::rewriting(lookup, offsets)
426 }
427
428 (
429 Self::Deduplicated(completed_buffers),
430 BuffersWithOffsets::AllKept { buffers, offsets },
431 ) => {
432 let buffer_lookup = completed_buffers.extend_from_iter(buffers.iter().cloned());
433 ViewAdjustment::lookup(buffer_lookup, offsets)
434 }
435 (
436 Self::Deduplicated(completed_buffers),
437 BuffersWithOffsets::SomeCompacted { buffers, offsets },
438 ) => {
439 let buffer_lookup = completed_buffers.extend_from_option_slice(&buffers);
440 ViewAdjustment::rewriting(buffer_lookup, offsets)
441 }
442 }
443 }
444
445 fn finish(self) -> Arc<[ByteBuffer]> {
446 match self {
447 Self::Default(buffers) => Arc::from(buffers),
448 Self::Deduplicated(buffers) => buffers.finish(),
449 }
450 }
451}
452
453#[derive(Default)]
454pub struct DeduplicatedBuffers {
455 buffers: Vec<ByteBuffer>,
456 buffer_to_idx: HashMap<BufferId, u32>,
457}
458
459impl DeduplicatedBuffers {
460 #[allow(clippy::cast_possible_truncation)]
462 fn len(&self) -> u32 {
463 self.buffers.len() as u32
464 }
465
466 pub(crate) fn push(&mut self, block: ByteBuffer) -> u32 {
468 assert!(self.buffers.len() < u32::MAX as usize, "Too many blocks");
469
470 let initial_len = self.len();
471 let id = BufferId::from(&block);
472 match self.buffer_to_idx.entry(id) {
473 Entry::Occupied(idx) => *idx.get(),
474 Entry::Vacant(entry) => {
475 let idx = initial_len;
476 entry.insert(idx);
477 self.buffers.push(block);
478 idx
479 }
480 }
481 }
482
483 pub(crate) fn extend_from_option_slice(
484 &mut self,
485 buffers: &[Option<ByteBuffer>],
486 ) -> Vec<Option<u32>> {
487 buffers
488 .iter()
489 .map(|buffer| buffer.as_ref().map(|buf| self.push(buf.clone())))
490 .collect()
491 }
492
493 pub(crate) fn extend_from_iter(
494 &mut self,
495 buffers: impl Iterator<Item = ByteBuffer>,
496 ) -> Vec<u32> {
497 buffers.map(|buffer| self.push(buffer)).collect()
498 }
499
500 pub(crate) fn finish(self) -> Arc<[ByteBuffer]> {
501 Arc::from(self.buffers)
502 }
503}
504
505#[derive(PartialEq, Eq, Hash)]
506struct BufferId {
507 ptr: usize,
509 len: usize,
510}
511
512impl BufferId {
513 fn from(buffer: &ByteBuffer) -> Self {
514 let slice = buffer.as_slice();
515 Self {
516 ptr: slice.as_ptr() as usize,
517 len: slice.len(),
518 }
519 }
520}
521
522#[derive(Debug, Clone)]
523pub enum BufferGrowthStrategy {
524 Fixed { size: u32 },
526 Exponential { current_size: u32, max_size: u32 },
528}
529
530impl Default for BufferGrowthStrategy {
531 fn default() -> Self {
532 Self::Exponential {
533 current_size: 4 * 1024, max_size: 2 * 1024 * 1024, }
536 }
537}
538
539impl BufferGrowthStrategy {
540 pub fn fixed(size: u32) -> Self {
541 Self::Fixed { size }
542 }
543
544 pub fn exponential(initial_size: u32, max_size: u32) -> Self {
545 Self::Exponential {
546 current_size: initial_size,
547 max_size,
548 }
549 }
550
551 pub fn next_size(&mut self) -> u32 {
553 match self {
554 Self::Fixed { size } => *size,
555 Self::Exponential {
556 current_size,
557 max_size,
558 } => {
559 let result = *current_size;
560 if *current_size < *max_size {
561 *current_size = current_size.saturating_mul(2).min(*max_size);
562 }
563 result
564 }
565 }
566 }
567}
568
569enum BuffersWithOffsets {
570 AllKept {
571 buffers: Arc<[ByteBuffer]>,
572 offsets: Option<Vec<u32>>,
573 },
574 SomeCompacted {
575 buffers: Vec<Option<ByteBuffer>>,
576 offsets: Option<Vec<u32>>,
577 },
578}
579
580impl BuffersWithOffsets {
581 pub fn from_array(array: &VarBinViewArray, compaction_threshold: f64) -> Self {
582 if compaction_threshold == 0.0 {
583 return Self::AllKept {
584 buffers: Arc::from(
585 array
586 .buffers()
587 .to_vec()
588 .into_iter()
589 .map(|b| b.unwrap_host())
590 .collect_vec(),
591 ),
592 offsets: None,
593 };
594 }
595
596 let buffer_utilizations = array
597 .buffer_utilizations()
598 .vortex_expect("buffer_utilizations in BuffersWithOffsets::from_array");
599 let mut has_rewrite = false;
600 let mut has_nonzero_offset = false;
601 for utilization in buffer_utilizations.iter() {
602 match compaction_strategy(utilization, compaction_threshold) {
603 CompactionStrategy::KeepFull => continue,
604 CompactionStrategy::Slice { .. } => has_nonzero_offset = true,
605 CompactionStrategy::Rewrite => has_rewrite = true,
606 }
607 }
608
609 let buffers_with_offsets_iter =
610 buffer_utilizations
611 .iter()
612 .zip(array.buffers().iter())
613 .map(|(utilization, buffer)| {
614 match compaction_strategy(utilization, compaction_threshold) {
615 CompactionStrategy::KeepFull => (Some(buffer.as_host().clone()), 0),
616 CompactionStrategy::Slice { start, end } => (
617 Some(buffer.as_host().slice(start as usize..end as usize)),
618 start,
619 ),
620 CompactionStrategy::Rewrite => (None, 0),
621 }
622 });
623
624 match (has_rewrite, has_nonzero_offset) {
625 (false, false) => {
627 let buffers: Vec<_> = buffers_with_offsets_iter
628 .map(|(b, _)| b.vortex_expect("already checked for rewrite"))
629 .collect();
630 Self::AllKept {
631 buffers: Arc::from(buffers),
632 offsets: None,
633 }
634 }
635 (true, false) => {
637 let buffers: Vec<_> = buffers_with_offsets_iter.map(|(b, _)| b).collect();
638 Self::SomeCompacted {
639 buffers,
640 offsets: None,
641 }
642 }
643 (false, true) => {
645 let (buffers, offsets): (Vec<_>, _) = buffers_with_offsets_iter
646 .map(|(buffer, offset)| {
647 (buffer.vortex_expect("already checked for rewrite"), offset)
648 })
649 .collect();
650 Self::AllKept {
651 buffers: Arc::from(buffers),
652 offsets: Some(offsets),
653 }
654 }
655 (true, true) => {
657 let (buffers, offsets) = buffers_with_offsets_iter.collect();
658 Self::SomeCompacted {
659 buffers,
660 offsets: Some(offsets),
661 }
662 }
663 }
664 }
665}
666
667#[derive(Debug, Clone, Copy, PartialEq, Eq)]
668enum CompactionStrategy {
669 KeepFull,
670 Slice {
672 start: u32,
673 end: u32,
674 },
675 Rewrite,
677}
678
679fn compaction_strategy(
680 buffer_utilization: &BufferUtilization,
681 threshold: f64,
682) -> CompactionStrategy {
683 match buffer_utilization.overall_utilization() {
684 0.0 => CompactionStrategy::Rewrite,
686 utilised if utilised >= threshold => CompactionStrategy::KeepFull,
687 _ if buffer_utilization.range_utilization() >= threshold => {
688 let Range { start, end } = buffer_utilization.range();
689 CompactionStrategy::Slice { start, end }
690 }
691 _ => CompactionStrategy::Rewrite,
692 }
693}
694
695enum ViewAdjustment {
696 Precomputed(PrecomputedViewAdjustment),
697 Rewriting(RewritingViewAdjustment),
698}
699
700impl ViewAdjustment {
701 fn shift(buffer_offset: u32, offsets: Option<Vec<u32>>) -> Self {
702 Self::Precomputed(PrecomputedViewAdjustment::Shift {
703 buffer_offset,
704 offsets,
705 })
706 }
707
708 fn lookup(buffer_lookup: Vec<u32>, offsets: Option<Vec<u32>>) -> Self {
709 Self::Precomputed(PrecomputedViewAdjustment::Lookup {
710 buffer_lookup,
711 offsets,
712 })
713 }
714
715 fn rewriting(buffer_lookup: Vec<Option<u32>>, offsets: Option<Vec<u32>>) -> Self {
716 Self::Rewriting(RewritingViewAdjustment {
717 buffer_lookup,
718 offsets,
719 })
720 }
721}
722
723enum PrecomputedViewAdjustment {
725 Shift {
726 buffer_offset: u32,
727 offsets: Option<Vec<u32>>,
728 },
729 Lookup {
730 buffer_lookup: Vec<u32>,
731 offsets: Option<Vec<u32>>,
732 },
733}
734
735impl PrecomputedViewAdjustment {
736 #[inline]
737 fn adjust_view(&self, view: &BinaryView) -> BinaryView {
738 if view.is_inlined() {
739 return *view;
740 }
741 let view_ref = view.as_view();
742 match self {
743 Self::Shift {
744 buffer_offset,
745 offsets,
746 } => {
747 let b_idx = view_ref.buffer_index;
748 let offset_shift = offsets
749 .as_ref()
750 .map(|o| o[b_idx as usize])
751 .unwrap_or_default();
752
753 if view_ref.offset < offset_shift {
756 return BinaryView::empty_view();
757 }
758
759 view_ref
760 .with_buffer_and_offset(b_idx + buffer_offset, view_ref.offset - offset_shift)
761 }
762 Self::Lookup {
763 buffer_lookup,
764 offsets,
765 } => {
766 let b_idx = view_ref.buffer_index;
767 let buffer = buffer_lookup[b_idx as usize];
768 let offset_shift = offsets
769 .as_ref()
770 .map(|o| o[b_idx as usize])
771 .unwrap_or_default();
772
773 if view_ref.offset < offset_shift {
776 return BinaryView::empty_view();
777 }
778
779 view_ref.with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
780 }
781 }
782 .into()
783 }
784}
785
786struct RewritingViewAdjustment {
787 buffer_lookup: Vec<Option<u32>>,
788 offsets: Option<Vec<u32>>,
789}
790
791impl RewritingViewAdjustment {
792 fn adjust_view(&self, view: &BinaryView) -> Option<BinaryView> {
795 if view.is_inlined() {
796 return Some(*view);
797 }
798
799 let view_ref = view.as_view();
800 self.buffer_lookup[view_ref.buffer_index as usize].map(|buffer| {
801 let offset_shift = self
802 .offsets
803 .as_ref()
804 .map(|o| o[view_ref.buffer_index as usize])
805 .unwrap_or_default();
806 view_ref
807 .with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
808 .into()
809 })
810 }
811}
812
813#[cfg(test)]
814mod tests {
815 use vortex_error::VortexResult;
816
817 use crate::IntoArray;
818 use crate::LEGACY_SESSION;
819 use crate::VortexSessionExecute;
820 use crate::arrays::VarBinViewArray;
821 use crate::assert_arrays_eq;
822 use crate::builders::ArrayBuilder;
823 use crate::builders::VarBinViewBuilder;
824 use crate::dtype::DType;
825 use crate::dtype::Nullability;
826
827 #[test]
828 fn test_utf8_builder() {
829 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
830
831 builder.append_value("Hello");
832 builder.append_null();
833 builder.append_value("World");
834
835 builder.append_nulls(2);
836
837 builder.append_zeros(2);
838 builder.append_value("test");
839
840 let actual = builder.finish();
841 let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
842 Some("Hello"),
843 None,
844 Some("World"),
845 None,
846 None,
847 Some(""),
848 Some(""),
849 Some("test"),
850 ]);
851 assert_arrays_eq!(actual, expected);
852 }
853
854 #[test]
855 fn test_utf8_builder_with_extend() {
856 let array = {
857 let mut builder =
858 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
859 builder.append_null();
860 builder.append_value("Hello2");
861 builder.finish()
862 };
863 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
864
865 builder.append_value("Hello1");
866 builder.extend_from_array(&array);
867 builder.append_nulls(2);
868 builder.append_value("Hello3");
869
870 let actual = builder.finish_into_canonical();
871 let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
872 Some("Hello1"),
873 None,
874 Some("Hello2"),
875 None,
876 None,
877 Some("Hello3"),
878 ]);
879 assert_arrays_eq!(actual.into_array(), expected.into_array());
880 }
881
882 #[test]
883 fn test_buffer_deduplication() -> VortexResult<()> {
884 let array = {
885 let mut builder =
886 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
887 builder.append_value("This is a long string that should not be inlined");
888 builder.append_value("short string");
889 builder.finish_into_varbinview()
890 };
891
892 assert_eq!(array.buffers().len(), 1);
893 let mut builder =
894 VarBinViewBuilder::with_buffer_deduplication(DType::Utf8(Nullability::Nullable), 10);
895
896 let mut ctx = LEGACY_SESSION.create_execution_ctx();
897
898 array.append_to_builder(&mut builder, &mut ctx)?;
899 assert_eq!(builder.completed_block_count(), 1);
900
901 array
902 .slice(1..2)?
903 .append_to_builder(&mut builder, &mut ctx)?;
904 array
905 .slice(0..1)?
906 .append_to_builder(&mut builder, &mut ctx)?;
907 assert_eq!(builder.completed_block_count(), 1);
908
909 let array2 = {
910 let mut builder =
911 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
912 builder.append_value("This is a long string that should not be inlined");
913 builder.finish_into_varbinview()
914 };
915
916 array2.append_to_builder(&mut builder, &mut ctx)?;
917 assert_eq!(builder.completed_block_count(), 2);
918
919 array
920 .slice(0..1)?
921 .append_to_builder(&mut builder, &mut ctx)?;
922 array2
923 .slice(0..1)?
924 .append_to_builder(&mut builder, &mut ctx)?;
925 assert_eq!(builder.completed_block_count(), 2);
926 Ok(())
927 }
928
929 #[test]
930 fn test_append_scalar() {
931 use crate::scalar::Scalar;
932
933 let mut utf8_builder =
935 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
936
937 let utf8_scalar1 = Scalar::utf8("hello", Nullability::Nullable);
939 utf8_builder.append_scalar(&utf8_scalar1).unwrap();
940
941 let utf8_scalar2 = Scalar::utf8("world", Nullability::Nullable);
943 utf8_builder.append_scalar(&utf8_scalar2).unwrap();
944
945 let null_scalar = Scalar::null(DType::Utf8(Nullability::Nullable));
947 utf8_builder.append_scalar(&null_scalar).unwrap();
948
949 let array = utf8_builder.finish();
950 let expected =
951 <VarBinViewArray as FromIterator<_>>::from_iter([Some("hello"), Some("world"), None]);
952 assert_arrays_eq!(&array, &expected);
953
954 let mut binary_builder =
956 VarBinViewBuilder::with_capacity(DType::Binary(Nullability::Nullable), 10);
957
958 let binary_scalar = Scalar::binary(vec![1u8, 2, 3], Nullability::Nullable);
959 binary_builder.append_scalar(&binary_scalar).unwrap();
960
961 let binary_null = Scalar::null(DType::Binary(Nullability::Nullable));
962 binary_builder.append_scalar(&binary_null).unwrap();
963
964 let binary_array = binary_builder.finish();
965 let expected =
966 <VarBinViewArray as FromIterator<_>>::from_iter([Some(vec![1u8, 2, 3]), None]);
967 assert_arrays_eq!(&binary_array, &expected);
968
969 let mut builder =
971 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::NonNullable), 10);
972 let wrong_scalar = Scalar::from(42i32);
973 assert!(builder.append_scalar(&wrong_scalar).is_err());
974 }
975
976 #[test]
977 fn test_buffer_growth_strategies() {
978 use super::BufferGrowthStrategy;
979
980 let mut strategy = BufferGrowthStrategy::fixed(1024);
982
983 assert_eq!(strategy.next_size(), 1024);
985 assert_eq!(strategy.next_size(), 1024);
986 assert_eq!(strategy.next_size(), 1024);
987
988 let mut strategy = BufferGrowthStrategy::exponential(1024, 8192);
990
991 assert_eq!(strategy.next_size(), 1024); assert_eq!(strategy.next_size(), 2048); assert_eq!(strategy.next_size(), 4096); assert_eq!(strategy.next_size(), 8192); assert_eq!(strategy.next_size(), 8192); }
998
999 #[test]
1000 fn test_large_value_allocation() {
1001 use super::BufferGrowthStrategy;
1002 use super::VarBinViewBuilder;
1003
1004 let mut builder = VarBinViewBuilder::new(
1005 DType::Binary(Nullability::Nullable),
1006 10,
1007 Default::default(),
1008 BufferGrowthStrategy::exponential(1024, 4096),
1009 0.0,
1010 );
1011
1012 let large_value = vec![0u8; 8192];
1014
1015 builder.append_value(&large_value);
1017
1018 let array = builder.finish_into_varbinview();
1019 assert_eq!(array.len(), 1);
1020
1021 let retrieved = array
1023 .scalar_at(0)
1024 .unwrap()
1025 .as_binary()
1026 .value()
1027 .cloned()
1028 .unwrap();
1029 assert_eq!(retrieved.len(), 8192);
1030 assert_eq!(retrieved.as_slice(), &large_value);
1031 }
1032}