1use std::any::Any;
5use std::ops::Range;
6use std::sync::Arc;
7
8use itertools::Itertools;
9use vortex_buffer::Alignment;
10use vortex_buffer::Buffer;
11use vortex_buffer::BufferMut;
12use vortex_buffer::ByteBuffer;
13use vortex_buffer::ByteBufferMut;
14use vortex_error::VortexExpect;
15use vortex_error::VortexResult;
16use vortex_error::vortex_bail;
17use vortex_error::vortex_ensure;
18use vortex_mask::Mask;
19use vortex_utils::aliases::hash_map::Entry;
20use vortex_utils::aliases::hash_map::HashMap;
21
22use crate::ArrayRef;
23use crate::ExecutionCtx;
24use crate::IntoArray;
25use crate::LEGACY_SESSION;
26use crate::VortexSessionExecute;
27use crate::arrays::VarBinViewArray;
28use crate::arrays::varbinview::VarBinViewArrayExt;
29use crate::arrays::varbinview::build_views::BinaryView;
30use crate::arrays::varbinview::compact::BufferUtilization;
31use crate::builders::ArrayBuilder;
32use crate::builders::LazyBitBufferBuilder;
33use crate::canonical::Canonical;
34#[expect(deprecated)]
35use crate::canonical::ToCanonical as _;
36use crate::dtype::DType;
37use crate::scalar::Scalar;
38
39pub struct VarBinViewBuilder {
41 dtype: DType,
42 views_builder: BufferMut<BinaryView>,
43 nulls: LazyBitBufferBuilder,
44 completed: CompletedBuffers,
45 in_progress: Option<ByteBufferMut>,
46 growth_strategy: BufferGrowthStrategy,
47 compaction_threshold: f64,
48}
49
50impl VarBinViewBuilder {
51 pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
52 Self::new(dtype, capacity, Default::default(), Default::default(), 0.0)
53 }
54
55 pub fn with_buffer_deduplication(dtype: DType, capacity: usize) -> Self {
56 Self::new(
57 dtype,
58 capacity,
59 CompletedBuffers::Deduplicated(Default::default()),
60 Default::default(),
61 0.0,
62 )
63 }
64
65 pub fn with_compaction(dtype: DType, capacity: usize, compaction_threshold: f64) -> Self {
66 Self::new(
67 dtype,
68 capacity,
69 Default::default(),
70 Default::default(),
71 compaction_threshold,
72 )
73 }
74
75 pub fn new(
76 dtype: DType,
77 capacity: usize,
78 completed: CompletedBuffers,
79 growth_strategy: BufferGrowthStrategy,
80 compaction_threshold: f64,
81 ) -> Self {
82 assert!(
83 matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
84 "VarBinViewBuilder DType must be Utf8 or Binary."
85 );
86 Self {
87 views_builder: BufferMut::with_capacity_preferred_aligned(
88 capacity,
89 Alignment::of::<BinaryView>(),
90 None,
91 ),
92 nulls: LazyBitBufferBuilder::new(capacity),
93 completed,
94 in_progress: None,
95 dtype,
96 growth_strategy,
97 compaction_threshold,
98 }
99 }
100
101 fn append_value_view(&mut self, value: &[u8]) {
102 let length =
103 u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
104 if length <= 12 {
105 self.views_builder.push(BinaryView::make_view(value, 0, 0));
106 return;
107 }
108
109 let (buffer_idx, offset) = self.append_value_to_buffer(value);
110 let view = BinaryView::make_view(value, buffer_idx, offset);
111 self.views_builder.push(view);
112 }
113
114 pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
116 self.append_value_view(value.as_ref());
117 self.nulls.append_non_null();
118 }
119
120 pub fn append_n_values<S: AsRef<[u8]>>(&mut self, value: S, n: usize) {
122 if n == 0 {
123 return;
124 }
125 let bytes = value.as_ref();
126 let view = if bytes.len() <= BinaryView::MAX_INLINED_SIZE {
127 BinaryView::make_view(bytes, 0, 0)
128 } else {
129 let (buffer_idx, offset) = self.append_value_to_buffer(bytes);
130 BinaryView::make_view(bytes, buffer_idx, offset)
131 };
132 self.views_builder.push_n(view, n);
133 self.nulls.append_n_non_nulls(n);
134 }
135
136 fn flush_in_progress(&mut self) {
137 let Some(block) = self.in_progress.take() else {
138 return;
139 };
140
141 assert!(block.len() < u32::MAX as usize, "Block too large");
142
143 let initial_len = self.completed.len();
144 self.completed.push(block.freeze());
145 assert_eq!(
146 self.completed.len(),
147 initial_len + 1,
148 "Invalid state, just completed block already exists"
149 );
150 }
151
152 fn init_in_progress(&mut self, min_len: usize) {
153 let next_buffer_size = self.growth_strategy.next_size() as usize;
154 let to_reserve = next_buffer_size.max(min_len);
155 self.in_progress = Some(ByteBufferMut::with_capacity_preferred_aligned(
156 to_reserve,
157 Alignment::of::<u8>(),
158 None,
159 ));
160 }
161
162 fn append_value_to_buffer(&mut self, value: &[u8]) -> (u32, u32) {
164 assert!(
165 value.len() > BinaryView::MAX_INLINED_SIZE,
166 "must inline small strings"
167 );
168
169 if let Some(in_progress) = &mut self.in_progress {
170 let required_cap = in_progress.len() + value.len();
171 if in_progress.capacity() < required_cap {
172 self.flush_in_progress();
173 self.init_in_progress(value.len());
174 }
175 } else {
176 self.init_in_progress(value.len())
177 };
178
179 let in_progress = self
180 .in_progress
181 .as_mut()
182 .vortex_expect("in_progress just set");
183
184 let buffer_idx = self.completed.len();
185 let offset = u32::try_from(in_progress.len()).vortex_expect("too many buffers");
186 in_progress.extend_from_slice(value);
187
188 (buffer_idx, offset)
189 }
190
191 pub fn completed_block_count(&self) -> u32 {
192 self.completed.len()
193 }
194
195 pub fn in_progress(&self) -> bool {
198 self.in_progress.is_some()
199 }
200
201 pub fn push_buffer_and_adjusted_views(
218 &mut self,
219 buffers: &[ByteBuffer],
220 views: &Buffer<BinaryView>,
221 validity_mask: Mask,
222 ) {
223 self.flush_in_progress();
224
225 let expected_completed_len = self.completed.len() as usize + buffers.len();
226 self.completed.extend_from_slice_unchecked(buffers);
227 assert_eq!(
228 self.completed.len() as usize,
229 expected_completed_len,
230 "Some buffers already exist",
231 );
232 self.views_builder.extend_trusted(views.iter().copied());
233 self.push_only_validity_mask(&validity_mask);
234
235 debug_assert_eq!(self.nulls.len(), self.views_builder.len())
236 }
237
238 pub fn finish_into_varbinview(&mut self) -> VarBinViewArray {
240 self.flush_in_progress();
241 let buffers = std::mem::take(&mut self.completed);
242
243 assert_eq!(
244 self.views_builder.len(),
245 self.nulls.len(),
246 "View and validity length must match"
247 );
248
249 let validity = self.nulls.finish_with_nullability(self.dtype.nullability());
250
251 unsafe {
253 VarBinViewArray::new_unchecked(
254 std::mem::take(&mut self.views_builder).freeze(),
255 buffers.finish(),
256 self.dtype.clone(),
257 validity,
258 )
259 }
260 }
261
262 fn push_only_validity_mask(&mut self, validity_mask: &Mask) {
264 self.nulls.append_validity_mask(validity_mask);
265 }
266
267 pub(crate) fn append_varbinview_array(
268 &mut self,
269 array: &VarBinViewArray,
270 ctx: &mut ExecutionCtx,
271 ) -> VortexResult<()> {
272 self.flush_in_progress();
273
274 let mask = array.varbinview_validity().execute_mask(array.len(), ctx)?;
275
276 self.push_only_validity_mask(&mask);
277
278 let view_adjustment =
279 self.completed
280 .extend_from_compaction(BuffersWithOffsets::from_array(
281 array,
282 self.compaction_threshold,
283 ));
284
285 match view_adjustment {
286 ViewAdjustment::Precomputed(adjustment) => self.views_builder.extend_trusted(
287 array
288 .views()
289 .iter()
290 .map(|view| adjustment.adjust_view(view)),
291 ),
292 ViewAdjustment::Rewriting(adjustment) => match mask {
293 Mask::AllTrue(_) => {
294 for (idx, &view) in array.views().iter().enumerate() {
295 let new_view = self.push_view(view, &adjustment, array, idx);
296 self.views_builder.push(new_view);
297 }
298 }
299 Mask::AllFalse(_) => {
300 self.views_builder
301 .push_n(BinaryView::empty_view(), array.len());
302 }
303 Mask::Values(v) => {
304 for (idx, (&view, is_valid)) in
305 array.views().iter().zip(v.bit_buffer().iter()).enumerate()
306 {
307 let new_view = if !is_valid {
308 BinaryView::empty_view()
309 } else {
310 self.push_view(view, &adjustment, array, idx)
311 };
312 self.views_builder.push(new_view);
313 }
314 }
315 },
316 }
317
318 Ok(())
319 }
320}
321
322impl ArrayBuilder for VarBinViewBuilder {
323 fn as_any(&self) -> &dyn Any {
324 self
325 }
326
327 fn as_any_mut(&mut self) -> &mut dyn Any {
328 self
329 }
330
331 fn dtype(&self) -> &DType {
332 &self.dtype
333 }
334
335 fn len(&self) -> usize {
336 self.nulls.len()
337 }
338
339 fn append_zeros(&mut self, n: usize) {
340 self.views_builder.push_n(BinaryView::empty_view(), n);
341 self.nulls.append_n_non_nulls(n);
342 }
343
344 unsafe fn append_nulls_unchecked(&mut self, n: usize) {
345 self.views_builder.push_n(BinaryView::empty_view(), n);
346 self.nulls.append_n_nulls(n);
347 }
348
349 fn append_scalar(&mut self, scalar: &Scalar) -> VortexResult<()> {
350 vortex_ensure!(
351 scalar.dtype() == self.dtype(),
352 "VarBinViewBuilder expected scalar with dtype {}, got {}",
353 self.dtype(),
354 scalar.dtype()
355 );
356
357 match self.dtype() {
358 DType::Utf8(_) => match scalar.as_utf8().value() {
359 Some(value) => self.append_value(value),
360 None => self.append_null(),
361 },
362 DType::Binary(_) => match scalar.as_binary().value() {
363 Some(value) => self.append_value(value),
364 None => self.append_null(),
365 },
366 _ => vortex_bail!(
367 "VarBinViewBuilder can only handle Utf8 or Binary scalars, got {:?}",
368 scalar.dtype()
369 ),
370 }
371
372 Ok(())
373 }
374
375 unsafe fn extend_from_array_unchecked(&mut self, array: &ArrayRef) {
376 #[expect(deprecated)]
377 let array = array.to_varbinview();
378 self.append_varbinview_array(&array, &mut LEGACY_SESSION.create_execution_ctx())
379 .vortex_expect("Failed to append varbinview array");
380 }
381
382 fn reserve_exact(&mut self, additional: usize) {
383 self.views_builder.reserve(additional);
384 self.nulls.reserve_exact(additional);
385 }
386
387 unsafe fn set_validity_unchecked(&mut self, validity: Mask) {
388 self.nulls = LazyBitBufferBuilder::from_validity_mask(validity);
389 }
390
391 fn finish(&mut self) -> ArrayRef {
392 self.finish_into_varbinview().into_array()
393 }
394
395 fn finish_into_canonical(&mut self) -> Canonical {
396 Canonical::VarBinView(self.finish_into_varbinview())
397 }
398}
399
400impl VarBinViewBuilder {
401 #[inline]
402 fn push_view(
403 &mut self,
404 view: BinaryView,
405 adjustment: &RewritingViewAdjustment,
406 array: &VarBinViewArray,
407 idx: usize,
408 ) -> BinaryView {
409 if view.is_inlined() {
410 view
411 } else if let Some(adjusted) = adjustment.adjust_view(&view) {
412 adjusted
413 } else {
414 let bytes = array.bytes_at(idx);
415 let (new_buf_idx, new_offset) = self.append_value_to_buffer(&bytes);
416 BinaryView::make_view(bytes.as_slice(), new_buf_idx, new_offset)
417 }
418 }
419}
420
421pub enum CompletedBuffers {
422 Default(Vec<ByteBuffer>),
423 Deduplicated(DeduplicatedBuffers),
424}
425
426impl Default for CompletedBuffers {
427 fn default() -> Self {
428 Self::Default(Vec::new())
429 }
430}
431
432#[expect(clippy::cast_possible_truncation)]
434impl CompletedBuffers {
435 fn len(&self) -> u32 {
436 match self {
437 Self::Default(buffers) => buffers.len() as u32,
438 Self::Deduplicated(buffers) => buffers.len(),
439 }
440 }
441
442 fn push(&mut self, block: ByteBuffer) -> u32 {
443 match self {
444 Self::Default(buffers) => {
445 assert!(buffers.len() < u32::MAX as usize, "Too many blocks");
446 buffers.push(block);
447 self.len()
448 }
449 Self::Deduplicated(buffers) => buffers.push(block),
450 }
451 }
452
453 fn extend_from_slice_unchecked(&mut self, buffers: &[ByteBuffer]) {
455 for buffer in buffers {
456 self.push(buffer.clone());
457 }
458 }
459
460 fn extend_from_compaction(&mut self, buffers: BuffersWithOffsets) -> ViewAdjustment {
461 match (self, buffers) {
462 (
463 Self::Default(completed_buffers),
464 BuffersWithOffsets::AllKept { buffers, offsets },
465 ) => {
466 let buffer_offset = completed_buffers.len() as u32;
467 completed_buffers.extend_from_slice(&buffers);
468 ViewAdjustment::shift(buffer_offset, offsets)
469 }
470 (
471 Self::Default(completed_buffers),
472 BuffersWithOffsets::SomeCompacted { buffers, offsets },
473 ) => {
474 let lookup = buffers
475 .iter()
476 .map(|maybe_buffer| {
477 maybe_buffer.as_ref().map(|buffer| {
478 completed_buffers.push(buffer.clone());
479 completed_buffers.len() as u32 - 1
480 })
481 })
482 .collect();
483 ViewAdjustment::rewriting(lookup, offsets)
484 }
485
486 (
487 Self::Deduplicated(completed_buffers),
488 BuffersWithOffsets::AllKept { buffers, offsets },
489 ) => {
490 let buffer_lookup = completed_buffers.extend_from_iter(buffers.iter().cloned());
491 ViewAdjustment::lookup(buffer_lookup, offsets)
492 }
493 (
494 Self::Deduplicated(completed_buffers),
495 BuffersWithOffsets::SomeCompacted { buffers, offsets },
496 ) => {
497 let buffer_lookup = completed_buffers.extend_from_option_slice(&buffers);
498 ViewAdjustment::rewriting(buffer_lookup, offsets)
499 }
500 }
501 }
502
503 fn finish(self) -> Arc<[ByteBuffer]> {
504 match self {
505 Self::Default(buffers) => Arc::from(buffers),
506 Self::Deduplicated(buffers) => buffers.finish(),
507 }
508 }
509}
510
511#[derive(Default)]
512pub struct DeduplicatedBuffers {
513 buffers: Vec<ByteBuffer>,
514 buffer_to_idx: HashMap<BufferId, u32>,
515}
516
517impl DeduplicatedBuffers {
518 #[expect(clippy::cast_possible_truncation)]
520 fn len(&self) -> u32 {
521 self.buffers.len() as u32
522 }
523
524 pub(crate) fn push(&mut self, block: ByteBuffer) -> u32 {
526 assert!(self.buffers.len() < u32::MAX as usize, "Too many blocks");
527
528 let initial_len = self.len();
529 let id = BufferId::from(&block);
530 match self.buffer_to_idx.entry(id) {
531 Entry::Occupied(idx) => *idx.get(),
532 Entry::Vacant(entry) => {
533 let idx = initial_len;
534 entry.insert(idx);
535 self.buffers.push(block);
536 idx
537 }
538 }
539 }
540
541 pub(crate) fn extend_from_option_slice(
542 &mut self,
543 buffers: &[Option<ByteBuffer>],
544 ) -> Vec<Option<u32>> {
545 buffers
546 .iter()
547 .map(|buffer| buffer.as_ref().map(|buf| self.push(buf.clone())))
548 .collect()
549 }
550
551 pub(crate) fn extend_from_iter(
552 &mut self,
553 buffers: impl Iterator<Item = ByteBuffer>,
554 ) -> Vec<u32> {
555 buffers.map(|buffer| self.push(buffer)).collect()
556 }
557
558 pub(crate) fn finish(self) -> Arc<[ByteBuffer]> {
559 Arc::from(self.buffers)
560 }
561}
562
563#[derive(PartialEq, Eq, Hash)]
564struct BufferId {
565 ptr: usize,
567 len: usize,
568}
569
570impl BufferId {
571 fn from(buffer: &ByteBuffer) -> Self {
572 let slice = buffer.as_slice();
573 Self {
574 ptr: slice.as_ptr() as usize,
575 len: slice.len(),
576 }
577 }
578}
579
580#[derive(Debug, Clone)]
581pub enum BufferGrowthStrategy {
582 Fixed { size: u32 },
584 Exponential { current_size: u32, max_size: u32 },
586}
587
588impl Default for BufferGrowthStrategy {
589 fn default() -> Self {
590 Self::Exponential {
591 current_size: 4 * 1024, max_size: 2 * 1024 * 1024, }
594 }
595}
596
597impl BufferGrowthStrategy {
598 pub fn fixed(size: u32) -> Self {
599 Self::Fixed { size }
600 }
601
602 pub fn exponential(initial_size: u32, max_size: u32) -> Self {
603 Self::Exponential {
604 current_size: initial_size,
605 max_size,
606 }
607 }
608
609 pub fn next_size(&mut self) -> u32 {
611 match self {
612 Self::Fixed { size } => *size,
613 Self::Exponential {
614 current_size,
615 max_size,
616 } => {
617 let result = *current_size;
618 if *current_size < *max_size {
619 *current_size = current_size.saturating_mul(2).min(*max_size);
620 }
621 result
622 }
623 }
624 }
625}
626
627enum BuffersWithOffsets {
628 AllKept {
629 buffers: Arc<[ByteBuffer]>,
630 offsets: Option<Vec<u32>>,
631 },
632 SomeCompacted {
633 buffers: Vec<Option<ByteBuffer>>,
634 offsets: Option<Vec<u32>>,
635 },
636}
637
638impl BuffersWithOffsets {
639 pub fn from_array(array: &VarBinViewArray, compaction_threshold: f64) -> Self {
640 if compaction_threshold == 0.0 {
641 return Self::AllKept {
642 buffers: Arc::from(
643 array
644 .data_buffers()
645 .iter()
646 .cloned()
647 .map(|b| b.unwrap_host())
648 .collect_vec(),
649 ),
650 offsets: None,
651 };
652 }
653
654 let buffer_utilizations = array
655 .buffer_utilizations()
656 .vortex_expect("buffer_utilizations in BuffersWithOffsets::from_array");
657 let mut has_rewrite = false;
658 let mut has_nonzero_offset = false;
659 for utilization in buffer_utilizations.iter() {
660 match compaction_strategy(utilization, compaction_threshold) {
661 CompactionStrategy::KeepFull => continue,
662 CompactionStrategy::Slice { .. } => has_nonzero_offset = true,
663 CompactionStrategy::Rewrite => has_rewrite = true,
664 }
665 }
666
667 let buffers_with_offsets_iter = buffer_utilizations
668 .iter()
669 .zip(array.data_buffers().iter())
670 .map(|(utilization, buffer)| {
671 match compaction_strategy(utilization, compaction_threshold) {
672 CompactionStrategy::KeepFull => (Some(buffer.as_host().clone()), 0),
673 CompactionStrategy::Slice { start, end } => (
674 Some(buffer.as_host().slice(start as usize..end as usize)),
675 start,
676 ),
677 CompactionStrategy::Rewrite => (None, 0),
678 }
679 });
680
681 match (has_rewrite, has_nonzero_offset) {
682 (false, false) => {
684 let buffers: Vec<_> = buffers_with_offsets_iter
685 .map(|(b, _)| b.vortex_expect("already checked for rewrite"))
686 .collect();
687 Self::AllKept {
688 buffers: Arc::from(buffers),
689 offsets: None,
690 }
691 }
692 (true, false) => {
694 let buffers: Vec<_> = buffers_with_offsets_iter.map(|(b, _)| b).collect();
695 Self::SomeCompacted {
696 buffers,
697 offsets: None,
698 }
699 }
700 (false, true) => {
702 let (buffers, offsets): (Vec<_>, _) = buffers_with_offsets_iter
703 .map(|(buffer, offset)| {
704 (buffer.vortex_expect("already checked for rewrite"), offset)
705 })
706 .collect();
707 Self::AllKept {
708 buffers: Arc::from(buffers),
709 offsets: Some(offsets),
710 }
711 }
712 (true, true) => {
714 let (buffers, offsets) = buffers_with_offsets_iter.collect();
715 Self::SomeCompacted {
716 buffers,
717 offsets: Some(offsets),
718 }
719 }
720 }
721 }
722}
723
724#[derive(Debug, Clone, Copy, PartialEq, Eq)]
725enum CompactionStrategy {
726 KeepFull,
727 Slice {
729 start: u32,
730 end: u32,
731 },
732 Rewrite,
734}
735
736fn compaction_strategy(
737 buffer_utilization: &BufferUtilization,
738 threshold: f64,
739) -> CompactionStrategy {
740 match buffer_utilization.overall_utilization() {
741 0.0 => CompactionStrategy::Rewrite,
743 utilised if utilised >= threshold => CompactionStrategy::KeepFull,
744 _ if buffer_utilization.range_utilization() >= threshold => {
745 let Range { start, end } = buffer_utilization.range();
746 CompactionStrategy::Slice { start, end }
747 }
748 _ => CompactionStrategy::Rewrite,
749 }
750}
751
752enum ViewAdjustment {
753 Precomputed(PrecomputedViewAdjustment),
754 Rewriting(RewritingViewAdjustment),
755}
756
757impl ViewAdjustment {
758 fn shift(buffer_offset: u32, offsets: Option<Vec<u32>>) -> Self {
759 Self::Precomputed(PrecomputedViewAdjustment::Shift {
760 buffer_offset,
761 offsets,
762 })
763 }
764
765 fn lookup(buffer_lookup: Vec<u32>, offsets: Option<Vec<u32>>) -> Self {
766 Self::Precomputed(PrecomputedViewAdjustment::Lookup {
767 buffer_lookup,
768 offsets,
769 })
770 }
771
772 fn rewriting(buffer_lookup: Vec<Option<u32>>, offsets: Option<Vec<u32>>) -> Self {
773 Self::Rewriting(RewritingViewAdjustment {
774 buffer_lookup,
775 offsets,
776 })
777 }
778}
779
780enum PrecomputedViewAdjustment {
782 Shift {
783 buffer_offset: u32,
784 offsets: Option<Vec<u32>>,
785 },
786 Lookup {
787 buffer_lookup: Vec<u32>,
788 offsets: Option<Vec<u32>>,
789 },
790}
791
792impl PrecomputedViewAdjustment {
793 #[inline]
794 fn adjust_view(&self, view: &BinaryView) -> BinaryView {
795 if view.is_inlined() {
796 return *view;
797 }
798 let view_ref = view.as_view();
799 match self {
800 Self::Shift {
801 buffer_offset,
802 offsets,
803 } => {
804 let b_idx = view_ref.buffer_index;
805 let offset_shift = offsets
806 .as_ref()
807 .map(|o| o[b_idx as usize])
808 .unwrap_or_default();
809
810 if view_ref.offset < offset_shift {
813 return BinaryView::empty_view();
814 }
815
816 view_ref
817 .with_buffer_and_offset(b_idx + buffer_offset, view_ref.offset - offset_shift)
818 }
819 Self::Lookup {
820 buffer_lookup,
821 offsets,
822 } => {
823 let b_idx = view_ref.buffer_index;
824 let buffer = buffer_lookup[b_idx as usize];
825 let offset_shift = offsets
826 .as_ref()
827 .map(|o| o[b_idx as usize])
828 .unwrap_or_default();
829
830 if view_ref.offset < offset_shift {
833 return BinaryView::empty_view();
834 }
835
836 view_ref.with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
837 }
838 }
839 .into()
840 }
841}
842
843struct RewritingViewAdjustment {
844 buffer_lookup: Vec<Option<u32>>,
845 offsets: Option<Vec<u32>>,
846}
847
848impl RewritingViewAdjustment {
849 #[inline]
852 fn adjust_view(&self, view: &BinaryView) -> Option<BinaryView> {
853 if view.is_inlined() {
854 return Some(*view);
855 }
856
857 let view_ref = view.as_view();
858 self.buffer_lookup[view_ref.buffer_index as usize].map(|buffer| {
859 let offset_shift = self
860 .offsets
861 .as_ref()
862 .map(|o| o[view_ref.buffer_index as usize])
863 .unwrap_or_default();
864 view_ref
865 .with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
866 .into()
867 })
868 }
869}
870
871#[cfg(test)]
872mod tests {
873 use vortex_error::VortexResult;
874
875 use crate::IntoArray;
876 use crate::VortexSessionExecute;
877 use crate::array_session;
878 use crate::assert_arrays_eq;
879 use crate::builders::ArrayBuilder;
880 use crate::builders::VarBinViewBuilder;
881 use crate::builders::varbinview::VarBinViewArray;
882 use crate::dtype::DType;
883 use crate::dtype::Nullability;
884
885 #[test]
886 fn test_utf8_builder() {
887 let mut ctx = array_session().create_execution_ctx();
888 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
889
890 builder.append_value("Hello");
891 builder.append_null();
892 builder.append_value("World");
893
894 builder.append_nulls(2);
895
896 builder.append_zeros(2);
897 builder.append_value("test");
898
899 let actual = builder.finish();
900 let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
901 Some("Hello"),
902 None,
903 Some("World"),
904 None,
905 None,
906 Some(""),
907 Some(""),
908 Some("test"),
909 ]);
910 assert_arrays_eq!(actual, expected, &mut ctx);
911 }
912
913 #[test]
914 fn test_utf8_builder_with_extend() {
915 let mut ctx = array_session().create_execution_ctx();
916 let array = {
917 let mut builder =
918 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
919 builder.append_null();
920 builder.append_value("Hello2");
921 builder.finish()
922 };
923 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
924
925 builder.append_value("Hello1");
926 builder.extend_from_array(&array);
927 builder.append_nulls(2);
928 builder.append_value("Hello3");
929
930 let actual = builder.finish_into_canonical();
931 let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
932 Some("Hello1"),
933 None,
934 Some("Hello2"),
935 None,
936 None,
937 Some("Hello3"),
938 ]);
939 assert_arrays_eq!(actual.into_array(), expected.into_array(), &mut ctx);
940 }
941
942 #[test]
943 fn test_buffer_deduplication() -> VortexResult<()> {
944 let array = {
945 let mut builder =
946 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
947 builder.append_value("This is a long string that should not be inlined");
948 builder.append_value("short string");
949 builder.finish_into_varbinview()
950 };
951
952 assert_eq!(array.data_buffers().len(), 1);
953 let mut builder =
954 VarBinViewBuilder::with_buffer_deduplication(DType::Utf8(Nullability::Nullable), 10);
955
956 let mut ctx = array_session().create_execution_ctx();
957
958 array.append_to_builder(&mut builder, &mut ctx)?;
959 assert_eq!(builder.completed_block_count(), 1);
960
961 array
962 .slice(1..2)?
963 .append_to_builder(&mut builder, &mut ctx)?;
964 array
965 .slice(0..1)?
966 .append_to_builder(&mut builder, &mut ctx)?;
967 assert_eq!(builder.completed_block_count(), 1);
968
969 let array2 = {
970 let mut builder =
971 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
972 builder.append_value("This is a long string that should not be inlined");
973 builder.finish_into_varbinview()
974 };
975
976 array2.append_to_builder(&mut builder, &mut ctx)?;
977 assert_eq!(builder.completed_block_count(), 2);
978
979 array
980 .slice(0..1)?
981 .append_to_builder(&mut builder, &mut ctx)?;
982 array2
983 .slice(0..1)?
984 .append_to_builder(&mut builder, &mut ctx)?;
985 assert_eq!(builder.completed_block_count(), 2);
986 Ok(())
987 }
988
989 #[test]
990 fn test_append_scalar() {
991 let mut ctx = array_session().create_execution_ctx();
992 use crate::scalar::Scalar;
993
994 let mut utf8_builder =
996 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
997
998 let utf8_scalar1 = Scalar::utf8("hello", Nullability::Nullable);
1000 utf8_builder.append_scalar(&utf8_scalar1).unwrap();
1001
1002 let utf8_scalar2 = Scalar::utf8("world", Nullability::Nullable);
1004 utf8_builder.append_scalar(&utf8_scalar2).unwrap();
1005
1006 let null_scalar = Scalar::null(DType::Utf8(Nullability::Nullable));
1008 utf8_builder.append_scalar(&null_scalar).unwrap();
1009
1010 let array = utf8_builder.finish();
1011 let expected =
1012 <VarBinViewArray as FromIterator<_>>::from_iter([Some("hello"), Some("world"), None]);
1013 assert_arrays_eq!(&array, &expected, &mut ctx);
1014
1015 let mut binary_builder =
1017 VarBinViewBuilder::with_capacity(DType::Binary(Nullability::Nullable), 10);
1018
1019 let binary_scalar = Scalar::binary(vec![1u8, 2, 3], Nullability::Nullable);
1020 binary_builder.append_scalar(&binary_scalar).unwrap();
1021
1022 let binary_null = Scalar::null(DType::Binary(Nullability::Nullable));
1023 binary_builder.append_scalar(&binary_null).unwrap();
1024
1025 let binary_array = binary_builder.finish();
1026 let expected =
1027 <VarBinViewArray as FromIterator<_>>::from_iter([Some(vec![1u8, 2, 3]), None]);
1028 assert_arrays_eq!(&binary_array, &expected, &mut ctx);
1029
1030 let mut builder =
1032 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::NonNullable), 10);
1033 let wrong_scalar = Scalar::from(42i32);
1034 assert!(builder.append_scalar(&wrong_scalar).is_err());
1035 }
1036
1037 #[test]
1038 fn test_buffer_growth_strategies() {
1039 use super::BufferGrowthStrategy;
1040
1041 let mut strategy = BufferGrowthStrategy::fixed(1024);
1043
1044 assert_eq!(strategy.next_size(), 1024);
1046 assert_eq!(strategy.next_size(), 1024);
1047 assert_eq!(strategy.next_size(), 1024);
1048
1049 let mut strategy = BufferGrowthStrategy::exponential(1024, 8192);
1051
1052 assert_eq!(strategy.next_size(), 1024); assert_eq!(strategy.next_size(), 2048); assert_eq!(strategy.next_size(), 4096); assert_eq!(strategy.next_size(), 8192); assert_eq!(strategy.next_size(), 8192); }
1059
1060 #[test]
1061 fn test_large_value_allocation() {
1062 use super::BufferGrowthStrategy;
1063 use super::VarBinViewBuilder;
1064
1065 let mut builder = VarBinViewBuilder::new(
1066 DType::Binary(Nullability::Nullable),
1067 10,
1068 Default::default(),
1069 BufferGrowthStrategy::exponential(1024, 4096),
1070 0.0,
1071 );
1072
1073 let large_value = vec![0u8; 8192];
1075
1076 builder.append_value(&large_value);
1078
1079 let array = builder.finish_into_varbinview();
1080 assert_eq!(array.len(), 1);
1081
1082 let retrieved = array
1084 .execute_scalar(0, &mut array_session().create_execution_ctx())
1085 .unwrap()
1086 .as_binary()
1087 .value()
1088 .cloned()
1089 .unwrap();
1090 assert_eq!(retrieved.len(), 8192);
1091 assert_eq!(retrieved.as_slice(), &large_value);
1092 }
1093}