1use std::any::Any;
5use std::ops::Range;
6use std::sync::Arc;
7
8use vortex_buffer::{Buffer, BufferMut, ByteBuffer, ByteBufferMut};
9use vortex_dtype::DType;
10use vortex_error::{VortexExpect, VortexResult, vortex_bail, vortex_ensure};
11use vortex_mask::Mask;
12use vortex_scalar::{BinaryScalar, Scalar, Utf8Scalar};
13use vortex_utils::aliases::hash_map::{Entry, HashMap};
14
15use crate::arrays::VarBinViewArray;
16use crate::arrays::binary_view::BinaryView;
17use crate::arrays::compact::BufferUtilization;
18use crate::builders::{ArrayBuilder, LazyNullBufferBuilder};
19use crate::canonical::{Canonical, ToCanonical};
20use crate::{Array, ArrayRef, IntoArray};
21
22pub struct VarBinViewBuilder {
24 dtype: DType,
25 views_builder: BufferMut<BinaryView>,
26 nulls: LazyNullBufferBuilder,
27 completed: CompletedBuffers,
28 in_progress: ByteBufferMut,
29 growth_strategy: BufferGrowthStrategy,
30 compaction_threshold: f64,
31}
32
33impl VarBinViewBuilder {
34 pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
35 Self::new(dtype, capacity, Default::default(), Default::default(), 0.0)
36 }
37
38 pub fn with_buffer_deduplication(dtype: DType, capacity: usize) -> Self {
39 Self::new(
40 dtype,
41 capacity,
42 CompletedBuffers::Deduplicated(Default::default()),
43 Default::default(),
44 0.0,
45 )
46 }
47
48 pub fn with_compaction(dtype: DType, capacity: usize, compaction_threshold: f64) -> Self {
49 Self::new(
50 dtype,
51 capacity,
52 Default::default(),
53 Default::default(),
54 compaction_threshold,
55 )
56 }
57
58 pub fn new(
59 dtype: DType,
60 capacity: usize,
61 completed: CompletedBuffers,
62 growth_strategy: BufferGrowthStrategy,
63 compaction_threshold: f64,
64 ) -> Self {
65 assert!(
66 matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
67 "VarBinViewBuilder DType must be Utf8 or Binary."
68 );
69 Self {
70 views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
71 nulls: LazyNullBufferBuilder::new(capacity),
72 completed,
73 in_progress: ByteBufferMut::empty(),
74 dtype,
75 growth_strategy,
76 compaction_threshold,
77 }
78 }
79
80 fn append_value_view(&mut self, value: &[u8]) {
81 let length =
82 u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
83 if length <= 12 {
84 self.views_builder.push(BinaryView::make_view(value, 0, 0));
85 return;
86 }
87
88 let (buffer_idx, offset) = self.append_value_to_buffer(value);
89 let view = BinaryView::make_view(value, buffer_idx, offset);
90 self.views_builder.push(view);
91 }
92
93 pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
95 self.append_value_view(value.as_ref());
96 self.nulls.append_non_null();
97 }
98
99 fn flush_in_progress(&mut self) {
100 if self.in_progress.is_empty() {
101 return;
102 }
103 let block = std::mem::take(&mut self.in_progress).freeze();
104
105 assert!(block.len() < u32::MAX as usize, "Block too large");
106
107 let initial_len = self.completed.len();
108 self.completed.push(block);
109 assert_eq!(
110 self.completed.len(),
111 initial_len + 1,
112 "Invalid state, just completed block already exists"
113 );
114 }
115
116 fn append_value_to_buffer(&mut self, value: &[u8]) -> (u32, u32) {
118 assert!(value.len() > 12, "must inline small strings");
119 let required_cap = self.in_progress.len() + value.len();
120 if self.in_progress.capacity() < required_cap {
121 self.flush_in_progress();
122 let next_buffer_size = self.growth_strategy.next_size() as usize;
123 let to_reserve = next_buffer_size.max(value.len());
124 self.in_progress.reserve(to_reserve);
125 }
126
127 let buffer_idx = self.completed.len();
128 let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
129 self.in_progress.extend_from_slice(value);
130
131 (buffer_idx, offset)
132 }
133
134 pub fn completed_block_count(&self) -> u32 {
135 self.completed.len()
136 }
137
138 pub fn push_buffer_and_adjusted_views(
155 &mut self,
156 buffer: &[ByteBuffer],
157 views: &Buffer<BinaryView>,
158 validity_mask: Mask,
159 ) {
160 self.flush_in_progress();
161
162 let expected_completed_len = self.completed.len() as usize + buffer.len();
163 self.completed.extend_from_slice_unchecked(buffer);
164 assert_eq!(
165 self.completed.len() as usize,
166 expected_completed_len,
167 "Some buffers already exist",
168 );
169 self.views_builder.extend_trusted(views.iter().copied());
170 self.push_only_validity_mask(validity_mask);
171
172 debug_assert_eq!(self.nulls.len(), self.views_builder.len())
173 }
174
175 pub fn finish_into_varbinview(&mut self) -> VarBinViewArray {
177 self.flush_in_progress();
178 let buffers = std::mem::take(&mut self.completed);
179
180 assert_eq!(
181 self.views_builder.len(),
182 self.nulls.len(),
183 "View and validity length must match"
184 );
185
186 let validity = self.nulls.finish_with_nullability(self.dtype.nullability());
187
188 unsafe {
190 VarBinViewArray::new_unchecked(
191 std::mem::take(&mut self.views_builder).freeze(),
192 buffers.finish(),
193 self.dtype.clone(),
194 validity,
195 )
196 }
197 }
198
199 fn push_only_validity_mask(&mut self, validity_mask: Mask) {
201 self.nulls.append_validity_mask(validity_mask);
202 }
203}
204
205impl ArrayBuilder for VarBinViewBuilder {
206 fn as_any(&self) -> &dyn Any {
207 self
208 }
209
210 fn as_any_mut(&mut self) -> &mut dyn Any {
211 self
212 }
213
214 fn dtype(&self) -> &DType {
215 &self.dtype
216 }
217
218 fn len(&self) -> usize {
219 self.nulls.len()
220 }
221
222 fn append_zeros(&mut self, n: usize) {
223 self.views_builder.push_n(BinaryView::empty_view(), n);
224 self.nulls.append_n_non_nulls(n);
225 }
226
227 unsafe fn append_nulls_unchecked(&mut self, n: usize) {
228 self.views_builder.push_n(BinaryView::empty_view(), n);
229 self.nulls.append_n_nulls(n);
230 }
231
232 fn append_scalar(&mut self, scalar: &Scalar) -> VortexResult<()> {
233 vortex_ensure!(
234 scalar.dtype() == self.dtype(),
235 "VarBinViewBuilder expected scalar with dtype {:?}, got {:?}",
236 self.dtype(),
237 scalar.dtype()
238 );
239
240 match self.dtype() {
241 DType::Utf8(_) => {
242 let utf8_scalar = Utf8Scalar::try_from(scalar)?;
243 match utf8_scalar.value() {
244 Some(value) => self.append_value(value),
245 None => self.append_null(),
246 }
247 }
248 DType::Binary(_) => {
249 let binary_scalar = BinaryScalar::try_from(scalar)?;
250 match binary_scalar.value() {
251 Some(value) => self.append_value(value),
252 None => self.append_null(),
253 }
254 }
255 _ => vortex_bail!(
256 "VarBinViewBuilder can only handle Utf8 or Binary scalars, got {:?}",
257 scalar.dtype()
258 ),
259 }
260
261 Ok(())
262 }
263
264 unsafe fn extend_from_array_unchecked(&mut self, array: &dyn Array) {
265 let array = array.to_varbinview();
266 self.flush_in_progress();
267
268 self.push_only_validity_mask(array.validity_mask());
269
270 let view_adjustment =
271 self.completed
272 .extend_from_compaction(BuffersWithOffsets::from_array(
273 &array,
274 self.compaction_threshold,
275 ));
276
277 match view_adjustment {
278 ViewAdjustment::Precomputed(adjustment) => self.views_builder.extend_trusted(
279 array
280 .views()
281 .iter()
282 .map(|view| adjustment.adjust_view(view)),
283 ),
284 ViewAdjustment::Rewriting(adjustment) => {
285 for (idx, &view) in array.views().iter().enumerate() {
286 let new_view = if !array.is_valid(idx) {
287 BinaryView::empty_view()
288 } else if view.is_inlined() {
289 view
290 } else if let Some(adjusted) = adjustment.adjust_view(&view) {
291 adjusted
292 } else {
293 let bytes = array.bytes_at(idx);
294 let (new_buf_idx, new_offset) = self.append_value_to_buffer(&bytes);
295 BinaryView::make_view(bytes.as_slice(), new_buf_idx, new_offset)
296 };
297 self.views_builder.push(new_view)
298 }
299 }
300 }
301 }
302
303 fn reserve_exact(&mut self, additional: usize) {
304 self.views_builder.reserve(additional);
305 self.nulls.reserve_exact(additional);
306 }
307
308 unsafe fn set_validity_unchecked(&mut self, validity: Mask) {
309 self.nulls = LazyNullBufferBuilder::new(validity.len());
310 self.nulls.append_validity_mask(validity);
311 }
312
313 fn finish(&mut self) -> ArrayRef {
314 self.finish_into_varbinview().into_array()
315 }
316
317 fn finish_into_canonical(&mut self) -> Canonical {
318 Canonical::VarBinView(self.finish_into_varbinview())
319 }
320}
321
322pub enum CompletedBuffers {
323 Default(Vec<ByteBuffer>),
324 Deduplicated(DeduplicatedBuffers),
325}
326
327impl Default for CompletedBuffers {
328 fn default() -> Self {
329 Self::Default(Vec::new())
330 }
331}
332
333#[allow(clippy::cast_possible_truncation)]
335impl CompletedBuffers {
336 fn len(&self) -> u32 {
337 match self {
338 Self::Default(buffers) => buffers.len() as u32,
339 Self::Deduplicated(buffers) => buffers.len(),
340 }
341 }
342
343 fn push(&mut self, block: ByteBuffer) -> u32 {
344 match self {
345 Self::Default(buffers) => {
346 assert!(buffers.len() < u32::MAX as usize, "Too many blocks");
347 buffers.push(block);
348 self.len()
349 }
350 Self::Deduplicated(buffers) => buffers.push(block),
351 }
352 }
353
354 fn extend_from_slice_unchecked(&mut self, buffers: &[ByteBuffer]) {
356 for buffer in buffers {
357 self.push(buffer.clone());
358 }
359 }
360
361 fn extend_from_compaction(&mut self, buffers: BuffersWithOffsets) -> ViewAdjustment {
362 match (self, buffers) {
363 (
364 Self::Default(completed_buffers),
365 BuffersWithOffsets::AllKept { buffers, offsets },
366 ) => {
367 let buffer_offset = completed_buffers.len() as u32;
368 completed_buffers.extend_from_slice(&buffers);
369 ViewAdjustment::shift(buffer_offset, offsets)
370 }
371 (
372 Self::Default(completed_buffers),
373 BuffersWithOffsets::SomeCompacted { buffers, offsets },
374 ) => {
375 let lookup = buffers
376 .iter()
377 .map(|maybe_buffer| {
378 maybe_buffer.as_ref().map(|buffer| {
379 completed_buffers.push(buffer.clone());
380 completed_buffers.len() as u32 - 1
381 })
382 })
383 .collect();
384 ViewAdjustment::rewriting(lookup, offsets)
385 }
386
387 (
388 Self::Deduplicated(completed_buffers),
389 BuffersWithOffsets::AllKept { buffers, offsets },
390 ) => {
391 let buffer_lookup = completed_buffers.extend_from_slice(&buffers);
392 ViewAdjustment::lookup(buffer_lookup, offsets)
393 }
394 (
395 Self::Deduplicated(completed_buffers),
396 BuffersWithOffsets::SomeCompacted { buffers, offsets },
397 ) => {
398 let buffer_lookup = completed_buffers.extend_from_option_slice(&buffers);
399 ViewAdjustment::rewriting(buffer_lookup, offsets)
400 }
401 }
402 }
403
404 fn finish(self) -> Arc<[ByteBuffer]> {
405 match self {
406 Self::Default(buffers) => Arc::from(buffers),
407 Self::Deduplicated(buffers) => buffers.finish(),
408 }
409 }
410}
411
412#[derive(Default)]
413pub struct DeduplicatedBuffers {
414 buffers: Vec<ByteBuffer>,
415 buffer_to_idx: HashMap<BufferId, u32>,
416}
417
418impl DeduplicatedBuffers {
419 #[allow(clippy::cast_possible_truncation)]
421 fn len(&self) -> u32 {
422 self.buffers.len() as u32
423 }
424
425 fn push(&mut self, block: ByteBuffer) -> u32 {
427 assert!(self.buffers.len() < u32::MAX as usize, "Too many blocks");
428
429 let initial_len = self.len();
430 let id = BufferId::from(&block);
431 match self.buffer_to_idx.entry(id) {
432 Entry::Occupied(idx) => *idx.get(),
433 Entry::Vacant(entry) => {
434 let idx = initial_len;
435 entry.insert(idx);
436 self.buffers.push(block);
437 idx
438 }
439 }
440 }
441
442 fn extend_from_option_slice(&mut self, buffers: &[Option<ByteBuffer>]) -> Vec<Option<u32>> {
443 buffers
444 .iter()
445 .map(|buffer| buffer.as_ref().map(|buf| self.push(buf.clone())))
446 .collect()
447 }
448
449 fn extend_from_slice(&mut self, buffers: &[ByteBuffer]) -> Vec<u32> {
450 buffers
451 .iter()
452 .map(|buffer| self.push(buffer.clone()))
453 .collect()
454 }
455
456 fn finish(self) -> Arc<[ByteBuffer]> {
457 Arc::from(self.buffers)
458 }
459}
460
461#[derive(PartialEq, Eq, Hash)]
462struct BufferId {
463 ptr: usize,
465 len: usize,
466}
467
468impl BufferId {
469 fn from(buffer: &ByteBuffer) -> Self {
470 let slice = buffer.as_slice();
471 Self {
472 ptr: slice.as_ptr() as usize,
473 len: slice.len(),
474 }
475 }
476}
477
478#[derive(Debug, Clone)]
479pub enum BufferGrowthStrategy {
480 Fixed { size: u32 },
482 Exponential { current_size: u32, max_size: u32 },
484}
485
486impl Default for BufferGrowthStrategy {
487 fn default() -> Self {
488 Self::Exponential {
489 current_size: 4 * 1024, max_size: 2 * 1024 * 1024, }
492 }
493}
494
495impl BufferGrowthStrategy {
496 pub fn fixed(size: u32) -> Self {
497 Self::Fixed { size }
498 }
499
500 pub fn exponential(initial_size: u32, max_size: u32) -> Self {
501 Self::Exponential {
502 current_size: initial_size,
503 max_size,
504 }
505 }
506
507 pub fn next_size(&mut self) -> u32 {
509 match self {
510 Self::Fixed { size } => *size,
511 Self::Exponential {
512 current_size,
513 max_size,
514 } => {
515 let result = *current_size;
516 if *current_size < *max_size {
517 *current_size = current_size.saturating_mul(2).min(*max_size);
518 }
519 result
520 }
521 }
522 }
523}
524
525enum BuffersWithOffsets {
526 AllKept {
527 buffers: Arc<[ByteBuffer]>,
528 offsets: Option<Vec<u32>>,
529 },
530 SomeCompacted {
531 buffers: Vec<Option<ByteBuffer>>,
532 offsets: Option<Vec<u32>>,
533 },
534}
535
536impl BuffersWithOffsets {
537 pub fn from_array(array: &VarBinViewArray, compaction_threshold: f64) -> Self {
538 if compaction_threshold == 0.0 {
539 return Self::AllKept {
540 buffers: array.buffers().clone(),
541 offsets: None,
542 };
543 }
544
545 let buffer_utilizations = array.buffer_utilizations();
546 let mut has_rewrite = false;
547 let mut has_nonzero_offset = false;
548 for utilization in buffer_utilizations.iter() {
549 match compaction_strategy(utilization, compaction_threshold) {
550 CompactionStrategy::KeepFull => continue,
551 CompactionStrategy::Slice { .. } => has_nonzero_offset = true,
552 CompactionStrategy::Rewrite => has_rewrite = true,
553 }
554 }
555
556 let buffers_with_offsets_iter =
557 buffer_utilizations
558 .iter()
559 .zip(array.buffers().iter())
560 .map(|(utilization, buffer)| {
561 match compaction_strategy(utilization, compaction_threshold) {
562 CompactionStrategy::KeepFull => (Some(buffer.clone()), 0),
563 CompactionStrategy::Slice { start, end } => {
564 (Some(buffer.slice(start as usize..end as usize)), start)
565 }
566 CompactionStrategy::Rewrite => (None, 0),
567 }
568 });
569
570 match (has_rewrite, has_nonzero_offset) {
571 (false, false) => {
573 let buffers: Vec<_> = buffers_with_offsets_iter
574 .map(|(b, _)| b.vortex_expect("already checked for rewrite"))
575 .collect();
576 Self::AllKept {
577 buffers: Arc::from(buffers),
578 offsets: None,
579 }
580 }
581 (true, false) => {
583 let buffers: Vec<_> = buffers_with_offsets_iter.map(|(b, _)| b).collect();
584 Self::SomeCompacted {
585 buffers,
586 offsets: None,
587 }
588 }
589 (false, true) => {
591 let (buffers, offsets): (Vec<_>, _) = buffers_with_offsets_iter
592 .map(|(buffer, offset)| {
593 (buffer.vortex_expect("already checked for rewrite"), offset)
594 })
595 .collect();
596 Self::AllKept {
597 buffers: Arc::from(buffers),
598 offsets: Some(offsets),
599 }
600 }
601 (true, true) => {
603 let (buffers, offsets) = buffers_with_offsets_iter.collect();
604 Self::SomeCompacted {
605 buffers,
606 offsets: Some(offsets),
607 }
608 }
609 }
610 }
611}
612
613#[derive(Debug, Clone, Copy, PartialEq, Eq)]
614enum CompactionStrategy {
615 KeepFull,
616 Slice {
618 start: u32,
619 end: u32,
620 },
621 Rewrite,
623}
624
625fn compaction_strategy(
626 buffer_utilization: &BufferUtilization,
627 threshold: f64,
628) -> CompactionStrategy {
629 match buffer_utilization.overall_utilization() {
630 0.0 => CompactionStrategy::Rewrite,
632 utilised if utilised >= threshold => CompactionStrategy::KeepFull,
633 _ if buffer_utilization.range_utilization() >= threshold => {
634 let Range { start, end } = buffer_utilization.range();
635 CompactionStrategy::Slice { start, end }
636 }
637 _ => CompactionStrategy::Rewrite,
638 }
639}
640
641enum ViewAdjustment {
642 Precomputed(PrecomputedViewAdjustment),
643 Rewriting(RewritingViewAdjustment),
644}
645
646impl ViewAdjustment {
647 fn shift(buffer_offset: u32, offsets: Option<Vec<u32>>) -> Self {
648 Self::Precomputed(PrecomputedViewAdjustment::Shift {
649 buffer_offset,
650 offsets,
651 })
652 }
653
654 fn lookup(buffer_lookup: Vec<u32>, offsets: Option<Vec<u32>>) -> Self {
655 Self::Precomputed(PrecomputedViewAdjustment::Lookup {
656 buffer_lookup,
657 offsets,
658 })
659 }
660
661 fn rewriting(buffer_lookup: Vec<Option<u32>>, offsets: Option<Vec<u32>>) -> Self {
662 Self::Rewriting(RewritingViewAdjustment {
663 buffer_lookup,
664 offsets,
665 })
666 }
667}
668
669enum PrecomputedViewAdjustment {
671 Shift {
672 buffer_offset: u32,
673 offsets: Option<Vec<u32>>,
674 },
675 Lookup {
676 buffer_lookup: Vec<u32>,
677 offsets: Option<Vec<u32>>,
678 },
679}
680
681impl PrecomputedViewAdjustment {
682 #[inline]
683 fn adjust_view(&self, view: &BinaryView) -> BinaryView {
684 if view.is_inlined() {
685 return *view;
686 }
687 let view_ref = view.as_view();
688 match self {
689 Self::Shift {
690 buffer_offset,
691 offsets,
692 } => {
693 let b_idx = view_ref.buffer_index();
694 let offset_shift = offsets
695 .as_ref()
696 .map(|o| o[b_idx as usize])
697 .unwrap_or_default();
698 view_ref
699 .with_buffer_and_offset(b_idx + buffer_offset, view_ref.offset() - offset_shift)
700 }
701 Self::Lookup {
702 buffer_lookup,
703 offsets,
704 } => {
705 let b_idx = view_ref.buffer_index();
706 let buffer = buffer_lookup[b_idx as usize];
707 let offset_shift = offsets
708 .as_ref()
709 .map(|o| o[b_idx as usize])
710 .unwrap_or_default();
711 view_ref.with_buffer_and_offset(buffer, view_ref.offset() - offset_shift)
712 }
713 }
714 .into()
715 }
716}
717
718struct RewritingViewAdjustment {
719 buffer_lookup: Vec<Option<u32>>,
720 offsets: Option<Vec<u32>>,
721}
722
723impl RewritingViewAdjustment {
724 fn adjust_view(&self, view: &BinaryView) -> Option<BinaryView> {
727 if view.is_inlined() {
728 return Some(*view);
729 }
730
731 let view_ref = view.as_view();
732 self.buffer_lookup[view_ref.buffer_index() as usize].map(|buffer| {
733 let offset_shift = self
734 .offsets
735 .as_ref()
736 .map(|o| o[view_ref.buffer_index() as usize])
737 .unwrap_or_default();
738 view_ref
739 .with_buffer_and_offset(buffer, view_ref.offset() - offset_shift)
740 .into()
741 })
742 }
743}
744
745#[cfg(test)]
746mod tests {
747 use std::str::from_utf8;
748
749 use itertools::Itertools;
750 use vortex_dtype::{DType, Nullability};
751
752 use crate::accessor::ArrayAccessor;
753 use crate::arrays::VarBinViewVTable;
754 use crate::builders::{ArrayBuilder, VarBinViewBuilder};
755
756 #[test]
757 fn test_utf8_builder() {
758 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
759
760 builder.append_value("Hello");
761 builder.append_null();
762 builder.append_value("World");
763
764 builder.append_nulls(2);
765
766 builder.append_zeros(2);
767 builder.append_value("test");
768
769 let arr = builder.finish();
770
771 let arr = arr
772 .as_::<VarBinViewVTable>()
773 .with_iterator(|iter| {
774 iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
775 .collect_vec()
776 })
777 .unwrap();
778 assert_eq!(arr.len(), 8);
779 assert_eq!(
780 arr,
781 vec![
782 Some("Hello".to_string()),
783 None,
784 Some("World".to_string()),
785 None,
786 None,
787 Some("".to_string()),
788 Some("".to_string()),
789 Some("test".to_string()),
790 ]
791 );
792 }
793
794 #[test]
795 fn test_utf8_builder_with_extend() {
796 let array = {
797 let mut builder =
798 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
799 builder.append_null();
800 builder.append_value("Hello2");
801 builder.finish()
802 };
803 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
804
805 builder.append_value("Hello1");
806 builder.extend_from_array(&array);
807 builder.append_nulls(2);
808 builder.append_value("Hello3");
809
810 let arr = builder.finish_into_canonical().into_varbinview();
811
812 let arr = arr
813 .with_iterator(|iter| {
814 iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
815 .collect_vec()
816 })
817 .unwrap();
818 assert_eq!(arr.len(), 6);
819 assert_eq!(
820 arr,
821 vec![
822 Some("Hello1".to_string()),
823 None,
824 Some("Hello2".to_string()),
825 None,
826 None,
827 Some("Hello3".to_string()),
828 ]
829 );
830 }
831
832 #[test]
833 fn test_buffer_deduplication() {
834 let array = {
835 let mut builder =
836 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
837 builder.append_value("This is a long string that should not be inlined");
838 builder.append_value("short string");
839 builder.finish_into_varbinview()
840 };
841
842 assert_eq!(array.buffers().len(), 1);
843 let mut builder =
844 VarBinViewBuilder::with_buffer_deduplication(DType::Utf8(Nullability::Nullable), 10);
845
846 array.append_to_builder(&mut builder);
847 assert_eq!(builder.completed_block_count(), 1);
848
849 array.slice(1..2).append_to_builder(&mut builder);
850 array.slice(0..1).append_to_builder(&mut builder);
851 assert_eq!(builder.completed_block_count(), 1);
852
853 let array2 = {
854 let mut builder =
855 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
856 builder.append_value("This is a long string that should not be inlined");
857 builder.finish_into_varbinview()
858 };
859
860 array2.append_to_builder(&mut builder);
861 assert_eq!(builder.completed_block_count(), 2);
862
863 array.slice(0..1).append_to_builder(&mut builder);
864 array2.slice(0..1).append_to_builder(&mut builder);
865 assert_eq!(builder.completed_block_count(), 2);
866 }
867
868 #[test]
869 fn test_append_scalar() {
870 use vortex_scalar::Scalar;
871
872 let mut utf8_builder =
874 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
875
876 let utf8_scalar1 = Scalar::utf8("hello", Nullability::Nullable);
878 utf8_builder.append_scalar(&utf8_scalar1).unwrap();
879
880 let utf8_scalar2 = Scalar::utf8("world", Nullability::Nullable);
882 utf8_builder.append_scalar(&utf8_scalar2).unwrap();
883
884 let null_scalar = Scalar::null(DType::Utf8(Nullability::Nullable));
886 utf8_builder.append_scalar(&null_scalar).unwrap();
887
888 let array = utf8_builder.finish();
889 assert_eq!(array.len(), 3);
890
891 use crate::array::Array;
893 let scalar0 = array.scalar_at(0).as_utf8().value();
894 assert_eq!(scalar0.as_ref().map(|s| s.as_str()), Some("hello"));
895
896 let scalar1 = array.scalar_at(1).as_utf8().value();
897 assert_eq!(scalar1.as_ref().map(|s| s.as_str()), Some("world"));
898
899 let scalar2 = array.scalar_at(2).as_utf8().value();
900 assert_eq!(scalar2, None); let mut binary_builder =
904 VarBinViewBuilder::with_capacity(DType::Binary(Nullability::Nullable), 10);
905
906 let binary_scalar = Scalar::binary(vec![1u8, 2, 3], Nullability::Nullable);
907 binary_builder.append_scalar(&binary_scalar).unwrap();
908
909 let binary_null = Scalar::null(DType::Binary(Nullability::Nullable));
910 binary_builder.append_scalar(&binary_null).unwrap();
911
912 let binary_array = binary_builder.finish();
913 assert_eq!(binary_array.len(), 2);
914
915 let binary0 = binary_array.scalar_at(0).as_binary().value();
917 assert_eq!(
918 binary0.as_ref().map(|b| b.as_slice()),
919 Some(&[1u8, 2, 3][..])
920 );
921
922 let binary1 = binary_array.scalar_at(1).as_binary().value();
923 assert_eq!(binary1, None); let mut builder =
927 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::NonNullable), 10);
928 let wrong_scalar = Scalar::from(42i32);
929 assert!(builder.append_scalar(&wrong_scalar).is_err());
930 }
931
932 #[test]
933 fn test_buffer_growth_strategies() {
934 use super::BufferGrowthStrategy;
935
936 let mut strategy = BufferGrowthStrategy::fixed(1024);
938
939 assert_eq!(strategy.next_size(), 1024);
941 assert_eq!(strategy.next_size(), 1024);
942 assert_eq!(strategy.next_size(), 1024);
943
944 let mut strategy = BufferGrowthStrategy::exponential(1024, 8192);
946
947 assert_eq!(strategy.next_size(), 1024); assert_eq!(strategy.next_size(), 2048); assert_eq!(strategy.next_size(), 4096); assert_eq!(strategy.next_size(), 8192); assert_eq!(strategy.next_size(), 8192); }
954
955 #[test]
956 fn test_large_value_allocation() {
957 use super::{BufferGrowthStrategy, VarBinViewBuilder};
958
959 let mut builder = VarBinViewBuilder::new(
960 DType::Binary(Nullability::Nullable),
961 10,
962 Default::default(),
963 BufferGrowthStrategy::exponential(1024, 4096),
964 0.0,
965 );
966
967 let large_value = vec![0u8; 8192];
969
970 builder.append_value(&large_value);
972
973 let array = builder.finish_into_varbinview();
974 assert_eq!(array.len(), 1);
975
976 let retrieved = array.scalar_at(0).as_binary().value().unwrap();
978 assert_eq!(retrieved.len(), 8192);
979 assert_eq!(retrieved.as_slice(), &large_value);
980 }
981}