1use std::any::Any;
5use std::ops::Range;
6use std::sync::Arc;
7
8use itertools::Itertools;
9use vortex_buffer::Buffer;
10use vortex_buffer::BufferMut;
11use vortex_buffer::ByteBuffer;
12use vortex_buffer::ByteBufferMut;
13use vortex_error::VortexExpect;
14use vortex_error::VortexResult;
15use vortex_error::vortex_bail;
16use vortex_error::vortex_ensure;
17use vortex_mask::Mask;
18use vortex_utils::aliases::hash_map::Entry;
19use vortex_utils::aliases::hash_map::HashMap;
20
21use crate::ArrayRef;
22use crate::IntoArray;
23use crate::arrays::VarBinViewArray;
24use crate::arrays::varbinview::build_views::BinaryView;
25use crate::arrays::varbinview::compact::BufferUtilization;
26use crate::builders::ArrayBuilder;
27use crate::builders::LazyBitBufferBuilder;
28use crate::canonical::Canonical;
29use crate::canonical::ToCanonical;
30use crate::dtype::DType;
31use crate::scalar::Scalar;
32
33pub struct VarBinViewBuilder {
35 dtype: DType,
36 views_builder: BufferMut<BinaryView>,
37 nulls: LazyBitBufferBuilder,
38 completed: CompletedBuffers,
39 in_progress: ByteBufferMut,
40 growth_strategy: BufferGrowthStrategy,
41 compaction_threshold: f64,
42}
43
44impl VarBinViewBuilder {
45 pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
46 Self::new(dtype, capacity, Default::default(), Default::default(), 0.0)
47 }
48
49 pub fn with_buffer_deduplication(dtype: DType, capacity: usize) -> Self {
50 Self::new(
51 dtype,
52 capacity,
53 CompletedBuffers::Deduplicated(Default::default()),
54 Default::default(),
55 0.0,
56 )
57 }
58
59 pub fn with_compaction(dtype: DType, capacity: usize, compaction_threshold: f64) -> Self {
60 Self::new(
61 dtype,
62 capacity,
63 Default::default(),
64 Default::default(),
65 compaction_threshold,
66 )
67 }
68
69 pub fn new(
70 dtype: DType,
71 capacity: usize,
72 completed: CompletedBuffers,
73 growth_strategy: BufferGrowthStrategy,
74 compaction_threshold: f64,
75 ) -> Self {
76 assert!(
77 matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
78 "VarBinViewBuilder DType must be Utf8 or Binary."
79 );
80 Self {
81 views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
82 nulls: LazyBitBufferBuilder::new(capacity),
83 completed,
84 in_progress: ByteBufferMut::empty(),
85 dtype,
86 growth_strategy,
87 compaction_threshold,
88 }
89 }
90
91 fn append_value_view(&mut self, value: &[u8]) {
92 let length =
93 u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
94 if length <= 12 {
95 self.views_builder.push(BinaryView::make_view(value, 0, 0));
96 return;
97 }
98
99 let (buffer_idx, offset) = self.append_value_to_buffer(value);
100 let view = BinaryView::make_view(value, buffer_idx, offset);
101 self.views_builder.push(view);
102 }
103
104 pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
106 self.append_value_view(value.as_ref());
107 self.nulls.append_non_null();
108 }
109
110 pub fn append_n_values<S: AsRef<[u8]>>(&mut self, value: S, n: usize) {
112 if n == 0 {
113 return;
114 }
115 let bytes = value.as_ref();
116 let view = if bytes.len() <= BinaryView::MAX_INLINED_SIZE {
117 BinaryView::make_view(bytes, 0, 0)
118 } else {
119 let (buffer_idx, offset) = self.append_value_to_buffer(bytes);
120 BinaryView::make_view(bytes, buffer_idx, offset)
121 };
122 self.views_builder.push_n(view, n);
123 self.nulls.append_n_non_nulls(n);
124 }
125
126 fn flush_in_progress(&mut self) {
127 if self.in_progress.is_empty() {
128 return;
129 }
130 let block = std::mem::take(&mut self.in_progress).freeze();
131
132 assert!(block.len() < u32::MAX as usize, "Block too large");
133
134 let initial_len = self.completed.len();
135 self.completed.push(block);
136 assert_eq!(
137 self.completed.len(),
138 initial_len + 1,
139 "Invalid state, just completed block already exists"
140 );
141 }
142
143 fn append_value_to_buffer(&mut self, value: &[u8]) -> (u32, u32) {
145 assert!(
146 value.len() > BinaryView::MAX_INLINED_SIZE,
147 "must inline small strings"
148 );
149 let required_cap = self.in_progress.len() + value.len();
150 if self.in_progress.capacity() < required_cap {
151 self.flush_in_progress();
152 let next_buffer_size = self.growth_strategy.next_size() as usize;
153 let to_reserve = next_buffer_size.max(value.len());
154 self.in_progress.reserve(to_reserve);
155 }
156
157 let buffer_idx = self.completed.len();
158 let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
159 self.in_progress.extend_from_slice(value);
160
161 (buffer_idx, offset)
162 }
163
164 pub fn completed_block_count(&self) -> u32 {
165 self.completed.len()
166 }
167
168 pub fn in_progress(&self) -> bool {
171 !self.in_progress.is_empty()
172 }
173
174 pub fn push_buffer_and_adjusted_views(
191 &mut self,
192 buffers: &[ByteBuffer],
193 views: &Buffer<BinaryView>,
194 validity_mask: Mask,
195 ) {
196 self.flush_in_progress();
197
198 let expected_completed_len = self.completed.len() as usize + buffers.len();
199 self.completed.extend_from_slice_unchecked(buffers);
200 assert_eq!(
201 self.completed.len() as usize,
202 expected_completed_len,
203 "Some buffers already exist",
204 );
205 self.views_builder.extend_trusted(views.iter().copied());
206 self.push_only_validity_mask(validity_mask);
207
208 debug_assert_eq!(self.nulls.len(), self.views_builder.len())
209 }
210
211 pub fn finish_into_varbinview(&mut self) -> VarBinViewArray {
213 self.flush_in_progress();
214 let buffers = std::mem::take(&mut self.completed);
215
216 assert_eq!(
217 self.views_builder.len(),
218 self.nulls.len(),
219 "View and validity length must match"
220 );
221
222 let validity = self.nulls.finish_with_nullability(self.dtype.nullability());
223
224 unsafe {
226 VarBinViewArray::new_unchecked(
227 std::mem::take(&mut self.views_builder).freeze(),
228 buffers.finish(),
229 self.dtype.clone(),
230 validity,
231 )
232 }
233 }
234
235 fn push_only_validity_mask(&mut self, validity_mask: Mask) {
237 self.nulls.append_validity_mask(validity_mask);
238 }
239}
240
241impl ArrayBuilder for VarBinViewBuilder {
242 fn as_any(&self) -> &dyn Any {
243 self
244 }
245
246 fn as_any_mut(&mut self) -> &mut dyn Any {
247 self
248 }
249
250 fn dtype(&self) -> &DType {
251 &self.dtype
252 }
253
254 fn len(&self) -> usize {
255 self.nulls.len()
256 }
257
258 fn append_zeros(&mut self, n: usize) {
259 self.views_builder.push_n(BinaryView::empty_view(), n);
260 self.nulls.append_n_non_nulls(n);
261 }
262
263 unsafe fn append_nulls_unchecked(&mut self, n: usize) {
264 self.views_builder.push_n(BinaryView::empty_view(), n);
265 self.nulls.append_n_nulls(n);
266 }
267
268 fn append_scalar(&mut self, scalar: &Scalar) -> VortexResult<()> {
269 vortex_ensure!(
270 scalar.dtype() == self.dtype(),
271 "VarBinViewBuilder expected scalar with dtype {}, got {}",
272 self.dtype(),
273 scalar.dtype()
274 );
275
276 match self.dtype() {
277 DType::Utf8(_) => match scalar.as_utf8().value() {
278 Some(value) => self.append_value(value),
279 None => self.append_null(),
280 },
281 DType::Binary(_) => match scalar.as_binary().value() {
282 Some(value) => self.append_value(value),
283 None => self.append_null(),
284 },
285 _ => vortex_bail!(
286 "VarBinViewBuilder can only handle Utf8 or Binary scalars, got {:?}",
287 scalar.dtype()
288 ),
289 }
290
291 Ok(())
292 }
293
294 unsafe fn extend_from_array_unchecked(&mut self, array: &ArrayRef) {
295 let array = array.to_varbinview();
296 self.flush_in_progress();
297
298 self.push_only_validity_mask(array.validity_mask().vortex_expect("validity_mask"));
299
300 let view_adjustment =
301 self.completed
302 .extend_from_compaction(BuffersWithOffsets::from_array(
303 &array,
304 self.compaction_threshold,
305 ));
306
307 match view_adjustment {
308 ViewAdjustment::Precomputed(adjustment) => self.views_builder.extend_trusted(
309 array
310 .views()
311 .iter()
312 .map(|view| adjustment.adjust_view(view)),
313 ),
314 ViewAdjustment::Rewriting(adjustment) => {
315 match array.validity_mask().vortex_expect("validity_mask") {
316 Mask::AllTrue(_) => {
317 for (idx, &view) in array.views().iter().enumerate() {
318 let new_view = self.push_view(view, &adjustment, &array, idx);
319 self.views_builder.push(new_view);
320 }
321 }
322 Mask::AllFalse(_) => {
323 self.views_builder
324 .push_n(BinaryView::empty_view(), array.len());
325 }
326 Mask::Values(v) => {
327 for (idx, (&view, is_valid)) in
328 array.views().iter().zip(v.bit_buffer().iter()).enumerate()
329 {
330 let new_view = if !is_valid {
331 BinaryView::empty_view()
332 } else {
333 self.push_view(view, &adjustment, &array, idx)
334 };
335 self.views_builder.push(new_view);
336 }
337 }
338 }
339 }
340 }
341 }
342
343 fn reserve_exact(&mut self, additional: usize) {
344 self.views_builder.reserve(additional);
345 self.nulls.reserve_exact(additional);
346 }
347
348 unsafe fn set_validity_unchecked(&mut self, validity: Mask) {
349 self.nulls = LazyBitBufferBuilder::new(validity.len());
350 self.nulls.append_validity_mask(validity);
351 }
352
353 fn finish(&mut self) -> ArrayRef {
354 self.finish_into_varbinview().into_array()
355 }
356
357 fn finish_into_canonical(&mut self) -> Canonical {
358 Canonical::VarBinView(self.finish_into_varbinview())
359 }
360}
361
362impl VarBinViewBuilder {
363 fn push_view(
364 &mut self,
365 view: BinaryView,
366 adjustment: &RewritingViewAdjustment,
367 array: &VarBinViewArray,
368 idx: usize,
369 ) -> BinaryView {
370 if view.is_inlined() {
371 view
372 } else if let Some(adjusted) = adjustment.adjust_view(&view) {
373 adjusted
374 } else {
375 let bytes = array.bytes_at(idx);
376 let (new_buf_idx, new_offset) = self.append_value_to_buffer(&bytes);
377 BinaryView::make_view(bytes.as_slice(), new_buf_idx, new_offset)
378 }
379 }
380}
381
382pub enum CompletedBuffers {
383 Default(Vec<ByteBuffer>),
384 Deduplicated(DeduplicatedBuffers),
385}
386
387impl Default for CompletedBuffers {
388 fn default() -> Self {
389 Self::Default(Vec::new())
390 }
391}
392
393#[allow(clippy::cast_possible_truncation)]
395impl CompletedBuffers {
396 fn len(&self) -> u32 {
397 match self {
398 Self::Default(buffers) => buffers.len() as u32,
399 Self::Deduplicated(buffers) => buffers.len(),
400 }
401 }
402
403 fn push(&mut self, block: ByteBuffer) -> u32 {
404 match self {
405 Self::Default(buffers) => {
406 assert!(buffers.len() < u32::MAX as usize, "Too many blocks");
407 buffers.push(block);
408 self.len()
409 }
410 Self::Deduplicated(buffers) => buffers.push(block),
411 }
412 }
413
414 fn extend_from_slice_unchecked(&mut self, buffers: &[ByteBuffer]) {
416 for buffer in buffers {
417 self.push(buffer.clone());
418 }
419 }
420
421 fn extend_from_compaction(&mut self, buffers: BuffersWithOffsets) -> ViewAdjustment {
422 match (self, buffers) {
423 (
424 Self::Default(completed_buffers),
425 BuffersWithOffsets::AllKept { buffers, offsets },
426 ) => {
427 let buffer_offset = completed_buffers.len() as u32;
428 completed_buffers.extend_from_slice(&buffers);
429 ViewAdjustment::shift(buffer_offset, offsets)
430 }
431 (
432 Self::Default(completed_buffers),
433 BuffersWithOffsets::SomeCompacted { buffers, offsets },
434 ) => {
435 let lookup = buffers
436 .iter()
437 .map(|maybe_buffer| {
438 maybe_buffer.as_ref().map(|buffer| {
439 completed_buffers.push(buffer.clone());
440 completed_buffers.len() as u32 - 1
441 })
442 })
443 .collect();
444 ViewAdjustment::rewriting(lookup, offsets)
445 }
446
447 (
448 Self::Deduplicated(completed_buffers),
449 BuffersWithOffsets::AllKept { buffers, offsets },
450 ) => {
451 let buffer_lookup = completed_buffers.extend_from_iter(buffers.iter().cloned());
452 ViewAdjustment::lookup(buffer_lookup, offsets)
453 }
454 (
455 Self::Deduplicated(completed_buffers),
456 BuffersWithOffsets::SomeCompacted { buffers, offsets },
457 ) => {
458 let buffer_lookup = completed_buffers.extend_from_option_slice(&buffers);
459 ViewAdjustment::rewriting(buffer_lookup, offsets)
460 }
461 }
462 }
463
464 fn finish(self) -> Arc<[ByteBuffer]> {
465 match self {
466 Self::Default(buffers) => Arc::from(buffers),
467 Self::Deduplicated(buffers) => buffers.finish(),
468 }
469 }
470}
471
472#[derive(Default)]
473pub struct DeduplicatedBuffers {
474 buffers: Vec<ByteBuffer>,
475 buffer_to_idx: HashMap<BufferId, u32>,
476}
477
478impl DeduplicatedBuffers {
479 #[allow(clippy::cast_possible_truncation)]
481 fn len(&self) -> u32 {
482 self.buffers.len() as u32
483 }
484
485 pub(crate) fn push(&mut self, block: ByteBuffer) -> u32 {
487 assert!(self.buffers.len() < u32::MAX as usize, "Too many blocks");
488
489 let initial_len = self.len();
490 let id = BufferId::from(&block);
491 match self.buffer_to_idx.entry(id) {
492 Entry::Occupied(idx) => *idx.get(),
493 Entry::Vacant(entry) => {
494 let idx = initial_len;
495 entry.insert(idx);
496 self.buffers.push(block);
497 idx
498 }
499 }
500 }
501
502 pub(crate) fn extend_from_option_slice(
503 &mut self,
504 buffers: &[Option<ByteBuffer>],
505 ) -> Vec<Option<u32>> {
506 buffers
507 .iter()
508 .map(|buffer| buffer.as_ref().map(|buf| self.push(buf.clone())))
509 .collect()
510 }
511
512 pub(crate) fn extend_from_iter(
513 &mut self,
514 buffers: impl Iterator<Item = ByteBuffer>,
515 ) -> Vec<u32> {
516 buffers.map(|buffer| self.push(buffer)).collect()
517 }
518
519 pub(crate) fn finish(self) -> Arc<[ByteBuffer]> {
520 Arc::from(self.buffers)
521 }
522}
523
524#[derive(PartialEq, Eq, Hash)]
525struct BufferId {
526 ptr: usize,
528 len: usize,
529}
530
531impl BufferId {
532 fn from(buffer: &ByteBuffer) -> Self {
533 let slice = buffer.as_slice();
534 Self {
535 ptr: slice.as_ptr() as usize,
536 len: slice.len(),
537 }
538 }
539}
540
541#[derive(Debug, Clone)]
542pub enum BufferGrowthStrategy {
543 Fixed { size: u32 },
545 Exponential { current_size: u32, max_size: u32 },
547}
548
549impl Default for BufferGrowthStrategy {
550 fn default() -> Self {
551 Self::Exponential {
552 current_size: 4 * 1024, max_size: 2 * 1024 * 1024, }
555 }
556}
557
558impl BufferGrowthStrategy {
559 pub fn fixed(size: u32) -> Self {
560 Self::Fixed { size }
561 }
562
563 pub fn exponential(initial_size: u32, max_size: u32) -> Self {
564 Self::Exponential {
565 current_size: initial_size,
566 max_size,
567 }
568 }
569
570 pub fn next_size(&mut self) -> u32 {
572 match self {
573 Self::Fixed { size } => *size,
574 Self::Exponential {
575 current_size,
576 max_size,
577 } => {
578 let result = *current_size;
579 if *current_size < *max_size {
580 *current_size = current_size.saturating_mul(2).min(*max_size);
581 }
582 result
583 }
584 }
585 }
586}
587
588enum BuffersWithOffsets {
589 AllKept {
590 buffers: Arc<[ByteBuffer]>,
591 offsets: Option<Vec<u32>>,
592 },
593 SomeCompacted {
594 buffers: Vec<Option<ByteBuffer>>,
595 offsets: Option<Vec<u32>>,
596 },
597}
598
599impl BuffersWithOffsets {
600 pub fn from_array(array: &VarBinViewArray, compaction_threshold: f64) -> Self {
601 if compaction_threshold == 0.0 {
602 return Self::AllKept {
603 buffers: Arc::from(
604 array
605 .data_buffers()
606 .to_vec()
607 .into_iter()
608 .map(|b| b.unwrap_host())
609 .collect_vec(),
610 ),
611 offsets: None,
612 };
613 }
614
615 let buffer_utilizations = array
616 .buffer_utilizations()
617 .vortex_expect("buffer_utilizations in BuffersWithOffsets::from_array");
618 let mut has_rewrite = false;
619 let mut has_nonzero_offset = false;
620 for utilization in buffer_utilizations.iter() {
621 match compaction_strategy(utilization, compaction_threshold) {
622 CompactionStrategy::KeepFull => continue,
623 CompactionStrategy::Slice { .. } => has_nonzero_offset = true,
624 CompactionStrategy::Rewrite => has_rewrite = true,
625 }
626 }
627
628 let buffers_with_offsets_iter = buffer_utilizations
629 .iter()
630 .zip(array.data_buffers().iter())
631 .map(|(utilization, buffer)| {
632 match compaction_strategy(utilization, compaction_threshold) {
633 CompactionStrategy::KeepFull => (Some(buffer.as_host().clone()), 0),
634 CompactionStrategy::Slice { start, end } => (
635 Some(buffer.as_host().slice(start as usize..end as usize)),
636 start,
637 ),
638 CompactionStrategy::Rewrite => (None, 0),
639 }
640 });
641
642 match (has_rewrite, has_nonzero_offset) {
643 (false, false) => {
645 let buffers: Vec<_> = buffers_with_offsets_iter
646 .map(|(b, _)| b.vortex_expect("already checked for rewrite"))
647 .collect();
648 Self::AllKept {
649 buffers: Arc::from(buffers),
650 offsets: None,
651 }
652 }
653 (true, false) => {
655 let buffers: Vec<_> = buffers_with_offsets_iter.map(|(b, _)| b).collect();
656 Self::SomeCompacted {
657 buffers,
658 offsets: None,
659 }
660 }
661 (false, true) => {
663 let (buffers, offsets): (Vec<_>, _) = buffers_with_offsets_iter
664 .map(|(buffer, offset)| {
665 (buffer.vortex_expect("already checked for rewrite"), offset)
666 })
667 .collect();
668 Self::AllKept {
669 buffers: Arc::from(buffers),
670 offsets: Some(offsets),
671 }
672 }
673 (true, true) => {
675 let (buffers, offsets) = buffers_with_offsets_iter.collect();
676 Self::SomeCompacted {
677 buffers,
678 offsets: Some(offsets),
679 }
680 }
681 }
682 }
683}
684
685#[derive(Debug, Clone, Copy, PartialEq, Eq)]
686enum CompactionStrategy {
687 KeepFull,
688 Slice {
690 start: u32,
691 end: u32,
692 },
693 Rewrite,
695}
696
697fn compaction_strategy(
698 buffer_utilization: &BufferUtilization,
699 threshold: f64,
700) -> CompactionStrategy {
701 match buffer_utilization.overall_utilization() {
702 0.0 => CompactionStrategy::Rewrite,
704 utilised if utilised >= threshold => CompactionStrategy::KeepFull,
705 _ if buffer_utilization.range_utilization() >= threshold => {
706 let Range { start, end } = buffer_utilization.range();
707 CompactionStrategy::Slice { start, end }
708 }
709 _ => CompactionStrategy::Rewrite,
710 }
711}
712
713enum ViewAdjustment {
714 Precomputed(PrecomputedViewAdjustment),
715 Rewriting(RewritingViewAdjustment),
716}
717
718impl ViewAdjustment {
719 fn shift(buffer_offset: u32, offsets: Option<Vec<u32>>) -> Self {
720 Self::Precomputed(PrecomputedViewAdjustment::Shift {
721 buffer_offset,
722 offsets,
723 })
724 }
725
726 fn lookup(buffer_lookup: Vec<u32>, offsets: Option<Vec<u32>>) -> Self {
727 Self::Precomputed(PrecomputedViewAdjustment::Lookup {
728 buffer_lookup,
729 offsets,
730 })
731 }
732
733 fn rewriting(buffer_lookup: Vec<Option<u32>>, offsets: Option<Vec<u32>>) -> Self {
734 Self::Rewriting(RewritingViewAdjustment {
735 buffer_lookup,
736 offsets,
737 })
738 }
739}
740
741enum PrecomputedViewAdjustment {
743 Shift {
744 buffer_offset: u32,
745 offsets: Option<Vec<u32>>,
746 },
747 Lookup {
748 buffer_lookup: Vec<u32>,
749 offsets: Option<Vec<u32>>,
750 },
751}
752
753impl PrecomputedViewAdjustment {
754 fn adjust_view(&self, view: &BinaryView) -> BinaryView {
755 if view.is_inlined() {
756 return *view;
757 }
758 let view_ref = view.as_view();
759 match self {
760 Self::Shift {
761 buffer_offset,
762 offsets,
763 } => {
764 let b_idx = view_ref.buffer_index;
765 let offset_shift = offsets
766 .as_ref()
767 .map(|o| o[b_idx as usize])
768 .unwrap_or_default();
769
770 if view_ref.offset < offset_shift {
773 return BinaryView::empty_view();
774 }
775
776 view_ref
777 .with_buffer_and_offset(b_idx + buffer_offset, view_ref.offset - offset_shift)
778 }
779 Self::Lookup {
780 buffer_lookup,
781 offsets,
782 } => {
783 let b_idx = view_ref.buffer_index;
784 let buffer = buffer_lookup[b_idx as usize];
785 let offset_shift = offsets
786 .as_ref()
787 .map(|o| o[b_idx as usize])
788 .unwrap_or_default();
789
790 if view_ref.offset < offset_shift {
793 return BinaryView::empty_view();
794 }
795
796 view_ref.with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
797 }
798 }
799 .into()
800 }
801}
802
803struct RewritingViewAdjustment {
804 buffer_lookup: Vec<Option<u32>>,
805 offsets: Option<Vec<u32>>,
806}
807
808impl RewritingViewAdjustment {
809 fn adjust_view(&self, view: &BinaryView) -> Option<BinaryView> {
812 if view.is_inlined() {
813 return Some(*view);
814 }
815
816 let view_ref = view.as_view();
817 self.buffer_lookup[view_ref.buffer_index as usize].map(|buffer| {
818 let offset_shift = self
819 .offsets
820 .as_ref()
821 .map(|o| o[view_ref.buffer_index as usize])
822 .unwrap_or_default();
823 view_ref
824 .with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
825 .into()
826 })
827 }
828}
829
830#[cfg(test)]
831mod tests {
832 use vortex_error::VortexResult;
833
834 use crate::IntoArray;
835 use crate::LEGACY_SESSION;
836 use crate::VortexSessionExecute;
837 use crate::assert_arrays_eq;
838 use crate::builders::ArrayBuilder;
839 use crate::builders::VarBinViewBuilder;
840 use crate::builders::varbinview::VarBinViewArray;
841 use crate::dtype::DType;
842 use crate::dtype::Nullability;
843
844 #[test]
845 fn test_utf8_builder() {
846 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
847
848 builder.append_value("Hello");
849 builder.append_null();
850 builder.append_value("World");
851
852 builder.append_nulls(2);
853
854 builder.append_zeros(2);
855 builder.append_value("test");
856
857 let actual = builder.finish();
858 let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
859 Some("Hello"),
860 None,
861 Some("World"),
862 None,
863 None,
864 Some(""),
865 Some(""),
866 Some("test"),
867 ]);
868 assert_arrays_eq!(actual, expected);
869 }
870
871 #[test]
872 fn test_utf8_builder_with_extend() {
873 let array = {
874 let mut builder =
875 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
876 builder.append_null();
877 builder.append_value("Hello2");
878 builder.finish()
879 };
880 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
881
882 builder.append_value("Hello1");
883 builder.extend_from_array(&array);
884 builder.append_nulls(2);
885 builder.append_value("Hello3");
886
887 let actual = builder.finish_into_canonical();
888 let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
889 Some("Hello1"),
890 None,
891 Some("Hello2"),
892 None,
893 None,
894 Some("Hello3"),
895 ]);
896 assert_arrays_eq!(actual.into_array(), expected.into_array());
897 }
898
899 #[test]
900 fn test_buffer_deduplication() -> VortexResult<()> {
901 let array = {
902 let mut builder =
903 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
904 builder.append_value("This is a long string that should not be inlined");
905 builder.append_value("short string");
906 builder.finish_into_varbinview()
907 };
908
909 assert_eq!(array.data_buffers().len(), 1);
910 let mut builder =
911 VarBinViewBuilder::with_buffer_deduplication(DType::Utf8(Nullability::Nullable), 10);
912
913 let mut ctx = LEGACY_SESSION.create_execution_ctx();
914
915 array.append_to_builder(&mut builder, &mut ctx)?;
916 assert_eq!(builder.completed_block_count(), 1);
917
918 array
919 .slice(1..2)?
920 .append_to_builder(&mut builder, &mut ctx)?;
921 array
922 .slice(0..1)?
923 .append_to_builder(&mut builder, &mut ctx)?;
924 assert_eq!(builder.completed_block_count(), 1);
925
926 let array2 = {
927 let mut builder =
928 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
929 builder.append_value("This is a long string that should not be inlined");
930 builder.finish_into_varbinview()
931 };
932
933 array2.append_to_builder(&mut builder, &mut ctx)?;
934 assert_eq!(builder.completed_block_count(), 2);
935
936 array
937 .slice(0..1)?
938 .append_to_builder(&mut builder, &mut ctx)?;
939 array2
940 .slice(0..1)?
941 .append_to_builder(&mut builder, &mut ctx)?;
942 assert_eq!(builder.completed_block_count(), 2);
943 Ok(())
944 }
945
946 #[test]
947 fn test_append_scalar() {
948 use crate::scalar::Scalar;
949
950 let mut utf8_builder =
952 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
953
954 let utf8_scalar1 = Scalar::utf8("hello", Nullability::Nullable);
956 utf8_builder.append_scalar(&utf8_scalar1).unwrap();
957
958 let utf8_scalar2 = Scalar::utf8("world", Nullability::Nullable);
960 utf8_builder.append_scalar(&utf8_scalar2).unwrap();
961
962 let null_scalar = Scalar::null(DType::Utf8(Nullability::Nullable));
964 utf8_builder.append_scalar(&null_scalar).unwrap();
965
966 let array = utf8_builder.finish();
967 let expected =
968 <VarBinViewArray as FromIterator<_>>::from_iter([Some("hello"), Some("world"), None]);
969 assert_arrays_eq!(&array, &expected);
970
971 let mut binary_builder =
973 VarBinViewBuilder::with_capacity(DType::Binary(Nullability::Nullable), 10);
974
975 let binary_scalar = Scalar::binary(vec![1u8, 2, 3], Nullability::Nullable);
976 binary_builder.append_scalar(&binary_scalar).unwrap();
977
978 let binary_null = Scalar::null(DType::Binary(Nullability::Nullable));
979 binary_builder.append_scalar(&binary_null).unwrap();
980
981 let binary_array = binary_builder.finish();
982 let expected =
983 <VarBinViewArray as FromIterator<_>>::from_iter([Some(vec![1u8, 2, 3]), None]);
984 assert_arrays_eq!(&binary_array, &expected);
985
986 let mut builder =
988 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::NonNullable), 10);
989 let wrong_scalar = Scalar::from(42i32);
990 assert!(builder.append_scalar(&wrong_scalar).is_err());
991 }
992
993 #[test]
994 fn test_buffer_growth_strategies() {
995 use super::BufferGrowthStrategy;
996
997 let mut strategy = BufferGrowthStrategy::fixed(1024);
999
1000 assert_eq!(strategy.next_size(), 1024);
1002 assert_eq!(strategy.next_size(), 1024);
1003 assert_eq!(strategy.next_size(), 1024);
1004
1005 let mut strategy = BufferGrowthStrategy::exponential(1024, 8192);
1007
1008 assert_eq!(strategy.next_size(), 1024); assert_eq!(strategy.next_size(), 2048); assert_eq!(strategy.next_size(), 4096); assert_eq!(strategy.next_size(), 8192); assert_eq!(strategy.next_size(), 8192); }
1015
1016 #[test]
1017 fn test_large_value_allocation() {
1018 use super::BufferGrowthStrategy;
1019 use super::VarBinViewBuilder;
1020
1021 let mut builder = VarBinViewBuilder::new(
1022 DType::Binary(Nullability::Nullable),
1023 10,
1024 Default::default(),
1025 BufferGrowthStrategy::exponential(1024, 4096),
1026 0.0,
1027 );
1028
1029 let large_value = vec![0u8; 8192];
1031
1032 builder.append_value(&large_value);
1034
1035 let array = builder.finish_into_varbinview();
1036 assert_eq!(array.len(), 1);
1037
1038 let retrieved = array
1040 .scalar_at(0)
1041 .unwrap()
1042 .as_binary()
1043 .value()
1044 .cloned()
1045 .unwrap();
1046 assert_eq!(retrieved.len(), 8192);
1047 assert_eq!(retrieved.as_slice(), &large_value);
1048 }
1049}