1use std::any::Any;
5use std::ops::Range;
6use std::sync::Arc;
7
8use itertools::Itertools;
9use vortex_buffer::Buffer;
10use vortex_buffer::BufferMut;
11use vortex_buffer::ByteBuffer;
12use vortex_buffer::ByteBufferMut;
13use vortex_error::VortexExpect;
14use vortex_error::VortexResult;
15use vortex_error::vortex_bail;
16use vortex_error::vortex_ensure;
17use vortex_mask::Mask;
18use vortex_utils::aliases::hash_map::Entry;
19use vortex_utils::aliases::hash_map::HashMap;
20
21use crate::ArrayRef;
22use crate::IntoArray;
23use crate::arrays::VarBinViewArray;
24use crate::arrays::varbinview::build_views::BinaryView;
25use crate::arrays::varbinview::compact::BufferUtilization;
26use crate::builders::ArrayBuilder;
27use crate::builders::LazyBitBufferBuilder;
28use crate::canonical::Canonical;
29use crate::canonical::ToCanonical;
30use crate::dtype::DType;
31use crate::scalar::Scalar;
32
33pub struct VarBinViewBuilder {
35 dtype: DType,
36 views_builder: BufferMut<BinaryView>,
37 nulls: LazyBitBufferBuilder,
38 completed: CompletedBuffers,
39 in_progress: ByteBufferMut,
40 growth_strategy: BufferGrowthStrategy,
41 compaction_threshold: f64,
42}
43
44impl VarBinViewBuilder {
45 pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
46 Self::new(dtype, capacity, Default::default(), Default::default(), 0.0)
47 }
48
49 pub fn with_buffer_deduplication(dtype: DType, capacity: usize) -> Self {
50 Self::new(
51 dtype,
52 capacity,
53 CompletedBuffers::Deduplicated(Default::default()),
54 Default::default(),
55 0.0,
56 )
57 }
58
59 pub fn with_compaction(dtype: DType, capacity: usize, compaction_threshold: f64) -> Self {
60 Self::new(
61 dtype,
62 capacity,
63 Default::default(),
64 Default::default(),
65 compaction_threshold,
66 )
67 }
68
69 pub fn new(
70 dtype: DType,
71 capacity: usize,
72 completed: CompletedBuffers,
73 growth_strategy: BufferGrowthStrategy,
74 compaction_threshold: f64,
75 ) -> Self {
76 assert!(
77 matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
78 "VarBinViewBuilder DType must be Utf8 or Binary."
79 );
80 Self {
81 views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
82 nulls: LazyBitBufferBuilder::new(capacity),
83 completed,
84 in_progress: ByteBufferMut::empty(),
85 dtype,
86 growth_strategy,
87 compaction_threshold,
88 }
89 }
90
91 fn append_value_view(&mut self, value: &[u8]) {
92 let length =
93 u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
94 if length <= 12 {
95 self.views_builder.push(BinaryView::make_view(value, 0, 0));
96 return;
97 }
98
99 let (buffer_idx, offset) = self.append_value_to_buffer(value);
100 let view = BinaryView::make_view(value, buffer_idx, offset);
101 self.views_builder.push(view);
102 }
103
104 pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
106 self.append_value_view(value.as_ref());
107 self.nulls.append_non_null();
108 }
109
110 pub fn append_n_values<S: AsRef<[u8]>>(&mut self, value: S, n: usize) {
112 if n == 0 {
113 return;
114 }
115 let bytes = value.as_ref();
116 let view = if bytes.len() <= BinaryView::MAX_INLINED_SIZE {
117 BinaryView::make_view(bytes, 0, 0)
118 } else {
119 let (buffer_idx, offset) = self.append_value_to_buffer(bytes);
120 BinaryView::make_view(bytes, buffer_idx, offset)
121 };
122 self.views_builder.push_n(view, n);
123 self.nulls.append_n_non_nulls(n);
124 }
125
126 fn flush_in_progress(&mut self) {
127 if self.in_progress.is_empty() {
128 return;
129 }
130 let block = std::mem::take(&mut self.in_progress).freeze();
131
132 assert!(block.len() < u32::MAX as usize, "Block too large");
133
134 let initial_len = self.completed.len();
135 self.completed.push(block);
136 assert_eq!(
137 self.completed.len(),
138 initial_len + 1,
139 "Invalid state, just completed block already exists"
140 );
141 }
142
143 fn append_value_to_buffer(&mut self, value: &[u8]) -> (u32, u32) {
145 assert!(
146 value.len() > BinaryView::MAX_INLINED_SIZE,
147 "must inline small strings"
148 );
149 let required_cap = self.in_progress.len() + value.len();
150 if self.in_progress.capacity() < required_cap {
151 self.flush_in_progress();
152 let next_buffer_size = self.growth_strategy.next_size() as usize;
153 let to_reserve = next_buffer_size.max(value.len());
154 self.in_progress.reserve(to_reserve);
155 }
156
157 let buffer_idx = self.completed.len();
158 let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
159 self.in_progress.extend_from_slice(value);
160
161 (buffer_idx, offset)
162 }
163
164 pub fn completed_block_count(&self) -> u32 {
165 self.completed.len()
166 }
167
168 pub fn push_buffer_and_adjusted_views(
185 &mut self,
186 buffers: &[ByteBuffer],
187 views: &Buffer<BinaryView>,
188 validity_mask: Mask,
189 ) {
190 self.flush_in_progress();
191
192 let expected_completed_len = self.completed.len() as usize + buffers.len();
193 self.completed.extend_from_slice_unchecked(buffers);
194 assert_eq!(
195 self.completed.len() as usize,
196 expected_completed_len,
197 "Some buffers already exist",
198 );
199 self.views_builder.extend_trusted(views.iter().copied());
200 self.push_only_validity_mask(validity_mask);
201
202 debug_assert_eq!(self.nulls.len(), self.views_builder.len())
203 }
204
205 pub fn finish_into_varbinview(&mut self) -> VarBinViewArray {
207 self.flush_in_progress();
208 let buffers = std::mem::take(&mut self.completed);
209
210 assert_eq!(
211 self.views_builder.len(),
212 self.nulls.len(),
213 "View and validity length must match"
214 );
215
216 let validity = self.nulls.finish_with_nullability(self.dtype.nullability());
217
218 unsafe {
220 VarBinViewArray::new_unchecked(
221 std::mem::take(&mut self.views_builder).freeze(),
222 buffers.finish(),
223 self.dtype.clone(),
224 validity,
225 )
226 }
227 }
228
229 fn push_only_validity_mask(&mut self, validity_mask: Mask) {
231 self.nulls.append_validity_mask(validity_mask);
232 }
233}
234
235impl ArrayBuilder for VarBinViewBuilder {
236 fn as_any(&self) -> &dyn Any {
237 self
238 }
239
240 fn as_any_mut(&mut self) -> &mut dyn Any {
241 self
242 }
243
244 fn dtype(&self) -> &DType {
245 &self.dtype
246 }
247
248 fn len(&self) -> usize {
249 self.nulls.len()
250 }
251
252 fn append_zeros(&mut self, n: usize) {
253 self.views_builder.push_n(BinaryView::empty_view(), n);
254 self.nulls.append_n_non_nulls(n);
255 }
256
257 unsafe fn append_nulls_unchecked(&mut self, n: usize) {
258 self.views_builder.push_n(BinaryView::empty_view(), n);
259 self.nulls.append_n_nulls(n);
260 }
261
262 fn append_scalar(&mut self, scalar: &Scalar) -> VortexResult<()> {
263 vortex_ensure!(
264 scalar.dtype() == self.dtype(),
265 "VarBinViewBuilder expected scalar with dtype {}, got {}",
266 self.dtype(),
267 scalar.dtype()
268 );
269
270 match self.dtype() {
271 DType::Utf8(_) => match scalar.as_utf8().value() {
272 Some(value) => self.append_value(value),
273 None => self.append_null(),
274 },
275 DType::Binary(_) => match scalar.as_binary().value() {
276 Some(value) => self.append_value(value),
277 None => self.append_null(),
278 },
279 _ => vortex_bail!(
280 "VarBinViewBuilder can only handle Utf8 or Binary scalars, got {:?}",
281 scalar.dtype()
282 ),
283 }
284
285 Ok(())
286 }
287
288 unsafe fn extend_from_array_unchecked(&mut self, array: &ArrayRef) {
289 let array = array.to_varbinview();
290 self.flush_in_progress();
291
292 self.push_only_validity_mask(
293 array
294 .validity_mask()
295 .vortex_expect("validity_mask in extend_from_array_unchecked"),
296 );
297
298 let view_adjustment =
299 self.completed
300 .extend_from_compaction(BuffersWithOffsets::from_array(
301 &array,
302 self.compaction_threshold,
303 ));
304
305 match view_adjustment {
306 ViewAdjustment::Precomputed(adjustment) => self.views_builder.extend_trusted(
307 array
308 .views()
309 .iter()
310 .map(|view| adjustment.adjust_view(view)),
311 ),
312 ViewAdjustment::Rewriting(adjustment) => match array
313 .validity_mask()
314 .vortex_expect("validity_mask in extend_from_array_unchecked")
315 {
316 Mask::AllTrue(_) => {
317 for (idx, &view) in array.views().iter().enumerate() {
318 let new_view = self.push_view(view, &adjustment, &array, idx);
319 self.views_builder.push(new_view);
320 }
321 }
322 Mask::AllFalse(_) => {
323 self.views_builder
324 .push_n(BinaryView::empty_view(), array.len());
325 }
326 Mask::Values(v) => {
327 for (idx, (&view, is_valid)) in
328 array.views().iter().zip(v.bit_buffer().iter()).enumerate()
329 {
330 let new_view = if !is_valid {
331 BinaryView::empty_view()
332 } else {
333 self.push_view(view, &adjustment, &array, idx)
334 };
335 self.views_builder.push(new_view);
336 }
337 }
338 },
339 }
340 }
341
342 fn reserve_exact(&mut self, additional: usize) {
343 self.views_builder.reserve(additional);
344 self.nulls.reserve_exact(additional);
345 }
346
347 unsafe fn set_validity_unchecked(&mut self, validity: Mask) {
348 self.nulls = LazyBitBufferBuilder::new(validity.len());
349 self.nulls.append_validity_mask(validity);
350 }
351
352 fn finish(&mut self) -> ArrayRef {
353 self.finish_into_varbinview().into_array()
354 }
355
356 fn finish_into_canonical(&mut self) -> Canonical {
357 Canonical::VarBinView(self.finish_into_varbinview())
358 }
359}
360
361impl VarBinViewBuilder {
362 fn push_view(
363 &mut self,
364 view: BinaryView,
365 adjustment: &RewritingViewAdjustment,
366 array: &VarBinViewArray,
367 idx: usize,
368 ) -> BinaryView {
369 if view.is_inlined() {
370 view
371 } else if let Some(adjusted) = adjustment.adjust_view(&view) {
372 adjusted
373 } else {
374 let bytes = array.bytes_at(idx);
375 let (new_buf_idx, new_offset) = self.append_value_to_buffer(&bytes);
376 BinaryView::make_view(bytes.as_slice(), new_buf_idx, new_offset)
377 }
378 }
379}
380
381pub enum CompletedBuffers {
382 Default(Vec<ByteBuffer>),
383 Deduplicated(DeduplicatedBuffers),
384}
385
386impl Default for CompletedBuffers {
387 fn default() -> Self {
388 Self::Default(Vec::new())
389 }
390}
391
392#[allow(clippy::cast_possible_truncation)]
394impl CompletedBuffers {
395 fn len(&self) -> u32 {
396 match self {
397 Self::Default(buffers) => buffers.len() as u32,
398 Self::Deduplicated(buffers) => buffers.len(),
399 }
400 }
401
402 fn push(&mut self, block: ByteBuffer) -> u32 {
403 match self {
404 Self::Default(buffers) => {
405 assert!(buffers.len() < u32::MAX as usize, "Too many blocks");
406 buffers.push(block);
407 self.len()
408 }
409 Self::Deduplicated(buffers) => buffers.push(block),
410 }
411 }
412
413 fn extend_from_slice_unchecked(&mut self, buffers: &[ByteBuffer]) {
415 for buffer in buffers {
416 self.push(buffer.clone());
417 }
418 }
419
420 fn extend_from_compaction(&mut self, buffers: BuffersWithOffsets) -> ViewAdjustment {
421 match (self, buffers) {
422 (
423 Self::Default(completed_buffers),
424 BuffersWithOffsets::AllKept { buffers, offsets },
425 ) => {
426 let buffer_offset = completed_buffers.len() as u32;
427 completed_buffers.extend_from_slice(&buffers);
428 ViewAdjustment::shift(buffer_offset, offsets)
429 }
430 (
431 Self::Default(completed_buffers),
432 BuffersWithOffsets::SomeCompacted { buffers, offsets },
433 ) => {
434 let lookup = buffers
435 .iter()
436 .map(|maybe_buffer| {
437 maybe_buffer.as_ref().map(|buffer| {
438 completed_buffers.push(buffer.clone());
439 completed_buffers.len() as u32 - 1
440 })
441 })
442 .collect();
443 ViewAdjustment::rewriting(lookup, offsets)
444 }
445
446 (
447 Self::Deduplicated(completed_buffers),
448 BuffersWithOffsets::AllKept { buffers, offsets },
449 ) => {
450 let buffer_lookup = completed_buffers.extend_from_iter(buffers.iter().cloned());
451 ViewAdjustment::lookup(buffer_lookup, offsets)
452 }
453 (
454 Self::Deduplicated(completed_buffers),
455 BuffersWithOffsets::SomeCompacted { buffers, offsets },
456 ) => {
457 let buffer_lookup = completed_buffers.extend_from_option_slice(&buffers);
458 ViewAdjustment::rewriting(buffer_lookup, offsets)
459 }
460 }
461 }
462
463 fn finish(self) -> Arc<[ByteBuffer]> {
464 match self {
465 Self::Default(buffers) => Arc::from(buffers),
466 Self::Deduplicated(buffers) => buffers.finish(),
467 }
468 }
469}
470
471#[derive(Default)]
472pub struct DeduplicatedBuffers {
473 buffers: Vec<ByteBuffer>,
474 buffer_to_idx: HashMap<BufferId, u32>,
475}
476
477impl DeduplicatedBuffers {
478 #[allow(clippy::cast_possible_truncation)]
480 fn len(&self) -> u32 {
481 self.buffers.len() as u32
482 }
483
484 pub(crate) fn push(&mut self, block: ByteBuffer) -> u32 {
486 assert!(self.buffers.len() < u32::MAX as usize, "Too many blocks");
487
488 let initial_len = self.len();
489 let id = BufferId::from(&block);
490 match self.buffer_to_idx.entry(id) {
491 Entry::Occupied(idx) => *idx.get(),
492 Entry::Vacant(entry) => {
493 let idx = initial_len;
494 entry.insert(idx);
495 self.buffers.push(block);
496 idx
497 }
498 }
499 }
500
501 pub(crate) fn extend_from_option_slice(
502 &mut self,
503 buffers: &[Option<ByteBuffer>],
504 ) -> Vec<Option<u32>> {
505 buffers
506 .iter()
507 .map(|buffer| buffer.as_ref().map(|buf| self.push(buf.clone())))
508 .collect()
509 }
510
511 pub(crate) fn extend_from_iter(
512 &mut self,
513 buffers: impl Iterator<Item = ByteBuffer>,
514 ) -> Vec<u32> {
515 buffers.map(|buffer| self.push(buffer)).collect()
516 }
517
518 pub(crate) fn finish(self) -> Arc<[ByteBuffer]> {
519 Arc::from(self.buffers)
520 }
521}
522
523#[derive(PartialEq, Eq, Hash)]
524struct BufferId {
525 ptr: usize,
527 len: usize,
528}
529
530impl BufferId {
531 fn from(buffer: &ByteBuffer) -> Self {
532 let slice = buffer.as_slice();
533 Self {
534 ptr: slice.as_ptr() as usize,
535 len: slice.len(),
536 }
537 }
538}
539
540#[derive(Debug, Clone)]
541pub enum BufferGrowthStrategy {
542 Fixed { size: u32 },
544 Exponential { current_size: u32, max_size: u32 },
546}
547
548impl Default for BufferGrowthStrategy {
549 fn default() -> Self {
550 Self::Exponential {
551 current_size: 4 * 1024, max_size: 2 * 1024 * 1024, }
554 }
555}
556
557impl BufferGrowthStrategy {
558 pub fn fixed(size: u32) -> Self {
559 Self::Fixed { size }
560 }
561
562 pub fn exponential(initial_size: u32, max_size: u32) -> Self {
563 Self::Exponential {
564 current_size: initial_size,
565 max_size,
566 }
567 }
568
569 pub fn next_size(&mut self) -> u32 {
571 match self {
572 Self::Fixed { size } => *size,
573 Self::Exponential {
574 current_size,
575 max_size,
576 } => {
577 let result = *current_size;
578 if *current_size < *max_size {
579 *current_size = current_size.saturating_mul(2).min(*max_size);
580 }
581 result
582 }
583 }
584 }
585}
586
587enum BuffersWithOffsets {
588 AllKept {
589 buffers: Arc<[ByteBuffer]>,
590 offsets: Option<Vec<u32>>,
591 },
592 SomeCompacted {
593 buffers: Vec<Option<ByteBuffer>>,
594 offsets: Option<Vec<u32>>,
595 },
596}
597
598impl BuffersWithOffsets {
599 pub fn from_array(array: &VarBinViewArray, compaction_threshold: f64) -> Self {
600 if compaction_threshold == 0.0 {
601 return Self::AllKept {
602 buffers: Arc::from(
603 array
604 .buffers()
605 .to_vec()
606 .into_iter()
607 .map(|b| b.unwrap_host())
608 .collect_vec(),
609 ),
610 offsets: None,
611 };
612 }
613
614 let buffer_utilizations = array
615 .buffer_utilizations()
616 .vortex_expect("buffer_utilizations in BuffersWithOffsets::from_array");
617 let mut has_rewrite = false;
618 let mut has_nonzero_offset = false;
619 for utilization in buffer_utilizations.iter() {
620 match compaction_strategy(utilization, compaction_threshold) {
621 CompactionStrategy::KeepFull => continue,
622 CompactionStrategy::Slice { .. } => has_nonzero_offset = true,
623 CompactionStrategy::Rewrite => has_rewrite = true,
624 }
625 }
626
627 let buffers_with_offsets_iter =
628 buffer_utilizations
629 .iter()
630 .zip(array.buffers().iter())
631 .map(|(utilization, buffer)| {
632 match compaction_strategy(utilization, compaction_threshold) {
633 CompactionStrategy::KeepFull => (Some(buffer.as_host().clone()), 0),
634 CompactionStrategy::Slice { start, end } => (
635 Some(buffer.as_host().slice(start as usize..end as usize)),
636 start,
637 ),
638 CompactionStrategy::Rewrite => (None, 0),
639 }
640 });
641
642 match (has_rewrite, has_nonzero_offset) {
643 (false, false) => {
645 let buffers: Vec<_> = buffers_with_offsets_iter
646 .map(|(b, _)| b.vortex_expect("already checked for rewrite"))
647 .collect();
648 Self::AllKept {
649 buffers: Arc::from(buffers),
650 offsets: None,
651 }
652 }
653 (true, false) => {
655 let buffers: Vec<_> = buffers_with_offsets_iter.map(|(b, _)| b).collect();
656 Self::SomeCompacted {
657 buffers,
658 offsets: None,
659 }
660 }
661 (false, true) => {
663 let (buffers, offsets): (Vec<_>, _) = buffers_with_offsets_iter
664 .map(|(buffer, offset)| {
665 (buffer.vortex_expect("already checked for rewrite"), offset)
666 })
667 .collect();
668 Self::AllKept {
669 buffers: Arc::from(buffers),
670 offsets: Some(offsets),
671 }
672 }
673 (true, true) => {
675 let (buffers, offsets) = buffers_with_offsets_iter.collect();
676 Self::SomeCompacted {
677 buffers,
678 offsets: Some(offsets),
679 }
680 }
681 }
682 }
683}
684
685#[derive(Debug, Clone, Copy, PartialEq, Eq)]
686enum CompactionStrategy {
687 KeepFull,
688 Slice {
690 start: u32,
691 end: u32,
692 },
693 Rewrite,
695}
696
697fn compaction_strategy(
698 buffer_utilization: &BufferUtilization,
699 threshold: f64,
700) -> CompactionStrategy {
701 match buffer_utilization.overall_utilization() {
702 0.0 => CompactionStrategy::Rewrite,
704 utilised if utilised >= threshold => CompactionStrategy::KeepFull,
705 _ if buffer_utilization.range_utilization() >= threshold => {
706 let Range { start, end } = buffer_utilization.range();
707 CompactionStrategy::Slice { start, end }
708 }
709 _ => CompactionStrategy::Rewrite,
710 }
711}
712
713enum ViewAdjustment {
714 Precomputed(PrecomputedViewAdjustment),
715 Rewriting(RewritingViewAdjustment),
716}
717
718impl ViewAdjustment {
719 fn shift(buffer_offset: u32, offsets: Option<Vec<u32>>) -> Self {
720 Self::Precomputed(PrecomputedViewAdjustment::Shift {
721 buffer_offset,
722 offsets,
723 })
724 }
725
726 fn lookup(buffer_lookup: Vec<u32>, offsets: Option<Vec<u32>>) -> Self {
727 Self::Precomputed(PrecomputedViewAdjustment::Lookup {
728 buffer_lookup,
729 offsets,
730 })
731 }
732
733 fn rewriting(buffer_lookup: Vec<Option<u32>>, offsets: Option<Vec<u32>>) -> Self {
734 Self::Rewriting(RewritingViewAdjustment {
735 buffer_lookup,
736 offsets,
737 })
738 }
739}
740
741enum PrecomputedViewAdjustment {
743 Shift {
744 buffer_offset: u32,
745 offsets: Option<Vec<u32>>,
746 },
747 Lookup {
748 buffer_lookup: Vec<u32>,
749 offsets: Option<Vec<u32>>,
750 },
751}
752
753impl PrecomputedViewAdjustment {
754 fn adjust_view(&self, view: &BinaryView) -> BinaryView {
755 if view.is_inlined() {
756 return *view;
757 }
758 let view_ref = view.as_view();
759 match self {
760 Self::Shift {
761 buffer_offset,
762 offsets,
763 } => {
764 let b_idx = view_ref.buffer_index;
765 let offset_shift = offsets
766 .as_ref()
767 .map(|o| o[b_idx as usize])
768 .unwrap_or_default();
769
770 if view_ref.offset < offset_shift {
773 return BinaryView::empty_view();
774 }
775
776 view_ref
777 .with_buffer_and_offset(b_idx + buffer_offset, view_ref.offset - offset_shift)
778 }
779 Self::Lookup {
780 buffer_lookup,
781 offsets,
782 } => {
783 let b_idx = view_ref.buffer_index;
784 let buffer = buffer_lookup[b_idx as usize];
785 let offset_shift = offsets
786 .as_ref()
787 .map(|o| o[b_idx as usize])
788 .unwrap_or_default();
789
790 if view_ref.offset < offset_shift {
793 return BinaryView::empty_view();
794 }
795
796 view_ref.with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
797 }
798 }
799 .into()
800 }
801}
802
803struct RewritingViewAdjustment {
804 buffer_lookup: Vec<Option<u32>>,
805 offsets: Option<Vec<u32>>,
806}
807
808impl RewritingViewAdjustment {
809 fn adjust_view(&self, view: &BinaryView) -> Option<BinaryView> {
812 if view.is_inlined() {
813 return Some(*view);
814 }
815
816 let view_ref = view.as_view();
817 self.buffer_lookup[view_ref.buffer_index as usize].map(|buffer| {
818 let offset_shift = self
819 .offsets
820 .as_ref()
821 .map(|o| o[view_ref.buffer_index as usize])
822 .unwrap_or_default();
823 view_ref
824 .with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
825 .into()
826 })
827 }
828}
829
830#[cfg(test)]
831mod tests {
832 use vortex_error::VortexResult;
833
834 use crate::IntoArray;
835 use crate::LEGACY_SESSION;
836 use crate::VortexSessionExecute;
837 use crate::assert_arrays_eq;
838 use crate::builders::ArrayBuilder;
839 use crate::builders::VarBinViewBuilder;
840 use crate::builders::varbinview::VarBinViewArray;
841 use crate::dtype::DType;
842 use crate::dtype::Nullability;
843
844 #[test]
845 fn test_utf8_builder() {
846 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
847
848 builder.append_value("Hello");
849 builder.append_null();
850 builder.append_value("World");
851
852 builder.append_nulls(2);
853
854 builder.append_zeros(2);
855 builder.append_value("test");
856
857 let actual = builder.finish();
858 let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
859 Some("Hello"),
860 None,
861 Some("World"),
862 None,
863 None,
864 Some(""),
865 Some(""),
866 Some("test"),
867 ]);
868 assert_arrays_eq!(actual, expected);
869 }
870
871 #[test]
872 fn test_utf8_builder_with_extend() {
873 let array = {
874 let mut builder =
875 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
876 builder.append_null();
877 builder.append_value("Hello2");
878 builder.finish()
879 };
880 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
881
882 builder.append_value("Hello1");
883 builder.extend_from_array(&array);
884 builder.append_nulls(2);
885 builder.append_value("Hello3");
886
887 let actual = builder.finish_into_canonical();
888 let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
889 Some("Hello1"),
890 None,
891 Some("Hello2"),
892 None,
893 None,
894 Some("Hello3"),
895 ]);
896 assert_arrays_eq!(actual.into_array(), expected.into_array());
897 }
898
899 #[test]
900 fn test_buffer_deduplication() -> VortexResult<()> {
901 let array = {
902 let mut builder =
903 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
904 builder.append_value("This is a long string that should not be inlined");
905 builder.append_value("short string");
906 builder.finish_into_varbinview()
907 };
908
909 assert_eq!(array.buffers().len(), 1);
910 let mut builder =
911 VarBinViewBuilder::with_buffer_deduplication(DType::Utf8(Nullability::Nullable), 10);
912
913 let mut ctx = LEGACY_SESSION.create_execution_ctx();
914
915 array.append_to_builder(&mut builder, &mut ctx)?;
916 assert_eq!(builder.completed_block_count(), 1);
917
918 array
919 .slice(1..2)?
920 .append_to_builder(&mut builder, &mut ctx)?;
921 array
922 .slice(0..1)?
923 .append_to_builder(&mut builder, &mut ctx)?;
924 assert_eq!(builder.completed_block_count(), 1);
925
926 let array2 = {
927 let mut builder =
928 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
929 builder.append_value("This is a long string that should not be inlined");
930 builder.finish_into_varbinview()
931 };
932
933 array2.append_to_builder(&mut builder, &mut ctx)?;
934 assert_eq!(builder.completed_block_count(), 2);
935
936 array
937 .slice(0..1)?
938 .append_to_builder(&mut builder, &mut ctx)?;
939 array2
940 .slice(0..1)?
941 .append_to_builder(&mut builder, &mut ctx)?;
942 assert_eq!(builder.completed_block_count(), 2);
943 Ok(())
944 }
945
946 #[test]
947 fn test_append_scalar() {
948 use crate::scalar::Scalar;
949
950 let mut utf8_builder =
952 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
953
954 let utf8_scalar1 = Scalar::utf8("hello", Nullability::Nullable);
956 utf8_builder.append_scalar(&utf8_scalar1).unwrap();
957
958 let utf8_scalar2 = Scalar::utf8("world", Nullability::Nullable);
960 utf8_builder.append_scalar(&utf8_scalar2).unwrap();
961
962 let null_scalar = Scalar::null(DType::Utf8(Nullability::Nullable));
964 utf8_builder.append_scalar(&null_scalar).unwrap();
965
966 let array = utf8_builder.finish();
967 let expected =
968 <VarBinViewArray as FromIterator<_>>::from_iter([Some("hello"), Some("world"), None]);
969 assert_arrays_eq!(&array, &expected);
970
971 let mut binary_builder =
973 VarBinViewBuilder::with_capacity(DType::Binary(Nullability::Nullable), 10);
974
975 let binary_scalar = Scalar::binary(vec![1u8, 2, 3], Nullability::Nullable);
976 binary_builder.append_scalar(&binary_scalar).unwrap();
977
978 let binary_null = Scalar::null(DType::Binary(Nullability::Nullable));
979 binary_builder.append_scalar(&binary_null).unwrap();
980
981 let binary_array = binary_builder.finish();
982 let expected =
983 <VarBinViewArray as FromIterator<_>>::from_iter([Some(vec![1u8, 2, 3]), None]);
984 assert_arrays_eq!(&binary_array, &expected);
985
986 let mut builder =
988 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::NonNullable), 10);
989 let wrong_scalar = Scalar::from(42i32);
990 assert!(builder.append_scalar(&wrong_scalar).is_err());
991 }
992
993 #[test]
994 fn test_buffer_growth_strategies() {
995 use super::BufferGrowthStrategy;
996
997 let mut strategy = BufferGrowthStrategy::fixed(1024);
999
1000 assert_eq!(strategy.next_size(), 1024);
1002 assert_eq!(strategy.next_size(), 1024);
1003 assert_eq!(strategy.next_size(), 1024);
1004
1005 let mut strategy = BufferGrowthStrategy::exponential(1024, 8192);
1007
1008 assert_eq!(strategy.next_size(), 1024); assert_eq!(strategy.next_size(), 2048); assert_eq!(strategy.next_size(), 4096); assert_eq!(strategy.next_size(), 8192); assert_eq!(strategy.next_size(), 8192); }
1015
1016 #[test]
1017 fn test_large_value_allocation() {
1018 use super::BufferGrowthStrategy;
1019 use super::VarBinViewBuilder;
1020
1021 let mut builder = VarBinViewBuilder::new(
1022 DType::Binary(Nullability::Nullable),
1023 10,
1024 Default::default(),
1025 BufferGrowthStrategy::exponential(1024, 4096),
1026 0.0,
1027 );
1028
1029 let large_value = vec![0u8; 8192];
1031
1032 builder.append_value(&large_value);
1034
1035 let array = builder.finish_into_varbinview();
1036 assert_eq!(array.len(), 1);
1037
1038 let retrieved = array
1040 .scalar_at(0)
1041 .unwrap()
1042 .as_binary()
1043 .value()
1044 .cloned()
1045 .unwrap();
1046 assert_eq!(retrieved.len(), 8192);
1047 assert_eq!(retrieved.as_slice(), &large_value);
1048 }
1049}