1use std::any::Any;
5use std::ops::Range;
6use std::sync::Arc;
7
8use itertools::Itertools;
9use vortex_buffer::Buffer;
10use vortex_buffer::BufferMut;
11use vortex_buffer::ByteBuffer;
12use vortex_buffer::ByteBufferMut;
13use vortex_error::VortexExpect;
14use vortex_error::VortexResult;
15use vortex_error::vortex_bail;
16use vortex_error::vortex_ensure;
17use vortex_mask::Mask;
18use vortex_utils::aliases::hash_map::Entry;
19use vortex_utils::aliases::hash_map::HashMap;
20
21use crate::ArrayRef;
22use crate::IntoArray;
23use crate::LEGACY_SESSION;
24use crate::VortexSessionExecute;
25use crate::arrays::VarBinViewArray;
26use crate::arrays::varbinview::build_views::BinaryView;
27use crate::arrays::varbinview::compact::BufferUtilization;
28use crate::builders::ArrayBuilder;
29use crate::builders::LazyBitBufferBuilder;
30use crate::canonical::Canonical;
31#[expect(deprecated)]
32use crate::canonical::ToCanonical as _;
33use crate::dtype::DType;
34use crate::scalar::Scalar;
35
36pub struct VarBinViewBuilder {
38 dtype: DType,
39 views_builder: BufferMut<BinaryView>,
40 nulls: LazyBitBufferBuilder,
41 completed: CompletedBuffers,
42 in_progress: ByteBufferMut,
43 growth_strategy: BufferGrowthStrategy,
44 compaction_threshold: f64,
45}
46
47impl VarBinViewBuilder {
48 pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
49 Self::new(dtype, capacity, Default::default(), Default::default(), 0.0)
50 }
51
52 pub fn with_buffer_deduplication(dtype: DType, capacity: usize) -> Self {
53 Self::new(
54 dtype,
55 capacity,
56 CompletedBuffers::Deduplicated(Default::default()),
57 Default::default(),
58 0.0,
59 )
60 }
61
62 pub fn with_compaction(dtype: DType, capacity: usize, compaction_threshold: f64) -> Self {
63 Self::new(
64 dtype,
65 capacity,
66 Default::default(),
67 Default::default(),
68 compaction_threshold,
69 )
70 }
71
72 pub fn new(
73 dtype: DType,
74 capacity: usize,
75 completed: CompletedBuffers,
76 growth_strategy: BufferGrowthStrategy,
77 compaction_threshold: f64,
78 ) -> Self {
79 assert!(
80 matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
81 "VarBinViewBuilder DType must be Utf8 or Binary."
82 );
83 Self {
84 views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
85 nulls: LazyBitBufferBuilder::new(capacity),
86 completed,
87 in_progress: ByteBufferMut::empty(),
88 dtype,
89 growth_strategy,
90 compaction_threshold,
91 }
92 }
93
94 fn append_value_view(&mut self, value: &[u8]) {
95 let length =
96 u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
97 if length <= 12 {
98 self.views_builder.push(BinaryView::make_view(value, 0, 0));
99 return;
100 }
101
102 let (buffer_idx, offset) = self.append_value_to_buffer(value);
103 let view = BinaryView::make_view(value, buffer_idx, offset);
104 self.views_builder.push(view);
105 }
106
107 pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
109 self.append_value_view(value.as_ref());
110 self.nulls.append_non_null();
111 }
112
113 pub fn append_n_values<S: AsRef<[u8]>>(&mut self, value: S, n: usize) {
115 if n == 0 {
116 return;
117 }
118 let bytes = value.as_ref();
119 let view = if bytes.len() <= BinaryView::MAX_INLINED_SIZE {
120 BinaryView::make_view(bytes, 0, 0)
121 } else {
122 let (buffer_idx, offset) = self.append_value_to_buffer(bytes);
123 BinaryView::make_view(bytes, buffer_idx, offset)
124 };
125 self.views_builder.push_n(view, n);
126 self.nulls.append_n_non_nulls(n);
127 }
128
129 fn flush_in_progress(&mut self) {
130 if self.in_progress.is_empty() {
131 return;
132 }
133 let block = std::mem::take(&mut self.in_progress).freeze();
134
135 assert!(block.len() < u32::MAX as usize, "Block too large");
136
137 let initial_len = self.completed.len();
138 self.completed.push(block);
139 assert_eq!(
140 self.completed.len(),
141 initial_len + 1,
142 "Invalid state, just completed block already exists"
143 );
144 }
145
146 fn append_value_to_buffer(&mut self, value: &[u8]) -> (u32, u32) {
148 assert!(
149 value.len() > BinaryView::MAX_INLINED_SIZE,
150 "must inline small strings"
151 );
152 let required_cap = self.in_progress.len() + value.len();
153 if self.in_progress.capacity() < required_cap {
154 self.flush_in_progress();
155 let next_buffer_size = self.growth_strategy.next_size() as usize;
156 let to_reserve = next_buffer_size.max(value.len());
157 self.in_progress.reserve(to_reserve);
158 }
159
160 let buffer_idx = self.completed.len();
161 let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
162 self.in_progress.extend_from_slice(value);
163
164 (buffer_idx, offset)
165 }
166
167 pub fn completed_block_count(&self) -> u32 {
168 self.completed.len()
169 }
170
171 pub fn in_progress(&self) -> bool {
174 !self.in_progress.is_empty()
175 }
176
177 pub fn push_buffer_and_adjusted_views(
194 &mut self,
195 buffers: &[ByteBuffer],
196 views: &Buffer<BinaryView>,
197 validity_mask: Mask,
198 ) {
199 self.flush_in_progress();
200
201 let expected_completed_len = self.completed.len() as usize + buffers.len();
202 self.completed.extend_from_slice_unchecked(buffers);
203 assert_eq!(
204 self.completed.len() as usize,
205 expected_completed_len,
206 "Some buffers already exist",
207 );
208 self.views_builder.extend_trusted(views.iter().copied());
209 self.push_only_validity_mask(validity_mask);
210
211 debug_assert_eq!(self.nulls.len(), self.views_builder.len())
212 }
213
214 pub fn finish_into_varbinview(&mut self) -> VarBinViewArray {
216 self.flush_in_progress();
217 let buffers = std::mem::take(&mut self.completed);
218
219 assert_eq!(
220 self.views_builder.len(),
221 self.nulls.len(),
222 "View and validity length must match"
223 );
224
225 let validity = self.nulls.finish_with_nullability(self.dtype.nullability());
226
227 unsafe {
229 VarBinViewArray::new_unchecked(
230 std::mem::take(&mut self.views_builder).freeze(),
231 buffers.finish(),
232 self.dtype.clone(),
233 validity,
234 )
235 }
236 }
237
238 fn push_only_validity_mask(&mut self, validity_mask: Mask) {
240 self.nulls.append_validity_mask(validity_mask);
241 }
242}
243
244impl ArrayBuilder for VarBinViewBuilder {
245 fn as_any(&self) -> &dyn Any {
246 self
247 }
248
249 fn as_any_mut(&mut self) -> &mut dyn Any {
250 self
251 }
252
253 fn dtype(&self) -> &DType {
254 &self.dtype
255 }
256
257 fn len(&self) -> usize {
258 self.nulls.len()
259 }
260
261 fn append_zeros(&mut self, n: usize) {
262 self.views_builder.push_n(BinaryView::empty_view(), n);
263 self.nulls.append_n_non_nulls(n);
264 }
265
266 unsafe fn append_nulls_unchecked(&mut self, n: usize) {
267 self.views_builder.push_n(BinaryView::empty_view(), n);
268 self.nulls.append_n_nulls(n);
269 }
270
271 fn append_scalar(&mut self, scalar: &Scalar) -> VortexResult<()> {
272 vortex_ensure!(
273 scalar.dtype() == self.dtype(),
274 "VarBinViewBuilder expected scalar with dtype {}, got {}",
275 self.dtype(),
276 scalar.dtype()
277 );
278
279 match self.dtype() {
280 DType::Utf8(_) => match scalar.as_utf8().value() {
281 Some(value) => self.append_value(value),
282 None => self.append_null(),
283 },
284 DType::Binary(_) => match scalar.as_binary().value() {
285 Some(value) => self.append_value(value),
286 None => self.append_null(),
287 },
288 _ => vortex_bail!(
289 "VarBinViewBuilder can only handle Utf8 or Binary scalars, got {:?}",
290 scalar.dtype()
291 ),
292 }
293
294 Ok(())
295 }
296
297 unsafe fn extend_from_array_unchecked(&mut self, array: &ArrayRef) {
298 #[expect(deprecated)]
299 let array = array.to_varbinview();
300 self.flush_in_progress();
301
302 self.push_only_validity_mask(
303 array
304 .as_ref()
305 .validity()
306 .vortex_expect("validity_mask")
307 .execute_mask(
308 array.as_ref().len(),
309 &mut LEGACY_SESSION.create_execution_ctx(),
310 )
311 .vortex_expect("Failed to compute validity mask"),
312 );
313
314 let view_adjustment =
315 self.completed
316 .extend_from_compaction(BuffersWithOffsets::from_array(
317 &array,
318 self.compaction_threshold,
319 ));
320
321 match view_adjustment {
322 ViewAdjustment::Precomputed(adjustment) => self.views_builder.extend_trusted(
323 array
324 .views()
325 .iter()
326 .map(|view| adjustment.adjust_view(view)),
327 ),
328 ViewAdjustment::Rewriting(adjustment) => {
329 match array
330 .as_ref()
331 .validity()
332 .vortex_expect("validity_mask")
333 .execute_mask(
334 array.as_ref().len(),
335 &mut LEGACY_SESSION.create_execution_ctx(),
336 )
337 .vortex_expect("Failed to compute validity mask")
338 {
339 Mask::AllTrue(_) => {
340 for (idx, &view) in array.views().iter().enumerate() {
341 let new_view = self.push_view(view, &adjustment, &array, idx);
342 self.views_builder.push(new_view);
343 }
344 }
345 Mask::AllFalse(_) => {
346 self.views_builder
347 .push_n(BinaryView::empty_view(), array.len());
348 }
349 Mask::Values(v) => {
350 for (idx, (&view, is_valid)) in
351 array.views().iter().zip(v.bit_buffer().iter()).enumerate()
352 {
353 let new_view = if !is_valid {
354 BinaryView::empty_view()
355 } else {
356 self.push_view(view, &adjustment, &array, idx)
357 };
358 self.views_builder.push(new_view);
359 }
360 }
361 }
362 }
363 }
364 }
365
366 fn reserve_exact(&mut self, additional: usize) {
367 self.views_builder.reserve(additional);
368 self.nulls.reserve_exact(additional);
369 }
370
371 unsafe fn set_validity_unchecked(&mut self, validity: Mask) {
372 self.nulls = LazyBitBufferBuilder::new(validity.len());
373 self.nulls.append_validity_mask(validity);
374 }
375
376 fn finish(&mut self) -> ArrayRef {
377 self.finish_into_varbinview().into_array()
378 }
379
380 fn finish_into_canonical(&mut self) -> Canonical {
381 Canonical::VarBinView(self.finish_into_varbinview())
382 }
383}
384
385impl VarBinViewBuilder {
386 fn push_view(
387 &mut self,
388 view: BinaryView,
389 adjustment: &RewritingViewAdjustment,
390 array: &VarBinViewArray,
391 idx: usize,
392 ) -> BinaryView {
393 if view.is_inlined() {
394 view
395 } else if let Some(adjusted) = adjustment.adjust_view(&view) {
396 adjusted
397 } else {
398 let bytes = array.bytes_at(idx);
399 let (new_buf_idx, new_offset) = self.append_value_to_buffer(&bytes);
400 BinaryView::make_view(bytes.as_slice(), new_buf_idx, new_offset)
401 }
402 }
403}
404
405pub enum CompletedBuffers {
406 Default(Vec<ByteBuffer>),
407 Deduplicated(DeduplicatedBuffers),
408}
409
410impl Default for CompletedBuffers {
411 fn default() -> Self {
412 Self::Default(Vec::new())
413 }
414}
415
416#[expect(clippy::cast_possible_truncation)]
418impl CompletedBuffers {
419 fn len(&self) -> u32 {
420 match self {
421 Self::Default(buffers) => buffers.len() as u32,
422 Self::Deduplicated(buffers) => buffers.len(),
423 }
424 }
425
426 fn push(&mut self, block: ByteBuffer) -> u32 {
427 match self {
428 Self::Default(buffers) => {
429 assert!(buffers.len() < u32::MAX as usize, "Too many blocks");
430 buffers.push(block);
431 self.len()
432 }
433 Self::Deduplicated(buffers) => buffers.push(block),
434 }
435 }
436
437 fn extend_from_slice_unchecked(&mut self, buffers: &[ByteBuffer]) {
439 for buffer in buffers {
440 self.push(buffer.clone());
441 }
442 }
443
444 fn extend_from_compaction(&mut self, buffers: BuffersWithOffsets) -> ViewAdjustment {
445 match (self, buffers) {
446 (
447 Self::Default(completed_buffers),
448 BuffersWithOffsets::AllKept { buffers, offsets },
449 ) => {
450 let buffer_offset = completed_buffers.len() as u32;
451 completed_buffers.extend_from_slice(&buffers);
452 ViewAdjustment::shift(buffer_offset, offsets)
453 }
454 (
455 Self::Default(completed_buffers),
456 BuffersWithOffsets::SomeCompacted { buffers, offsets },
457 ) => {
458 let lookup = buffers
459 .iter()
460 .map(|maybe_buffer| {
461 maybe_buffer.as_ref().map(|buffer| {
462 completed_buffers.push(buffer.clone());
463 completed_buffers.len() as u32 - 1
464 })
465 })
466 .collect();
467 ViewAdjustment::rewriting(lookup, offsets)
468 }
469
470 (
471 Self::Deduplicated(completed_buffers),
472 BuffersWithOffsets::AllKept { buffers, offsets },
473 ) => {
474 let buffer_lookup = completed_buffers.extend_from_iter(buffers.iter().cloned());
475 ViewAdjustment::lookup(buffer_lookup, offsets)
476 }
477 (
478 Self::Deduplicated(completed_buffers),
479 BuffersWithOffsets::SomeCompacted { buffers, offsets },
480 ) => {
481 let buffer_lookup = completed_buffers.extend_from_option_slice(&buffers);
482 ViewAdjustment::rewriting(buffer_lookup, offsets)
483 }
484 }
485 }
486
487 fn finish(self) -> Arc<[ByteBuffer]> {
488 match self {
489 Self::Default(buffers) => Arc::from(buffers),
490 Self::Deduplicated(buffers) => buffers.finish(),
491 }
492 }
493}
494
495#[derive(Default)]
496pub struct DeduplicatedBuffers {
497 buffers: Vec<ByteBuffer>,
498 buffer_to_idx: HashMap<BufferId, u32>,
499}
500
501impl DeduplicatedBuffers {
502 #[expect(clippy::cast_possible_truncation)]
504 fn len(&self) -> u32 {
505 self.buffers.len() as u32
506 }
507
508 pub(crate) fn push(&mut self, block: ByteBuffer) -> u32 {
510 assert!(self.buffers.len() < u32::MAX as usize, "Too many blocks");
511
512 let initial_len = self.len();
513 let id = BufferId::from(&block);
514 match self.buffer_to_idx.entry(id) {
515 Entry::Occupied(idx) => *idx.get(),
516 Entry::Vacant(entry) => {
517 let idx = initial_len;
518 entry.insert(idx);
519 self.buffers.push(block);
520 idx
521 }
522 }
523 }
524
525 pub(crate) fn extend_from_option_slice(
526 &mut self,
527 buffers: &[Option<ByteBuffer>],
528 ) -> Vec<Option<u32>> {
529 buffers
530 .iter()
531 .map(|buffer| buffer.as_ref().map(|buf| self.push(buf.clone())))
532 .collect()
533 }
534
535 pub(crate) fn extend_from_iter(
536 &mut self,
537 buffers: impl Iterator<Item = ByteBuffer>,
538 ) -> Vec<u32> {
539 buffers.map(|buffer| self.push(buffer)).collect()
540 }
541
542 pub(crate) fn finish(self) -> Arc<[ByteBuffer]> {
543 Arc::from(self.buffers)
544 }
545}
546
547#[derive(PartialEq, Eq, Hash)]
548struct BufferId {
549 ptr: usize,
551 len: usize,
552}
553
554impl BufferId {
555 fn from(buffer: &ByteBuffer) -> Self {
556 let slice = buffer.as_slice();
557 Self {
558 ptr: slice.as_ptr() as usize,
559 len: slice.len(),
560 }
561 }
562}
563
564#[derive(Debug, Clone)]
565pub enum BufferGrowthStrategy {
566 Fixed { size: u32 },
568 Exponential { current_size: u32, max_size: u32 },
570}
571
572impl Default for BufferGrowthStrategy {
573 fn default() -> Self {
574 Self::Exponential {
575 current_size: 4 * 1024, max_size: 2 * 1024 * 1024, }
578 }
579}
580
581impl BufferGrowthStrategy {
582 pub fn fixed(size: u32) -> Self {
583 Self::Fixed { size }
584 }
585
586 pub fn exponential(initial_size: u32, max_size: u32) -> Self {
587 Self::Exponential {
588 current_size: initial_size,
589 max_size,
590 }
591 }
592
593 pub fn next_size(&mut self) -> u32 {
595 match self {
596 Self::Fixed { size } => *size,
597 Self::Exponential {
598 current_size,
599 max_size,
600 } => {
601 let result = *current_size;
602 if *current_size < *max_size {
603 *current_size = current_size.saturating_mul(2).min(*max_size);
604 }
605 result
606 }
607 }
608 }
609}
610
611enum BuffersWithOffsets {
612 AllKept {
613 buffers: Arc<[ByteBuffer]>,
614 offsets: Option<Vec<u32>>,
615 },
616 SomeCompacted {
617 buffers: Vec<Option<ByteBuffer>>,
618 offsets: Option<Vec<u32>>,
619 },
620}
621
622impl BuffersWithOffsets {
623 pub fn from_array(array: &VarBinViewArray, compaction_threshold: f64) -> Self {
624 if compaction_threshold == 0.0 {
625 return Self::AllKept {
626 buffers: Arc::from(
627 array
628 .data_buffers()
629 .to_vec()
630 .into_iter()
631 .map(|b| b.unwrap_host())
632 .collect_vec(),
633 ),
634 offsets: None,
635 };
636 }
637
638 let buffer_utilizations = array
639 .buffer_utilizations()
640 .vortex_expect("buffer_utilizations in BuffersWithOffsets::from_array");
641 let mut has_rewrite = false;
642 let mut has_nonzero_offset = false;
643 for utilization in buffer_utilizations.iter() {
644 match compaction_strategy(utilization, compaction_threshold) {
645 CompactionStrategy::KeepFull => continue,
646 CompactionStrategy::Slice { .. } => has_nonzero_offset = true,
647 CompactionStrategy::Rewrite => has_rewrite = true,
648 }
649 }
650
651 let buffers_with_offsets_iter = buffer_utilizations
652 .iter()
653 .zip(array.data_buffers().iter())
654 .map(|(utilization, buffer)| {
655 match compaction_strategy(utilization, compaction_threshold) {
656 CompactionStrategy::KeepFull => (Some(buffer.as_host().clone()), 0),
657 CompactionStrategy::Slice { start, end } => (
658 Some(buffer.as_host().slice(start as usize..end as usize)),
659 start,
660 ),
661 CompactionStrategy::Rewrite => (None, 0),
662 }
663 });
664
665 match (has_rewrite, has_nonzero_offset) {
666 (false, false) => {
668 let buffers: Vec<_> = buffers_with_offsets_iter
669 .map(|(b, _)| b.vortex_expect("already checked for rewrite"))
670 .collect();
671 Self::AllKept {
672 buffers: Arc::from(buffers),
673 offsets: None,
674 }
675 }
676 (true, false) => {
678 let buffers: Vec<_> = buffers_with_offsets_iter.map(|(b, _)| b).collect();
679 Self::SomeCompacted {
680 buffers,
681 offsets: None,
682 }
683 }
684 (false, true) => {
686 let (buffers, offsets): (Vec<_>, _) = buffers_with_offsets_iter
687 .map(|(buffer, offset)| {
688 (buffer.vortex_expect("already checked for rewrite"), offset)
689 })
690 .collect();
691 Self::AllKept {
692 buffers: Arc::from(buffers),
693 offsets: Some(offsets),
694 }
695 }
696 (true, true) => {
698 let (buffers, offsets) = buffers_with_offsets_iter.collect();
699 Self::SomeCompacted {
700 buffers,
701 offsets: Some(offsets),
702 }
703 }
704 }
705 }
706}
707
708#[derive(Debug, Clone, Copy, PartialEq, Eq)]
709enum CompactionStrategy {
710 KeepFull,
711 Slice {
713 start: u32,
714 end: u32,
715 },
716 Rewrite,
718}
719
720fn compaction_strategy(
721 buffer_utilization: &BufferUtilization,
722 threshold: f64,
723) -> CompactionStrategy {
724 match buffer_utilization.overall_utilization() {
725 0.0 => CompactionStrategy::Rewrite,
727 utilised if utilised >= threshold => CompactionStrategy::KeepFull,
728 _ if buffer_utilization.range_utilization() >= threshold => {
729 let Range { start, end } = buffer_utilization.range();
730 CompactionStrategy::Slice { start, end }
731 }
732 _ => CompactionStrategy::Rewrite,
733 }
734}
735
736enum ViewAdjustment {
737 Precomputed(PrecomputedViewAdjustment),
738 Rewriting(RewritingViewAdjustment),
739}
740
741impl ViewAdjustment {
742 fn shift(buffer_offset: u32, offsets: Option<Vec<u32>>) -> Self {
743 Self::Precomputed(PrecomputedViewAdjustment::Shift {
744 buffer_offset,
745 offsets,
746 })
747 }
748
749 fn lookup(buffer_lookup: Vec<u32>, offsets: Option<Vec<u32>>) -> Self {
750 Self::Precomputed(PrecomputedViewAdjustment::Lookup {
751 buffer_lookup,
752 offsets,
753 })
754 }
755
756 fn rewriting(buffer_lookup: Vec<Option<u32>>, offsets: Option<Vec<u32>>) -> Self {
757 Self::Rewriting(RewritingViewAdjustment {
758 buffer_lookup,
759 offsets,
760 })
761 }
762}
763
764enum PrecomputedViewAdjustment {
766 Shift {
767 buffer_offset: u32,
768 offsets: Option<Vec<u32>>,
769 },
770 Lookup {
771 buffer_lookup: Vec<u32>,
772 offsets: Option<Vec<u32>>,
773 },
774}
775
776impl PrecomputedViewAdjustment {
777 fn adjust_view(&self, view: &BinaryView) -> BinaryView {
778 if view.is_inlined() {
779 return *view;
780 }
781 let view_ref = view.as_view();
782 match self {
783 Self::Shift {
784 buffer_offset,
785 offsets,
786 } => {
787 let b_idx = view_ref.buffer_index;
788 let offset_shift = offsets
789 .as_ref()
790 .map(|o| o[b_idx as usize])
791 .unwrap_or_default();
792
793 if view_ref.offset < offset_shift {
796 return BinaryView::empty_view();
797 }
798
799 view_ref
800 .with_buffer_and_offset(b_idx + buffer_offset, view_ref.offset - offset_shift)
801 }
802 Self::Lookup {
803 buffer_lookup,
804 offsets,
805 } => {
806 let b_idx = view_ref.buffer_index;
807 let buffer = buffer_lookup[b_idx as usize];
808 let offset_shift = offsets
809 .as_ref()
810 .map(|o| o[b_idx as usize])
811 .unwrap_or_default();
812
813 if view_ref.offset < offset_shift {
816 return BinaryView::empty_view();
817 }
818
819 view_ref.with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
820 }
821 }
822 .into()
823 }
824}
825
826struct RewritingViewAdjustment {
827 buffer_lookup: Vec<Option<u32>>,
828 offsets: Option<Vec<u32>>,
829}
830
831impl RewritingViewAdjustment {
832 fn adjust_view(&self, view: &BinaryView) -> Option<BinaryView> {
835 if view.is_inlined() {
836 return Some(*view);
837 }
838
839 let view_ref = view.as_view();
840 self.buffer_lookup[view_ref.buffer_index as usize].map(|buffer| {
841 let offset_shift = self
842 .offsets
843 .as_ref()
844 .map(|o| o[view_ref.buffer_index as usize])
845 .unwrap_or_default();
846 view_ref
847 .with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
848 .into()
849 })
850 }
851}
852
853#[cfg(test)]
854mod tests {
855 use vortex_error::VortexResult;
856
857 use crate::IntoArray;
858 use crate::LEGACY_SESSION;
859 use crate::VortexSessionExecute;
860 use crate::assert_arrays_eq;
861 use crate::builders::ArrayBuilder;
862 use crate::builders::VarBinViewBuilder;
863 use crate::builders::varbinview::VarBinViewArray;
864 use crate::dtype::DType;
865 use crate::dtype::Nullability;
866
867 #[test]
868 fn test_utf8_builder() {
869 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
870
871 builder.append_value("Hello");
872 builder.append_null();
873 builder.append_value("World");
874
875 builder.append_nulls(2);
876
877 builder.append_zeros(2);
878 builder.append_value("test");
879
880 let actual = builder.finish();
881 let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
882 Some("Hello"),
883 None,
884 Some("World"),
885 None,
886 None,
887 Some(""),
888 Some(""),
889 Some("test"),
890 ]);
891 assert_arrays_eq!(actual, expected);
892 }
893
894 #[test]
895 fn test_utf8_builder_with_extend() {
896 let array = {
897 let mut builder =
898 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
899 builder.append_null();
900 builder.append_value("Hello2");
901 builder.finish()
902 };
903 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
904
905 builder.append_value("Hello1");
906 builder.extend_from_array(&array);
907 builder.append_nulls(2);
908 builder.append_value("Hello3");
909
910 let actual = builder.finish_into_canonical();
911 let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
912 Some("Hello1"),
913 None,
914 Some("Hello2"),
915 None,
916 None,
917 Some("Hello3"),
918 ]);
919 assert_arrays_eq!(actual.into_array(), expected.into_array());
920 }
921
922 #[test]
923 fn test_buffer_deduplication() -> VortexResult<()> {
924 let array = {
925 let mut builder =
926 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
927 builder.append_value("This is a long string that should not be inlined");
928 builder.append_value("short string");
929 builder.finish_into_varbinview()
930 };
931
932 assert_eq!(array.data_buffers().len(), 1);
933 let mut builder =
934 VarBinViewBuilder::with_buffer_deduplication(DType::Utf8(Nullability::Nullable), 10);
935
936 let mut ctx = LEGACY_SESSION.create_execution_ctx();
937
938 array.append_to_builder(&mut builder, &mut ctx)?;
939 assert_eq!(builder.completed_block_count(), 1);
940
941 array
942 .slice(1..2)?
943 .append_to_builder(&mut builder, &mut ctx)?;
944 array
945 .slice(0..1)?
946 .append_to_builder(&mut builder, &mut ctx)?;
947 assert_eq!(builder.completed_block_count(), 1);
948
949 let array2 = {
950 let mut builder =
951 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
952 builder.append_value("This is a long string that should not be inlined");
953 builder.finish_into_varbinview()
954 };
955
956 array2.append_to_builder(&mut builder, &mut ctx)?;
957 assert_eq!(builder.completed_block_count(), 2);
958
959 array
960 .slice(0..1)?
961 .append_to_builder(&mut builder, &mut ctx)?;
962 array2
963 .slice(0..1)?
964 .append_to_builder(&mut builder, &mut ctx)?;
965 assert_eq!(builder.completed_block_count(), 2);
966 Ok(())
967 }
968
969 #[test]
970 fn test_append_scalar() {
971 use crate::scalar::Scalar;
972
973 let mut utf8_builder =
975 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
976
977 let utf8_scalar1 = Scalar::utf8("hello", Nullability::Nullable);
979 utf8_builder.append_scalar(&utf8_scalar1).unwrap();
980
981 let utf8_scalar2 = Scalar::utf8("world", Nullability::Nullable);
983 utf8_builder.append_scalar(&utf8_scalar2).unwrap();
984
985 let null_scalar = Scalar::null(DType::Utf8(Nullability::Nullable));
987 utf8_builder.append_scalar(&null_scalar).unwrap();
988
989 let array = utf8_builder.finish();
990 let expected =
991 <VarBinViewArray as FromIterator<_>>::from_iter([Some("hello"), Some("world"), None]);
992 assert_arrays_eq!(&array, &expected);
993
994 let mut binary_builder =
996 VarBinViewBuilder::with_capacity(DType::Binary(Nullability::Nullable), 10);
997
998 let binary_scalar = Scalar::binary(vec![1u8, 2, 3], Nullability::Nullable);
999 binary_builder.append_scalar(&binary_scalar).unwrap();
1000
1001 let binary_null = Scalar::null(DType::Binary(Nullability::Nullable));
1002 binary_builder.append_scalar(&binary_null).unwrap();
1003
1004 let binary_array = binary_builder.finish();
1005 let expected =
1006 <VarBinViewArray as FromIterator<_>>::from_iter([Some(vec![1u8, 2, 3]), None]);
1007 assert_arrays_eq!(&binary_array, &expected);
1008
1009 let mut builder =
1011 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::NonNullable), 10);
1012 let wrong_scalar = Scalar::from(42i32);
1013 assert!(builder.append_scalar(&wrong_scalar).is_err());
1014 }
1015
1016 #[test]
1017 fn test_buffer_growth_strategies() {
1018 use super::BufferGrowthStrategy;
1019
1020 let mut strategy = BufferGrowthStrategy::fixed(1024);
1022
1023 assert_eq!(strategy.next_size(), 1024);
1025 assert_eq!(strategy.next_size(), 1024);
1026 assert_eq!(strategy.next_size(), 1024);
1027
1028 let mut strategy = BufferGrowthStrategy::exponential(1024, 8192);
1030
1031 assert_eq!(strategy.next_size(), 1024); assert_eq!(strategy.next_size(), 2048); assert_eq!(strategy.next_size(), 4096); assert_eq!(strategy.next_size(), 8192); assert_eq!(strategy.next_size(), 8192); }
1038
1039 #[test]
1040 fn test_large_value_allocation() {
1041 use super::BufferGrowthStrategy;
1042 use super::VarBinViewBuilder;
1043
1044 let mut builder = VarBinViewBuilder::new(
1045 DType::Binary(Nullability::Nullable),
1046 10,
1047 Default::default(),
1048 BufferGrowthStrategy::exponential(1024, 4096),
1049 0.0,
1050 );
1051
1052 let large_value = vec![0u8; 8192];
1054
1055 builder.append_value(&large_value);
1057
1058 let array = builder.finish_into_varbinview();
1059 assert_eq!(array.len(), 1);
1060
1061 let retrieved = array
1063 .execute_scalar(0, &mut LEGACY_SESSION.create_execution_ctx())
1064 .unwrap()
1065 .as_binary()
1066 .value()
1067 .cloned()
1068 .unwrap();
1069 assert_eq!(retrieved.len(), 8192);
1070 assert_eq!(retrieved.as_slice(), &large_value);
1071 }
1072}