1use std::any::Any;
5use std::cmp::max;
6use std::sync::Arc;
7
8use vortex_buffer::{Buffer, BufferMut, ByteBuffer, ByteBufferMut};
9use vortex_dtype::{DType, Nullability};
10use vortex_error::{VortexExpect, VortexResult};
11use vortex_mask::Mask;
12use vortex_utils::aliases::hash_map::{Entry, HashMap};
13
14use crate::arrays::{BinaryView, VarBinViewArray};
15use crate::builders::ArrayBuilder;
16use crate::builders::lazy_validity_builder::LazyNullBufferBuilder;
17use crate::{Array, ArrayRef, IntoArray, ToCanonical};
18
19pub struct VarBinViewBuilder {
20 views_builder: BufferMut<BinaryView>,
21 pub null_buffer_builder: LazyNullBufferBuilder,
22 completed: CompletedBuffers,
23 in_progress: ByteBufferMut,
24 nullability: Nullability,
25 dtype: DType,
26}
27
28impl VarBinViewBuilder {
29 const BLOCK_SIZE: u32 = 8 * 8 * 1024;
31
32 pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
33 Self::new(dtype, capacity, Default::default())
34 }
35
36 pub fn with_buffer_deduplication(dtype: DType, capacity: usize) -> Self {
37 Self::new(
38 dtype,
39 capacity,
40 CompletedBuffers::Deduplicated(Default::default()),
41 )
42 }
43
44 fn new(dtype: DType, capacity: usize, completed: CompletedBuffers) -> Self {
45 assert!(
46 matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
47 "VarBinViewBuilder DType must be Utf8 or Binary."
48 );
49 Self {
50 views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
51 null_buffer_builder: LazyNullBufferBuilder::new(capacity),
52 completed,
53 in_progress: ByteBufferMut::empty(),
54 nullability: dtype.nullability(),
55 dtype,
56 }
57 }
58
59 fn append_value_view(&mut self, value: &[u8]) {
60 let length =
61 u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
62 if length <= 12 {
63 self.views_builder.push(BinaryView::make_view(value, 0, 0));
64 return;
65 }
66
67 let required_cap = self.in_progress.len() + value.len();
68 if self.in_progress.capacity() < required_cap {
69 self.flush_in_progress();
70 let to_reserve = max(value.len(), VarBinViewBuilder::BLOCK_SIZE as usize);
71 self.in_progress.reserve(to_reserve);
72 };
73
74 let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
75 self.in_progress.extend_from_slice(value);
76 let view = BinaryView::make_view(
77 value,
78 self.completed.len(),
80 offset,
81 );
82 self.views_builder.push(view);
83 }
84
85 #[inline]
86 pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
87 self.append_value_view(value.as_ref());
88 self.null_buffer_builder.append_non_null();
89 }
90
91 #[inline]
92 pub fn append_option<S: AsRef<[u8]>>(&mut self, value: Option<S>) {
93 match value {
94 Some(value) => self.append_value(value),
95 None => self.append_null(),
96 }
97 }
98
99 #[inline]
100 fn flush_in_progress(&mut self) {
101 if self.in_progress.is_empty() {
102 return;
103 }
104 let block = std::mem::take(&mut self.in_progress).freeze();
105
106 assert!(block.len() < u32::MAX as usize, "Block too large");
107
108 let initial_len = self.completed.len();
109 self.completed.push(block);
110 assert_eq!(
111 self.completed.len(),
112 initial_len + 1,
113 "Invalid state, just completed block already exists"
114 );
115 }
116
117 pub fn completed_block_count(&self) -> u32 {
118 self.completed.len()
119 }
120
121 pub fn push_buffer_and_adjusted_views(
130 &mut self,
131 buffer: &[ByteBuffer],
132 views: &Buffer<BinaryView>,
133 validity_mask: Mask,
134 ) {
135 self.flush_in_progress();
136
137 let expected_completed_len = self.completed.len() as usize + buffer.len();
138 self.completed.extend_from_slice(buffer);
139 assert_eq!(
140 self.completed.len() as usize,
141 expected_completed_len,
142 "Some buffers already exist",
143 );
144 self.views_builder.extend_trusted(views.iter().copied());
145 self.push_only_validity_mask(validity_mask);
146
147 debug_assert_eq!(self.null_buffer_builder.len(), self.views_builder.len())
148 }
149
150 pub fn finish_into_varbinview(&mut self) -> VarBinViewArray {
151 self.flush_in_progress();
152 let buffers = std::mem::take(&mut self.completed);
153
154 assert_eq!(
155 self.views_builder.len(),
156 self.null_buffer_builder.len(),
157 "View and validity length must match"
158 );
159
160 let validity = self
161 .null_buffer_builder
162 .finish_with_nullability(self.nullability);
163
164 unsafe {
166 VarBinViewArray::new_unchecked(
167 std::mem::take(&mut self.views_builder).freeze(),
168 buffers.finish(),
169 std::mem::replace(&mut self.dtype, DType::Null),
170 validity,
171 )
172 }
173 }
174}
175
176impl VarBinViewBuilder {
177 fn push_only_validity_mask(&mut self, validity_mask: Mask) {
179 self.null_buffer_builder.append_validity_mask(validity_mask);
180 }
181}
182
183impl ArrayBuilder for VarBinViewBuilder {
184 fn as_any(&self) -> &dyn Any {
185 self
186 }
187
188 fn as_any_mut(&mut self) -> &mut dyn Any {
189 self
190 }
191
192 #[inline]
193 fn dtype(&self) -> &DType {
194 &self.dtype
195 }
196
197 #[inline]
198 fn len(&self) -> usize {
199 self.null_buffer_builder.len()
200 }
201
202 #[inline]
203 fn append_zeros(&mut self, n: usize) {
204 self.views_builder.push_n(BinaryView::empty_view(), n);
205 self.null_buffer_builder.append_n_non_nulls(n);
206 }
207
208 #[inline]
209 fn append_nulls(&mut self, n: usize) {
210 self.views_builder.push_n(BinaryView::empty_view(), n);
211 self.null_buffer_builder.append_n_nulls(n);
212 }
213
214 #[inline]
215 fn extend_from_array(&mut self, array: &dyn Array) -> VortexResult<()> {
216 let array = array.to_varbinview()?;
217 self.flush_in_progress();
218
219 let new_indices = self.completed.extend_from_slice(array.buffers());
220
221 match new_indices {
222 NewIndices::ConstantOffset(offset) => {
223 self.views_builder
224 .extend_trusted(array.views().iter().map(|view| view.offset_view(offset)));
225 }
226 NewIndices::LookupArray(lookup) => {
227 self.views_builder
228 .extend_trusted(array.views().iter().map(|view| {
229 if view.is_inlined() {
230 *view
231 } else {
232 let new_buffer_idx = lookup[view.as_view().buffer_index() as usize];
233 view.with_buffer_idx(new_buffer_idx)
234 }
235 }));
236 }
237 }
238
239 self.push_only_validity_mask(array.validity_mask());
240
241 Ok(())
242 }
243
244 fn ensure_capacity(&mut self, capacity: usize) {
245 if capacity > self.views_builder.capacity() {
246 self.views_builder
247 .reserve(capacity - self.views_builder.len());
248 self.null_buffer_builder.ensure_capacity(capacity);
249 }
250 }
251
252 fn set_validity(&mut self, validity: Mask) {
253 self.null_buffer_builder = LazyNullBufferBuilder::new(validity.len());
254 self.null_buffer_builder.append_validity_mask(validity);
255 }
256
257 fn finish(&mut self) -> ArrayRef {
258 self.finish_into_varbinview().into_array()
259 }
260}
261
262enum CompletedBuffers {
263 Default(Vec<ByteBuffer>),
264 Deduplicated(DeduplicatedBuffers),
265}
266
267impl Default for CompletedBuffers {
268 fn default() -> Self {
269 Self::Default(Vec::new())
270 }
271}
272
273#[allow(clippy::cast_possible_truncation)]
275impl CompletedBuffers {
276 fn len(&self) -> u32 {
277 match self {
278 Self::Default(buffers) => buffers.len() as u32,
279 Self::Deduplicated(buffers) => buffers.len(),
280 }
281 }
282
283 fn push(&mut self, block: ByteBuffer) -> u32 {
284 match self {
285 Self::Default(buffers) => {
286 assert!(buffers.len() < u32::MAX as usize, "Too many blocks");
287 buffers.push(block);
288 self.len()
289 }
290 Self::Deduplicated(buffers) => buffers.push(block),
291 }
292 }
293
294 fn extend_from_slice(&mut self, new_buffers: &[ByteBuffer]) -> NewIndices {
295 match self {
296 Self::Default(buffers) => {
297 let offset = buffers.len() as u32;
298 buffers.extend_from_slice(new_buffers);
299 NewIndices::ConstantOffset(offset)
300 }
301 Self::Deduplicated(buffers) => {
302 NewIndices::LookupArray(buffers.extend_from_slice(new_buffers))
303 }
304 }
305 }
306
307 fn finish(self) -> Arc<[ByteBuffer]> {
308 match self {
309 Self::Default(buffers) => Arc::from(buffers),
310 Self::Deduplicated(buffers) => buffers.finish(),
311 }
312 }
313}
314
315enum NewIndices {
316 ConstantOffset(u32),
318 LookupArray(Vec<u32>),
320}
321
322#[derive(Default)]
323struct DeduplicatedBuffers {
324 buffers: Vec<ByteBuffer>,
325 buffer_to_idx: HashMap<BufferId, u32>,
326}
327
328impl DeduplicatedBuffers {
329 #[allow(clippy::cast_possible_truncation)]
331 fn len(&self) -> u32 {
332 self.buffers.len() as u32
333 }
334
335 fn push(&mut self, block: ByteBuffer) -> u32 {
337 assert!(self.buffers.len() < u32::MAX as usize, "Too many blocks");
338
339 let initial_len = self.len();
340 let id = BufferId::from(&block);
341 match self.buffer_to_idx.entry(id) {
342 Entry::Occupied(idx) => *idx.get(),
343 Entry::Vacant(entry) => {
344 let idx = initial_len;
345 entry.insert(idx);
346 self.buffers.push(block);
347 idx
348 }
349 }
350 }
351
352 fn extend_from_slice(&mut self, buffers: &[ByteBuffer]) -> Vec<u32> {
353 buffers
354 .iter()
355 .map(|buffer| self.push(buffer.clone()))
356 .collect()
357 }
358
359 fn finish(self) -> Arc<[ByteBuffer]> {
360 Arc::from(self.buffers)
361 }
362}
363
364#[derive(PartialEq, Eq, Hash)]
365struct BufferId {
366 ptr: usize,
368 len: usize,
369}
370
371impl BufferId {
372 fn from(buffer: &ByteBuffer) -> Self {
373 let slice = buffer.as_slice();
374 Self {
375 ptr: slice.as_ptr() as usize,
376 len: slice.len(),
377 }
378 }
379}
380
381#[cfg(test)]
382mod tests {
383 use std::str::from_utf8;
384
385 use itertools::Itertools;
386 use vortex_dtype::{DType, Nullability};
387
388 use crate::ToCanonical;
389 use crate::accessor::ArrayAccessor;
390 use crate::arrays::VarBinViewVTable;
391 use crate::builders::{ArrayBuilder, VarBinViewBuilder};
392
393 #[test]
394 fn test_utf8_builder() {
395 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
396
397 builder.append_option(Some("Hello"));
398 builder.append_option::<&str>(None);
399 builder.append_value("World");
400
401 builder.append_nulls(2);
402
403 builder.append_zeros(2);
404 builder.append_value("test");
405
406 let arr = builder.finish();
407
408 let arr = arr
409 .as_::<VarBinViewVTable>()
410 .with_iterator(|iter| {
411 iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
412 .collect_vec()
413 })
414 .unwrap();
415 assert_eq!(arr.len(), 8);
416 assert_eq!(
417 arr,
418 vec![
419 Some("Hello".to_string()),
420 None,
421 Some("World".to_string()),
422 None,
423 None,
424 Some("".to_string()),
425 Some("".to_string()),
426 Some("test".to_string()),
427 ]
428 );
429 }
430
431 #[test]
432 fn test_utf8_builder_with_extend() {
433 let array = {
434 let mut builder =
435 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
436 builder.append_null();
437 builder.append_value("Hello2");
438 builder.finish()
439 };
440 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
441
442 builder.append_option(Some("Hello1"));
443 builder.extend_from_array(&array).unwrap();
444 builder.append_nulls(2);
445 builder.append_value("Hello3");
446
447 let arr = builder.finish().to_varbinview().unwrap();
448
449 let arr = arr
450 .with_iterator(|iter| {
451 iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
452 .collect_vec()
453 })
454 .unwrap();
455 assert_eq!(arr.len(), 6);
456 assert_eq!(
457 arr,
458 vec![
459 Some("Hello1".to_string()),
460 None,
461 Some("Hello2".to_string()),
462 None,
463 None,
464 Some("Hello3".to_string()),
465 ]
466 );
467 }
468
469 #[test]
470 fn test_buffer_deduplication() {
471 let array = {
472 let mut builder =
473 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
474 builder.append_value("This is a long string that should not be inlined");
475 builder.append_value("short string");
476 builder.finish_into_varbinview()
477 };
478
479 assert_eq!(array.buffers().len(), 1);
480 let mut builder =
481 VarBinViewBuilder::with_buffer_deduplication(DType::Utf8(Nullability::Nullable), 10);
482
483 array.append_to_builder(&mut builder).unwrap();
484 assert_eq!(builder.completed_block_count(), 1);
485
486 array.slice(1..2).append_to_builder(&mut builder).unwrap();
487 array.slice(0..1).append_to_builder(&mut builder).unwrap();
488 assert_eq!(builder.completed_block_count(), 1);
489
490 let array2 = {
491 let mut builder =
492 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
493 builder.append_value("This is a long string that should not be inlined");
494 builder.finish_into_varbinview()
495 };
496
497 array2.append_to_builder(&mut builder).unwrap();
498 assert_eq!(builder.completed_block_count(), 2);
499
500 array.slice(0..1).append_to_builder(&mut builder).unwrap();
501 array2.slice(0..1).append_to_builder(&mut builder).unwrap();
502 assert_eq!(builder.completed_block_count(), 2);
503 }
504}