1use std::any::Any;
7use std::cmp::max;
8use std::sync::Arc;
9
10use vortex_buffer::{Buffer, BufferMut, ByteBuffer, ByteBufferMut};
11use vortex_dtype::DType;
12use vortex_error::VortexExpect;
13use vortex_mask::Mask;
14use vortex_utils::aliases::hash_map::{Entry, HashMap};
15
16use crate::arrays::{BinaryView, VarBinViewArray};
17use crate::builders::{ArrayBuilder, LazyNullBufferBuilder};
18use crate::canonical::{Canonical, ToCanonical};
19use crate::{Array, ArrayRef, IntoArray};
20
21pub struct VarBinViewBuilder {
23 dtype: DType,
24 views_builder: BufferMut<BinaryView>,
25 nulls: LazyNullBufferBuilder,
26 completed: CompletedBuffers,
27 in_progress: ByteBufferMut,
28}
29
30impl VarBinViewBuilder {
31 const BLOCK_SIZE: u32 = 8 * 8 * 1024;
33
34 pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
35 Self::new(dtype, capacity, Default::default())
36 }
37
38 pub fn with_buffer_deduplication(dtype: DType, capacity: usize) -> Self {
39 Self::new(
40 dtype,
41 capacity,
42 CompletedBuffers::Deduplicated(Default::default()),
43 )
44 }
45
46 fn new(dtype: DType, capacity: usize, completed: CompletedBuffers) -> Self {
47 assert!(
48 matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
49 "VarBinViewBuilder DType must be Utf8 or Binary."
50 );
51 Self {
52 views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
53 nulls: LazyNullBufferBuilder::new(capacity),
54 completed,
55 in_progress: ByteBufferMut::empty(),
56 dtype,
57 }
58 }
59
60 fn append_value_view(&mut self, value: &[u8]) {
61 let length =
62 u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
63 if length <= 12 {
64 self.views_builder.push(BinaryView::make_view(value, 0, 0));
65 return;
66 }
67
68 let required_cap = self.in_progress.len() + value.len();
69 if self.in_progress.capacity() < required_cap {
70 self.flush_in_progress();
71 let to_reserve = max(value.len(), VarBinViewBuilder::BLOCK_SIZE as usize);
72 self.in_progress.reserve(to_reserve);
73 };
74
75 let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
76 self.in_progress.extend_from_slice(value);
77 let view = BinaryView::make_view(
78 value,
79 self.completed.len(),
81 offset,
82 );
83 self.views_builder.push(view);
84 }
85
86 pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
88 self.append_value_view(value.as_ref());
89 self.nulls.append_non_null();
90 }
91
92 pub fn append_option<S: AsRef<[u8]>>(&mut self, value: Option<S>) {
101 match value {
102 Some(value) => self.append_value(value),
103 None => self.append_null(),
104 }
105 }
106
107 fn flush_in_progress(&mut self) {
108 if self.in_progress.is_empty() {
109 return;
110 }
111 let block = std::mem::take(&mut self.in_progress).freeze();
112
113 assert!(block.len() < u32::MAX as usize, "Block too large");
114
115 let initial_len = self.completed.len();
116 self.completed.push(block);
117 assert_eq!(
118 self.completed.len(),
119 initial_len + 1,
120 "Invalid state, just completed block already exists"
121 );
122 }
123
124 pub fn completed_block_count(&self) -> u32 {
125 self.completed.len()
126 }
127
128 pub fn push_buffer_and_adjusted_views(
137 &mut self,
138 buffer: &[ByteBuffer],
139 views: &Buffer<BinaryView>,
140 validity_mask: Mask,
141 ) {
142 self.flush_in_progress();
143
144 let expected_completed_len = self.completed.len() as usize + buffer.len();
145 self.completed.extend_from_slice(buffer);
146 assert_eq!(
147 self.completed.len() as usize,
148 expected_completed_len,
149 "Some buffers already exist",
150 );
151 self.views_builder.extend_trusted(views.iter().copied());
152 self.push_only_validity_mask(validity_mask);
153
154 debug_assert_eq!(self.nulls.len(), self.views_builder.len())
155 }
156
157 pub fn finish_into_varbinview(&mut self) -> VarBinViewArray {
159 self.flush_in_progress();
160 let buffers = std::mem::take(&mut self.completed);
161
162 assert_eq!(
163 self.views_builder.len(),
164 self.nulls.len(),
165 "View and validity length must match"
166 );
167
168 let validity = self.nulls.finish_with_nullability(self.dtype.nullability());
169
170 unsafe {
172 VarBinViewArray::new_unchecked(
173 std::mem::take(&mut self.views_builder).freeze(),
174 buffers.finish(),
175 std::mem::replace(&mut self.dtype, DType::Null),
176 validity,
177 )
178 }
179 }
180}
181
182impl VarBinViewBuilder {
183 fn push_only_validity_mask(&mut self, validity_mask: Mask) {
185 self.nulls.append_validity_mask(validity_mask);
186 }
187}
188
189impl ArrayBuilder for VarBinViewBuilder {
190 fn as_any(&self) -> &dyn Any {
191 self
192 }
193
194 fn as_any_mut(&mut self) -> &mut dyn Any {
195 self
196 }
197
198 fn dtype(&self) -> &DType {
199 &self.dtype
200 }
201
202 fn len(&self) -> usize {
203 self.nulls.len()
204 }
205
206 fn append_zeros(&mut self, n: usize) {
207 self.views_builder.push_n(BinaryView::empty_view(), n);
208 self.nulls.append_n_non_nulls(n);
209 }
210
211 unsafe fn append_nulls_unchecked(&mut self, n: usize) {
212 self.views_builder.push_n(BinaryView::empty_view(), n);
213 self.nulls.append_n_nulls(n);
214 }
215
216 unsafe fn extend_from_array_unchecked(&mut self, array: &dyn Array) {
217 let array = array.to_varbinview();
218 self.flush_in_progress();
219
220 let new_indices = self.completed.extend_from_slice(array.buffers());
221
222 match new_indices {
223 NewIndices::ConstantOffset(offset) => {
224 self.views_builder
225 .extend_trusted(array.views().iter().map(|view| view.offset_view(offset)));
226 }
227 NewIndices::LookupArray(lookup) => {
228 self.views_builder
229 .extend_trusted(array.views().iter().map(|view| {
230 if view.is_inlined() {
231 *view
232 } else {
233 let new_buffer_idx = lookup[view.as_view().buffer_index() as usize];
234 view.with_buffer_idx(new_buffer_idx)
235 }
236 }));
237 }
238 }
239
240 self.push_only_validity_mask(array.validity_mask());
241 }
242
243 fn ensure_capacity(&mut self, capacity: usize) {
244 if capacity > self.views_builder.capacity() {
245 self.views_builder
246 .reserve(capacity - self.views_builder.len());
247 self.nulls.ensure_capacity(capacity);
248 }
249 }
250
251 fn set_validity(&mut self, validity: Mask) {
252 self.nulls = LazyNullBufferBuilder::new(validity.len());
253 self.nulls.append_validity_mask(validity);
254 }
255
256 fn finish(&mut self) -> ArrayRef {
257 self.finish_into_varbinview().into_array()
258 }
259
260 fn finish_into_canonical(&mut self) -> Canonical {
261 Canonical::VarBinView(self.finish_into_varbinview())
262 }
263}
264
265enum CompletedBuffers {
266 Default(Vec<ByteBuffer>),
267 Deduplicated(DeduplicatedBuffers),
268}
269
270impl Default for CompletedBuffers {
271 fn default() -> Self {
272 Self::Default(Vec::new())
273 }
274}
275
276#[allow(clippy::cast_possible_truncation)]
278impl CompletedBuffers {
279 fn len(&self) -> u32 {
280 match self {
281 Self::Default(buffers) => buffers.len() as u32,
282 Self::Deduplicated(buffers) => buffers.len(),
283 }
284 }
285
286 fn push(&mut self, block: ByteBuffer) -> u32 {
287 match self {
288 Self::Default(buffers) => {
289 assert!(buffers.len() < u32::MAX as usize, "Too many blocks");
290 buffers.push(block);
291 self.len()
292 }
293 Self::Deduplicated(buffers) => buffers.push(block),
294 }
295 }
296
297 fn extend_from_slice(&mut self, new_buffers: &[ByteBuffer]) -> NewIndices {
298 match self {
299 Self::Default(buffers) => {
300 let offset = buffers.len() as u32;
301 buffers.extend_from_slice(new_buffers);
302 NewIndices::ConstantOffset(offset)
303 }
304 Self::Deduplicated(buffers) => {
305 NewIndices::LookupArray(buffers.extend_from_slice(new_buffers))
306 }
307 }
308 }
309
310 fn finish(self) -> Arc<[ByteBuffer]> {
311 match self {
312 Self::Default(buffers) => Arc::from(buffers),
313 Self::Deduplicated(buffers) => buffers.finish(),
314 }
315 }
316}
317
318enum NewIndices {
319 ConstantOffset(u32),
321 LookupArray(Vec<u32>),
323}
324
325#[derive(Default)]
326struct DeduplicatedBuffers {
327 buffers: Vec<ByteBuffer>,
328 buffer_to_idx: HashMap<BufferId, u32>,
329}
330
331impl DeduplicatedBuffers {
332 #[allow(clippy::cast_possible_truncation)]
334 fn len(&self) -> u32 {
335 self.buffers.len() as u32
336 }
337
338 fn push(&mut self, block: ByteBuffer) -> u32 {
340 assert!(self.buffers.len() < u32::MAX as usize, "Too many blocks");
341
342 let initial_len = self.len();
343 let id = BufferId::from(&block);
344 match self.buffer_to_idx.entry(id) {
345 Entry::Occupied(idx) => *idx.get(),
346 Entry::Vacant(entry) => {
347 let idx = initial_len;
348 entry.insert(idx);
349 self.buffers.push(block);
350 idx
351 }
352 }
353 }
354
355 fn extend_from_slice(&mut self, buffers: &[ByteBuffer]) -> Vec<u32> {
356 buffers
357 .iter()
358 .map(|buffer| self.push(buffer.clone()))
359 .collect()
360 }
361
362 fn finish(self) -> Arc<[ByteBuffer]> {
363 Arc::from(self.buffers)
364 }
365}
366
367#[derive(PartialEq, Eq, Hash)]
368struct BufferId {
369 ptr: usize,
371 len: usize,
372}
373
374impl BufferId {
375 fn from(buffer: &ByteBuffer) -> Self {
376 let slice = buffer.as_slice();
377 Self {
378 ptr: slice.as_ptr() as usize,
379 len: slice.len(),
380 }
381 }
382}
383
384#[cfg(test)]
385mod tests {
386 use std::str::from_utf8;
387
388 use itertools::Itertools;
389 use vortex_dtype::{DType, Nullability};
390
391 use crate::accessor::ArrayAccessor;
392 use crate::arrays::VarBinViewVTable;
393 use crate::builders::{ArrayBuilder, VarBinViewBuilder};
394
395 #[test]
396 fn test_utf8_builder() {
397 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
398
399 builder.append_option(Some("Hello"));
400 builder.append_option::<&str>(None);
401 builder.append_value("World");
402
403 builder.append_nulls(2);
404
405 builder.append_zeros(2);
406 builder.append_value("test");
407
408 let arr = builder.finish();
409
410 let arr = arr
411 .as_::<VarBinViewVTable>()
412 .with_iterator(|iter| {
413 iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
414 .collect_vec()
415 })
416 .unwrap();
417 assert_eq!(arr.len(), 8);
418 assert_eq!(
419 arr,
420 vec![
421 Some("Hello".to_string()),
422 None,
423 Some("World".to_string()),
424 None,
425 None,
426 Some("".to_string()),
427 Some("".to_string()),
428 Some("test".to_string()),
429 ]
430 );
431 }
432
433 #[test]
434 fn test_utf8_builder_with_extend() {
435 let array = {
436 let mut builder =
437 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
438 builder.append_null();
439 builder.append_value("Hello2");
440 builder.finish()
441 };
442 let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
443
444 builder.append_option(Some("Hello1"));
445 builder.extend_from_array(&array);
446 builder.append_nulls(2);
447 builder.append_value("Hello3");
448
449 let arr = builder.finish_into_canonical().into_varbinview();
450
451 let arr = arr
452 .with_iterator(|iter| {
453 iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
454 .collect_vec()
455 })
456 .unwrap();
457 assert_eq!(arr.len(), 6);
458 assert_eq!(
459 arr,
460 vec![
461 Some("Hello1".to_string()),
462 None,
463 Some("Hello2".to_string()),
464 None,
465 None,
466 Some("Hello3".to_string()),
467 ]
468 );
469 }
470
471 #[test]
472 fn test_buffer_deduplication() {
473 let array = {
474 let mut builder =
475 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
476 builder.append_value("This is a long string that should not be inlined");
477 builder.append_value("short string");
478 builder.finish_into_varbinview()
479 };
480
481 assert_eq!(array.buffers().len(), 1);
482 let mut builder =
483 VarBinViewBuilder::with_buffer_deduplication(DType::Utf8(Nullability::Nullable), 10);
484
485 array.append_to_builder(&mut builder);
486 assert_eq!(builder.completed_block_count(), 1);
487
488 array.slice(1..2).append_to_builder(&mut builder);
489 array.slice(0..1).append_to_builder(&mut builder);
490 assert_eq!(builder.completed_block_count(), 1);
491
492 let array2 = {
493 let mut builder =
494 VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
495 builder.append_value("This is a long string that should not be inlined");
496 builder.finish_into_varbinview()
497 };
498
499 array2.append_to_builder(&mut builder);
500 assert_eq!(builder.completed_block_count(), 2);
501
502 array.slice(0..1).append_to_builder(&mut builder);
503 array2.slice(0..1).append_to_builder(&mut builder);
504 assert_eq!(builder.completed_block_count(), 2);
505 }
506}