1use std::any::Any;
5use std::ops::Range;
6use std::sync::Arc;
7
8use itertools::Itertools;
9use vortex_buffer::Buffer;
10use vortex_buffer::BufferMut;
11use vortex_buffer::ByteBuffer;
12use vortex_buffer::ByteBufferMut;
13use vortex_error::VortexExpect;
14use vortex_error::VortexResult;
15use vortex_error::vortex_bail;
16use vortex_error::vortex_ensure;
17use vortex_mask::Mask;
18use vortex_utils::aliases::hash_map::Entry;
19use vortex_utils::aliases::hash_map::HashMap;
20
21use crate::ArrayRef;
22use crate::IntoArray;
23use crate::LEGACY_SESSION;
24use crate::VortexSessionExecute;
25use crate::arrays::VarBinViewArray;
26use crate::arrays::varbinview::build_views::BinaryView;
27use crate::arrays::varbinview::compact::BufferUtilization;
28use crate::builders::ArrayBuilder;
29use crate::builders::LazyBitBufferBuilder;
30use crate::canonical::Canonical;
31use crate::canonical::ToCanonical;
32use crate::dtype::DType;
33use crate::scalar::Scalar;
34
35pub struct VarBinViewBuilder {
37 dtype: DType,
38 views_builder: BufferMut<BinaryView>,
39 nulls: LazyBitBufferBuilder,
40 completed: CompletedBuffers,
41 in_progress: ByteBufferMut,
42 growth_strategy: BufferGrowthStrategy,
43 compaction_threshold: f64,
44}
45
46impl VarBinViewBuilder {
47 pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
48 Self::new(dtype, capacity, Default::default(), Default::default(), 0.0)
49 }
50
51 pub fn with_buffer_deduplication(dtype: DType, capacity: usize) -> Self {
52 Self::new(
53 dtype,
54 capacity,
55 CompletedBuffers::Deduplicated(Default::default()),
56 Default::default(),
57 0.0,
58 )
59 }
60
61 pub fn with_compaction(dtype: DType, capacity: usize, compaction_threshold: f64) -> Self {
62 Self::new(
63 dtype,
64 capacity,
65 Default::default(),
66 Default::default(),
67 compaction_threshold,
68 )
69 }
70
71 pub fn new(
72 dtype: DType,
73 capacity: usize,
74 completed: CompletedBuffers,
75 growth_strategy: BufferGrowthStrategy,
76 compaction_threshold: f64,
77 ) -> Self {
78 assert!(
79 matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
80 "VarBinViewBuilder DType must be Utf8 or Binary."
81 );
82 Self {
83 views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
84 nulls: LazyBitBufferBuilder::new(capacity),
85 completed,
86 in_progress: ByteBufferMut::empty(),
87 dtype,
88 growth_strategy,
89 compaction_threshold,
90 }
91 }
92
93 fn append_value_view(&mut self, value: &[u8]) {
94 let length =
95 u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
96 if length <= 12 {
97 self.views_builder.push(BinaryView::make_view(value, 0, 0));
98 return;
99 }
100
101 let (buffer_idx, offset) = self.append_value_to_buffer(value);
102 let view = BinaryView::make_view(value, buffer_idx, offset);
103 self.views_builder.push(view);
104 }
105
106 pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
108 self.append_value_view(value.as_ref());
109 self.nulls.append_non_null();
110 }
111
112 pub fn append_n_values<S: AsRef<[u8]>>(&mut self, value: S, n: usize) {
114 if n == 0 {
115 return;
116 }
117 let bytes = value.as_ref();
118 let view = if bytes.len() <= BinaryView::MAX_INLINED_SIZE {
119 BinaryView::make_view(bytes, 0, 0)
120 } else {
121 let (buffer_idx, offset) = self.append_value_to_buffer(bytes);
122 BinaryView::make_view(bytes, buffer_idx, offset)
123 };
124 self.views_builder.push_n(view, n);
125 self.nulls.append_n_non_nulls(n);
126 }
127
128 fn flush_in_progress(&mut self) {
129 if self.in_progress.is_empty() {
130 return;
131 }
132 let block = std::mem::take(&mut self.in_progress).freeze();
133
134 assert!(block.len() < u32::MAX as usize, "Block too large");
135
136 let initial_len = self.completed.len();
137 self.completed.push(block);
138 assert_eq!(
139 self.completed.len(),
140 initial_len + 1,
141 "Invalid state, just completed block already exists"
142 );
143 }
144
145 fn append_value_to_buffer(&mut self, value: &[u8]) -> (u32, u32) {
147 assert!(
148 value.len() > BinaryView::MAX_INLINED_SIZE,
149 "must inline small strings"
150 );
151 let required_cap = self.in_progress.len() + value.len();
152 if self.in_progress.capacity() < required_cap {
153 self.flush_in_progress();
154 let next_buffer_size = self.growth_strategy.next_size() as usize;
155 let to_reserve = next_buffer_size.max(value.len());
156 self.in_progress.reserve(to_reserve);
157 }
158
159 let buffer_idx = self.completed.len();
160 let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
161 self.in_progress.extend_from_slice(value);
162
163 (buffer_idx, offset)
164 }
165
166 pub fn completed_block_count(&self) -> u32 {
167 self.completed.len()
168 }
169
170 pub fn in_progress(&self) -> bool {
173 !self.in_progress.is_empty()
174 }
175
176 pub fn push_buffer_and_adjusted_views(
193 &mut self,
194 buffers: &[ByteBuffer],
195 views: &Buffer<BinaryView>,
196 validity_mask: Mask,
197 ) {
198 self.flush_in_progress();
199
200 let expected_completed_len = self.completed.len() as usize + buffers.len();
201 self.completed.extend_from_slice_unchecked(buffers);
202 assert_eq!(
203 self.completed.len() as usize,
204 expected_completed_len,
205 "Some buffers already exist",
206 );
207 self.views_builder.extend_trusted(views.iter().copied());
208 self.push_only_validity_mask(validity_mask);
209
210 debug_assert_eq!(self.nulls.len(), self.views_builder.len())
211 }
212
213 pub fn finish_into_varbinview(&mut self) -> VarBinViewArray {
215 self.flush_in_progress();
216 let buffers = std::mem::take(&mut self.completed);
217
218 assert_eq!(
219 self.views_builder.len(),
220 self.nulls.len(),
221 "View and validity length must match"
222 );
223
224 let validity = self.nulls.finish_with_nullability(self.dtype.nullability());
225
226 unsafe {
228 VarBinViewArray::new_unchecked(
229 std::mem::take(&mut self.views_builder).freeze(),
230 buffers.finish(),
231 self.dtype.clone(),
232 validity,
233 )
234 }
235 }
236
237 fn push_only_validity_mask(&mut self, validity_mask: Mask) {
239 self.nulls.append_validity_mask(validity_mask);
240 }
241}
242
243impl ArrayBuilder for VarBinViewBuilder {
244 fn as_any(&self) -> &dyn Any {
245 self
246 }
247
248 fn as_any_mut(&mut self) -> &mut dyn Any {
249 self
250 }
251
252 fn dtype(&self) -> &DType {
253 &self.dtype
254 }
255
256 fn len(&self) -> usize {
257 self.nulls.len()
258 }
259
260 fn append_zeros(&mut self, n: usize) {
261 self.views_builder.push_n(BinaryView::empty_view(), n);
262 self.nulls.append_n_non_nulls(n);
263 }
264
265 unsafe fn append_nulls_unchecked(&mut self, n: usize) {
266 self.views_builder.push_n(BinaryView::empty_view(), n);
267 self.nulls.append_n_nulls(n);
268 }
269
270 fn append_scalar(&mut self, scalar: &Scalar) -> VortexResult<()> {
271 vortex_ensure!(
272 scalar.dtype() == self.dtype(),
273 "VarBinViewBuilder expected scalar with dtype {}, got {}",
274 self.dtype(),
275 scalar.dtype()
276 );
277
278 match self.dtype() {
279 DType::Utf8(_) => match scalar.as_utf8().value() {
280 Some(value) => self.append_value(value),
281 None => self.append_null(),
282 },
283 DType::Binary(_) => match scalar.as_binary().value() {
284 Some(value) => self.append_value(value),
285 None => self.append_null(),
286 },
287 _ => vortex_bail!(
288 "VarBinViewBuilder can only handle Utf8 or Binary scalars, got {:?}",
289 scalar.dtype()
290 ),
291 }
292
293 Ok(())
294 }
295
296 unsafe fn extend_from_array_unchecked(&mut self, array: &ArrayRef) {
297 let array = array.to_varbinview();
298 self.flush_in_progress();
299
300 self.push_only_validity_mask(
301 array
302 .as_ref()
303 .validity()
304 .vortex_expect("validity_mask")
305 .to_mask(
306 array.as_ref().len(),
307 &mut LEGACY_SESSION.create_execution_ctx(),
308 )
309 .vortex_expect("Failed to compute validity mask"),
310 );
311
312 let view_adjustment =
313 self.completed
314 .extend_from_compaction(BuffersWithOffsets::from_array(
315 &array,
316 self.compaction_threshold,
317 ));
318
319 match view_adjustment {
320 ViewAdjustment::Precomputed(adjustment) => self.views_builder.extend_trusted(
321 array
322 .views()
323 .iter()
324 .map(|view| adjustment.adjust_view(view)),
325 ),
326 ViewAdjustment::Rewriting(adjustment) => {
327 match array
328 .as_ref()
329 .validity()
330 .vortex_expect("validity_mask")
331 .to_mask(
332 array.as_ref().len(),
333 &mut LEGACY_SESSION.create_execution_ctx(),
334 )
335 .vortex_expect("Failed to compute validity mask")
336 {
337 Mask::AllTrue(_) => {
338 for (idx, &view) in array.views().iter().enumerate() {
339 let new_view = self.push_view(view, &adjustment, &array, idx);
340 self.views_builder.push(new_view);
341 }
342 }
343 Mask::AllFalse(_) => {
344 self.views_builder
345 .push_n(BinaryView::empty_view(), array.len());
346 }
347 Mask::Values(v) => {
348 for (idx, (&view, is_valid)) in
349 array.views().iter().zip(v.bit_buffer().iter()).enumerate()
350 {
351 let new_view = if !is_valid {
352 BinaryView::empty_view()
353 } else {
354 self.push_view(view, &adjustment, &array, idx)
355 };
356 self.views_builder.push(new_view);
357 }
358 }
359 }
360 }
361 }
362 }
363
364 fn reserve_exact(&mut self, additional: usize) {
365 self.views_builder.reserve(additional);
366 self.nulls.reserve_exact(additional);
367 }
368
369 unsafe fn set_validity_unchecked(&mut self, validity: Mask) {
370 self.nulls = LazyBitBufferBuilder::new(validity.len());
371 self.nulls.append_validity_mask(validity);
372 }
373
374 fn finish(&mut self) -> ArrayRef {
375 self.finish_into_varbinview().into_array()
376 }
377
378 fn finish_into_canonical(&mut self) -> Canonical {
379 Canonical::VarBinView(self.finish_into_varbinview())
380 }
381}
382
383impl VarBinViewBuilder {
384 fn push_view(
385 &mut self,
386 view: BinaryView,
387 adjustment: &RewritingViewAdjustment,
388 array: &VarBinViewArray,
389 idx: usize,
390 ) -> BinaryView {
391 if view.is_inlined() {
392 view
393 } else if let Some(adjusted) = adjustment.adjust_view(&view) {
394 adjusted
395 } else {
396 let bytes = array.bytes_at(idx);
397 let (new_buf_idx, new_offset) = self.append_value_to_buffer(&bytes);
398 BinaryView::make_view(bytes.as_slice(), new_buf_idx, new_offset)
399 }
400 }
401}
402
403pub enum CompletedBuffers {
404 Default(Vec<ByteBuffer>),
405 Deduplicated(DeduplicatedBuffers),
406}
407
408impl Default for CompletedBuffers {
409 fn default() -> Self {
410 Self::Default(Vec::new())
411 }
412}
413
414#[expect(clippy::cast_possible_truncation)]
416impl CompletedBuffers {
417 fn len(&self) -> u32 {
418 match self {
419 Self::Default(buffers) => buffers.len() as u32,
420 Self::Deduplicated(buffers) => buffers.len(),
421 }
422 }
423
424 fn push(&mut self, block: ByteBuffer) -> u32 {
425 match self {
426 Self::Default(buffers) => {
427 assert!(buffers.len() < u32::MAX as usize, "Too many blocks");
428 buffers.push(block);
429 self.len()
430 }
431 Self::Deduplicated(buffers) => buffers.push(block),
432 }
433 }
434
435 fn extend_from_slice_unchecked(&mut self, buffers: &[ByteBuffer]) {
437 for buffer in buffers {
438 self.push(buffer.clone());
439 }
440 }
441
442 fn extend_from_compaction(&mut self, buffers: BuffersWithOffsets) -> ViewAdjustment {
443 match (self, buffers) {
444 (
445 Self::Default(completed_buffers),
446 BuffersWithOffsets::AllKept { buffers, offsets },
447 ) => {
448 let buffer_offset = completed_buffers.len() as u32;
449 completed_buffers.extend_from_slice(&buffers);
450 ViewAdjustment::shift(buffer_offset, offsets)
451 }
452 (
453 Self::Default(completed_buffers),
454 BuffersWithOffsets::SomeCompacted { buffers, offsets },
455 ) => {
456 let lookup = buffers
457 .iter()
458 .map(|maybe_buffer| {
459 maybe_buffer.as_ref().map(|buffer| {
460 completed_buffers.push(buffer.clone());
461 completed_buffers.len() as u32 - 1
462 })
463 })
464 .collect();
465 ViewAdjustment::rewriting(lookup, offsets)
466 }
467
468 (
469 Self::Deduplicated(completed_buffers),
470 BuffersWithOffsets::AllKept { buffers, offsets },
471 ) => {
472 let buffer_lookup = completed_buffers.extend_from_iter(buffers.iter().cloned());
473 ViewAdjustment::lookup(buffer_lookup, offsets)
474 }
475 (
476 Self::Deduplicated(completed_buffers),
477 BuffersWithOffsets::SomeCompacted { buffers, offsets },
478 ) => {
479 let buffer_lookup = completed_buffers.extend_from_option_slice(&buffers);
480 ViewAdjustment::rewriting(buffer_lookup, offsets)
481 }
482 }
483 }
484
485 fn finish(self) -> Arc<[ByteBuffer]> {
486 match self {
487 Self::Default(buffers) => Arc::from(buffers),
488 Self::Deduplicated(buffers) => buffers.finish(),
489 }
490 }
491}
492
493#[derive(Default)]
494pub struct DeduplicatedBuffers {
495 buffers: Vec<ByteBuffer>,
496 buffer_to_idx: HashMap<BufferId, u32>,
497}
498
499impl DeduplicatedBuffers {
500 #[expect(clippy::cast_possible_truncation)]
502 fn len(&self) -> u32 {
503 self.buffers.len() as u32
504 }
505
506 pub(crate) fn push(&mut self, block: ByteBuffer) -> u32 {
508 assert!(self.buffers.len() < u32::MAX as usize, "Too many blocks");
509
510 let initial_len = self.len();
511 let id = BufferId::from(&block);
512 match self.buffer_to_idx.entry(id) {
513 Entry::Occupied(idx) => *idx.get(),
514 Entry::Vacant(entry) => {
515 let idx = initial_len;
516 entry.insert(idx);
517 self.buffers.push(block);
518 idx
519 }
520 }
521 }
522
523 pub(crate) fn extend_from_option_slice(
524 &mut self,
525 buffers: &[Option<ByteBuffer>],
526 ) -> Vec<Option<u32>> {
527 buffers
528 .iter()
529 .map(|buffer| buffer.as_ref().map(|buf| self.push(buf.clone())))
530 .collect()
531 }
532
533 pub(crate) fn extend_from_iter(
534 &mut self,
535 buffers: impl Iterator<Item = ByteBuffer>,
536 ) -> Vec<u32> {
537 buffers.map(|buffer| self.push(buffer)).collect()
538 }
539
540 pub(crate) fn finish(self) -> Arc<[ByteBuffer]> {
541 Arc::from(self.buffers)
542 }
543}
544
545#[derive(PartialEq, Eq, Hash)]
546struct BufferId {
547 ptr: usize,
549 len: usize,
550}
551
552impl BufferId {
553 fn from(buffer: &ByteBuffer) -> Self {
554 let slice = buffer.as_slice();
555 Self {
556 ptr: slice.as_ptr() as usize,
557 len: slice.len(),
558 }
559 }
560}
561
562#[derive(Debug, Clone)]
563pub enum BufferGrowthStrategy {
564 Fixed { size: u32 },
566 Exponential { current_size: u32, max_size: u32 },
568}
569
570impl Default for BufferGrowthStrategy {
571 fn default() -> Self {
572 Self::Exponential {
573 current_size: 4 * 1024, max_size: 2 * 1024 * 1024, }
576 }
577}
578
579impl BufferGrowthStrategy {
580 pub fn fixed(size: u32) -> Self {
581 Self::Fixed { size }
582 }
583
584 pub fn exponential(initial_size: u32, max_size: u32) -> Self {
585 Self::Exponential {
586 current_size: initial_size,
587 max_size,
588 }
589 }
590
591 pub fn next_size(&mut self) -> u32 {
593 match self {
594 Self::Fixed { size } => *size,
595 Self::Exponential {
596 current_size,
597 max_size,
598 } => {
599 let result = *current_size;
600 if *current_size < *max_size {
601 *current_size = current_size.saturating_mul(2).min(*max_size);
602 }
603 result
604 }
605 }
606 }
607}
608
609enum BuffersWithOffsets {
610 AllKept {
611 buffers: Arc<[ByteBuffer]>,
612 offsets: Option<Vec<u32>>,
613 },
614 SomeCompacted {
615 buffers: Vec<Option<ByteBuffer>>,
616 offsets: Option<Vec<u32>>,
617 },
618}
619
620impl BuffersWithOffsets {
621 pub fn from_array(array: &VarBinViewArray, compaction_threshold: f64) -> Self {
622 if compaction_threshold == 0.0 {
623 return Self::AllKept {
624 buffers: Arc::from(
625 array
626 .data_buffers()
627 .to_vec()
628 .into_iter()
629 .map(|b| b.unwrap_host())
630 .collect_vec(),
631 ),
632 offsets: None,
633 };
634 }
635
636 let buffer_utilizations = array
637 .buffer_utilizations()
638 .vortex_expect("buffer_utilizations in BuffersWithOffsets::from_array");
639 let mut has_rewrite = false;
640 let mut has_nonzero_offset = false;
641 for utilization in buffer_utilizations.iter() {
642 match compaction_strategy(utilization, compaction_threshold) {
643 CompactionStrategy::KeepFull => continue,
644 CompactionStrategy::Slice { .. } => has_nonzero_offset = true,
645 CompactionStrategy::Rewrite => has_rewrite = true,
646 }
647 }
648
649 let buffers_with_offsets_iter = buffer_utilizations
650 .iter()
651 .zip(array.data_buffers().iter())
652 .map(|(utilization, buffer)| {
653 match compaction_strategy(utilization, compaction_threshold) {
654 CompactionStrategy::KeepFull => (Some(buffer.as_host().clone()), 0),
655 CompactionStrategy::Slice { start, end } => (
656 Some(buffer.as_host().slice(start as usize..end as usize)),
657 start,
658 ),
659 CompactionStrategy::Rewrite => (None, 0),
660 }
661 });
662
663 match (has_rewrite, has_nonzero_offset) {
664 (false, false) => {
666 let buffers: Vec<_> = buffers_with_offsets_iter
667 .map(|(b, _)| b.vortex_expect("already checked for rewrite"))
668 .collect();
669 Self::AllKept {
670 buffers: Arc::from(buffers),
671 offsets: None,
672 }
673 }
674 (true, false) => {
676 let buffers: Vec<_> = buffers_with_offsets_iter.map(|(b, _)| b).collect();
677 Self::SomeCompacted {
678 buffers,
679 offsets: None,
680 }
681 }
682 (false, true) => {
684 let (buffers, offsets): (Vec<_>, _) = buffers_with_offsets_iter
685 .map(|(buffer, offset)| {
686 (buffer.vortex_expect("already checked for rewrite"), offset)
687 })
688 .collect();
689 Self::AllKept {
690 buffers: Arc::from(buffers),
691 offsets: Some(offsets),
692 }
693 }
694 (true, true) => {
696 let (buffers, offsets) = buffers_with_offsets_iter.collect();
697 Self::SomeCompacted {
698 buffers,
699 offsets: Some(offsets),
700 }
701 }
702 }
703 }
704}
705
706#[derive(Debug, Clone, Copy, PartialEq, Eq)]
707enum CompactionStrategy {
708 KeepFull,
709 Slice {
711 start: u32,
712 end: u32,
713 },
714 Rewrite,
716}
717
718fn compaction_strategy(
719 buffer_utilization: &BufferUtilization,
720 threshold: f64,
721) -> CompactionStrategy {
722 match buffer_utilization.overall_utilization() {
723 0.0 => CompactionStrategy::Rewrite,
725 utilised if utilised >= threshold => CompactionStrategy::KeepFull,
726 _ if buffer_utilization.range_utilization() >= threshold => {
727 let Range { start, end } = buffer_utilization.range();
728 CompactionStrategy::Slice { start, end }
729 }
730 _ => CompactionStrategy::Rewrite,
731 }
732}
733
734enum ViewAdjustment {
735 Precomputed(PrecomputedViewAdjustment),
736 Rewriting(RewritingViewAdjustment),
737}
738
739impl ViewAdjustment {
740 fn shift(buffer_offset: u32, offsets: Option<Vec<u32>>) -> Self {
741 Self::Precomputed(PrecomputedViewAdjustment::Shift {
742 buffer_offset,
743 offsets,
744 })
745 }
746
747 fn lookup(buffer_lookup: Vec<u32>, offsets: Option<Vec<u32>>) -> Self {
748 Self::Precomputed(PrecomputedViewAdjustment::Lookup {
749 buffer_lookup,
750 offsets,
751 })
752 }
753
754 fn rewriting(buffer_lookup: Vec<Option<u32>>, offsets: Option<Vec<u32>>) -> Self {
755 Self::Rewriting(RewritingViewAdjustment {
756 buffer_lookup,
757 offsets,
758 })
759 }
760}
761
762enum PrecomputedViewAdjustment {
764 Shift {
765 buffer_offset: u32,
766 offsets: Option<Vec<u32>>,
767 },
768 Lookup {
769 buffer_lookup: Vec<u32>,
770 offsets: Option<Vec<u32>>,
771 },
772}
773
774impl PrecomputedViewAdjustment {
775 fn adjust_view(&self, view: &BinaryView) -> BinaryView {
776 if view.is_inlined() {
777 return *view;
778 }
779 let view_ref = view.as_view();
780 match self {
781 Self::Shift {
782 buffer_offset,
783 offsets,
784 } => {
785 let b_idx = view_ref.buffer_index;
786 let offset_shift = offsets
787 .as_ref()
788 .map(|o| o[b_idx as usize])
789 .unwrap_or_default();
790
791 if view_ref.offset < offset_shift {
794 return BinaryView::empty_view();
795 }
796
797 view_ref
798 .with_buffer_and_offset(b_idx + buffer_offset, view_ref.offset - offset_shift)
799 }
800 Self::Lookup {
801 buffer_lookup,
802 offsets,
803 } => {
804 let b_idx = view_ref.buffer_index;
805 let buffer = buffer_lookup[b_idx as usize];
806 let offset_shift = offsets
807 .as_ref()
808 .map(|o| o[b_idx as usize])
809 .unwrap_or_default();
810
811 if view_ref.offset < offset_shift {
814 return BinaryView::empty_view();
815 }
816
817 view_ref.with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
818 }
819 }
820 .into()
821 }
822}
823
824struct RewritingViewAdjustment {
825 buffer_lookup: Vec<Option<u32>>,
826 offsets: Option<Vec<u32>>,
827}
828
829impl RewritingViewAdjustment {
830 fn adjust_view(&self, view: &BinaryView) -> Option<BinaryView> {
833 if view.is_inlined() {
834 return Some(*view);
835 }
836
837 let view_ref = view.as_view();
838 self.buffer_lookup[view_ref.buffer_index as usize].map(|buffer| {
839 let offset_shift = self
840 .offsets
841 .as_ref()
842 .map(|o| o[view_ref.buffer_index as usize])
843 .unwrap_or_default();
844 view_ref
845 .with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
846 .into()
847 })
848 }
849}
850
851#[cfg(test)]
852mod tests {
853 use vortex_error::VortexResult;
854
855 use crate::IntoArray;
856 use crate::LEGACY_SESSION;
857 use crate::VortexSessionExecute;
858 use crate::assert_arrays_eq;
859 use crate::builders::ArrayBuilder;
860 use crate::builders::VarBinViewBuilder;
861 use crate::builders::varbinview::VarBinViewArray;
862 use crate::dtype::DType;
863 use crate::dtype::Nullability;
864
865 #[test]
866 fn test_utf8_builder() {
867 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
868
869 builder.append_value("Hello");
870 builder.append_null();
871 builder.append_value("World");
872
873 builder.append_nulls(2);
874
875 builder.append_zeros(2);
876 builder.append_value("test");
877
878 let actual = builder.finish();
879 let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
880 Some("Hello"),
881 None,
882 Some("World"),
883 None,
884 None,
885 Some(""),
886 Some(""),
887 Some("test"),
888 ]);
889 assert_arrays_eq!(actual, expected);
890 }
891
892 #[test]
893 fn test_utf8_builder_with_extend() {
894 let array = {
895 let mut builder =
896 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
897 builder.append_null();
898 builder.append_value("Hello2");
899 builder.finish()
900 };
901 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
902
903 builder.append_value("Hello1");
904 builder.extend_from_array(&array);
905 builder.append_nulls(2);
906 builder.append_value("Hello3");
907
908 let actual = builder.finish_into_canonical();
909 let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
910 Some("Hello1"),
911 None,
912 Some("Hello2"),
913 None,
914 None,
915 Some("Hello3"),
916 ]);
917 assert_arrays_eq!(actual.into_array(), expected.into_array());
918 }
919
920 #[test]
921 fn test_buffer_deduplication() -> VortexResult<()> {
922 let array = {
923 let mut builder =
924 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
925 builder.append_value("This is a long string that should not be inlined");
926 builder.append_value("short string");
927 builder.finish_into_varbinview()
928 };
929
930 assert_eq!(array.data_buffers().len(), 1);
931 let mut builder =
932 VarBinViewBuilder::with_buffer_deduplication(DType::Utf8(Nullability::Nullable), 10);
933
934 let mut ctx = LEGACY_SESSION.create_execution_ctx();
935
936 array.append_to_builder(&mut builder, &mut ctx)?;
937 assert_eq!(builder.completed_block_count(), 1);
938
939 array
940 .slice(1..2)?
941 .append_to_builder(&mut builder, &mut ctx)?;
942 array
943 .slice(0..1)?
944 .append_to_builder(&mut builder, &mut ctx)?;
945 assert_eq!(builder.completed_block_count(), 1);
946
947 let array2 = {
948 let mut builder =
949 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
950 builder.append_value("This is a long string that should not be inlined");
951 builder.finish_into_varbinview()
952 };
953
954 array2.append_to_builder(&mut builder, &mut ctx)?;
955 assert_eq!(builder.completed_block_count(), 2);
956
957 array
958 .slice(0..1)?
959 .append_to_builder(&mut builder, &mut ctx)?;
960 array2
961 .slice(0..1)?
962 .append_to_builder(&mut builder, &mut ctx)?;
963 assert_eq!(builder.completed_block_count(), 2);
964 Ok(())
965 }
966
967 #[test]
968 fn test_append_scalar() {
969 use crate::scalar::Scalar;
970
971 let mut utf8_builder =
973 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
974
975 let utf8_scalar1 = Scalar::utf8("hello", Nullability::Nullable);
977 utf8_builder.append_scalar(&utf8_scalar1).unwrap();
978
979 let utf8_scalar2 = Scalar::utf8("world", Nullability::Nullable);
981 utf8_builder.append_scalar(&utf8_scalar2).unwrap();
982
983 let null_scalar = Scalar::null(DType::Utf8(Nullability::Nullable));
985 utf8_builder.append_scalar(&null_scalar).unwrap();
986
987 let array = utf8_builder.finish();
988 let expected =
989 <VarBinViewArray as FromIterator<_>>::from_iter([Some("hello"), Some("world"), None]);
990 assert_arrays_eq!(&array, &expected);
991
992 let mut binary_builder =
994 VarBinViewBuilder::with_capacity(DType::Binary(Nullability::Nullable), 10);
995
996 let binary_scalar = Scalar::binary(vec![1u8, 2, 3], Nullability::Nullable);
997 binary_builder.append_scalar(&binary_scalar).unwrap();
998
999 let binary_null = Scalar::null(DType::Binary(Nullability::Nullable));
1000 binary_builder.append_scalar(&binary_null).unwrap();
1001
1002 let binary_array = binary_builder.finish();
1003 let expected =
1004 <VarBinViewArray as FromIterator<_>>::from_iter([Some(vec![1u8, 2, 3]), None]);
1005 assert_arrays_eq!(&binary_array, &expected);
1006
1007 let mut builder =
1009 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::NonNullable), 10);
1010 let wrong_scalar = Scalar::from(42i32);
1011 assert!(builder.append_scalar(&wrong_scalar).is_err());
1012 }
1013
1014 #[test]
1015 fn test_buffer_growth_strategies() {
1016 use super::BufferGrowthStrategy;
1017
1018 let mut strategy = BufferGrowthStrategy::fixed(1024);
1020
1021 assert_eq!(strategy.next_size(), 1024);
1023 assert_eq!(strategy.next_size(), 1024);
1024 assert_eq!(strategy.next_size(), 1024);
1025
1026 let mut strategy = BufferGrowthStrategy::exponential(1024, 8192);
1028
1029 assert_eq!(strategy.next_size(), 1024); assert_eq!(strategy.next_size(), 2048); assert_eq!(strategy.next_size(), 4096); assert_eq!(strategy.next_size(), 8192); assert_eq!(strategy.next_size(), 8192); }
1036
1037 #[test]
1038 fn test_large_value_allocation() {
1039 use super::BufferGrowthStrategy;
1040 use super::VarBinViewBuilder;
1041
1042 let mut builder = VarBinViewBuilder::new(
1043 DType::Binary(Nullability::Nullable),
1044 10,
1045 Default::default(),
1046 BufferGrowthStrategy::exponential(1024, 4096),
1047 0.0,
1048 );
1049
1050 let large_value = vec![0u8; 8192];
1052
1053 builder.append_value(&large_value);
1055
1056 let array = builder.finish_into_varbinview();
1057 assert_eq!(array.len(), 1);
1058
1059 let retrieved = array
1061 .execute_scalar(0, &mut LEGACY_SESSION.create_execution_ctx())
1062 .unwrap()
1063 .as_binary()
1064 .value()
1065 .cloned()
1066 .unwrap();
1067 assert_eq!(retrieved.len(), 8192);
1068 assert_eq!(retrieved.as_slice(), &large_value);
1069 }
1070}