1use std::{
18 ops::Range,
19 sync::{Arc, RwLock},
20};
21
22use arrow::array::{ArrayData, ArrayDataBuilder, AsArray};
23use arrow_array::{new_empty_array, new_null_array, Array, ArrayRef, OffsetSizeTrait, UInt64Array};
24use arrow_buffer::{
25 ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer, ScalarBuffer,
26};
27use arrow_schema::DataType;
28use lance_arrow::DataTypeExt;
29use snafu::location;
30
31use lance_core::{Error, Result};
32
33use crate::{
34 buffer::LanceBuffer,
35 statistics::{ComputeStat, Stat},
36};
37
38#[derive(Debug)]
44pub struct AllNullDataBlock {
45 pub num_values: u64,
47}
48
49impl AllNullDataBlock {
50 fn into_arrow(self, data_type: DataType, _validate: bool) -> Result<ArrayData> {
51 Ok(ArrayData::new_null(&data_type, self.num_values as usize))
52 }
53
54 fn into_buffers(self) -> Vec<LanceBuffer> {
55 vec![]
56 }
57
58 fn borrow_and_clone(&mut self) -> Self {
59 Self {
60 num_values: self.num_values,
61 }
62 }
63
64 fn try_clone(&self) -> Result<Self> {
65 Ok(Self {
66 num_values: self.num_values,
67 })
68 }
69}
70
71use std::collections::HashMap;
72
73#[derive(Debug, Clone)]
76pub struct BlockInfo(pub Arc<RwLock<HashMap<Stat, Arc<dyn Array>>>>);
77
78impl Default for BlockInfo {
79 fn default() -> Self {
80 Self::new()
81 }
82}
83
84impl BlockInfo {
85 pub fn new() -> Self {
86 Self(Arc::new(RwLock::new(HashMap::new())))
87 }
88}
89
90impl PartialEq for BlockInfo {
91 fn eq(&self, other: &Self) -> bool {
92 let self_info = self.0.read().unwrap();
93 let other_info = other.0.read().unwrap();
94 *self_info == *other_info
95 }
96}
97
98#[derive(Debug)]
104pub struct NullableDataBlock {
105 pub data: Box<DataBlock>,
107 pub nulls: LanceBuffer,
109
110 pub block_info: BlockInfo,
111}
112
113impl NullableDataBlock {
114 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
115 let nulls = self.nulls.into_buffer();
116 let data = self.data.into_arrow(data_type, validate)?.into_builder();
117 let data = data.null_bit_buffer(Some(nulls));
118 if validate {
119 Ok(data.build()?)
120 } else {
121 Ok(unsafe { data.build_unchecked() })
122 }
123 }
124
125 fn into_buffers(self) -> Vec<LanceBuffer> {
126 let mut buffers = vec![self.nulls];
127 buffers.extend(self.data.into_buffers());
128 buffers
129 }
130
131 fn borrow_and_clone(&mut self) -> Self {
132 Self {
133 data: Box::new(self.data.borrow_and_clone()),
134 nulls: self.nulls.borrow_and_clone(),
135 block_info: self.block_info.clone(),
136 }
137 }
138
139 fn try_clone(&self) -> Result<Self> {
140 Ok(Self {
141 data: Box::new(self.data.try_clone()?),
142 nulls: self.nulls.try_clone()?,
143 block_info: self.block_info.clone(),
144 })
145 }
146
147 pub fn data_size(&self) -> u64 {
148 self.data.data_size() + self.nulls.len() as u64
149 }
150}
151
152#[derive(Debug, PartialEq)]
154pub struct ConstantDataBlock {
155 pub data: LanceBuffer,
157 pub num_values: u64,
159}
160
161impl ConstantDataBlock {
162 fn into_buffers(self) -> Vec<LanceBuffer> {
163 vec![self.data]
164 }
165
166 fn into_arrow(self, _data_type: DataType, _validate: bool) -> Result<ArrayData> {
167 todo!()
170 }
171
172 pub fn borrow_and_clone(&mut self) -> Self {
173 Self {
174 data: self.data.borrow_and_clone(),
175 num_values: self.num_values,
176 }
177 }
178
179 pub fn try_clone(&self) -> Result<Self> {
180 Ok(Self {
181 data: self.data.try_clone()?,
182 num_values: self.num_values,
183 })
184 }
185
186 pub fn data_size(&self) -> u64 {
187 self.data.len() as u64
188 }
189}
190
191#[derive(Debug, PartialEq)]
193pub struct FixedWidthDataBlock {
194 pub data: LanceBuffer,
196 pub bits_per_value: u64,
198 pub num_values: u64,
200
201 pub block_info: BlockInfo,
202}
203
204impl FixedWidthDataBlock {
205 fn do_into_arrow(
206 self,
207 data_type: DataType,
208 num_values: u64,
209 validate: bool,
210 ) -> Result<ArrayData> {
211 let data_buffer = self.data.into_buffer();
212 let builder = ArrayDataBuilder::new(data_type)
213 .add_buffer(data_buffer)
214 .len(num_values as usize)
215 .null_count(0);
216 if validate {
217 Ok(builder.build()?)
218 } else {
219 Ok(unsafe { builder.build_unchecked() })
220 }
221 }
222
223 pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
224 let root_num_values = self.num_values;
225 self.do_into_arrow(data_type, root_num_values, validate)
226 }
227
228 pub fn into_buffers(self) -> Vec<LanceBuffer> {
229 vec![self.data]
230 }
231
232 pub fn borrow_and_clone(&mut self) -> Self {
233 Self {
234 data: self.data.borrow_and_clone(),
235 bits_per_value: self.bits_per_value,
236 num_values: self.num_values,
237 block_info: self.block_info.clone(),
238 }
239 }
240
241 pub fn try_clone(&self) -> Result<Self> {
242 Ok(Self {
243 data: self.data.try_clone()?,
244 bits_per_value: self.bits_per_value,
245 num_values: self.num_values,
246 block_info: self.block_info.clone(),
247 })
248 }
249
250 pub fn data_size(&self) -> u64 {
251 self.data.len() as u64
252 }
253}
254
255#[derive(Debug)]
256pub struct VariableWidthDataBlockBuilder<T: OffsetSizeTrait> {
257 offsets: Vec<T>,
258 bytes: Vec<u8>,
259}
260
261impl<T: OffsetSizeTrait> VariableWidthDataBlockBuilder<T> {
262 fn new(estimated_size_bytes: u64) -> Self {
263 Self {
264 offsets: vec![T::from_usize(0).unwrap()],
265 bytes: Vec::with_capacity(estimated_size_bytes as usize),
266 }
267 }
268}
269
270impl<T: OffsetSizeTrait> DataBlockBuilderImpl for VariableWidthDataBlockBuilder<T> {
271 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
272 let block = data_block.as_variable_width_ref().unwrap();
273 assert!(block.bits_per_offset == T::get_byte_width() as u8 * 8);
274
275 let offsets: ScalarBuffer<T> = block.offsets.try_clone().unwrap().borrow_to_typed_slice();
276
277 let start_offset = offsets[selection.start as usize];
278 let end_offset = offsets[selection.end as usize];
279 let mut previous_len = self.bytes.len();
280
281 self.bytes
282 .extend_from_slice(&block.data[start_offset.as_usize()..end_offset.as_usize()]);
283
284 self.offsets.extend(
285 offsets[selection.start as usize..selection.end as usize]
286 .iter()
287 .zip(&offsets[selection.start as usize + 1..=selection.end as usize])
288 .map(|(¤t, &next)| {
289 let this_value_len = next - current;
290 previous_len += this_value_len.as_usize();
291 T::from_usize(previous_len).unwrap()
292 }),
293 );
294 }
295
296 fn finish(self: Box<Self>) -> DataBlock {
297 let num_values = (self.offsets.len() - 1) as u64;
298 DataBlock::VariableWidth(VariableWidthBlock {
299 data: LanceBuffer::Owned(self.bytes),
300 offsets: LanceBuffer::reinterpret_vec(self.offsets),
301 bits_per_offset: T::get_byte_width() as u8 * 8,
302 num_values,
303 block_info: BlockInfo::new(),
304 })
305 }
306}
307
308#[derive(Debug)]
309struct BitmapDataBlockBuilder {
310 values: BooleanBufferBuilder,
311}
312
313impl BitmapDataBlockBuilder {
314 fn new(estimated_size_bytes: u64) -> Self {
315 Self {
316 values: BooleanBufferBuilder::new(estimated_size_bytes as usize * 8),
317 }
318 }
319}
320
321impl DataBlockBuilderImpl for BitmapDataBlockBuilder {
322 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
323 let bitmap_blk = data_block.as_fixed_width_ref().unwrap();
324 self.values.append_packed_range(
325 selection.start as usize..selection.end as usize,
326 &bitmap_blk.data,
327 );
328 }
329
330 fn finish(mut self: Box<Self>) -> DataBlock {
331 let bool_buf = self.values.finish();
332 let num_values = bool_buf.len() as u64;
333 let bits_buf = bool_buf.into_inner();
334 DataBlock::FixedWidth(FixedWidthDataBlock {
335 data: LanceBuffer::from(bits_buf),
336 bits_per_value: 1,
337 num_values,
338 block_info: BlockInfo::new(),
339 })
340 }
341}
342
343#[derive(Debug)]
344struct FixedWidthDataBlockBuilder {
345 bits_per_value: u64,
346 bytes_per_value: u64,
347 values: Vec<u8>,
348}
349
350impl FixedWidthDataBlockBuilder {
351 fn new(bits_per_value: u64, estimated_size_bytes: u64) -> Self {
352 assert!(bits_per_value % 8 == 0);
353 Self {
354 bits_per_value,
355 bytes_per_value: bits_per_value / 8,
356 values: Vec::with_capacity(estimated_size_bytes as usize),
357 }
358 }
359}
360
361impl DataBlockBuilderImpl for FixedWidthDataBlockBuilder {
362 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
363 let block = data_block.as_fixed_width_ref().unwrap();
364 assert_eq!(self.bits_per_value, block.bits_per_value);
365 let start = selection.start as usize * self.bytes_per_value as usize;
366 let end = selection.end as usize * self.bytes_per_value as usize;
367 self.values.extend_from_slice(&block.data[start..end]);
368 }
369
370 fn finish(self: Box<Self>) -> DataBlock {
371 let num_values = (self.values.len() / self.bytes_per_value as usize) as u64;
372 DataBlock::FixedWidth(FixedWidthDataBlock {
373 data: LanceBuffer::Owned(self.values),
374 bits_per_value: self.bits_per_value,
375 num_values,
376 block_info: BlockInfo::new(),
377 })
378 }
379}
380
381#[derive(Debug)]
382struct StructDataBlockBuilder {
383 children: Vec<Box<dyn DataBlockBuilderImpl>>,
384}
385
386impl StructDataBlockBuilder {
387 fn new(bits_per_values: Vec<u32>, estimated_size_bytes: u64) -> Self {
390 let mut children = vec![];
391
392 debug_assert!(bits_per_values.iter().all(|bpv| bpv % 8 == 0));
393
394 let bytes_per_row: u32 = bits_per_values.iter().sum::<u32>() / 8;
395 let bytes_per_row = bytes_per_row as u64;
396
397 for bits_per_value in bits_per_values.iter() {
398 let this_estimated_size_bytes =
399 estimated_size_bytes / bytes_per_row * (*bits_per_value as u64) / 8;
400 let child =
401 FixedWidthDataBlockBuilder::new(*bits_per_value as u64, this_estimated_size_bytes);
402 children.push(Box::new(child) as Box<dyn DataBlockBuilderImpl>);
403 }
404 Self { children }
405 }
406}
407
408impl DataBlockBuilderImpl for StructDataBlockBuilder {
409 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
410 let data_block = data_block.as_struct_ref().unwrap();
411 for i in 0..self.children.len() {
412 self.children[i].append(&data_block.children[i], selection.clone());
413 }
414 }
415
416 fn finish(self: Box<Self>) -> DataBlock {
417 let mut children_data_block = Vec::new();
418 for child in self.children {
419 let child_data_block = child.finish();
420 children_data_block.push(child_data_block);
421 }
422 DataBlock::Struct(StructDataBlock {
423 children: children_data_block,
424 block_info: BlockInfo::new(),
425 validity: None,
426 })
427 }
428}
429#[derive(Debug)]
431pub struct FixedSizeListBlock {
432 pub child: Box<DataBlock>,
434 pub dimension: u64,
436}
437
438impl FixedSizeListBlock {
439 fn borrow_and_clone(&mut self) -> Self {
440 Self {
441 child: Box::new(self.child.borrow_and_clone()),
442 dimension: self.dimension,
443 }
444 }
445
446 fn try_clone(&self) -> Result<Self> {
447 Ok(Self {
448 child: Box::new(self.child.try_clone()?),
449 dimension: self.dimension,
450 })
451 }
452
453 pub fn num_values(&self) -> u64 {
454 self.child.num_values() / self.dimension
455 }
456
457 pub fn try_into_flat(self) -> Option<FixedWidthDataBlock> {
461 match *self.child {
462 DataBlock::Nullable(_) => None,
464 DataBlock::FixedSizeList(inner) => {
465 let mut flat = inner.try_into_flat()?;
466 flat.bits_per_value *= self.dimension;
467 flat.num_values /= self.dimension;
468 Some(flat)
469 }
470 DataBlock::FixedWidth(mut inner) => {
471 inner.bits_per_value *= self.dimension;
472 inner.num_values /= self.dimension;
473 Some(inner)
474 }
475 _ => panic!(
476 "Expected FixedSizeList or FixedWidth data block but found {:?}",
477 self
478 ),
479 }
480 }
481
482 pub fn flatten_as_fixed(&mut self) -> FixedWidthDataBlock {
483 match self.child.as_mut() {
484 DataBlock::FixedSizeList(fsl) => fsl.flatten_as_fixed(),
485 DataBlock::FixedWidth(fw) => fw.borrow_and_clone(),
486 _ => panic!("Expected FixedSizeList or FixedWidth data block"),
487 }
488 }
489
490 pub fn from_flat(data: FixedWidthDataBlock, data_type: &DataType) -> DataBlock {
492 match data_type {
493 DataType::FixedSizeList(child_field, dimension) => {
494 let mut data = data;
495 data.bits_per_value /= *dimension as u64;
496 data.num_values *= *dimension as u64;
497 let child_data = Self::from_flat(data, child_field.data_type());
498 DataBlock::FixedSizeList(Self {
499 child: Box::new(child_data),
500 dimension: *dimension as u64,
501 })
502 }
503 _ => DataBlock::FixedWidth(data),
505 }
506 }
507
508 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
509 let num_values = self.num_values();
510 let builder = match &data_type {
511 DataType::FixedSizeList(child_field, _) => {
512 let child_data = self
513 .child
514 .into_arrow(child_field.data_type().clone(), validate)?;
515 ArrayDataBuilder::new(data_type)
516 .add_child_data(child_data)
517 .len(num_values as usize)
518 .null_count(0)
519 }
520 _ => panic!("Expected FixedSizeList data type and got {:?}", data_type),
521 };
522 if validate {
523 Ok(builder.build()?)
524 } else {
525 Ok(unsafe { builder.build_unchecked() })
526 }
527 }
528
529 fn into_buffers(self) -> Vec<LanceBuffer> {
530 self.child.into_buffers()
531 }
532
533 fn data_size(&self) -> u64 {
534 self.child.data_size()
535 }
536}
537
538#[derive(Debug)]
539struct FixedSizeListBlockBuilder {
540 inner: Box<dyn DataBlockBuilderImpl>,
541 dimension: u64,
542}
543
544impl FixedSizeListBlockBuilder {
545 fn new(inner: Box<dyn DataBlockBuilderImpl>, dimension: u64) -> Self {
546 Self { inner, dimension }
547 }
548}
549
550impl DataBlockBuilderImpl for FixedSizeListBlockBuilder {
551 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
552 let selection = selection.start * self.dimension..selection.end * self.dimension;
553 let fsl = data_block.as_fixed_size_list_ref().unwrap();
554 self.inner.append(fsl.child.as_ref(), selection);
555 }
556
557 fn finish(self: Box<Self>) -> DataBlock {
558 let inner_block = self.inner.finish();
559 DataBlock::FixedSizeList(FixedSizeListBlock {
560 child: Box::new(inner_block),
561 dimension: self.dimension,
562 })
563 }
564}
565
566#[derive(Debug)]
567struct NullableDataBlockBuilder {
568 inner: Box<dyn DataBlockBuilderImpl>,
569 validity: BooleanBufferBuilder,
570}
571
572impl NullableDataBlockBuilder {
573 fn new(inner: Box<dyn DataBlockBuilderImpl>, estimated_size_bytes: usize) -> Self {
574 Self {
575 inner,
576 validity: BooleanBufferBuilder::new(estimated_size_bytes * 8),
577 }
578 }
579}
580
581impl DataBlockBuilderImpl for NullableDataBlockBuilder {
582 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
583 let nullable = data_block.as_nullable_ref().unwrap();
584 let bool_buf = BooleanBuffer::new(
585 nullable.nulls.try_clone().unwrap().into_buffer(),
586 selection.start as usize,
587 (selection.end - selection.start) as usize,
588 );
589 self.validity.append_buffer(&bool_buf);
590 self.inner.append(nullable.data.as_ref(), selection);
591 }
592
593 fn finish(mut self: Box<Self>) -> DataBlock {
594 let inner_block = self.inner.finish();
595 DataBlock::Nullable(NullableDataBlock {
596 data: Box::new(inner_block),
597 nulls: LanceBuffer::Borrowed(self.validity.finish().into_inner()),
598 block_info: BlockInfo::new(),
599 })
600 }
601}
602
603#[derive(Debug)]
607pub struct OpaqueBlock {
608 pub buffers: Vec<LanceBuffer>,
609 pub num_values: u64,
610 pub block_info: BlockInfo,
611}
612
613impl OpaqueBlock {
614 fn borrow_and_clone(&mut self) -> Self {
615 Self {
616 buffers: self
617 .buffers
618 .iter_mut()
619 .map(|b| b.borrow_and_clone())
620 .collect(),
621 num_values: self.num_values,
622 block_info: self.block_info.clone(),
623 }
624 }
625
626 fn try_clone(&self) -> Result<Self> {
627 Ok(Self {
628 buffers: self
629 .buffers
630 .iter()
631 .map(|b| b.try_clone())
632 .collect::<Result<_>>()?,
633 num_values: self.num_values,
634 block_info: self.block_info.clone(),
635 })
636 }
637
638 pub fn data_size(&self) -> u64 {
639 self.buffers.iter().map(|b| b.len() as u64).sum()
640 }
641}
642
643#[derive(Debug)]
645pub struct VariableWidthBlock {
646 pub data: LanceBuffer,
648 pub offsets: LanceBuffer,
652 pub bits_per_offset: u8,
654 pub num_values: u64,
656
657 pub block_info: BlockInfo,
658}
659
660impl VariableWidthBlock {
661 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
662 let data_buffer = self.data.into_buffer();
663 let offsets_buffer = self.offsets.into_buffer();
664 let builder = ArrayDataBuilder::new(data_type)
665 .add_buffer(offsets_buffer)
666 .add_buffer(data_buffer)
667 .len(self.num_values as usize)
668 .null_count(0);
669 if validate {
670 Ok(builder.build()?)
671 } else {
672 Ok(unsafe { builder.build_unchecked() })
673 }
674 }
675
676 fn into_buffers(self) -> Vec<LanceBuffer> {
677 vec![self.offsets, self.data]
678 }
679
680 fn borrow_and_clone(&mut self) -> Self {
681 Self {
682 data: self.data.borrow_and_clone(),
683 offsets: self.offsets.borrow_and_clone(),
684 bits_per_offset: self.bits_per_offset,
685 num_values: self.num_values,
686 block_info: self.block_info.clone(),
687 }
688 }
689
690 fn try_clone(&self) -> Result<Self> {
691 Ok(Self {
692 data: self.data.try_clone()?,
693 offsets: self.offsets.try_clone()?,
694 bits_per_offset: self.bits_per_offset,
695 num_values: self.num_values,
696 block_info: self.block_info.clone(),
697 })
698 }
699
700 pub fn offsets_as_block(&mut self) -> DataBlock {
701 let offsets = self.offsets.borrow_and_clone();
702 DataBlock::FixedWidth(FixedWidthDataBlock {
703 data: offsets,
704 bits_per_value: self.bits_per_offset as u64,
705 num_values: self.num_values + 1,
706 block_info: BlockInfo::new(),
707 })
708 }
709
710 pub fn data_size(&self) -> u64 {
711 (self.data.len() + self.offsets.len()) as u64
712 }
713}
714
715#[derive(Debug)]
717pub struct StructDataBlock {
718 pub children: Vec<DataBlock>,
720 pub block_info: BlockInfo,
721 pub validity: Option<NullBuffer>,
723}
724
725impl StructDataBlock {
726 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
727 if let DataType::Struct(fields) = &data_type {
728 let mut builder = ArrayDataBuilder::new(DataType::Struct(fields.clone()));
729 let mut num_rows = 0;
730 for (field, child) in fields.iter().zip(self.children) {
731 let child_data = child.into_arrow(field.data_type().clone(), validate)?;
732 num_rows = child_data.len();
733 builder = builder.add_child_data(child_data);
734 }
735
736 let builder = if let Some(validity) = self.validity {
738 let null_count = validity.null_count();
739 builder
740 .null_bit_buffer(Some(validity.into_inner().into_inner()))
741 .null_count(null_count)
742 } else {
743 builder.null_count(0)
744 };
745
746 let builder = builder.len(num_rows);
747 if validate {
748 Ok(builder.build()?)
749 } else {
750 Ok(unsafe { builder.build_unchecked() })
751 }
752 } else {
753 Err(Error::Internal {
754 message: format!("Expected Struct, got {:?}", data_type),
755 location: location!(),
756 })
757 }
758 }
759
760 fn remove_outer_validity(self) -> Self {
761 Self {
762 children: self
763 .children
764 .into_iter()
765 .map(|c| c.remove_outer_validity())
766 .collect(),
767 block_info: self.block_info,
768 validity: None, }
770 }
771
772 fn into_buffers(self) -> Vec<LanceBuffer> {
773 self.children
774 .into_iter()
775 .flat_map(|c| c.into_buffers())
776 .collect()
777 }
778
779 fn borrow_and_clone(&mut self) -> Self {
780 Self {
781 children: self
782 .children
783 .iter_mut()
784 .map(|c| c.borrow_and_clone())
785 .collect(),
786 block_info: self.block_info.clone(),
787 validity: self.validity.clone(),
788 }
789 }
790
791 fn try_clone(&self) -> Result<Self> {
792 Ok(Self {
793 children: self
794 .children
795 .iter()
796 .map(|c| c.try_clone())
797 .collect::<Result<_>>()?,
798 block_info: self.block_info.clone(),
799 validity: self.validity.clone(),
800 })
801 }
802
803 pub fn data_size(&self) -> u64 {
804 self.children
805 .iter()
806 .map(|data_block| data_block.data_size())
807 .sum()
808 }
809}
810
811#[derive(Debug)]
813pub struct DictionaryDataBlock {
814 pub indices: FixedWidthDataBlock,
816 pub dictionary: Box<DataBlock>,
818}
819
820impl DictionaryDataBlock {
821 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
822 let (key_type, value_type) = if let DataType::Dictionary(key_type, value_type) = &data_type
823 {
824 (key_type.as_ref().clone(), value_type.as_ref().clone())
825 } else {
826 return Err(Error::Internal {
827 message: format!("Expected Dictionary, got {:?}", data_type),
828 location: location!(),
829 });
830 };
831
832 let indices = self.indices.into_arrow(key_type, validate)?;
833 let dictionary = self.dictionary.into_arrow(value_type, validate)?;
834
835 let builder = indices
836 .into_builder()
837 .add_child_data(dictionary)
838 .data_type(data_type);
839
840 if validate {
841 Ok(builder.build()?)
842 } else {
843 Ok(unsafe { builder.build_unchecked() })
844 }
845 }
846
847 fn into_buffers(self) -> Vec<LanceBuffer> {
848 let mut buffers = self.indices.into_buffers();
849 buffers.extend(self.dictionary.into_buffers());
850 buffers
851 }
852
853 fn borrow_and_clone(&mut self) -> Self {
854 Self {
855 indices: self.indices.borrow_and_clone(),
856 dictionary: Box::new(self.dictionary.borrow_and_clone()),
857 }
858 }
859
860 fn try_clone(&self) -> Result<Self> {
861 Ok(Self {
862 indices: self.indices.try_clone()?,
863 dictionary: Box::new(self.dictionary.try_clone()?),
864 })
865 }
866}
867
868#[derive(Debug)]
883pub enum DataBlock {
884 Empty(),
885 Constant(ConstantDataBlock),
886 AllNull(AllNullDataBlock),
887 Nullable(NullableDataBlock),
888 FixedWidth(FixedWidthDataBlock),
889 FixedSizeList(FixedSizeListBlock),
890 VariableWidth(VariableWidthBlock),
891 Opaque(OpaqueBlock),
892 Struct(StructDataBlock),
893 Dictionary(DictionaryDataBlock),
894}
895
896impl DataBlock {
897 pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
899 match self {
900 Self::Empty() => Ok(new_empty_array(&data_type).to_data()),
901 Self::Constant(inner) => inner.into_arrow(data_type, validate),
902 Self::AllNull(inner) => inner.into_arrow(data_type, validate),
903 Self::Nullable(inner) => inner.into_arrow(data_type, validate),
904 Self::FixedWidth(inner) => inner.into_arrow(data_type, validate),
905 Self::FixedSizeList(inner) => inner.into_arrow(data_type, validate),
906 Self::VariableWidth(inner) => inner.into_arrow(data_type, validate),
907 Self::Struct(inner) => inner.into_arrow(data_type, validate),
908 Self::Dictionary(inner) => inner.into_arrow(data_type, validate),
909 Self::Opaque(_) => Err(Error::Internal {
910 message: "Cannot convert OpaqueBlock to Arrow".to_string(),
911 location: location!(),
912 }),
913 }
914 }
915
916 pub fn into_buffers(self) -> Vec<LanceBuffer> {
920 match self {
921 Self::Empty() => Vec::default(),
922 Self::Constant(inner) => inner.into_buffers(),
923 Self::AllNull(inner) => inner.into_buffers(),
924 Self::Nullable(inner) => inner.into_buffers(),
925 Self::FixedWidth(inner) => inner.into_buffers(),
926 Self::FixedSizeList(inner) => inner.into_buffers(),
927 Self::VariableWidth(inner) => inner.into_buffers(),
928 Self::Struct(inner) => inner.into_buffers(),
929 Self::Dictionary(inner) => inner.into_buffers(),
930 Self::Opaque(inner) => inner.buffers,
931 }
932 }
933
934 pub fn borrow_and_clone(&mut self) -> Self {
939 match self {
940 Self::Empty() => Self::Empty(),
941 Self::Constant(inner) => Self::Constant(inner.borrow_and_clone()),
942 Self::AllNull(inner) => Self::AllNull(inner.borrow_and_clone()),
943 Self::Nullable(inner) => Self::Nullable(inner.borrow_and_clone()),
944 Self::FixedWidth(inner) => Self::FixedWidth(inner.borrow_and_clone()),
945 Self::FixedSizeList(inner) => Self::FixedSizeList(inner.borrow_and_clone()),
946 Self::VariableWidth(inner) => Self::VariableWidth(inner.borrow_and_clone()),
947 Self::Struct(inner) => Self::Struct(inner.borrow_and_clone()),
948 Self::Dictionary(inner) => Self::Dictionary(inner.borrow_and_clone()),
949 Self::Opaque(inner) => Self::Opaque(inner.borrow_and_clone()),
950 }
951 }
952
953 pub fn try_clone(&self) -> Result<Self> {
958 match self {
959 Self::Empty() => Ok(Self::Empty()),
960 Self::Constant(inner) => Ok(Self::Constant(inner.try_clone()?)),
961 Self::AllNull(inner) => Ok(Self::AllNull(inner.try_clone()?)),
962 Self::Nullable(inner) => Ok(Self::Nullable(inner.try_clone()?)),
963 Self::FixedWidth(inner) => Ok(Self::FixedWidth(inner.try_clone()?)),
964 Self::FixedSizeList(inner) => Ok(Self::FixedSizeList(inner.try_clone()?)),
965 Self::VariableWidth(inner) => Ok(Self::VariableWidth(inner.try_clone()?)),
966 Self::Struct(inner) => Ok(Self::Struct(inner.try_clone()?)),
967 Self::Dictionary(inner) => Ok(Self::Dictionary(inner.try_clone()?)),
968 Self::Opaque(inner) => Ok(Self::Opaque(inner.try_clone()?)),
969 }
970 }
971
972 pub fn name(&self) -> &'static str {
973 match self {
974 Self::Constant(_) => "Constant",
975 Self::Empty() => "Empty",
976 Self::AllNull(_) => "AllNull",
977 Self::Nullable(_) => "Nullable",
978 Self::FixedWidth(_) => "FixedWidth",
979 Self::FixedSizeList(_) => "FixedSizeList",
980 Self::VariableWidth(_) => "VariableWidth",
981 Self::Struct(_) => "Struct",
982 Self::Dictionary(_) => "Dictionary",
983 Self::Opaque(_) => "Opaque",
984 }
985 }
986
987 pub fn is_variable(&self) -> bool {
988 match self {
989 Self::Constant(_) => false,
990 Self::Empty() => false,
991 Self::AllNull(_) => false,
992 Self::Nullable(nullable) => nullable.data.is_variable(),
993 Self::FixedWidth(_) => false,
994 Self::FixedSizeList(fsl) => fsl.child.is_variable(),
995 Self::VariableWidth(_) => true,
996 Self::Struct(strct) => strct.children.iter().any(|c| c.is_variable()),
997 Self::Dictionary(_) => {
998 todo!("is_variable for DictionaryDataBlock is not implemented yet")
999 }
1000 Self::Opaque(_) => panic!("Does not make sense to ask if an Opaque block is variable"),
1001 }
1002 }
1003
1004 pub fn is_nullable(&self) -> bool {
1005 match self {
1006 Self::AllNull(_) => true,
1007 Self::Nullable(_) => true,
1008 Self::FixedSizeList(fsl) => fsl.child.is_nullable(),
1009 Self::Struct(strct) => strct.children.iter().any(|c| c.is_nullable()),
1010 Self::Dictionary(_) => {
1011 todo!("is_nullable for DictionaryDataBlock is not implemented yet")
1012 }
1013 Self::Opaque(_) => panic!("Does not make sense to ask if an Opaque block is nullable"),
1014 _ => false,
1015 }
1016 }
1017
1018 pub fn num_values(&self) -> u64 {
1023 match self {
1024 Self::Empty() => 0,
1025 Self::Constant(inner) => inner.num_values,
1026 Self::AllNull(inner) => inner.num_values,
1027 Self::Nullable(inner) => inner.data.num_values(),
1028 Self::FixedWidth(inner) => inner.num_values,
1029 Self::FixedSizeList(inner) => inner.num_values(),
1030 Self::VariableWidth(inner) => inner.num_values,
1031 Self::Struct(inner) => inner.children[0].num_values(),
1032 Self::Dictionary(inner) => inner.indices.num_values,
1033 Self::Opaque(inner) => inner.num_values,
1034 }
1035 }
1036
1037 pub fn items_per_row(&self) -> u64 {
1041 match self {
1042 Self::Empty() => todo!(), Self::Constant(_) => todo!(), Self::AllNull(_) => todo!(), Self::Nullable(nullable) => nullable.data.items_per_row(),
1046 Self::FixedWidth(_) => 1,
1047 Self::FixedSizeList(fsl) => fsl.dimension * fsl.child.items_per_row(),
1048 Self::VariableWidth(_) => 1,
1049 Self::Struct(_) => todo!(), Self::Dictionary(_) => 1,
1051 Self::Opaque(_) => 1,
1052 }
1053 }
1054
1055 pub fn data_size(&self) -> u64 {
1057 match self {
1058 Self::Empty() => 0,
1059 Self::Constant(inner) => inner.data_size(),
1060 Self::AllNull(_) => 0,
1061 Self::Nullable(inner) => inner.data_size(),
1062 Self::FixedWidth(inner) => inner.data_size(),
1063 Self::FixedSizeList(inner) => inner.data_size(),
1064 Self::VariableWidth(inner) => inner.data_size(),
1065 Self::Struct(_) => {
1066 todo!("the data_size method for StructDataBlock is not implemented yet")
1067 }
1068 Self::Dictionary(_) => {
1069 todo!("the data_size method for DictionaryDataBlock is not implemented yet")
1070 }
1071 Self::Opaque(inner) => inner.data_size(),
1072 }
1073 }
1074
1075 pub fn remove_outer_validity(self) -> Self {
1083 match self {
1084 Self::AllNull(_) => panic!("Cannot remove validity on all-null data"),
1085 Self::Nullable(inner) => *inner.data,
1086 Self::Struct(inner) => Self::Struct(inner.remove_outer_validity()),
1087 other => other,
1088 }
1089 }
1090
1091 pub fn make_builder(&self, estimated_size_bytes: u64) -> Box<dyn DataBlockBuilderImpl> {
1092 match self {
1093 Self::FixedWidth(inner) => {
1094 if inner.bits_per_value == 1 {
1095 Box::new(BitmapDataBlockBuilder::new(estimated_size_bytes))
1096 } else {
1097 Box::new(FixedWidthDataBlockBuilder::new(
1098 inner.bits_per_value,
1099 estimated_size_bytes,
1100 ))
1101 }
1102 }
1103 Self::VariableWidth(inner) => {
1104 if inner.bits_per_offset == 32 {
1105 Box::new(VariableWidthDataBlockBuilder::<i32>::new(
1106 estimated_size_bytes,
1107 ))
1108 } else if inner.bits_per_offset == 64 {
1109 Box::new(VariableWidthDataBlockBuilder::<i64>::new(
1110 estimated_size_bytes,
1111 ))
1112 } else {
1113 todo!()
1114 }
1115 }
1116 Self::FixedSizeList(inner) => {
1117 let inner_builder = inner.child.make_builder(estimated_size_bytes);
1118 Box::new(FixedSizeListBlockBuilder::new(
1119 inner_builder,
1120 inner.dimension,
1121 ))
1122 }
1123 Self::Nullable(nullable) => {
1124 let estimated_validity_size_bytes = estimated_size_bytes / 16;
1127 let inner_builder = nullable
1128 .data
1129 .make_builder(estimated_size_bytes - estimated_validity_size_bytes);
1130 Box::new(NullableDataBlockBuilder::new(
1131 inner_builder,
1132 estimated_validity_size_bytes as usize,
1133 ))
1134 }
1135 Self::Struct(struct_data_block) => {
1136 let mut bits_per_values = vec![];
1137 for child in struct_data_block.children.iter() {
1138 let child = child.as_fixed_width_ref().
1139 expect("Currently StructDataBlockBuilder is only used in packed-struct encoding, and currently in packed-struct encoding, only fixed-width fields are supported.");
1140 bits_per_values.push(child.bits_per_value as u32);
1141 }
1142 Box::new(StructDataBlockBuilder::new(
1143 bits_per_values,
1144 estimated_size_bytes,
1145 ))
1146 }
1147 _ => todo!("make_builder for {:?}", self),
1148 }
1149 }
1150}
1151
1152macro_rules! as_type {
1153 ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1154 pub fn $fn_name(self) -> Option<$inner_type> {
1155 match self {
1156 Self::$inner(inner) => Some(inner),
1157 _ => None,
1158 }
1159 }
1160 };
1161}
1162
1163macro_rules! as_type_ref {
1164 ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1165 pub fn $fn_name(&self) -> Option<&$inner_type> {
1166 match self {
1167 Self::$inner(inner) => Some(inner),
1168 _ => None,
1169 }
1170 }
1171 };
1172}
1173
1174macro_rules! as_type_ref_mut {
1175 ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1176 pub fn $fn_name(&mut self) -> Option<&mut $inner_type> {
1177 match self {
1178 Self::$inner(inner) => Some(inner),
1179 _ => None,
1180 }
1181 }
1182 };
1183}
1184
1185impl DataBlock {
1187 as_type!(as_all_null, AllNull, AllNullDataBlock);
1188 as_type!(as_nullable, Nullable, NullableDataBlock);
1189 as_type!(as_fixed_width, FixedWidth, FixedWidthDataBlock);
1190 as_type!(as_fixed_size_list, FixedSizeList, FixedSizeListBlock);
1191 as_type!(as_variable_width, VariableWidth, VariableWidthBlock);
1192 as_type!(as_struct, Struct, StructDataBlock);
1193 as_type!(as_dictionary, Dictionary, DictionaryDataBlock);
1194 as_type_ref!(as_all_null_ref, AllNull, AllNullDataBlock);
1195 as_type_ref!(as_nullable_ref, Nullable, NullableDataBlock);
1196 as_type_ref!(as_fixed_width_ref, FixedWidth, FixedWidthDataBlock);
1197 as_type_ref!(as_fixed_size_list_ref, FixedSizeList, FixedSizeListBlock);
1198 as_type_ref!(as_variable_width_ref, VariableWidth, VariableWidthBlock);
1199 as_type_ref!(as_struct_ref, Struct, StructDataBlock);
1200 as_type_ref!(as_dictionary_ref, Dictionary, DictionaryDataBlock);
1201 as_type_ref_mut!(as_all_null_ref_mut, AllNull, AllNullDataBlock);
1202 as_type_ref_mut!(as_nullable_ref_mut, Nullable, NullableDataBlock);
1203 as_type_ref_mut!(as_fixed_width_ref_mut, FixedWidth, FixedWidthDataBlock);
1204 as_type_ref_mut!(
1205 as_fixed_size_list_ref_mut,
1206 FixedSizeList,
1207 FixedSizeListBlock
1208 );
1209 as_type_ref_mut!(as_variable_width_ref_mut, VariableWidth, VariableWidthBlock);
1210 as_type_ref_mut!(as_struct_ref_mut, Struct, StructDataBlock);
1211 as_type_ref_mut!(as_dictionary_ref_mut, Dictionary, DictionaryDataBlock);
1212}
1213
1214fn get_byte_range<T: ArrowNativeType>(offsets: &mut LanceBuffer) -> Range<usize> {
1217 let offsets = offsets.borrow_to_typed_slice::<T>();
1218 if offsets.as_ref().is_empty() {
1219 0..0
1220 } else {
1221 offsets.as_ref().first().unwrap().as_usize()..offsets.as_ref().last().unwrap().as_usize()
1222 }
1223}
1224
1225fn stitch_offsets<T: ArrowNativeType + std::ops::Add<Output = T> + std::ops::Sub<Output = T>>(
1231 offsets: Vec<LanceBuffer>,
1232) -> (LanceBuffer, Vec<Range<usize>>) {
1233 if offsets.is_empty() {
1234 return (LanceBuffer::empty(), Vec::default());
1235 }
1236 let len = offsets.iter().map(|b| b.len()).sum::<usize>();
1237 let mut dest = Vec::with_capacity(len);
1241 let mut byte_ranges = Vec::with_capacity(offsets.len());
1242
1243 dest.push(T::from_usize(0).unwrap());
1245
1246 for mut o in offsets.into_iter() {
1247 if !o.is_empty() {
1248 let last_offset = *dest.last().unwrap();
1249 let o = o.borrow_to_typed_slice::<T>();
1250 let start = *o.as_ref().first().unwrap();
1251 dest.extend(o.as_ref()[1..].iter().map(|&x| x + last_offset - start));
1265 }
1266 byte_ranges.push(get_byte_range::<T>(&mut o));
1267 }
1268 (LanceBuffer::reinterpret_vec(dest), byte_ranges)
1269}
1270
1271fn arrow_binary_to_data_block(
1272 arrays: &[ArrayRef],
1273 num_values: u64,
1274 bits_per_offset: u8,
1275) -> DataBlock {
1276 let data_vec = arrays.iter().map(|arr| arr.to_data()).collect::<Vec<_>>();
1277 let bytes_per_offset = bits_per_offset as usize / 8;
1278 let offsets = data_vec
1279 .iter()
1280 .map(|d| {
1281 LanceBuffer::Borrowed(
1282 d.buffers()[0].slice_with_length(d.offset(), (d.len() + 1) * bytes_per_offset),
1283 )
1284 })
1285 .collect::<Vec<_>>();
1286 let (offsets, data_ranges) = if bits_per_offset == 32 {
1287 stitch_offsets::<i32>(offsets)
1288 } else {
1289 stitch_offsets::<i64>(offsets)
1290 };
1291 let data = data_vec
1292 .iter()
1293 .zip(data_ranges)
1294 .map(|(d, byte_range)| {
1295 LanceBuffer::Borrowed(
1296 d.buffers()[1]
1297 .slice_with_length(byte_range.start, byte_range.end - byte_range.start),
1298 )
1299 })
1300 .collect::<Vec<_>>();
1301 let data = LanceBuffer::concat_into_one(data);
1302 DataBlock::VariableWidth(VariableWidthBlock {
1303 data,
1304 offsets,
1305 bits_per_offset,
1306 num_values,
1307 block_info: BlockInfo::new(),
1308 })
1309}
1310
1311fn encode_flat_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1312 let bytes_per_value = arrays[0].data_type().byte_width();
1313 let mut buffer = Vec::with_capacity(num_values as usize * bytes_per_value);
1314 for arr in arrays {
1315 let data = arr.to_data();
1316 buffer.extend_from_slice(data.buffers()[0].as_slice());
1317 }
1318 LanceBuffer::Owned(buffer)
1319}
1320
1321fn do_encode_bitmap_data(bitmaps: &[BooleanBuffer], num_values: u64) -> LanceBuffer {
1322 let mut builder = BooleanBufferBuilder::new(num_values as usize);
1323
1324 for buf in bitmaps {
1325 builder.append_buffer(buf);
1326 }
1327
1328 let buffer = builder.finish().into_inner();
1329 LanceBuffer::Borrowed(buffer)
1330}
1331
1332fn encode_bitmap_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1333 let bitmaps = arrays
1334 .iter()
1335 .map(|arr| arr.as_boolean().values().clone())
1336 .collect::<Vec<_>>();
1337 do_encode_bitmap_data(&bitmaps, num_values)
1338}
1339
1340fn concat_dict_arrays(arrays: &[ArrayRef]) -> ArrayRef {
1343 let value_type = arrays[0].as_any_dictionary().values().data_type();
1344 let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1345 match arrow_select::concat::concat(&array_refs) {
1346 Ok(array) => array,
1347 Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1348 let upscaled = array_refs
1350 .iter()
1351 .map(|arr| {
1352 match arrow_cast::cast(
1353 *arr,
1354 &DataType::Dictionary(
1355 Box::new(DataType::UInt32),
1356 Box::new(value_type.clone()),
1357 ),
1358 ) {
1359 Ok(arr) => arr,
1360 Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1361 unimplemented!("Dictionary arrays with more than 2^32 unique values")
1363 }
1364 err => err.unwrap(),
1365 }
1366 })
1367 .collect::<Vec<_>>();
1368 let array_refs = upscaled.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1369 match arrow_select::concat::concat(&array_refs) {
1371 Ok(array) => array,
1372 Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1373 unimplemented!("Dictionary arrays with more than 2^32 unique values")
1374 }
1375 err => err.unwrap(),
1376 }
1377 }
1378 err => err.unwrap(),
1380 }
1381}
1382
1383fn max_index_val(index_type: &DataType) -> u64 {
1384 match index_type {
1385 DataType::Int8 => i8::MAX as u64,
1386 DataType::Int16 => i16::MAX as u64,
1387 DataType::Int32 => i32::MAX as u64,
1388 DataType::Int64 => i64::MAX as u64,
1389 DataType::UInt8 => u8::MAX as u64,
1390 DataType::UInt16 => u16::MAX as u64,
1391 DataType::UInt32 => u32::MAX as u64,
1392 DataType::UInt64 => u64::MAX,
1393 _ => panic!("Invalid dictionary index type"),
1394 }
1395}
1396
1397fn arrow_dictionary_to_data_block(arrays: &[ArrayRef], validity: Option<NullBuffer>) -> DataBlock {
1416 let array = concat_dict_arrays(arrays);
1417 let array_dict = array.as_any_dictionary();
1418 let mut indices = array_dict.keys();
1419 let num_values = indices.len() as u64;
1420 let mut values = array_dict.values().clone();
1421 let mut upcast = None;
1423
1424 let indices_block = if let Some(validity) = validity {
1428 let mut first_invalid_index = None;
1432 if let Some(values_validity) = values.nulls() {
1433 first_invalid_index = (!values_validity.inner()).set_indices().next();
1434 }
1435 let first_invalid_index = first_invalid_index.unwrap_or_else(|| {
1436 let null_arr = new_null_array(values.data_type(), 1);
1437 values = arrow_select::concat::concat(&[values.as_ref(), null_arr.as_ref()]).unwrap();
1438 let null_index = values.len() - 1;
1439 let max_index_val = max_index_val(indices.data_type());
1440 if null_index as u64 > max_index_val {
1441 if max_index_val >= u32::MAX as u64 {
1443 unimplemented!("Dictionary arrays with 2^32 unique value (or more) and a null")
1444 }
1445 upcast = Some(arrow_cast::cast(indices, &DataType::UInt32).unwrap());
1446 indices = upcast.as_ref().unwrap();
1447 }
1448 null_index
1449 });
1450 let null_index_arr = arrow_cast::cast(
1452 &UInt64Array::from(vec![first_invalid_index as u64]),
1453 indices.data_type(),
1454 )
1455 .unwrap();
1456
1457 let bytes_per_index = indices.data_type().byte_width();
1458 let bits_per_index = bytes_per_index as u64 * 8;
1459
1460 let null_index_arr = null_index_arr.into_data();
1461 let null_index_bytes = &null_index_arr.buffers()[0];
1462 let mut indices_bytes = indices.to_data().buffers()[0].to_vec();
1464 for invalid_idx in (!validity.inner()).set_indices() {
1465 indices_bytes[invalid_idx * bytes_per_index..(invalid_idx + 1) * bytes_per_index]
1466 .copy_from_slice(null_index_bytes.as_slice());
1467 }
1468 FixedWidthDataBlock {
1469 data: LanceBuffer::Owned(indices_bytes),
1470 bits_per_value: bits_per_index,
1471 num_values,
1472 block_info: BlockInfo::new(),
1473 }
1474 } else {
1475 FixedWidthDataBlock {
1476 data: LanceBuffer::Borrowed(indices.to_data().buffers()[0].clone()),
1477 bits_per_value: indices.data_type().byte_width() as u64 * 8,
1478 num_values,
1479 block_info: BlockInfo::new(),
1480 }
1481 };
1482
1483 let items = DataBlock::from(values);
1484 DataBlock::Dictionary(DictionaryDataBlock {
1485 indices: indices_block,
1486 dictionary: Box::new(items),
1487 })
1488}
1489
1490enum Nullability {
1491 None,
1492 All,
1493 Some(NullBuffer),
1494}
1495
1496impl Nullability {
1497 fn to_option(&self) -> Option<NullBuffer> {
1498 match self {
1499 Self::Some(nulls) => Some(nulls.clone()),
1500 _ => None,
1501 }
1502 }
1503}
1504
1505fn extract_nulls(arrays: &[ArrayRef], num_values: u64) -> Nullability {
1506 let mut has_nulls = false;
1507 let nulls_and_lens = arrays
1508 .iter()
1509 .map(|arr| {
1510 let nulls = arr.logical_nulls();
1511 has_nulls |= nulls.is_some();
1512 (nulls, arr.len())
1513 })
1514 .collect::<Vec<_>>();
1515 if !has_nulls {
1516 return Nullability::None;
1517 }
1518 let mut builder = BooleanBufferBuilder::new(num_values as usize);
1519 let mut num_nulls = 0;
1520 for (null, len) in nulls_and_lens {
1521 if let Some(null) = null {
1522 num_nulls += null.null_count();
1523 builder.append_buffer(&null.into_inner());
1524 } else {
1525 builder.append_n(len, true);
1526 }
1527 }
1528 if num_nulls == num_values as usize {
1529 Nullability::All
1530 } else {
1531 Nullability::Some(NullBuffer::new(builder.finish()))
1532 }
1533}
1534
1535impl DataBlock {
1536 pub fn from_arrays(arrays: &[ArrayRef], num_values: u64) -> Self {
1537 if arrays.is_empty() || num_values == 0 {
1538 return Self::AllNull(AllNullDataBlock { num_values: 0 });
1539 }
1540
1541 let data_type = arrays[0].data_type();
1542 let nulls = extract_nulls(arrays, num_values);
1543
1544 if let Nullability::All = nulls {
1545 return Self::AllNull(AllNullDataBlock { num_values });
1546 }
1547
1548 let mut encoded = match data_type {
1549 DataType::Binary | DataType::Utf8 => arrow_binary_to_data_block(arrays, num_values, 32),
1550 DataType::BinaryView | DataType::Utf8View => {
1551 todo!()
1552 }
1553 DataType::LargeBinary | DataType::LargeUtf8 => {
1554 arrow_binary_to_data_block(arrays, num_values, 64)
1555 }
1556 DataType::Boolean => {
1557 let data = encode_bitmap_data(arrays, num_values);
1558 Self::FixedWidth(FixedWidthDataBlock {
1559 data,
1560 bits_per_value: 1,
1561 num_values,
1562 block_info: BlockInfo::new(),
1563 })
1564 }
1565 DataType::Date32
1566 | DataType::Date64
1567 | DataType::Decimal128(_, _)
1568 | DataType::Decimal256(_, _)
1569 | DataType::Duration(_)
1570 | DataType::FixedSizeBinary(_)
1571 | DataType::Float16
1572 | DataType::Float32
1573 | DataType::Float64
1574 | DataType::Int16
1575 | DataType::Int32
1576 | DataType::Int64
1577 | DataType::Int8
1578 | DataType::Interval(_)
1579 | DataType::Time32(_)
1580 | DataType::Time64(_)
1581 | DataType::Timestamp(_, _)
1582 | DataType::UInt16
1583 | DataType::UInt32
1584 | DataType::UInt64
1585 | DataType::UInt8 => {
1586 let data = encode_flat_data(arrays, num_values);
1587 Self::FixedWidth(FixedWidthDataBlock {
1588 data,
1589 bits_per_value: data_type.byte_width() as u64 * 8,
1590 num_values,
1591 block_info: BlockInfo::new(),
1592 })
1593 }
1594 DataType::Null => Self::AllNull(AllNullDataBlock { num_values }),
1595 DataType::Dictionary(_, _) => arrow_dictionary_to_data_block(arrays, nulls.to_option()),
1596 DataType::Struct(fields) => {
1597 let structs = arrays.iter().map(|arr| arr.as_struct()).collect::<Vec<_>>();
1598 let mut children = Vec::with_capacity(fields.len());
1599 for child_idx in 0..fields.len() {
1600 let child_vec = structs
1601 .iter()
1602 .map(|s| s.column(child_idx).clone())
1603 .collect::<Vec<_>>();
1604 children.push(Self::from_arrays(&child_vec, num_values));
1605 }
1606
1607 let validity = match &nulls {
1609 Nullability::None => None,
1610 Nullability::Some(null_buffer) => Some(null_buffer.clone()),
1611 Nullability::All => unreachable!("Should have returned AllNull earlier"),
1612 };
1613
1614 Self::Struct(StructDataBlock {
1615 children,
1616 block_info: BlockInfo::default(),
1617 validity,
1618 })
1619 }
1620 DataType::FixedSizeList(_, dim) => {
1621 let children = arrays
1622 .iter()
1623 .map(|arr| arr.as_fixed_size_list().values().clone())
1624 .collect::<Vec<_>>();
1625 let child_block = Self::from_arrays(&children, num_values * *dim as u64);
1626 Self::FixedSizeList(FixedSizeListBlock {
1627 child: Box::new(child_block),
1628 dimension: *dim as u64,
1629 })
1630 }
1631 DataType::LargeList(_)
1632 | DataType::List(_)
1633 | DataType::ListView(_)
1634 | DataType::LargeListView(_)
1635 | DataType::Map(_, _)
1636 | DataType::RunEndEncoded(_, _)
1637 | DataType::Union(_, _) => {
1638 panic!(
1639 "Field with data type {} cannot be converted to data block",
1640 data_type
1641 )
1642 }
1643 };
1644
1645 encoded.compute_stat();
1647
1648 if !matches!(data_type, DataType::Dictionary(_, _)) {
1649 match nulls {
1650 Nullability::None => encoded,
1651 Nullability::Some(nulls) => Self::Nullable(NullableDataBlock {
1652 data: Box::new(encoded),
1653 nulls: LanceBuffer::Borrowed(nulls.into_inner().into_inner()),
1654 block_info: BlockInfo::new(),
1655 }),
1656 _ => unreachable!(),
1657 }
1658 } else {
1659 encoded
1661 }
1662 }
1663
1664 pub fn from_array<T: Array + 'static>(array: T) -> Self {
1665 let num_values = array.len();
1666 Self::from_arrays(&[Arc::new(array)], num_values as u64)
1667 }
1668}
1669
1670impl From<ArrayRef> for DataBlock {
1671 fn from(array: ArrayRef) -> Self {
1672 let num_values = array.len() as u64;
1673 Self::from_arrays(&[array], num_values)
1674 }
1675}
1676
1677pub trait DataBlockBuilderImpl: std::fmt::Debug {
1678 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>);
1679 fn finish(self: Box<Self>) -> DataBlock;
1680}
1681
1682#[derive(Debug)]
1683pub struct DataBlockBuilder {
1684 estimated_size_bytes: u64,
1685 builder: Option<Box<dyn DataBlockBuilderImpl>>,
1686}
1687
1688impl DataBlockBuilder {
1689 pub fn with_capacity_estimate(estimated_size_bytes: u64) -> Self {
1690 Self {
1691 estimated_size_bytes,
1692 builder: None,
1693 }
1694 }
1695
1696 fn get_builder(&mut self, block: &DataBlock) -> &mut dyn DataBlockBuilderImpl {
1697 if self.builder.is_none() {
1698 self.builder = Some(block.make_builder(self.estimated_size_bytes));
1699 }
1700 self.builder.as_mut().unwrap().as_mut()
1701 }
1702
1703 pub fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
1704 self.get_builder(data_block).append(data_block, selection);
1705 }
1706
1707 pub fn finish(self) -> DataBlock {
1708 let builder = self.builder.expect("DataBlockBuilder didn't see any data");
1709 builder.finish()
1710 }
1711}
1712
1713#[cfg(test)]
1714mod tests {
1715 use std::sync::Arc;
1716
1717 use arrow::datatypes::{Int32Type, Int8Type};
1718 use arrow_array::{
1719 make_array, new_null_array, ArrayRef, DictionaryArray, Int8Array, LargeBinaryArray,
1720 StringArray, UInt16Array, UInt8Array,
1721 };
1722 use arrow_buffer::{BooleanBuffer, NullBuffer};
1723
1724 use arrow_schema::{DataType, Field, Fields};
1725 use lance_datagen::{array, ArrayGeneratorExt, RowCount, DEFAULT_SEED};
1726 use rand::SeedableRng;
1727
1728 use crate::buffer::LanceBuffer;
1729
1730 use super::{AllNullDataBlock, DataBlock};
1731
1732 use arrow::compute::concat;
1733 use arrow_array::Array;
1734
1735 #[test]
1736 fn test_sliced_to_data_block() {
1737 let ints = UInt16Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
1738 let ints = ints.slice(2, 4);
1739 let data = DataBlock::from_array(ints);
1740
1741 let fixed_data = data.as_fixed_width().unwrap();
1742 assert_eq!(fixed_data.num_values, 4);
1743 assert_eq!(fixed_data.data.len(), 8);
1744
1745 let nullable_ints =
1746 UInt16Array::from(vec![Some(0), None, Some(2), None, Some(4), None, Some(6)]);
1747 let nullable_ints = nullable_ints.slice(1, 3);
1748 let data = DataBlock::from_array(nullable_ints);
1749
1750 let nullable = data.as_nullable().unwrap();
1751 assert_eq!(nullable.nulls, LanceBuffer::Owned(vec![0b00000010]));
1752 }
1753
1754 #[test]
1755 fn test_string_to_data_block() {
1756 let strings1 = StringArray::from(vec![Some("hello"), None, Some("world")]);
1758 let strings2 = StringArray::from(vec![Some("a"), Some("b")]);
1759 let strings3 = StringArray::from(vec![Option::<&'static str>::None, None]);
1760
1761 let arrays = &[strings1, strings2, strings3]
1762 .iter()
1763 .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1764 .collect::<Vec<_>>();
1765
1766 let block = DataBlock::from_arrays(arrays, 7);
1767
1768 assert_eq!(block.num_values(), 7);
1769 let block = block.as_nullable().unwrap();
1770
1771 assert_eq!(block.nulls, LanceBuffer::Owned(vec![0b00011101]));
1772
1773 let data = block.data.as_variable_width().unwrap();
1774 assert_eq!(
1775 data.offsets,
1776 LanceBuffer::reinterpret_vec(vec![0, 5, 5, 10, 11, 12, 12, 12])
1777 );
1778
1779 assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworldab"));
1780
1781 let strings1 = StringArray::from(vec![Some("a"), Some("bc")]);
1783 let strings2 = StringArray::from(vec![Some("def")]);
1784
1785 let arrays = &[strings1, strings2]
1786 .iter()
1787 .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1788 .collect::<Vec<_>>();
1789
1790 let block = DataBlock::from_arrays(arrays, 3);
1791
1792 assert_eq!(block.num_values(), 3);
1793 let data = block.as_variable_width().unwrap();
1795 assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(vec![0, 1, 3, 6]));
1796 assert_eq!(data.data, LanceBuffer::copy_slice(b"abcdef"));
1797 }
1798
1799 #[test]
1800 fn test_string_sliced() {
1801 let check = |arr: Vec<StringArray>, expected_off: Vec<i32>, expected_data: &[u8]| {
1802 let arrs = arr
1803 .into_iter()
1804 .map(|a| Arc::new(a) as ArrayRef)
1805 .collect::<Vec<_>>();
1806 let num_rows = arrs.iter().map(|a| a.len()).sum::<usize>() as u64;
1807 let data = DataBlock::from_arrays(&arrs, num_rows);
1808
1809 assert_eq!(data.num_values(), num_rows);
1810
1811 let data = data.as_variable_width().unwrap();
1812 assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(expected_off));
1813 assert_eq!(data.data, LanceBuffer::copy_slice(expected_data));
1814 };
1815
1816 let string = StringArray::from(vec![Some("hello"), Some("world")]);
1817 check(vec![string.slice(1, 1)], vec![0, 5], b"world");
1818 check(vec![string.slice(0, 1)], vec![0, 5], b"hello");
1819 check(
1820 vec![string.slice(0, 1), string.slice(1, 1)],
1821 vec![0, 5, 10],
1822 b"helloworld",
1823 );
1824
1825 let string2 = StringArray::from(vec![Some("foo"), Some("bar")]);
1826 check(
1827 vec![string.slice(0, 1), string2.slice(0, 1)],
1828 vec![0, 5, 8],
1829 b"hellofoo",
1830 );
1831 }
1832
1833 #[test]
1834 fn test_large() {
1835 let arr = LargeBinaryArray::from_vec(vec![b"hello", b"world"]);
1836 let data = DataBlock::from_array(arr);
1837
1838 assert_eq!(data.num_values(), 2);
1839 let data = data.as_variable_width().unwrap();
1840 assert_eq!(data.bits_per_offset, 64);
1841 assert_eq!(data.num_values, 2);
1842 assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworld"));
1843 assert_eq!(
1844 data.offsets,
1845 LanceBuffer::reinterpret_vec(vec![0_u64, 5, 10])
1846 );
1847 }
1848
1849 #[test]
1850 fn test_dictionary_indices_normalized() {
1851 let arr1 = DictionaryArray::<Int8Type>::from_iter([Some("a"), Some("a"), Some("b")]);
1852 let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("b"), Some("c")]);
1853
1854 let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1855
1856 assert_eq!(data.num_values(), 5);
1857 let data = data.as_dictionary().unwrap();
1858 let indices = data.indices;
1859 assert_eq!(indices.bits_per_value, 8);
1860 assert_eq!(indices.num_values, 5);
1861 assert_eq!(
1862 indices.data,
1863 LanceBuffer::reinterpret_vec::<i8>(vec![0, 0, 1, 2, 3])
1867 );
1868
1869 let items = data.dictionary.as_variable_width().unwrap();
1870 assert_eq!(items.bits_per_offset, 32);
1871 assert_eq!(items.num_values, 4);
1872 assert_eq!(items.data, LanceBuffer::copy_slice(b"abbc"));
1873 assert_eq!(
1874 items.offsets,
1875 LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 4],)
1876 );
1877 }
1878
1879 #[test]
1880 fn test_dictionary_nulls() {
1881 let arr1 = DictionaryArray::<Int8Type>::from_iter([None, Some("a"), Some("b")]);
1885 let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("c"), None]);
1886
1887 let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1888
1889 let check_common = |data: DataBlock| {
1890 assert_eq!(data.num_values(), 5);
1891 let dict = data.as_dictionary().unwrap();
1892
1893 let nullable_items = dict.dictionary.as_nullable().unwrap();
1894 assert_eq!(nullable_items.nulls, LanceBuffer::Owned(vec![0b00000111]));
1895 assert_eq!(nullable_items.data.num_values(), 4);
1896
1897 let items = nullable_items.data.as_variable_width().unwrap();
1898 assert_eq!(items.bits_per_offset, 32);
1899 assert_eq!(items.num_values, 4);
1900 assert_eq!(items.data, LanceBuffer::copy_slice(b"abc"));
1901 assert_eq!(
1902 items.offsets,
1903 LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 3],)
1904 );
1905
1906 let indices = dict.indices;
1907 assert_eq!(indices.bits_per_value, 8);
1908 assert_eq!(indices.num_values, 5);
1909 assert_eq!(
1910 indices.data,
1911 LanceBuffer::reinterpret_vec::<i8>(vec![3, 0, 1, 2, 3])
1912 );
1913 };
1914 check_common(data);
1915
1916 let items = StringArray::from(vec![Some("a"), Some("b"), Some("c"), None]);
1918 let indices = Int8Array::from(vec![Some(3), Some(0), Some(1), Some(2), Some(3)]);
1919 let dict = DictionaryArray::new(indices, Arc::new(items));
1920
1921 let data = DataBlock::from_array(dict);
1922
1923 check_common(data);
1924 }
1925
1926 #[test]
1927 fn test_dictionary_cannot_add_null() {
1928 let items = StringArray::from(
1930 (0..256)
1931 .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1932 .collect::<Vec<_>>(),
1933 );
1934 let indices = UInt8Array::from(
1936 (0..=256)
1937 .map(|i| if i == 256 { None } else { Some(i as u8) })
1938 .collect::<Vec<_>>(),
1939 );
1940 let dict = DictionaryArray::new(indices, Arc::new(items));
1943 let data = DataBlock::from_array(dict);
1944
1945 assert_eq!(data.num_values(), 257);
1946
1947 let dict = data.as_dictionary().unwrap();
1948
1949 assert_eq!(dict.indices.bits_per_value, 32);
1950 assert_eq!(
1951 dict.indices.data,
1952 LanceBuffer::reinterpret_vec((0_u32..257).collect::<Vec<_>>())
1953 );
1954
1955 let nullable_items = dict.dictionary.as_nullable().unwrap();
1956 let null_buffer = NullBuffer::new(BooleanBuffer::new(
1957 nullable_items.nulls.into_buffer(),
1958 0,
1959 257,
1960 ));
1961 for i in 0..256 {
1962 assert!(!null_buffer.is_null(i));
1963 }
1964 assert!(null_buffer.is_null(256));
1965
1966 assert_eq!(
1967 nullable_items.data.as_variable_width().unwrap().data.len(),
1968 32640
1969 );
1970 }
1971
1972 #[test]
1973 fn test_all_null() {
1974 for data_type in [
1975 DataType::UInt32,
1976 DataType::FixedSizeBinary(2),
1977 DataType::List(Arc::new(Field::new("item", DataType::UInt32, true))),
1978 DataType::Struct(Fields::from(vec![Field::new("a", DataType::UInt32, true)])),
1979 ] {
1980 let block = DataBlock::AllNull(AllNullDataBlock { num_values: 10 });
1981 let arr = block.into_arrow(data_type.clone(), true).unwrap();
1982 let arr = make_array(arr);
1983 let expected = new_null_array(&data_type, 10);
1984 assert_eq!(&arr, &expected);
1985 }
1986 }
1987
1988 #[test]
1989 fn test_dictionary_cannot_concatenate() {
1990 let items = StringArray::from(
1992 (0..256)
1993 .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1994 .collect::<Vec<_>>(),
1995 );
1996 let other_items = StringArray::from(
1998 (0..256)
1999 .map(|i| Some(String::from_utf8(vec![1; i + 1]).unwrap()))
2000 .collect::<Vec<_>>(),
2001 );
2002 let indices = UInt8Array::from_iter_values(0..=255);
2003 let dict1 = DictionaryArray::new(indices.clone(), Arc::new(items));
2004 let dict2 = DictionaryArray::new(indices, Arc::new(other_items));
2005 let data = DataBlock::from_arrays(&[Arc::new(dict1), Arc::new(dict2)], 512);
2006 assert_eq!(data.num_values(), 512);
2007
2008 let dict = data.as_dictionary().unwrap();
2009
2010 assert_eq!(dict.indices.bits_per_value, 32);
2011 assert_eq!(
2012 dict.indices.data,
2013 LanceBuffer::reinterpret_vec::<u32>((0..512).collect::<Vec<_>>())
2014 );
2015 assert_eq!(
2017 dict.dictionary.as_variable_width().unwrap().data.len(),
2018 65536
2019 );
2020 }
2021
2022 #[test]
2023 fn test_data_size() {
2024 let mut rng = rand_xoshiro::Xoshiro256PlusPlus::seed_from_u64(DEFAULT_SEED.0);
2025 let mut gen = array::rand::<Int32Type>().with_nulls(&[false, false, false]);
2027
2028 let arr = gen.generate(RowCount::from(3), &mut rng).unwrap();
2029 let block = DataBlock::from_array(arr.clone());
2030 assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
2031
2032 let arr = gen.generate(RowCount::from(400), &mut rng).unwrap();
2033 let block = DataBlock::from_array(arr.clone());
2034 assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
2035
2036 let mut gen = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
2038 let arr = gen.generate(RowCount::from(3), &mut rng).unwrap();
2039 let block = DataBlock::from_array(arr.clone());
2040
2041 let array_data = arr.to_data();
2042 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
2043 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
2045 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
2046
2047 let arr = gen.generate(RowCount::from(400), &mut rng).unwrap();
2048 let block = DataBlock::from_array(arr.clone());
2049
2050 let array_data = arr.to_data();
2051 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
2052 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
2053 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
2054
2055 let mut gen = array::rand::<Int32Type>().with_nulls(&[true, true, false]);
2056 let arr = gen.generate(RowCount::from(3), &mut rng).unwrap();
2057 let block = DataBlock::from_array(arr.clone());
2058
2059 let array_data = arr.to_data();
2060 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
2061 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
2062 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
2063
2064 let arr = gen.generate(RowCount::from(400), &mut rng).unwrap();
2065 let block = DataBlock::from_array(arr.clone());
2066
2067 let array_data = arr.to_data();
2068 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
2069 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
2070 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
2071
2072 let mut gen = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
2073 let arr1 = gen.generate(RowCount::from(3), &mut rng).unwrap();
2074 let arr2 = gen.generate(RowCount::from(3), &mut rng).unwrap();
2075 let arr3 = gen.generate(RowCount::from(3), &mut rng).unwrap();
2076 let block = DataBlock::from_arrays(&[arr1.clone(), arr2.clone(), arr3.clone()], 9);
2077
2078 let concatenated_array = concat(&[
2079 &*Arc::new(arr1.clone()) as &dyn Array,
2080 &*Arc::new(arr2.clone()) as &dyn Array,
2081 &*Arc::new(arr3.clone()) as &dyn Array,
2082 ])
2083 .unwrap();
2084 let total_buffer_size: usize = concatenated_array
2085 .to_data()
2086 .buffers()
2087 .iter()
2088 .map(|buffer| buffer.len())
2089 .sum();
2090
2091 let total_nulls_size_in_bytes = concatenated_array.nulls().unwrap().len().div_ceil(8);
2092 assert!(block.data_size() == (total_buffer_size + total_nulls_size_in_bytes) as u64);
2093 }
2094}