1use std::any::Any;
5use std::ops::Range;
6use std::sync::Arc;
7
8use itertools::Itertools;
9use vortex_buffer::Buffer;
10use vortex_buffer::BufferMut;
11use vortex_buffer::ByteBuffer;
12use vortex_buffer::ByteBufferMut;
13use vortex_error::VortexExpect;
14use vortex_error::VortexResult;
15use vortex_error::vortex_bail;
16use vortex_error::vortex_ensure;
17use vortex_mask::Mask;
18use vortex_utils::aliases::hash_map::Entry;
19use vortex_utils::aliases::hash_map::HashMap;
20
21use crate::ArrayRef;
22use crate::IntoArray;
23use crate::arrays::VarBinViewArray;
24use crate::arrays::varbinview::build_views::BinaryView;
25use crate::arrays::varbinview::compact::BufferUtilization;
26use crate::builders::ArrayBuilder;
27use crate::builders::LazyBitBufferBuilder;
28use crate::canonical::Canonical;
29use crate::canonical::ToCanonical;
30use crate::dtype::DType;
31use crate::scalar::Scalar;
32
33pub struct VarBinViewBuilder {
35 dtype: DType,
36 views_builder: BufferMut<BinaryView>,
37 nulls: LazyBitBufferBuilder,
38 completed: CompletedBuffers,
39 in_progress: ByteBufferMut,
40 growth_strategy: BufferGrowthStrategy,
41 compaction_threshold: f64,
42}
43
44impl VarBinViewBuilder {
45 pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
46 Self::new(dtype, capacity, Default::default(), Default::default(), 0.0)
47 }
48
49 pub fn with_buffer_deduplication(dtype: DType, capacity: usize) -> Self {
50 Self::new(
51 dtype,
52 capacity,
53 CompletedBuffers::Deduplicated(Default::default()),
54 Default::default(),
55 0.0,
56 )
57 }
58
59 pub fn with_compaction(dtype: DType, capacity: usize, compaction_threshold: f64) -> Self {
60 Self::new(
61 dtype,
62 capacity,
63 Default::default(),
64 Default::default(),
65 compaction_threshold,
66 )
67 }
68
69 pub fn new(
70 dtype: DType,
71 capacity: usize,
72 completed: CompletedBuffers,
73 growth_strategy: BufferGrowthStrategy,
74 compaction_threshold: f64,
75 ) -> Self {
76 assert!(
77 matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
78 "VarBinViewBuilder DType must be Utf8 or Binary."
79 );
80 Self {
81 views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
82 nulls: LazyBitBufferBuilder::new(capacity),
83 completed,
84 in_progress: ByteBufferMut::empty(),
85 dtype,
86 growth_strategy,
87 compaction_threshold,
88 }
89 }
90
91 fn append_value_view(&mut self, value: &[u8]) {
92 let length =
93 u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
94 if length <= 12 {
95 self.views_builder.push(BinaryView::make_view(value, 0, 0));
96 return;
97 }
98
99 let (buffer_idx, offset) = self.append_value_to_buffer(value);
100 let view = BinaryView::make_view(value, buffer_idx, offset);
101 self.views_builder.push(view);
102 }
103
104 pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
106 self.append_value_view(value.as_ref());
107 self.nulls.append_non_null();
108 }
109
110 pub fn append_n_values<S: AsRef<[u8]>>(&mut self, value: S, n: usize) {
112 if n == 0 {
113 return;
114 }
115 let bytes = value.as_ref();
116 let view = if bytes.len() <= BinaryView::MAX_INLINED_SIZE {
117 BinaryView::make_view(bytes, 0, 0)
118 } else {
119 let (buffer_idx, offset) = self.append_value_to_buffer(bytes);
120 BinaryView::make_view(bytes, buffer_idx, offset)
121 };
122 self.views_builder.push_n(view, n);
123 self.nulls.append_n_non_nulls(n);
124 }
125
126 fn flush_in_progress(&mut self) {
127 if self.in_progress.is_empty() {
128 return;
129 }
130 let block = std::mem::take(&mut self.in_progress).freeze();
131
132 assert!(block.len() < u32::MAX as usize, "Block too large");
133
134 let initial_len = self.completed.len();
135 self.completed.push(block);
136 assert_eq!(
137 self.completed.len(),
138 initial_len + 1,
139 "Invalid state, just completed block already exists"
140 );
141 }
142
143 fn append_value_to_buffer(&mut self, value: &[u8]) -> (u32, u32) {
145 assert!(
146 value.len() > BinaryView::MAX_INLINED_SIZE,
147 "must inline small strings"
148 );
149 let required_cap = self.in_progress.len() + value.len();
150 if self.in_progress.capacity() < required_cap {
151 self.flush_in_progress();
152 let next_buffer_size = self.growth_strategy.next_size() as usize;
153 let to_reserve = next_buffer_size.max(value.len());
154 self.in_progress.reserve(to_reserve);
155 }
156
157 let buffer_idx = self.completed.len();
158 let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
159 self.in_progress.extend_from_slice(value);
160
161 (buffer_idx, offset)
162 }
163
164 pub fn completed_block_count(&self) -> u32 {
165 self.completed.len()
166 }
167
168 pub fn push_buffer_and_adjusted_views(
185 &mut self,
186 buffers: &[ByteBuffer],
187 views: &Buffer<BinaryView>,
188 validity_mask: Mask,
189 ) {
190 self.flush_in_progress();
191
192 let expected_completed_len = self.completed.len() as usize + buffers.len();
193 self.completed.extend_from_slice_unchecked(buffers);
194 assert_eq!(
195 self.completed.len() as usize,
196 expected_completed_len,
197 "Some buffers already exist",
198 );
199 self.views_builder.extend_trusted(views.iter().copied());
200 self.push_only_validity_mask(validity_mask);
201
202 debug_assert_eq!(self.nulls.len(), self.views_builder.len())
203 }
204
205 pub fn finish_into_varbinview(&mut self) -> VarBinViewArray {
207 self.flush_in_progress();
208 let buffers = std::mem::take(&mut self.completed);
209
210 assert_eq!(
211 self.views_builder.len(),
212 self.nulls.len(),
213 "View and validity length must match"
214 );
215
216 let validity = self.nulls.finish_with_nullability(self.dtype.nullability());
217
218 unsafe {
220 VarBinViewArray::new_unchecked(
221 std::mem::take(&mut self.views_builder).freeze(),
222 buffers.finish(),
223 self.dtype.clone(),
224 validity,
225 )
226 }
227 }
228
229 fn push_only_validity_mask(&mut self, validity_mask: Mask) {
231 self.nulls.append_validity_mask(validity_mask);
232 }
233}
234
235impl ArrayBuilder for VarBinViewBuilder {
236 fn as_any(&self) -> &dyn Any {
237 self
238 }
239
240 fn as_any_mut(&mut self) -> &mut dyn Any {
241 self
242 }
243
244 fn dtype(&self) -> &DType {
245 &self.dtype
246 }
247
248 fn len(&self) -> usize {
249 self.nulls.len()
250 }
251
252 fn append_zeros(&mut self, n: usize) {
253 self.views_builder.push_n(BinaryView::empty_view(), n);
254 self.nulls.append_n_non_nulls(n);
255 }
256
257 unsafe fn append_nulls_unchecked(&mut self, n: usize) {
258 self.views_builder.push_n(BinaryView::empty_view(), n);
259 self.nulls.append_n_nulls(n);
260 }
261
262 fn append_scalar(&mut self, scalar: &Scalar) -> VortexResult<()> {
263 vortex_ensure!(
264 scalar.dtype() == self.dtype(),
265 "VarBinViewBuilder expected scalar with dtype {}, got {}",
266 self.dtype(),
267 scalar.dtype()
268 );
269
270 match self.dtype() {
271 DType::Utf8(_) => match scalar.as_utf8().value() {
272 Some(value) => self.append_value(value),
273 None => self.append_null(),
274 },
275 DType::Binary(_) => match scalar.as_binary().value() {
276 Some(value) => self.append_value(value),
277 None => self.append_null(),
278 },
279 _ => vortex_bail!(
280 "VarBinViewBuilder can only handle Utf8 or Binary scalars, got {:?}",
281 scalar.dtype()
282 ),
283 }
284
285 Ok(())
286 }
287
288 unsafe fn extend_from_array_unchecked(&mut self, array: &ArrayRef) {
289 let array = array.to_varbinview();
290 self.flush_in_progress();
291
292 self.push_only_validity_mask(
293 array
294 .validity_mask()
295 .vortex_expect("validity_mask in extend_from_array_unchecked"),
296 );
297
298 let view_adjustment =
299 self.completed
300 .extend_from_compaction(BuffersWithOffsets::from_array(
301 &array,
302 self.compaction_threshold,
303 ));
304
305 match view_adjustment {
306 ViewAdjustment::Precomputed(adjustment) => self.views_builder.extend_trusted(
307 array
308 .views()
309 .iter()
310 .map(|view| adjustment.adjust_view(view)),
311 ),
312 ViewAdjustment::Rewriting(adjustment) => match array
313 .validity_mask()
314 .vortex_expect("validity_mask in extend_from_array_unchecked")
315 {
316 Mask::AllTrue(_) => {
317 for (idx, &view) in array.views().iter().enumerate() {
318 let new_view = self.push_view(view, &adjustment, &array, idx);
319 self.views_builder.push(new_view);
320 }
321 }
322 Mask::AllFalse(_) => {
323 self.views_builder
324 .push_n(BinaryView::empty_view(), array.len());
325 }
326 Mask::Values(v) => {
327 for (idx, (&view, is_valid)) in
328 array.views().iter().zip(v.bit_buffer().iter()).enumerate()
329 {
330 let new_view = if !is_valid {
331 BinaryView::empty_view()
332 } else {
333 self.push_view(view, &adjustment, &array, idx)
334 };
335 self.views_builder.push(new_view);
336 }
337 }
338 },
339 }
340 }
341
342 fn reserve_exact(&mut self, additional: usize) {
343 self.views_builder.reserve(additional);
344 self.nulls.reserve_exact(additional);
345 }
346
347 unsafe fn set_validity_unchecked(&mut self, validity: Mask) {
348 self.nulls = LazyBitBufferBuilder::new(validity.len());
349 self.nulls.append_validity_mask(validity);
350 }
351
352 fn finish(&mut self) -> ArrayRef {
353 self.finish_into_varbinview().into_array()
354 }
355
356 fn finish_into_canonical(&mut self) -> Canonical {
357 Canonical::VarBinView(self.finish_into_varbinview())
358 }
359}
360
361impl VarBinViewBuilder {
362 #[inline]
363 fn push_view(
364 &mut self,
365 view: BinaryView,
366 adjustment: &RewritingViewAdjustment,
367 array: &VarBinViewArray,
368 idx: usize,
369 ) -> BinaryView {
370 if view.is_inlined() {
371 view
372 } else if let Some(adjusted) = adjustment.adjust_view(&view) {
373 adjusted
374 } else {
375 let bytes = array.bytes_at(idx);
376 let (new_buf_idx, new_offset) = self.append_value_to_buffer(&bytes);
377 BinaryView::make_view(bytes.as_slice(), new_buf_idx, new_offset)
378 }
379 }
380}
381
382pub enum CompletedBuffers {
383 Default(Vec<ByteBuffer>),
384 Deduplicated(DeduplicatedBuffers),
385}
386
387impl Default for CompletedBuffers {
388 fn default() -> Self {
389 Self::Default(Vec::new())
390 }
391}
392
393#[allow(clippy::cast_possible_truncation)]
395impl CompletedBuffers {
396 fn len(&self) -> u32 {
397 match self {
398 Self::Default(buffers) => buffers.len() as u32,
399 Self::Deduplicated(buffers) => buffers.len(),
400 }
401 }
402
403 fn push(&mut self, block: ByteBuffer) -> u32 {
404 match self {
405 Self::Default(buffers) => {
406 assert!(buffers.len() < u32::MAX as usize, "Too many blocks");
407 buffers.push(block);
408 self.len()
409 }
410 Self::Deduplicated(buffers) => buffers.push(block),
411 }
412 }
413
414 fn extend_from_slice_unchecked(&mut self, buffers: &[ByteBuffer]) {
416 for buffer in buffers {
417 self.push(buffer.clone());
418 }
419 }
420
421 fn extend_from_compaction(&mut self, buffers: BuffersWithOffsets) -> ViewAdjustment {
422 match (self, buffers) {
423 (
424 Self::Default(completed_buffers),
425 BuffersWithOffsets::AllKept { buffers, offsets },
426 ) => {
427 let buffer_offset = completed_buffers.len() as u32;
428 completed_buffers.extend_from_slice(&buffers);
429 ViewAdjustment::shift(buffer_offset, offsets)
430 }
431 (
432 Self::Default(completed_buffers),
433 BuffersWithOffsets::SomeCompacted { buffers, offsets },
434 ) => {
435 let lookup = buffers
436 .iter()
437 .map(|maybe_buffer| {
438 maybe_buffer.as_ref().map(|buffer| {
439 completed_buffers.push(buffer.clone());
440 completed_buffers.len() as u32 - 1
441 })
442 })
443 .collect();
444 ViewAdjustment::rewriting(lookup, offsets)
445 }
446
447 (
448 Self::Deduplicated(completed_buffers),
449 BuffersWithOffsets::AllKept { buffers, offsets },
450 ) => {
451 let buffer_lookup = completed_buffers.extend_from_iter(buffers.iter().cloned());
452 ViewAdjustment::lookup(buffer_lookup, offsets)
453 }
454 (
455 Self::Deduplicated(completed_buffers),
456 BuffersWithOffsets::SomeCompacted { buffers, offsets },
457 ) => {
458 let buffer_lookup = completed_buffers.extend_from_option_slice(&buffers);
459 ViewAdjustment::rewriting(buffer_lookup, offsets)
460 }
461 }
462 }
463
464 fn finish(self) -> Arc<[ByteBuffer]> {
465 match self {
466 Self::Default(buffers) => Arc::from(buffers),
467 Self::Deduplicated(buffers) => buffers.finish(),
468 }
469 }
470}
471
472#[derive(Default)]
473pub struct DeduplicatedBuffers {
474 buffers: Vec<ByteBuffer>,
475 buffer_to_idx: HashMap<BufferId, u32>,
476}
477
478impl DeduplicatedBuffers {
479 #[allow(clippy::cast_possible_truncation)]
481 fn len(&self) -> u32 {
482 self.buffers.len() as u32
483 }
484
485 pub(crate) fn push(&mut self, block: ByteBuffer) -> u32 {
487 assert!(self.buffers.len() < u32::MAX as usize, "Too many blocks");
488
489 let initial_len = self.len();
490 let id = BufferId::from(&block);
491 match self.buffer_to_idx.entry(id) {
492 Entry::Occupied(idx) => *idx.get(),
493 Entry::Vacant(entry) => {
494 let idx = initial_len;
495 entry.insert(idx);
496 self.buffers.push(block);
497 idx
498 }
499 }
500 }
501
502 pub(crate) fn extend_from_option_slice(
503 &mut self,
504 buffers: &[Option<ByteBuffer>],
505 ) -> Vec<Option<u32>> {
506 buffers
507 .iter()
508 .map(|buffer| buffer.as_ref().map(|buf| self.push(buf.clone())))
509 .collect()
510 }
511
512 pub(crate) fn extend_from_iter(
513 &mut self,
514 buffers: impl Iterator<Item = ByteBuffer>,
515 ) -> Vec<u32> {
516 buffers.map(|buffer| self.push(buffer)).collect()
517 }
518
519 pub(crate) fn finish(self) -> Arc<[ByteBuffer]> {
520 Arc::from(self.buffers)
521 }
522}
523
524#[derive(PartialEq, Eq, Hash)]
525struct BufferId {
526 ptr: usize,
528 len: usize,
529}
530
531impl BufferId {
532 fn from(buffer: &ByteBuffer) -> Self {
533 let slice = buffer.as_slice();
534 Self {
535 ptr: slice.as_ptr() as usize,
536 len: slice.len(),
537 }
538 }
539}
540
541#[derive(Debug, Clone)]
542pub enum BufferGrowthStrategy {
543 Fixed { size: u32 },
545 Exponential { current_size: u32, max_size: u32 },
547}
548
549impl Default for BufferGrowthStrategy {
550 fn default() -> Self {
551 Self::Exponential {
552 current_size: 4 * 1024, max_size: 2 * 1024 * 1024, }
555 }
556}
557
558impl BufferGrowthStrategy {
559 pub fn fixed(size: u32) -> Self {
560 Self::Fixed { size }
561 }
562
563 pub fn exponential(initial_size: u32, max_size: u32) -> Self {
564 Self::Exponential {
565 current_size: initial_size,
566 max_size,
567 }
568 }
569
570 pub fn next_size(&mut self) -> u32 {
572 match self {
573 Self::Fixed { size } => *size,
574 Self::Exponential {
575 current_size,
576 max_size,
577 } => {
578 let result = *current_size;
579 if *current_size < *max_size {
580 *current_size = current_size.saturating_mul(2).min(*max_size);
581 }
582 result
583 }
584 }
585 }
586}
587
588enum BuffersWithOffsets {
589 AllKept {
590 buffers: Arc<[ByteBuffer]>,
591 offsets: Option<Vec<u32>>,
592 },
593 SomeCompacted {
594 buffers: Vec<Option<ByteBuffer>>,
595 offsets: Option<Vec<u32>>,
596 },
597}
598
599impl BuffersWithOffsets {
600 pub fn from_array(array: &VarBinViewArray, compaction_threshold: f64) -> Self {
601 if compaction_threshold == 0.0 {
602 return Self::AllKept {
603 buffers: Arc::from(
604 array
605 .buffers()
606 .to_vec()
607 .into_iter()
608 .map(|b| b.unwrap_host())
609 .collect_vec(),
610 ),
611 offsets: None,
612 };
613 }
614
615 let buffer_utilizations = array
616 .buffer_utilizations()
617 .vortex_expect("buffer_utilizations in BuffersWithOffsets::from_array");
618 let mut has_rewrite = false;
619 let mut has_nonzero_offset = false;
620 for utilization in buffer_utilizations.iter() {
621 match compaction_strategy(utilization, compaction_threshold) {
622 CompactionStrategy::KeepFull => continue,
623 CompactionStrategy::Slice { .. } => has_nonzero_offset = true,
624 CompactionStrategy::Rewrite => has_rewrite = true,
625 }
626 }
627
628 let buffers_with_offsets_iter =
629 buffer_utilizations
630 .iter()
631 .zip(array.buffers().iter())
632 .map(|(utilization, buffer)| {
633 match compaction_strategy(utilization, compaction_threshold) {
634 CompactionStrategy::KeepFull => (Some(buffer.as_host().clone()), 0),
635 CompactionStrategy::Slice { start, end } => (
636 Some(buffer.as_host().slice(start as usize..end as usize)),
637 start,
638 ),
639 CompactionStrategy::Rewrite => (None, 0),
640 }
641 });
642
643 match (has_rewrite, has_nonzero_offset) {
644 (false, false) => {
646 let buffers: Vec<_> = buffers_with_offsets_iter
647 .map(|(b, _)| b.vortex_expect("already checked for rewrite"))
648 .collect();
649 Self::AllKept {
650 buffers: Arc::from(buffers),
651 offsets: None,
652 }
653 }
654 (true, false) => {
656 let buffers: Vec<_> = buffers_with_offsets_iter.map(|(b, _)| b).collect();
657 Self::SomeCompacted {
658 buffers,
659 offsets: None,
660 }
661 }
662 (false, true) => {
664 let (buffers, offsets): (Vec<_>, _) = buffers_with_offsets_iter
665 .map(|(buffer, offset)| {
666 (buffer.vortex_expect("already checked for rewrite"), offset)
667 })
668 .collect();
669 Self::AllKept {
670 buffers: Arc::from(buffers),
671 offsets: Some(offsets),
672 }
673 }
674 (true, true) => {
676 let (buffers, offsets) = buffers_with_offsets_iter.collect();
677 Self::SomeCompacted {
678 buffers,
679 offsets: Some(offsets),
680 }
681 }
682 }
683 }
684}
685
686#[derive(Debug, Clone, Copy, PartialEq, Eq)]
687enum CompactionStrategy {
688 KeepFull,
689 Slice {
691 start: u32,
692 end: u32,
693 },
694 Rewrite,
696}
697
698fn compaction_strategy(
699 buffer_utilization: &BufferUtilization,
700 threshold: f64,
701) -> CompactionStrategy {
702 match buffer_utilization.overall_utilization() {
703 0.0 => CompactionStrategy::Rewrite,
705 utilised if utilised >= threshold => CompactionStrategy::KeepFull,
706 _ if buffer_utilization.range_utilization() >= threshold => {
707 let Range { start, end } = buffer_utilization.range();
708 CompactionStrategy::Slice { start, end }
709 }
710 _ => CompactionStrategy::Rewrite,
711 }
712}
713
714enum ViewAdjustment {
715 Precomputed(PrecomputedViewAdjustment),
716 Rewriting(RewritingViewAdjustment),
717}
718
719impl ViewAdjustment {
720 fn shift(buffer_offset: u32, offsets: Option<Vec<u32>>) -> Self {
721 Self::Precomputed(PrecomputedViewAdjustment::Shift {
722 buffer_offset,
723 offsets,
724 })
725 }
726
727 fn lookup(buffer_lookup: Vec<u32>, offsets: Option<Vec<u32>>) -> Self {
728 Self::Precomputed(PrecomputedViewAdjustment::Lookup {
729 buffer_lookup,
730 offsets,
731 })
732 }
733
734 fn rewriting(buffer_lookup: Vec<Option<u32>>, offsets: Option<Vec<u32>>) -> Self {
735 Self::Rewriting(RewritingViewAdjustment {
736 buffer_lookup,
737 offsets,
738 })
739 }
740}
741
742enum PrecomputedViewAdjustment {
744 Shift {
745 buffer_offset: u32,
746 offsets: Option<Vec<u32>>,
747 },
748 Lookup {
749 buffer_lookup: Vec<u32>,
750 offsets: Option<Vec<u32>>,
751 },
752}
753
754impl PrecomputedViewAdjustment {
755 #[inline]
756 fn adjust_view(&self, view: &BinaryView) -> BinaryView {
757 if view.is_inlined() {
758 return *view;
759 }
760 let view_ref = view.as_view();
761 match self {
762 Self::Shift {
763 buffer_offset,
764 offsets,
765 } => {
766 let b_idx = view_ref.buffer_index;
767 let offset_shift = offsets
768 .as_ref()
769 .map(|o| o[b_idx as usize])
770 .unwrap_or_default();
771
772 if view_ref.offset < offset_shift {
775 return BinaryView::empty_view();
776 }
777
778 view_ref
779 .with_buffer_and_offset(b_idx + buffer_offset, view_ref.offset - offset_shift)
780 }
781 Self::Lookup {
782 buffer_lookup,
783 offsets,
784 } => {
785 let b_idx = view_ref.buffer_index;
786 let buffer = buffer_lookup[b_idx as usize];
787 let offset_shift = offsets
788 .as_ref()
789 .map(|o| o[b_idx as usize])
790 .unwrap_or_default();
791
792 if view_ref.offset < offset_shift {
795 return BinaryView::empty_view();
796 }
797
798 view_ref.with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
799 }
800 }
801 .into()
802 }
803}
804
805struct RewritingViewAdjustment {
806 buffer_lookup: Vec<Option<u32>>,
807 offsets: Option<Vec<u32>>,
808}
809
810impl RewritingViewAdjustment {
811 fn adjust_view(&self, view: &BinaryView) -> Option<BinaryView> {
814 if view.is_inlined() {
815 return Some(*view);
816 }
817
818 let view_ref = view.as_view();
819 self.buffer_lookup[view_ref.buffer_index as usize].map(|buffer| {
820 let offset_shift = self
821 .offsets
822 .as_ref()
823 .map(|o| o[view_ref.buffer_index as usize])
824 .unwrap_or_default();
825 view_ref
826 .with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
827 .into()
828 })
829 }
830}
831
832#[cfg(test)]
833mod tests {
834 use vortex_error::VortexResult;
835
836 use crate::IntoArray;
837 use crate::LEGACY_SESSION;
838 use crate::VortexSessionExecute;
839 use crate::assert_arrays_eq;
840 use crate::builders::ArrayBuilder;
841 use crate::builders::VarBinViewBuilder;
842 use crate::builders::varbinview::VarBinViewArray;
843 use crate::dtype::DType;
844 use crate::dtype::Nullability;
845
846 #[test]
847 fn test_utf8_builder() {
848 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
849
850 builder.append_value("Hello");
851 builder.append_null();
852 builder.append_value("World");
853
854 builder.append_nulls(2);
855
856 builder.append_zeros(2);
857 builder.append_value("test");
858
859 let actual = builder.finish();
860 let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
861 Some("Hello"),
862 None,
863 Some("World"),
864 None,
865 None,
866 Some(""),
867 Some(""),
868 Some("test"),
869 ]);
870 assert_arrays_eq!(actual, expected);
871 }
872
873 #[test]
874 fn test_utf8_builder_with_extend() {
875 let array = {
876 let mut builder =
877 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
878 builder.append_null();
879 builder.append_value("Hello2");
880 builder.finish()
881 };
882 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
883
884 builder.append_value("Hello1");
885 builder.extend_from_array(&array);
886 builder.append_nulls(2);
887 builder.append_value("Hello3");
888
889 let actual = builder.finish_into_canonical();
890 let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
891 Some("Hello1"),
892 None,
893 Some("Hello2"),
894 None,
895 None,
896 Some("Hello3"),
897 ]);
898 assert_arrays_eq!(actual.into_array(), expected.into_array());
899 }
900
901 #[test]
902 fn test_buffer_deduplication() -> VortexResult<()> {
903 let array = {
904 let mut builder =
905 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
906 builder.append_value("This is a long string that should not be inlined");
907 builder.append_value("short string");
908 builder.finish_into_varbinview()
909 };
910
911 assert_eq!(array.buffers().len(), 1);
912 let mut builder =
913 VarBinViewBuilder::with_buffer_deduplication(DType::Utf8(Nullability::Nullable), 10);
914
915 let mut ctx = LEGACY_SESSION.create_execution_ctx();
916
917 array.append_to_builder(&mut builder, &mut ctx)?;
918 assert_eq!(builder.completed_block_count(), 1);
919
920 array
921 .slice(1..2)?
922 .append_to_builder(&mut builder, &mut ctx)?;
923 array
924 .slice(0..1)?
925 .append_to_builder(&mut builder, &mut ctx)?;
926 assert_eq!(builder.completed_block_count(), 1);
927
928 let array2 = {
929 let mut builder =
930 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
931 builder.append_value("This is a long string that should not be inlined");
932 builder.finish_into_varbinview()
933 };
934
935 array2.append_to_builder(&mut builder, &mut ctx)?;
936 assert_eq!(builder.completed_block_count(), 2);
937
938 array
939 .slice(0..1)?
940 .append_to_builder(&mut builder, &mut ctx)?;
941 array2
942 .slice(0..1)?
943 .append_to_builder(&mut builder, &mut ctx)?;
944 assert_eq!(builder.completed_block_count(), 2);
945 Ok(())
946 }
947
948 #[test]
949 fn test_append_scalar() {
950 use crate::scalar::Scalar;
951
952 let mut utf8_builder =
954 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
955
956 let utf8_scalar1 = Scalar::utf8("hello", Nullability::Nullable);
958 utf8_builder.append_scalar(&utf8_scalar1).unwrap();
959
960 let utf8_scalar2 = Scalar::utf8("world", Nullability::Nullable);
962 utf8_builder.append_scalar(&utf8_scalar2).unwrap();
963
964 let null_scalar = Scalar::null(DType::Utf8(Nullability::Nullable));
966 utf8_builder.append_scalar(&null_scalar).unwrap();
967
968 let array = utf8_builder.finish();
969 let expected =
970 <VarBinViewArray as FromIterator<_>>::from_iter([Some("hello"), Some("world"), None]);
971 assert_arrays_eq!(&array, &expected);
972
973 let mut binary_builder =
975 VarBinViewBuilder::with_capacity(DType::Binary(Nullability::Nullable), 10);
976
977 let binary_scalar = Scalar::binary(vec![1u8, 2, 3], Nullability::Nullable);
978 binary_builder.append_scalar(&binary_scalar).unwrap();
979
980 let binary_null = Scalar::null(DType::Binary(Nullability::Nullable));
981 binary_builder.append_scalar(&binary_null).unwrap();
982
983 let binary_array = binary_builder.finish();
984 let expected =
985 <VarBinViewArray as FromIterator<_>>::from_iter([Some(vec![1u8, 2, 3]), None]);
986 assert_arrays_eq!(&binary_array, &expected);
987
988 let mut builder =
990 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::NonNullable), 10);
991 let wrong_scalar = Scalar::from(42i32);
992 assert!(builder.append_scalar(&wrong_scalar).is_err());
993 }
994
995 #[test]
996 fn test_buffer_growth_strategies() {
997 use super::BufferGrowthStrategy;
998
999 let mut strategy = BufferGrowthStrategy::fixed(1024);
1001
1002 assert_eq!(strategy.next_size(), 1024);
1004 assert_eq!(strategy.next_size(), 1024);
1005 assert_eq!(strategy.next_size(), 1024);
1006
1007 let mut strategy = BufferGrowthStrategy::exponential(1024, 8192);
1009
1010 assert_eq!(strategy.next_size(), 1024); assert_eq!(strategy.next_size(), 2048); assert_eq!(strategy.next_size(), 4096); assert_eq!(strategy.next_size(), 8192); assert_eq!(strategy.next_size(), 8192); }
1017
1018 #[test]
1019 fn test_large_value_allocation() {
1020 use super::BufferGrowthStrategy;
1021 use super::VarBinViewBuilder;
1022
1023 let mut builder = VarBinViewBuilder::new(
1024 DType::Binary(Nullability::Nullable),
1025 10,
1026 Default::default(),
1027 BufferGrowthStrategy::exponential(1024, 4096),
1028 0.0,
1029 );
1030
1031 let large_value = vec![0u8; 8192];
1033
1034 builder.append_value(&large_value);
1036
1037 let array = builder.finish_into_varbinview();
1038 assert_eq!(array.len(), 1);
1039
1040 let retrieved = array
1042 .scalar_at(0)
1043 .unwrap()
1044 .as_binary()
1045 .value()
1046 .cloned()
1047 .unwrap();
1048 assert_eq!(retrieved.len(), 8192);
1049 assert_eq!(retrieved.as_slice(), &large_value);
1050 }
1051}