1use std::any::Any;
5use std::ops::Range;
6use std::sync::Arc;
7
8use itertools::Itertools;
9use vortex_buffer::Buffer;
10use vortex_buffer::BufferMut;
11use vortex_buffer::ByteBuffer;
12use vortex_buffer::ByteBufferMut;
13use vortex_dtype::DType;
14use vortex_error::VortexExpect;
15use vortex_error::VortexResult;
16use vortex_error::vortex_bail;
17use vortex_error::vortex_ensure;
18use vortex_mask::Mask;
19use vortex_utils::aliases::hash_map::Entry;
20use vortex_utils::aliases::hash_map::HashMap;
21
22use crate::Array;
23use crate::ArrayRef;
24use crate::IntoArray;
25use crate::arrays::BinaryView;
26use crate::arrays::VarBinViewArray;
27use crate::arrays::compact::BufferUtilization;
28use crate::builders::ArrayBuilder;
29use crate::builders::LazyBitBufferBuilder;
30use crate::canonical::Canonical;
31use crate::canonical::ToCanonical;
32use crate::scalar::Scalar;
33
34pub struct VarBinViewBuilder {
36 dtype: DType,
37 views_builder: BufferMut<BinaryView>,
38 nulls: LazyBitBufferBuilder,
39 completed: CompletedBuffers,
40 in_progress: ByteBufferMut,
41 growth_strategy: BufferGrowthStrategy,
42 compaction_threshold: f64,
43}
44
45impl VarBinViewBuilder {
46 pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
47 Self::new(dtype, capacity, Default::default(), Default::default(), 0.0)
48 }
49
50 pub fn with_buffer_deduplication(dtype: DType, capacity: usize) -> Self {
51 Self::new(
52 dtype,
53 capacity,
54 CompletedBuffers::Deduplicated(Default::default()),
55 Default::default(),
56 0.0,
57 )
58 }
59
60 pub fn with_compaction(dtype: DType, capacity: usize, compaction_threshold: f64) -> Self {
61 Self::new(
62 dtype,
63 capacity,
64 Default::default(),
65 Default::default(),
66 compaction_threshold,
67 )
68 }
69
70 pub fn new(
71 dtype: DType,
72 capacity: usize,
73 completed: CompletedBuffers,
74 growth_strategy: BufferGrowthStrategy,
75 compaction_threshold: f64,
76 ) -> Self {
77 assert!(
78 matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
79 "VarBinViewBuilder DType must be Utf8 or Binary."
80 );
81 Self {
82 views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
83 nulls: LazyBitBufferBuilder::new(capacity),
84 completed,
85 in_progress: ByteBufferMut::empty(),
86 dtype,
87 growth_strategy,
88 compaction_threshold,
89 }
90 }
91
92 fn append_value_view(&mut self, value: &[u8]) {
93 let length =
94 u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
95 if length <= 12 {
96 self.views_builder.push(BinaryView::make_view(value, 0, 0));
97 return;
98 }
99
100 let (buffer_idx, offset) = self.append_value_to_buffer(value);
101 let view = BinaryView::make_view(value, buffer_idx, offset);
102 self.views_builder.push(view);
103 }
104
105 pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
107 self.append_value_view(value.as_ref());
108 self.nulls.append_non_null();
109 }
110
111 fn flush_in_progress(&mut self) {
112 if self.in_progress.is_empty() {
113 return;
114 }
115 let block = std::mem::take(&mut self.in_progress).freeze();
116
117 assert!(block.len() < u32::MAX as usize, "Block too large");
118
119 let initial_len = self.completed.len();
120 self.completed.push(block);
121 assert_eq!(
122 self.completed.len(),
123 initial_len + 1,
124 "Invalid state, just completed block already exists"
125 );
126 }
127
128 fn append_value_to_buffer(&mut self, value: &[u8]) -> (u32, u32) {
130 assert!(value.len() > 12, "must inline small strings");
131 let required_cap = self.in_progress.len() + value.len();
132 if self.in_progress.capacity() < required_cap {
133 self.flush_in_progress();
134 let next_buffer_size = self.growth_strategy.next_size() as usize;
135 let to_reserve = next_buffer_size.max(value.len());
136 self.in_progress.reserve(to_reserve);
137 }
138
139 let buffer_idx = self.completed.len();
140 let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
141 self.in_progress.extend_from_slice(value);
142
143 (buffer_idx, offset)
144 }
145
146 pub fn completed_block_count(&self) -> u32 {
147 self.completed.len()
148 }
149
150 pub fn push_buffer_and_adjusted_views(
167 &mut self,
168 buffers: &[ByteBuffer],
169 views: &Buffer<BinaryView>,
170 validity_mask: Mask,
171 ) {
172 self.flush_in_progress();
173
174 let expected_completed_len = self.completed.len() as usize + buffers.len();
175 self.completed.extend_from_slice_unchecked(buffers);
176 assert_eq!(
177 self.completed.len() as usize,
178 expected_completed_len,
179 "Some buffers already exist",
180 );
181 self.views_builder.extend_trusted(views.iter().copied());
182 self.push_only_validity_mask(validity_mask);
183
184 debug_assert_eq!(self.nulls.len(), self.views_builder.len())
185 }
186
187 pub fn finish_into_varbinview(&mut self) -> VarBinViewArray {
189 self.flush_in_progress();
190 let buffers = std::mem::take(&mut self.completed);
191
192 assert_eq!(
193 self.views_builder.len(),
194 self.nulls.len(),
195 "View and validity length must match"
196 );
197
198 let validity = self.nulls.finish_with_nullability(self.dtype.nullability());
199
200 unsafe {
202 VarBinViewArray::new_unchecked(
203 std::mem::take(&mut self.views_builder).freeze(),
204 buffers.finish(),
205 self.dtype.clone(),
206 validity,
207 )
208 }
209 }
210
211 fn push_only_validity_mask(&mut self, validity_mask: Mask) {
213 self.nulls.append_validity_mask(validity_mask);
214 }
215}
216
217impl ArrayBuilder for VarBinViewBuilder {
218 fn as_any(&self) -> &dyn Any {
219 self
220 }
221
222 fn as_any_mut(&mut self) -> &mut dyn Any {
223 self
224 }
225
226 fn dtype(&self) -> &DType {
227 &self.dtype
228 }
229
230 fn len(&self) -> usize {
231 self.nulls.len()
232 }
233
234 fn append_zeros(&mut self, n: usize) {
235 self.views_builder.push_n(BinaryView::empty_view(), n);
236 self.nulls.append_n_non_nulls(n);
237 }
238
239 unsafe fn append_nulls_unchecked(&mut self, n: usize) {
240 self.views_builder.push_n(BinaryView::empty_view(), n);
241 self.nulls.append_n_nulls(n);
242 }
243
244 fn append_scalar(&mut self, scalar: &Scalar) -> VortexResult<()> {
245 vortex_ensure!(
246 scalar.dtype() == self.dtype(),
247 "VarBinViewBuilder expected scalar with dtype {}, got {}",
248 self.dtype(),
249 scalar.dtype()
250 );
251
252 match self.dtype() {
253 DType::Utf8(_) => match scalar.as_utf8().value() {
254 Some(value) => self.append_value(value),
255 None => self.append_null(),
256 },
257 DType::Binary(_) => match scalar.as_binary().value() {
258 Some(value) => self.append_value(value),
259 None => self.append_null(),
260 },
261 _ => vortex_bail!(
262 "VarBinViewBuilder can only handle Utf8 or Binary scalars, got {:?}",
263 scalar.dtype()
264 ),
265 }
266
267 Ok(())
268 }
269
270 unsafe fn extend_from_array_unchecked(&mut self, array: &dyn Array) {
271 let array = array.to_varbinview();
272 self.flush_in_progress();
273
274 self.push_only_validity_mask(
275 array
276 .validity_mask()
277 .vortex_expect("validity_mask in extend_from_array_unchecked"),
278 );
279
280 let view_adjustment =
281 self.completed
282 .extend_from_compaction(BuffersWithOffsets::from_array(
283 &array,
284 self.compaction_threshold,
285 ));
286
287 match view_adjustment {
288 ViewAdjustment::Precomputed(adjustment) => self.views_builder.extend_trusted(
289 array
290 .views()
291 .iter()
292 .map(|view| adjustment.adjust_view(view)),
293 ),
294 ViewAdjustment::Rewriting(adjustment) => match array
295 .validity_mask()
296 .vortex_expect("validity_mask in extend_from_array_unchecked")
297 {
298 Mask::AllTrue(_) => {
299 for (idx, &view) in array.views().iter().enumerate() {
300 let new_view = self.push_view(view, &adjustment, &array, idx);
301 self.views_builder.push(new_view);
302 }
303 }
304 Mask::AllFalse(_) => {
305 self.views_builder
306 .push_n(BinaryView::empty_view(), array.len());
307 }
308 Mask::Values(v) => {
309 for (idx, (&view, is_valid)) in
310 array.views().iter().zip(v.bit_buffer().iter()).enumerate()
311 {
312 let new_view = if !is_valid {
313 BinaryView::empty_view()
314 } else {
315 self.push_view(view, &adjustment, &array, idx)
316 };
317 self.views_builder.push(new_view);
318 }
319 }
320 },
321 }
322 }
323
324 fn reserve_exact(&mut self, additional: usize) {
325 self.views_builder.reserve(additional);
326 self.nulls.reserve_exact(additional);
327 }
328
329 unsafe fn set_validity_unchecked(&mut self, validity: Mask) {
330 self.nulls = LazyBitBufferBuilder::new(validity.len());
331 self.nulls.append_validity_mask(validity);
332 }
333
334 fn finish(&mut self) -> ArrayRef {
335 self.finish_into_varbinview().into_array()
336 }
337
338 fn finish_into_canonical(&mut self) -> Canonical {
339 Canonical::VarBinView(self.finish_into_varbinview())
340 }
341}
342
343impl VarBinViewBuilder {
344 #[inline]
345 fn push_view(
346 &mut self,
347 view: BinaryView,
348 adjustment: &RewritingViewAdjustment,
349 array: &VarBinViewArray,
350 idx: usize,
351 ) -> BinaryView {
352 if view.is_inlined() {
353 view
354 } else if let Some(adjusted) = adjustment.adjust_view(&view) {
355 adjusted
356 } else {
357 let bytes = array.bytes_at(idx);
358 let (new_buf_idx, new_offset) = self.append_value_to_buffer(&bytes);
359 BinaryView::make_view(bytes.as_slice(), new_buf_idx, new_offset)
360 }
361 }
362}
363
364pub enum CompletedBuffers {
365 Default(Vec<ByteBuffer>),
366 Deduplicated(DeduplicatedBuffers),
367}
368
369impl Default for CompletedBuffers {
370 fn default() -> Self {
371 Self::Default(Vec::new())
372 }
373}
374
375#[allow(clippy::cast_possible_truncation)]
377impl CompletedBuffers {
378 fn len(&self) -> u32 {
379 match self {
380 Self::Default(buffers) => buffers.len() as u32,
381 Self::Deduplicated(buffers) => buffers.len(),
382 }
383 }
384
385 fn push(&mut self, block: ByteBuffer) -> u32 {
386 match self {
387 Self::Default(buffers) => {
388 assert!(buffers.len() < u32::MAX as usize, "Too many blocks");
389 buffers.push(block);
390 self.len()
391 }
392 Self::Deduplicated(buffers) => buffers.push(block),
393 }
394 }
395
396 fn extend_from_slice_unchecked(&mut self, buffers: &[ByteBuffer]) {
398 for buffer in buffers {
399 self.push(buffer.clone());
400 }
401 }
402
403 fn extend_from_compaction(&mut self, buffers: BuffersWithOffsets) -> ViewAdjustment {
404 match (self, buffers) {
405 (
406 Self::Default(completed_buffers),
407 BuffersWithOffsets::AllKept { buffers, offsets },
408 ) => {
409 let buffer_offset = completed_buffers.len() as u32;
410 completed_buffers.extend_from_slice(&buffers);
411 ViewAdjustment::shift(buffer_offset, offsets)
412 }
413 (
414 Self::Default(completed_buffers),
415 BuffersWithOffsets::SomeCompacted { buffers, offsets },
416 ) => {
417 let lookup = buffers
418 .iter()
419 .map(|maybe_buffer| {
420 maybe_buffer.as_ref().map(|buffer| {
421 completed_buffers.push(buffer.clone());
422 completed_buffers.len() as u32 - 1
423 })
424 })
425 .collect();
426 ViewAdjustment::rewriting(lookup, offsets)
427 }
428
429 (
430 Self::Deduplicated(completed_buffers),
431 BuffersWithOffsets::AllKept { buffers, offsets },
432 ) => {
433 let buffer_lookup = completed_buffers.extend_from_iter(buffers.iter().cloned());
434 ViewAdjustment::lookup(buffer_lookup, offsets)
435 }
436 (
437 Self::Deduplicated(completed_buffers),
438 BuffersWithOffsets::SomeCompacted { buffers, offsets },
439 ) => {
440 let buffer_lookup = completed_buffers.extend_from_option_slice(&buffers);
441 ViewAdjustment::rewriting(buffer_lookup, offsets)
442 }
443 }
444 }
445
446 fn finish(self) -> Arc<[ByteBuffer]> {
447 match self {
448 Self::Default(buffers) => Arc::from(buffers),
449 Self::Deduplicated(buffers) => buffers.finish(),
450 }
451 }
452}
453
454#[derive(Default)]
455pub struct DeduplicatedBuffers {
456 buffers: Vec<ByteBuffer>,
457 buffer_to_idx: HashMap<BufferId, u32>,
458}
459
460impl DeduplicatedBuffers {
461 #[allow(clippy::cast_possible_truncation)]
463 fn len(&self) -> u32 {
464 self.buffers.len() as u32
465 }
466
467 pub(crate) fn push(&mut self, block: ByteBuffer) -> u32 {
469 assert!(self.buffers.len() < u32::MAX as usize, "Too many blocks");
470
471 let initial_len = self.len();
472 let id = BufferId::from(&block);
473 match self.buffer_to_idx.entry(id) {
474 Entry::Occupied(idx) => *idx.get(),
475 Entry::Vacant(entry) => {
476 let idx = initial_len;
477 entry.insert(idx);
478 self.buffers.push(block);
479 idx
480 }
481 }
482 }
483
484 pub(crate) fn extend_from_option_slice(
485 &mut self,
486 buffers: &[Option<ByteBuffer>],
487 ) -> Vec<Option<u32>> {
488 buffers
489 .iter()
490 .map(|buffer| buffer.as_ref().map(|buf| self.push(buf.clone())))
491 .collect()
492 }
493
494 pub(crate) fn extend_from_iter(
495 &mut self,
496 buffers: impl Iterator<Item = ByteBuffer>,
497 ) -> Vec<u32> {
498 buffers.map(|buffer| self.push(buffer)).collect()
499 }
500
501 pub(crate) fn finish(self) -> Arc<[ByteBuffer]> {
502 Arc::from(self.buffers)
503 }
504}
505
506#[derive(PartialEq, Eq, Hash)]
507struct BufferId {
508 ptr: usize,
510 len: usize,
511}
512
513impl BufferId {
514 fn from(buffer: &ByteBuffer) -> Self {
515 let slice = buffer.as_slice();
516 Self {
517 ptr: slice.as_ptr() as usize,
518 len: slice.len(),
519 }
520 }
521}
522
523#[derive(Debug, Clone)]
524pub enum BufferGrowthStrategy {
525 Fixed { size: u32 },
527 Exponential { current_size: u32, max_size: u32 },
529}
530
531impl Default for BufferGrowthStrategy {
532 fn default() -> Self {
533 Self::Exponential {
534 current_size: 4 * 1024, max_size: 2 * 1024 * 1024, }
537 }
538}
539
540impl BufferGrowthStrategy {
541 pub fn fixed(size: u32) -> Self {
542 Self::Fixed { size }
543 }
544
545 pub fn exponential(initial_size: u32, max_size: u32) -> Self {
546 Self::Exponential {
547 current_size: initial_size,
548 max_size,
549 }
550 }
551
552 pub fn next_size(&mut self) -> u32 {
554 match self {
555 Self::Fixed { size } => *size,
556 Self::Exponential {
557 current_size,
558 max_size,
559 } => {
560 let result = *current_size;
561 if *current_size < *max_size {
562 *current_size = current_size.saturating_mul(2).min(*max_size);
563 }
564 result
565 }
566 }
567 }
568}
569
570enum BuffersWithOffsets {
571 AllKept {
572 buffers: Arc<[ByteBuffer]>,
573 offsets: Option<Vec<u32>>,
574 },
575 SomeCompacted {
576 buffers: Vec<Option<ByteBuffer>>,
577 offsets: Option<Vec<u32>>,
578 },
579}
580
581impl BuffersWithOffsets {
582 pub fn from_array(array: &VarBinViewArray, compaction_threshold: f64) -> Self {
583 if compaction_threshold == 0.0 {
584 return Self::AllKept {
585 buffers: Arc::from(
586 array
587 .buffers()
588 .to_vec()
589 .into_iter()
590 .map(|b| b.unwrap_host())
591 .collect_vec(),
592 ),
593 offsets: None,
594 };
595 }
596
597 let buffer_utilizations = array
598 .buffer_utilizations()
599 .vortex_expect("buffer_utilizations in BuffersWithOffsets::from_array");
600 let mut has_rewrite = false;
601 let mut has_nonzero_offset = false;
602 for utilization in buffer_utilizations.iter() {
603 match compaction_strategy(utilization, compaction_threshold) {
604 CompactionStrategy::KeepFull => continue,
605 CompactionStrategy::Slice { .. } => has_nonzero_offset = true,
606 CompactionStrategy::Rewrite => has_rewrite = true,
607 }
608 }
609
610 let buffers_with_offsets_iter =
611 buffer_utilizations
612 .iter()
613 .zip(array.buffers().iter())
614 .map(|(utilization, buffer)| {
615 match compaction_strategy(utilization, compaction_threshold) {
616 CompactionStrategy::KeepFull => (Some(buffer.as_host().clone()), 0),
617 CompactionStrategy::Slice { start, end } => (
618 Some(buffer.as_host().slice(start as usize..end as usize)),
619 start,
620 ),
621 CompactionStrategy::Rewrite => (None, 0),
622 }
623 });
624
625 match (has_rewrite, has_nonzero_offset) {
626 (false, false) => {
628 let buffers: Vec<_> = buffers_with_offsets_iter
629 .map(|(b, _)| b.vortex_expect("already checked for rewrite"))
630 .collect();
631 Self::AllKept {
632 buffers: Arc::from(buffers),
633 offsets: None,
634 }
635 }
636 (true, false) => {
638 let buffers: Vec<_> = buffers_with_offsets_iter.map(|(b, _)| b).collect();
639 Self::SomeCompacted {
640 buffers,
641 offsets: None,
642 }
643 }
644 (false, true) => {
646 let (buffers, offsets): (Vec<_>, _) = buffers_with_offsets_iter
647 .map(|(buffer, offset)| {
648 (buffer.vortex_expect("already checked for rewrite"), offset)
649 })
650 .collect();
651 Self::AllKept {
652 buffers: Arc::from(buffers),
653 offsets: Some(offsets),
654 }
655 }
656 (true, true) => {
658 let (buffers, offsets) = buffers_with_offsets_iter.collect();
659 Self::SomeCompacted {
660 buffers,
661 offsets: Some(offsets),
662 }
663 }
664 }
665 }
666}
667
668#[derive(Debug, Clone, Copy, PartialEq, Eq)]
669enum CompactionStrategy {
670 KeepFull,
671 Slice {
673 start: u32,
674 end: u32,
675 },
676 Rewrite,
678}
679
680fn compaction_strategy(
681 buffer_utilization: &BufferUtilization,
682 threshold: f64,
683) -> CompactionStrategy {
684 match buffer_utilization.overall_utilization() {
685 0.0 => CompactionStrategy::Rewrite,
687 utilised if utilised >= threshold => CompactionStrategy::KeepFull,
688 _ if buffer_utilization.range_utilization() >= threshold => {
689 let Range { start, end } = buffer_utilization.range();
690 CompactionStrategy::Slice { start, end }
691 }
692 _ => CompactionStrategy::Rewrite,
693 }
694}
695
696enum ViewAdjustment {
697 Precomputed(PrecomputedViewAdjustment),
698 Rewriting(RewritingViewAdjustment),
699}
700
701impl ViewAdjustment {
702 fn shift(buffer_offset: u32, offsets: Option<Vec<u32>>) -> Self {
703 Self::Precomputed(PrecomputedViewAdjustment::Shift {
704 buffer_offset,
705 offsets,
706 })
707 }
708
709 fn lookup(buffer_lookup: Vec<u32>, offsets: Option<Vec<u32>>) -> Self {
710 Self::Precomputed(PrecomputedViewAdjustment::Lookup {
711 buffer_lookup,
712 offsets,
713 })
714 }
715
716 fn rewriting(buffer_lookup: Vec<Option<u32>>, offsets: Option<Vec<u32>>) -> Self {
717 Self::Rewriting(RewritingViewAdjustment {
718 buffer_lookup,
719 offsets,
720 })
721 }
722}
723
724enum PrecomputedViewAdjustment {
726 Shift {
727 buffer_offset: u32,
728 offsets: Option<Vec<u32>>,
729 },
730 Lookup {
731 buffer_lookup: Vec<u32>,
732 offsets: Option<Vec<u32>>,
733 },
734}
735
736impl PrecomputedViewAdjustment {
737 #[inline]
738 fn adjust_view(&self, view: &BinaryView) -> BinaryView {
739 if view.is_inlined() {
740 return *view;
741 }
742 let view_ref = view.as_view();
743 match self {
744 Self::Shift {
745 buffer_offset,
746 offsets,
747 } => {
748 let b_idx = view_ref.buffer_index;
749 let offset_shift = offsets
750 .as_ref()
751 .map(|o| o[b_idx as usize])
752 .unwrap_or_default();
753
754 if view_ref.offset < offset_shift {
757 return BinaryView::empty_view();
758 }
759
760 view_ref
761 .with_buffer_and_offset(b_idx + buffer_offset, view_ref.offset - offset_shift)
762 }
763 Self::Lookup {
764 buffer_lookup,
765 offsets,
766 } => {
767 let b_idx = view_ref.buffer_index;
768 let buffer = buffer_lookup[b_idx as usize];
769 let offset_shift = offsets
770 .as_ref()
771 .map(|o| o[b_idx as usize])
772 .unwrap_or_default();
773
774 if view_ref.offset < offset_shift {
777 return BinaryView::empty_view();
778 }
779
780 view_ref.with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
781 }
782 }
783 .into()
784 }
785}
786
787struct RewritingViewAdjustment {
788 buffer_lookup: Vec<Option<u32>>,
789 offsets: Option<Vec<u32>>,
790}
791
792impl RewritingViewAdjustment {
793 fn adjust_view(&self, view: &BinaryView) -> Option<BinaryView> {
796 if view.is_inlined() {
797 return Some(*view);
798 }
799
800 let view_ref = view.as_view();
801 self.buffer_lookup[view_ref.buffer_index as usize].map(|buffer| {
802 let offset_shift = self
803 .offsets
804 .as_ref()
805 .map(|o| o[view_ref.buffer_index as usize])
806 .unwrap_or_default();
807 view_ref
808 .with_buffer_and_offset(buffer, view_ref.offset - offset_shift)
809 .into()
810 })
811 }
812}
813
814#[cfg(test)]
815mod tests {
816 use vortex_dtype::DType;
817 use vortex_dtype::Nullability;
818 use vortex_error::VortexResult;
819
820 use crate::IntoArray;
821 use crate::LEGACY_SESSION;
822 use crate::VortexSessionExecute;
823 use crate::arrays::VarBinViewArray;
824 use crate::assert_arrays_eq;
825 use crate::builders::ArrayBuilder;
826 use crate::builders::VarBinViewBuilder;
827
828 #[test]
829 fn test_utf8_builder() {
830 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
831
832 builder.append_value("Hello");
833 builder.append_null();
834 builder.append_value("World");
835
836 builder.append_nulls(2);
837
838 builder.append_zeros(2);
839 builder.append_value("test");
840
841 let actual = builder.finish();
842 let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
843 Some("Hello"),
844 None,
845 Some("World"),
846 None,
847 None,
848 Some(""),
849 Some(""),
850 Some("test"),
851 ]);
852 assert_arrays_eq!(actual, expected);
853 }
854
855 #[test]
856 fn test_utf8_builder_with_extend() {
857 let array = {
858 let mut builder =
859 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
860 builder.append_null();
861 builder.append_value("Hello2");
862 builder.finish()
863 };
864 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
865
866 builder.append_value("Hello1");
867 builder.extend_from_array(&array);
868 builder.append_nulls(2);
869 builder.append_value("Hello3");
870
871 let actual = builder.finish_into_canonical();
872 let expected = <VarBinViewArray as FromIterator<_>>::from_iter([
873 Some("Hello1"),
874 None,
875 Some("Hello2"),
876 None,
877 None,
878 Some("Hello3"),
879 ]);
880 assert_arrays_eq!(actual.into_array(), expected.into_array());
881 }
882
883 #[test]
884 fn test_buffer_deduplication() -> VortexResult<()> {
885 let array = {
886 let mut builder =
887 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
888 builder.append_value("This is a long string that should not be inlined");
889 builder.append_value("short string");
890 builder.finish_into_varbinview()
891 };
892
893 assert_eq!(array.buffers().len(), 1);
894 let mut builder =
895 VarBinViewBuilder::with_buffer_deduplication(DType::Utf8(Nullability::Nullable), 10);
896
897 let mut ctx = LEGACY_SESSION.create_execution_ctx();
898
899 array.append_to_builder(&mut builder, &mut ctx)?;
900 assert_eq!(builder.completed_block_count(), 1);
901
902 array
903 .slice(1..2)?
904 .append_to_builder(&mut builder, &mut ctx)?;
905 array
906 .slice(0..1)?
907 .append_to_builder(&mut builder, &mut ctx)?;
908 assert_eq!(builder.completed_block_count(), 1);
909
910 let array2 = {
911 let mut builder =
912 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
913 builder.append_value("This is a long string that should not be inlined");
914 builder.finish_into_varbinview()
915 };
916
917 array2.append_to_builder(&mut builder, &mut ctx)?;
918 assert_eq!(builder.completed_block_count(), 2);
919
920 array
921 .slice(0..1)?
922 .append_to_builder(&mut builder, &mut ctx)?;
923 array2
924 .slice(0..1)?
925 .append_to_builder(&mut builder, &mut ctx)?;
926 assert_eq!(builder.completed_block_count(), 2);
927 Ok(())
928 }
929
930 #[test]
931 fn test_append_scalar() {
932 use crate::scalar::Scalar;
933
934 let mut utf8_builder =
936 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
937
938 let utf8_scalar1 = Scalar::utf8("hello", Nullability::Nullable);
940 utf8_builder.append_scalar(&utf8_scalar1).unwrap();
941
942 let utf8_scalar2 = Scalar::utf8("world", Nullability::Nullable);
944 utf8_builder.append_scalar(&utf8_scalar2).unwrap();
945
946 let null_scalar = Scalar::null(DType::Utf8(Nullability::Nullable));
948 utf8_builder.append_scalar(&null_scalar).unwrap();
949
950 let array = utf8_builder.finish();
951 let expected =
952 <VarBinViewArray as FromIterator<_>>::from_iter([Some("hello"), Some("world"), None]);
953 assert_arrays_eq!(&array, &expected);
954
955 let mut binary_builder =
957 VarBinViewBuilder::with_capacity(DType::Binary(Nullability::Nullable), 10);
958
959 let binary_scalar = Scalar::binary(vec![1u8, 2, 3], Nullability::Nullable);
960 binary_builder.append_scalar(&binary_scalar).unwrap();
961
962 let binary_null = Scalar::null(DType::Binary(Nullability::Nullable));
963 binary_builder.append_scalar(&binary_null).unwrap();
964
965 let binary_array = binary_builder.finish();
966 let expected =
967 <VarBinViewArray as FromIterator<_>>::from_iter([Some(vec![1u8, 2, 3]), None]);
968 assert_arrays_eq!(&binary_array, &expected);
969
970 let mut builder =
972 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::NonNullable), 10);
973 let wrong_scalar = Scalar::from(42i32);
974 assert!(builder.append_scalar(&wrong_scalar).is_err());
975 }
976
977 #[test]
978 fn test_buffer_growth_strategies() {
979 use super::BufferGrowthStrategy;
980
981 let mut strategy = BufferGrowthStrategy::fixed(1024);
983
984 assert_eq!(strategy.next_size(), 1024);
986 assert_eq!(strategy.next_size(), 1024);
987 assert_eq!(strategy.next_size(), 1024);
988
989 let mut strategy = BufferGrowthStrategy::exponential(1024, 8192);
991
992 assert_eq!(strategy.next_size(), 1024); assert_eq!(strategy.next_size(), 2048); assert_eq!(strategy.next_size(), 4096); assert_eq!(strategy.next_size(), 8192); assert_eq!(strategy.next_size(), 8192); }
999
1000 #[test]
1001 fn test_large_value_allocation() {
1002 use super::BufferGrowthStrategy;
1003 use super::VarBinViewBuilder;
1004
1005 let mut builder = VarBinViewBuilder::new(
1006 DType::Binary(Nullability::Nullable),
1007 10,
1008 Default::default(),
1009 BufferGrowthStrategy::exponential(1024, 4096),
1010 0.0,
1011 );
1012
1013 let large_value = vec![0u8; 8192];
1015
1016 builder.append_value(&large_value);
1018
1019 let array = builder.finish_into_varbinview();
1020 assert_eq!(array.len(), 1);
1021
1022 let retrieved = array
1024 .scalar_at(0)
1025 .unwrap()
1026 .as_binary()
1027 .value()
1028 .cloned()
1029 .unwrap();
1030 assert_eq!(retrieved.len(), 8192);
1031 assert_eq!(retrieved.as_slice(), &large_value);
1032 }
1033}