1use std::{
18 ops::Range,
19 sync::{Arc, RwLock},
20};
21
22use arrow_array::{
23 cast::AsArray,
24 new_empty_array, new_null_array,
25 types::{ArrowDictionaryKeyType, UInt16Type, UInt32Type, UInt64Type, UInt8Type},
26 Array, ArrayRef, OffsetSizeTrait, UInt64Array,
27};
28use arrow_buffer::{ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer};
29use arrow_data::{ArrayData, ArrayDataBuilder};
30use arrow_schema::DataType;
31use lance_arrow::DataTypeExt;
32use snafu::location;
33
34use lance_core::{Error, Result};
35
36use crate::{
37 buffer::LanceBuffer,
38 statistics::{ComputeStat, Stat},
39};
40
41#[derive(Debug, Clone)]
47pub struct AllNullDataBlock {
48 pub num_values: u64,
50}
51
52impl AllNullDataBlock {
53 fn into_arrow(self, data_type: DataType, _validate: bool) -> Result<ArrayData> {
54 Ok(ArrayData::new_null(&data_type, self.num_values as usize))
55 }
56
57 fn into_buffers(self) -> Vec<LanceBuffer> {
58 vec![]
59 }
60}
61
62use std::collections::HashMap;
63
64#[derive(Debug, Clone)]
67pub struct BlockInfo(pub Arc<RwLock<HashMap<Stat, Arc<dyn Array>>>>);
68
69impl Default for BlockInfo {
70 fn default() -> Self {
71 Self::new()
72 }
73}
74
75impl BlockInfo {
76 pub fn new() -> Self {
77 Self(Arc::new(RwLock::new(HashMap::new())))
78 }
79}
80
81impl PartialEq for BlockInfo {
82 fn eq(&self, other: &Self) -> bool {
83 let self_info = self.0.read().unwrap();
84 let other_info = other.0.read().unwrap();
85 *self_info == *other_info
86 }
87}
88
89#[derive(Debug, Clone)]
95pub struct NullableDataBlock {
96 pub data: Box<DataBlock>,
98 pub nulls: LanceBuffer,
100
101 pub block_info: BlockInfo,
102}
103
104impl NullableDataBlock {
105 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
106 let nulls = self.nulls.into_buffer();
107 let data = self.data.into_arrow(data_type, validate)?.into_builder();
108 let data = data.null_bit_buffer(Some(nulls));
109 if validate {
110 Ok(data.build()?)
111 } else {
112 Ok(unsafe { data.build_unchecked() })
113 }
114 }
115
116 fn into_buffers(self) -> Vec<LanceBuffer> {
117 let mut buffers = vec![self.nulls];
118 buffers.extend(self.data.into_buffers());
119 buffers
120 }
121
122 pub fn data_size(&self) -> u64 {
123 self.data.data_size() + self.nulls.len() as u64
124 }
125}
126
127#[derive(Debug, PartialEq, Clone)]
129pub struct ConstantDataBlock {
130 pub data: LanceBuffer,
132 pub num_values: u64,
134}
135
136impl ConstantDataBlock {
137 fn into_buffers(self) -> Vec<LanceBuffer> {
138 vec![self.data]
139 }
140
141 fn into_arrow(self, _data_type: DataType, _validate: bool) -> Result<ArrayData> {
142 todo!()
145 }
146
147 pub fn try_clone(&self) -> Result<Self> {
148 Ok(Self {
149 data: self.data.clone(),
150 num_values: self.num_values,
151 })
152 }
153
154 pub fn data_size(&self) -> u64 {
155 self.data.len() as u64
156 }
157}
158
159#[derive(Debug, PartialEq, Clone)]
161pub struct FixedWidthDataBlock {
162 pub data: LanceBuffer,
164 pub bits_per_value: u64,
166 pub num_values: u64,
168
169 pub block_info: BlockInfo,
170}
171
172impl FixedWidthDataBlock {
173 fn do_into_arrow(
174 self,
175 data_type: DataType,
176 num_values: u64,
177 validate: bool,
178 ) -> Result<ArrayData> {
179 let data_buffer = self.data.into_buffer();
180 let builder = ArrayDataBuilder::new(data_type)
181 .add_buffer(data_buffer)
182 .len(num_values as usize)
183 .null_count(0);
184 if validate {
185 Ok(builder.build()?)
186 } else {
187 Ok(unsafe { builder.build_unchecked() })
188 }
189 }
190
191 pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
192 let root_num_values = self.num_values;
193 self.do_into_arrow(data_type, root_num_values, validate)
194 }
195
196 pub fn into_buffers(self) -> Vec<LanceBuffer> {
197 vec![self.data]
198 }
199
200 pub fn try_clone(&self) -> Result<Self> {
201 Ok(Self {
202 data: self.data.clone(),
203 bits_per_value: self.bits_per_value,
204 num_values: self.num_values,
205 block_info: self.block_info.clone(),
206 })
207 }
208
209 pub fn data_size(&self) -> u64 {
210 self.data.len() as u64
211 }
212}
213
214#[derive(Debug)]
215pub struct VariableWidthDataBlockBuilder<T: OffsetSizeTrait> {
216 offsets: Vec<T>,
217 bytes: Vec<u8>,
218}
219
220impl<T: OffsetSizeTrait> VariableWidthDataBlockBuilder<T> {
221 fn new(estimated_size_bytes: u64) -> Self {
222 Self {
223 offsets: vec![T::from_usize(0).unwrap()],
224 bytes: Vec::with_capacity(estimated_size_bytes as usize),
225 }
226 }
227}
228
229impl<T: OffsetSizeTrait + bytemuck::Pod> DataBlockBuilderImpl for VariableWidthDataBlockBuilder<T> {
230 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
231 let block = data_block.as_variable_width_ref().unwrap();
232 assert!(block.bits_per_offset == T::get_byte_width() as u8 * 8);
233 let offsets = block.offsets.borrow_to_typed_view::<T>();
234
235 let start_offset = offsets[selection.start as usize];
236 let end_offset = offsets[selection.end as usize];
237 let mut previous_len = self.bytes.len();
238
239 self.bytes
240 .extend_from_slice(&block.data[start_offset.as_usize()..end_offset.as_usize()]);
241
242 self.offsets.extend(
243 offsets[selection.start as usize..selection.end as usize]
244 .iter()
245 .zip(&offsets[selection.start as usize + 1..=selection.end as usize])
246 .map(|(¤t, &next)| {
247 let this_value_len = next - current;
248 previous_len += this_value_len.as_usize();
249 T::from_usize(previous_len).unwrap()
250 }),
251 );
252 }
253
254 fn finish(self: Box<Self>) -> DataBlock {
255 let num_values = (self.offsets.len() - 1) as u64;
256 DataBlock::VariableWidth(VariableWidthBlock {
257 data: LanceBuffer::from(self.bytes),
258 offsets: LanceBuffer::reinterpret_vec(self.offsets),
259 bits_per_offset: T::get_byte_width() as u8 * 8,
260 num_values,
261 block_info: BlockInfo::new(),
262 })
263 }
264}
265
266#[derive(Debug)]
267struct BitmapDataBlockBuilder {
268 values: BooleanBufferBuilder,
269}
270
271impl BitmapDataBlockBuilder {
272 fn new(estimated_size_bytes: u64) -> Self {
273 Self {
274 values: BooleanBufferBuilder::new(estimated_size_bytes as usize * 8),
275 }
276 }
277}
278
279impl DataBlockBuilderImpl for BitmapDataBlockBuilder {
280 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
281 let bitmap_blk = data_block.as_fixed_width_ref().unwrap();
282 self.values.append_packed_range(
283 selection.start as usize..selection.end as usize,
284 &bitmap_blk.data,
285 );
286 }
287
288 fn finish(mut self: Box<Self>) -> DataBlock {
289 let bool_buf = self.values.finish();
290 let num_values = bool_buf.len() as u64;
291 let bits_buf = bool_buf.into_inner();
292 DataBlock::FixedWidth(FixedWidthDataBlock {
293 data: LanceBuffer::from(bits_buf),
294 bits_per_value: 1,
295 num_values,
296 block_info: BlockInfo::new(),
297 })
298 }
299}
300
301#[derive(Debug)]
302struct FixedWidthDataBlockBuilder {
303 bits_per_value: u64,
304 bytes_per_value: u64,
305 values: Vec<u8>,
306}
307
308impl FixedWidthDataBlockBuilder {
309 fn new(bits_per_value: u64, estimated_size_bytes: u64) -> Self {
310 assert!(bits_per_value % 8 == 0);
311 Self {
312 bits_per_value,
313 bytes_per_value: bits_per_value / 8,
314 values: Vec::with_capacity(estimated_size_bytes as usize),
315 }
316 }
317}
318
319impl DataBlockBuilderImpl for FixedWidthDataBlockBuilder {
320 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
321 let block = data_block.as_fixed_width_ref().unwrap();
322 assert_eq!(self.bits_per_value, block.bits_per_value);
323 let start = selection.start as usize * self.bytes_per_value as usize;
324 let end = selection.end as usize * self.bytes_per_value as usize;
325 self.values.extend_from_slice(&block.data[start..end]);
326 }
327
328 fn finish(self: Box<Self>) -> DataBlock {
329 let num_values = (self.values.len() / self.bytes_per_value as usize) as u64;
330 DataBlock::FixedWidth(FixedWidthDataBlock {
331 data: LanceBuffer::from(self.values),
332 bits_per_value: self.bits_per_value,
333 num_values,
334 block_info: BlockInfo::new(),
335 })
336 }
337}
338
339#[derive(Debug)]
340struct StructDataBlockBuilder {
341 children: Vec<Box<dyn DataBlockBuilderImpl>>,
342}
343
344impl StructDataBlockBuilder {
345 fn new(children: Vec<Box<dyn DataBlockBuilderImpl>>) -> Self {
346 Self { children }
347 }
348}
349
350impl DataBlockBuilderImpl for StructDataBlockBuilder {
351 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
352 let data_block = data_block.as_struct_ref().unwrap();
353 for i in 0..self.children.len() {
354 self.children[i].append(&data_block.children[i], selection.clone());
355 }
356 }
357
358 fn finish(self: Box<Self>) -> DataBlock {
359 let mut children_data_block = Vec::new();
360 for child in self.children {
361 let child_data_block = child.finish();
362 children_data_block.push(child_data_block);
363 }
364 DataBlock::Struct(StructDataBlock {
365 children: children_data_block,
366 block_info: BlockInfo::new(),
367 validity: None,
368 })
369 }
370}
371
372#[derive(Debug, Default)]
373struct AllNullDataBlockBuilder {
374 num_values: u64,
375}
376
377impl DataBlockBuilderImpl for AllNullDataBlockBuilder {
378 fn append(&mut self, _data_block: &DataBlock, selection: Range<u64>) {
379 self.num_values += selection.end - selection.start;
380 }
381
382 fn finish(self: Box<Self>) -> DataBlock {
383 DataBlock::AllNull(AllNullDataBlock {
384 num_values: self.num_values,
385 })
386 }
387}
388
389#[derive(Debug, Clone)]
391pub struct FixedSizeListBlock {
392 pub child: Box<DataBlock>,
394 pub dimension: u64,
396}
397
398impl FixedSizeListBlock {
399 pub fn num_values(&self) -> u64 {
400 self.child.num_values() / self.dimension
401 }
402
403 pub fn try_into_flat(self) -> Option<FixedWidthDataBlock> {
407 match *self.child {
408 DataBlock::Nullable(_) => None,
410 DataBlock::FixedSizeList(inner) => {
411 let mut flat = inner.try_into_flat()?;
412 flat.bits_per_value *= self.dimension;
413 flat.num_values /= self.dimension;
414 Some(flat)
415 }
416 DataBlock::FixedWidth(mut inner) => {
417 inner.bits_per_value *= self.dimension;
418 inner.num_values /= self.dimension;
419 Some(inner)
420 }
421 _ => panic!(
422 "Expected FixedSizeList or FixedWidth data block but found {:?}",
423 self
424 ),
425 }
426 }
427
428 pub fn flatten_as_fixed(&mut self) -> FixedWidthDataBlock {
429 match self.child.as_mut() {
430 DataBlock::FixedSizeList(fsl) => fsl.flatten_as_fixed(),
431 DataBlock::FixedWidth(fw) => fw.clone(),
432 _ => panic!("Expected FixedSizeList or FixedWidth data block"),
433 }
434 }
435
436 pub fn from_flat(data: FixedWidthDataBlock, data_type: &DataType) -> DataBlock {
438 match data_type {
439 DataType::FixedSizeList(child_field, dimension) => {
440 let mut data = data;
441 data.bits_per_value /= *dimension as u64;
442 data.num_values *= *dimension as u64;
443 let child_data = Self::from_flat(data, child_field.data_type());
444 DataBlock::FixedSizeList(Self {
445 child: Box::new(child_data),
446 dimension: *dimension as u64,
447 })
448 }
449 _ => DataBlock::FixedWidth(data),
451 }
452 }
453
454 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
455 let num_values = self.num_values();
456 let builder = match &data_type {
457 DataType::FixedSizeList(child_field, _) => {
458 let child_data = self
459 .child
460 .into_arrow(child_field.data_type().clone(), validate)?;
461 ArrayDataBuilder::new(data_type)
462 .add_child_data(child_data)
463 .len(num_values as usize)
464 .null_count(0)
465 }
466 _ => panic!("Expected FixedSizeList data type and got {:?}", data_type),
467 };
468 if validate {
469 Ok(builder.build()?)
470 } else {
471 Ok(unsafe { builder.build_unchecked() })
472 }
473 }
474
475 fn into_buffers(self) -> Vec<LanceBuffer> {
476 self.child.into_buffers()
477 }
478
479 fn data_size(&self) -> u64 {
480 self.child.data_size()
481 }
482}
483
484#[derive(Debug)]
485struct FixedSizeListBlockBuilder {
486 inner: Box<dyn DataBlockBuilderImpl>,
487 dimension: u64,
488}
489
490impl FixedSizeListBlockBuilder {
491 fn new(inner: Box<dyn DataBlockBuilderImpl>, dimension: u64) -> Self {
492 Self { inner, dimension }
493 }
494}
495
496impl DataBlockBuilderImpl for FixedSizeListBlockBuilder {
497 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
498 let selection = selection.start * self.dimension..selection.end * self.dimension;
499 let fsl = data_block.as_fixed_size_list_ref().unwrap();
500 self.inner.append(fsl.child.as_ref(), selection);
501 }
502
503 fn finish(self: Box<Self>) -> DataBlock {
504 let inner_block = self.inner.finish();
505 DataBlock::FixedSizeList(FixedSizeListBlock {
506 child: Box::new(inner_block),
507 dimension: self.dimension,
508 })
509 }
510}
511
512#[derive(Debug)]
513struct NullableDataBlockBuilder {
514 inner: Box<dyn DataBlockBuilderImpl>,
515 validity: BooleanBufferBuilder,
516}
517
518impl NullableDataBlockBuilder {
519 fn new(inner: Box<dyn DataBlockBuilderImpl>, estimated_size_bytes: usize) -> Self {
520 Self {
521 inner,
522 validity: BooleanBufferBuilder::new(estimated_size_bytes * 8),
523 }
524 }
525}
526
527impl DataBlockBuilderImpl for NullableDataBlockBuilder {
528 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
529 let nullable = data_block.as_nullable_ref().unwrap();
530 let bool_buf = BooleanBuffer::new(
531 nullable.nulls.clone().into_buffer(),
532 selection.start as usize,
533 (selection.end - selection.start) as usize,
534 );
535 self.validity.append_buffer(&bool_buf);
536 self.inner.append(nullable.data.as_ref(), selection);
537 }
538
539 fn finish(mut self: Box<Self>) -> DataBlock {
540 let inner_block = self.inner.finish();
541 DataBlock::Nullable(NullableDataBlock {
542 data: Box::new(inner_block),
543 nulls: LanceBuffer::from(self.validity.finish().into_inner()),
544 block_info: BlockInfo::new(),
545 })
546 }
547}
548
549#[derive(Debug, Clone)]
553pub struct OpaqueBlock {
554 pub buffers: Vec<LanceBuffer>,
555 pub num_values: u64,
556 pub block_info: BlockInfo,
557}
558
559impl OpaqueBlock {
560 pub fn data_size(&self) -> u64 {
561 self.buffers.iter().map(|b| b.len() as u64).sum()
562 }
563}
564
565#[derive(Debug, Clone)]
567pub struct VariableWidthBlock {
568 pub data: LanceBuffer,
570 pub offsets: LanceBuffer,
574 pub bits_per_offset: u8,
576 pub num_values: u64,
578
579 pub block_info: BlockInfo,
580}
581
582impl VariableWidthBlock {
583 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
584 let data_buffer = self.data.into_buffer();
585 let offsets_buffer = self.offsets.into_buffer();
586 let builder = ArrayDataBuilder::new(data_type)
587 .add_buffer(offsets_buffer)
588 .add_buffer(data_buffer)
589 .len(self.num_values as usize)
590 .null_count(0);
591 if validate {
592 Ok(builder.build()?)
593 } else {
594 Ok(unsafe { builder.build_unchecked() })
595 }
596 }
597
598 fn into_buffers(self) -> Vec<LanceBuffer> {
599 vec![self.offsets, self.data]
600 }
601
602 pub fn offsets_as_block(&mut self) -> DataBlock {
603 let offsets = self.offsets.clone();
604 DataBlock::FixedWidth(FixedWidthDataBlock {
605 data: offsets,
606 bits_per_value: self.bits_per_offset as u64,
607 num_values: self.num_values + 1,
608 block_info: BlockInfo::new(),
609 })
610 }
611
612 pub fn data_size(&self) -> u64 {
613 (self.data.len() + self.offsets.len()) as u64
614 }
615}
616
617#[derive(Debug, Clone)]
619pub struct StructDataBlock {
620 pub children: Vec<DataBlock>,
622 pub block_info: BlockInfo,
623 pub validity: Option<NullBuffer>,
625}
626
627impl StructDataBlock {
628 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
629 if let DataType::Struct(fields) = &data_type {
630 let mut builder = ArrayDataBuilder::new(DataType::Struct(fields.clone()));
631 let mut num_rows = 0;
632 for (field, child) in fields.iter().zip(self.children) {
633 let child_data = child.into_arrow(field.data_type().clone(), validate)?;
634 num_rows = child_data.len();
635 builder = builder.add_child_data(child_data);
636 }
637
638 let builder = if let Some(validity) = self.validity {
640 let null_count = validity.null_count();
641 builder
642 .null_bit_buffer(Some(validity.into_inner().into_inner()))
643 .null_count(null_count)
644 } else {
645 builder.null_count(0)
646 };
647
648 let builder = builder.len(num_rows);
649 if validate {
650 Ok(builder.build()?)
651 } else {
652 Ok(unsafe { builder.build_unchecked() })
653 }
654 } else {
655 Err(Error::Internal {
656 message: format!("Expected Struct, got {:?}", data_type),
657 location: location!(),
658 })
659 }
660 }
661
662 fn remove_outer_validity(self) -> Self {
663 Self {
664 children: self
665 .children
666 .into_iter()
667 .map(|c| c.remove_outer_validity())
668 .collect(),
669 block_info: self.block_info,
670 validity: None, }
672 }
673
674 fn into_buffers(self) -> Vec<LanceBuffer> {
675 self.children
676 .into_iter()
677 .flat_map(|c| c.into_buffers())
678 .collect()
679 }
680
681 pub fn has_variable_width_child(&self) -> bool {
682 self.children
683 .iter()
684 .any(|child| !matches!(child, DataBlock::FixedWidth(_)))
685 }
686
687 pub fn data_size(&self) -> u64 {
688 self.children
689 .iter()
690 .map(|data_block| data_block.data_size())
691 .sum()
692 }
693}
694
695#[derive(Debug, Clone)]
697pub struct DictionaryDataBlock {
698 pub indices: FixedWidthDataBlock,
700 pub dictionary: Box<DataBlock>,
702}
703
704impl DictionaryDataBlock {
705 fn decode_helper<K: ArrowDictionaryKeyType>(self) -> Result<DataBlock> {
706 if self.indices.num_values == 0 {
709 return Ok(DataBlock::AllNull(AllNullDataBlock { num_values: 0 }));
710 }
711
712 let estimated_size_bytes = self.dictionary.data_size()
714 * (self.indices.num_values + self.dictionary.num_values() - 1)
715 / self.dictionary.num_values();
716 let mut data_builder = DataBlockBuilder::with_capacity_estimate(estimated_size_bytes);
717
718 let indices = self.indices.data.borrow_to_typed_slice::<K::Native>();
719 let indices = indices.as_ref();
720
721 indices
722 .iter()
723 .map(|idx| idx.to_usize().unwrap() as u64)
724 .for_each(|idx| {
725 data_builder.append(&self.dictionary, idx..idx + 1);
726 });
727
728 Ok(data_builder.finish())
729 }
730
731 pub fn decode(self) -> Result<DataBlock> {
732 match self.indices.bits_per_value {
733 8 => self.decode_helper::<UInt8Type>(),
734 16 => self.decode_helper::<UInt16Type>(),
735 32 => self.decode_helper::<UInt32Type>(),
736 64 => self.decode_helper::<UInt64Type>(),
737 _ => Err(lance_core::Error::Internal {
738 message: format!(
739 "Unsupported dictionary index bit width: {} bits",
740 self.indices.bits_per_value
741 ),
742 location: location!(),
743 }),
744 }
745 }
746
747 fn into_arrow_dict(
748 self,
749 key_type: Box<DataType>,
750 value_type: Box<DataType>,
751 validate: bool,
752 ) -> Result<ArrayData> {
753 let indices = self.indices.into_arrow((*key_type).clone(), validate)?;
754 let dictionary = self
755 .dictionary
756 .into_arrow((*value_type).clone(), validate)?;
757
758 let builder = indices
759 .into_builder()
760 .add_child_data(dictionary)
761 .data_type(DataType::Dictionary(key_type, value_type));
762
763 if validate {
764 Ok(builder.build()?)
765 } else {
766 Ok(unsafe { builder.build_unchecked() })
767 }
768 }
769
770 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
771 if let DataType::Dictionary(key_type, value_type) = data_type {
772 self.into_arrow_dict(key_type, value_type, validate)
773 } else {
774 self.decode()?.into_arrow(data_type, validate)
775 }
776 }
777
778 fn into_buffers(self) -> Vec<LanceBuffer> {
779 let mut buffers = self.indices.into_buffers();
780 buffers.extend(self.dictionary.into_buffers());
781 buffers
782 }
783
784 pub fn into_parts(self) -> (DataBlock, DataBlock) {
785 (DataBlock::FixedWidth(self.indices), *self.dictionary)
786 }
787
788 pub fn from_parts(indices: FixedWidthDataBlock, dictionary: DataBlock) -> Self {
789 Self {
790 indices,
791 dictionary: Box::new(dictionary),
792 }
793 }
794}
795
796#[derive(Debug, Clone)]
811pub enum DataBlock {
812 Empty(),
813 Constant(ConstantDataBlock),
814 AllNull(AllNullDataBlock),
815 Nullable(NullableDataBlock),
816 FixedWidth(FixedWidthDataBlock),
817 FixedSizeList(FixedSizeListBlock),
818 VariableWidth(VariableWidthBlock),
819 Opaque(OpaqueBlock),
820 Struct(StructDataBlock),
821 Dictionary(DictionaryDataBlock),
822}
823
824impl DataBlock {
825 pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
827 match self {
828 Self::Empty() => Ok(new_empty_array(&data_type).to_data()),
829 Self::Constant(inner) => inner.into_arrow(data_type, validate),
830 Self::AllNull(inner) => inner.into_arrow(data_type, validate),
831 Self::Nullable(inner) => inner.into_arrow(data_type, validate),
832 Self::FixedWidth(inner) => inner.into_arrow(data_type, validate),
833 Self::FixedSizeList(inner) => inner.into_arrow(data_type, validate),
834 Self::VariableWidth(inner) => inner.into_arrow(data_type, validate),
835 Self::Struct(inner) => inner.into_arrow(data_type, validate),
836 Self::Dictionary(inner) => inner.into_arrow(data_type, validate),
837 Self::Opaque(_) => Err(Error::Internal {
838 message: "Cannot convert OpaqueBlock to Arrow".to_string(),
839 location: location!(),
840 }),
841 }
842 }
843
844 pub fn into_buffers(self) -> Vec<LanceBuffer> {
848 match self {
849 Self::Empty() => Vec::default(),
850 Self::Constant(inner) => inner.into_buffers(),
851 Self::AllNull(inner) => inner.into_buffers(),
852 Self::Nullable(inner) => inner.into_buffers(),
853 Self::FixedWidth(inner) => inner.into_buffers(),
854 Self::FixedSizeList(inner) => inner.into_buffers(),
855 Self::VariableWidth(inner) => inner.into_buffers(),
856 Self::Struct(inner) => inner.into_buffers(),
857 Self::Dictionary(inner) => inner.into_buffers(),
858 Self::Opaque(inner) => inner.buffers,
859 }
860 }
861
862 pub fn try_clone(&self) -> Result<Self> {
871 match self {
872 Self::Empty() => Ok(Self::Empty()),
873 Self::Constant(inner) => Ok(Self::Constant(inner.clone())),
874 Self::AllNull(inner) => Ok(Self::AllNull(inner.clone())),
875 Self::Nullable(inner) => Ok(Self::Nullable(inner.clone())),
876 Self::FixedWidth(inner) => Ok(Self::FixedWidth(inner.clone())),
877 Self::FixedSizeList(inner) => Ok(Self::FixedSizeList(inner.clone())),
878 Self::VariableWidth(inner) => Ok(Self::VariableWidth(inner.clone())),
879 Self::Struct(inner) => Ok(Self::Struct(inner.clone())),
880 Self::Dictionary(inner) => Ok(Self::Dictionary(inner.clone())),
881 Self::Opaque(inner) => Ok(Self::Opaque(inner.clone())),
882 }
883 }
884
885 pub fn name(&self) -> &'static str {
886 match self {
887 Self::Constant(_) => "Constant",
888 Self::Empty() => "Empty",
889 Self::AllNull(_) => "AllNull",
890 Self::Nullable(_) => "Nullable",
891 Self::FixedWidth(_) => "FixedWidth",
892 Self::FixedSizeList(_) => "FixedSizeList",
893 Self::VariableWidth(_) => "VariableWidth",
894 Self::Struct(_) => "Struct",
895 Self::Dictionary(_) => "Dictionary",
896 Self::Opaque(_) => "Opaque",
897 }
898 }
899
900 pub fn is_variable(&self) -> bool {
901 match self {
902 Self::Constant(_) => false,
903 Self::Empty() => false,
904 Self::AllNull(_) => false,
905 Self::Nullable(nullable) => nullable.data.is_variable(),
906 Self::FixedWidth(_) => false,
907 Self::FixedSizeList(fsl) => fsl.child.is_variable(),
908 Self::VariableWidth(_) => true,
909 Self::Struct(strct) => strct.children.iter().any(|c| c.is_variable()),
910 Self::Dictionary(_) => {
911 todo!("is_variable for DictionaryDataBlock is not implemented yet")
912 }
913 Self::Opaque(_) => panic!("Does not make sense to ask if an Opaque block is variable"),
914 }
915 }
916
917 pub fn is_nullable(&self) -> bool {
918 match self {
919 Self::AllNull(_) => true,
920 Self::Nullable(_) => true,
921 Self::FixedSizeList(fsl) => fsl.child.is_nullable(),
922 Self::Struct(strct) => strct.children.iter().any(|c| c.is_nullable()),
923 Self::Dictionary(_) => {
924 todo!("is_nullable for DictionaryDataBlock is not implemented yet")
925 }
926 Self::Opaque(_) => panic!("Does not make sense to ask if an Opaque block is nullable"),
927 _ => false,
928 }
929 }
930
931 pub fn num_values(&self) -> u64 {
936 match self {
937 Self::Empty() => 0,
938 Self::Constant(inner) => inner.num_values,
939 Self::AllNull(inner) => inner.num_values,
940 Self::Nullable(inner) => inner.data.num_values(),
941 Self::FixedWidth(inner) => inner.num_values,
942 Self::FixedSizeList(inner) => inner.num_values(),
943 Self::VariableWidth(inner) => inner.num_values,
944 Self::Struct(inner) => inner.children[0].num_values(),
945 Self::Dictionary(inner) => inner.indices.num_values,
946 Self::Opaque(inner) => inner.num_values,
947 }
948 }
949
950 pub fn items_per_row(&self) -> u64 {
954 match self {
955 Self::Empty() => todo!(), Self::Constant(_) => todo!(), Self::AllNull(_) => todo!(), Self::Nullable(nullable) => nullable.data.items_per_row(),
959 Self::FixedWidth(_) => 1,
960 Self::FixedSizeList(fsl) => fsl.dimension * fsl.child.items_per_row(),
961 Self::VariableWidth(_) => 1,
962 Self::Struct(_) => todo!(), Self::Dictionary(_) => 1,
964 Self::Opaque(_) => 1,
965 }
966 }
967
968 pub fn data_size(&self) -> u64 {
970 match self {
971 Self::Empty() => 0,
972 Self::Constant(inner) => inner.data_size(),
973 Self::AllNull(_) => 0,
974 Self::Nullable(inner) => inner.data_size(),
975 Self::FixedWidth(inner) => inner.data_size(),
976 Self::FixedSizeList(inner) => inner.data_size(),
977 Self::VariableWidth(inner) => inner.data_size(),
978 Self::Struct(_) => {
979 todo!("the data_size method for StructDataBlock is not implemented yet")
980 }
981 Self::Dictionary(_) => {
982 todo!("the data_size method for DictionaryDataBlock is not implemented yet")
983 }
984 Self::Opaque(inner) => inner.data_size(),
985 }
986 }
987
988 pub fn remove_outer_validity(self) -> Self {
996 match self {
997 Self::AllNull(_) => panic!("Cannot remove validity on all-null data"),
998 Self::Nullable(inner) => *inner.data,
999 Self::Struct(inner) => Self::Struct(inner.remove_outer_validity()),
1000 other => other,
1001 }
1002 }
1003
1004 pub fn make_builder(&self, estimated_size_bytes: u64) -> Box<dyn DataBlockBuilderImpl> {
1005 match self {
1006 Self::FixedWidth(inner) => {
1007 if inner.bits_per_value == 1 {
1008 Box::new(BitmapDataBlockBuilder::new(estimated_size_bytes))
1009 } else {
1010 Box::new(FixedWidthDataBlockBuilder::new(
1011 inner.bits_per_value,
1012 estimated_size_bytes,
1013 ))
1014 }
1015 }
1016 Self::VariableWidth(inner) => {
1017 if inner.bits_per_offset == 32 {
1018 Box::new(VariableWidthDataBlockBuilder::<i32>::new(
1019 estimated_size_bytes,
1020 ))
1021 } else if inner.bits_per_offset == 64 {
1022 Box::new(VariableWidthDataBlockBuilder::<i64>::new(
1023 estimated_size_bytes,
1024 ))
1025 } else {
1026 todo!()
1027 }
1028 }
1029 Self::FixedSizeList(inner) => {
1030 let inner_builder = inner.child.make_builder(estimated_size_bytes);
1031 Box::new(FixedSizeListBlockBuilder::new(
1032 inner_builder,
1033 inner.dimension,
1034 ))
1035 }
1036 Self::Nullable(nullable) => {
1037 let estimated_validity_size_bytes = estimated_size_bytes / 16;
1040 let inner_builder = nullable
1041 .data
1042 .make_builder(estimated_size_bytes - estimated_validity_size_bytes);
1043 Box::new(NullableDataBlockBuilder::new(
1044 inner_builder,
1045 estimated_validity_size_bytes as usize,
1046 ))
1047 }
1048 Self::Struct(struct_data_block) => {
1049 let num_children = struct_data_block.children.len();
1050 let per_child_estimate = if num_children == 0 {
1051 0
1052 } else {
1053 estimated_size_bytes / num_children as u64
1054 };
1055 let child_builders = struct_data_block
1056 .children
1057 .iter()
1058 .map(|child| child.make_builder(per_child_estimate))
1059 .collect();
1060 Box::new(StructDataBlockBuilder::new(child_builders))
1061 }
1062 Self::AllNull(_) => Box::new(AllNullDataBlockBuilder::default()),
1063 _ => todo!("make_builder for {:?}", self),
1064 }
1065 }
1066}
1067
1068macro_rules! as_type {
1069 ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1070 pub fn $fn_name(self) -> Option<$inner_type> {
1071 match self {
1072 Self::$inner(inner) => Some(inner),
1073 _ => None,
1074 }
1075 }
1076 };
1077}
1078
1079macro_rules! as_type_ref {
1080 ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1081 pub fn $fn_name(&self) -> Option<&$inner_type> {
1082 match self {
1083 Self::$inner(inner) => Some(inner),
1084 _ => None,
1085 }
1086 }
1087 };
1088}
1089
1090macro_rules! as_type_ref_mut {
1091 ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1092 pub fn $fn_name(&mut self) -> Option<&mut $inner_type> {
1093 match self {
1094 Self::$inner(inner) => Some(inner),
1095 _ => None,
1096 }
1097 }
1098 };
1099}
1100
1101impl DataBlock {
1103 as_type!(as_all_null, AllNull, AllNullDataBlock);
1104 as_type!(as_nullable, Nullable, NullableDataBlock);
1105 as_type!(as_fixed_width, FixedWidth, FixedWidthDataBlock);
1106 as_type!(as_fixed_size_list, FixedSizeList, FixedSizeListBlock);
1107 as_type!(as_variable_width, VariableWidth, VariableWidthBlock);
1108 as_type!(as_struct, Struct, StructDataBlock);
1109 as_type!(as_dictionary, Dictionary, DictionaryDataBlock);
1110 as_type_ref!(as_all_null_ref, AllNull, AllNullDataBlock);
1111 as_type_ref!(as_nullable_ref, Nullable, NullableDataBlock);
1112 as_type_ref!(as_fixed_width_ref, FixedWidth, FixedWidthDataBlock);
1113 as_type_ref!(as_fixed_size_list_ref, FixedSizeList, FixedSizeListBlock);
1114 as_type_ref!(as_variable_width_ref, VariableWidth, VariableWidthBlock);
1115 as_type_ref!(as_struct_ref, Struct, StructDataBlock);
1116 as_type_ref!(as_dictionary_ref, Dictionary, DictionaryDataBlock);
1117 as_type_ref_mut!(as_all_null_ref_mut, AllNull, AllNullDataBlock);
1118 as_type_ref_mut!(as_nullable_ref_mut, Nullable, NullableDataBlock);
1119 as_type_ref_mut!(as_fixed_width_ref_mut, FixedWidth, FixedWidthDataBlock);
1120 as_type_ref_mut!(
1121 as_fixed_size_list_ref_mut,
1122 FixedSizeList,
1123 FixedSizeListBlock
1124 );
1125 as_type_ref_mut!(as_variable_width_ref_mut, VariableWidth, VariableWidthBlock);
1126 as_type_ref_mut!(as_struct_ref_mut, Struct, StructDataBlock);
1127 as_type_ref_mut!(as_dictionary_ref_mut, Dictionary, DictionaryDataBlock);
1128}
1129
1130fn get_byte_range<T: ArrowNativeType>(offsets: &mut LanceBuffer) -> Range<usize> {
1133 let offsets = offsets.borrow_to_typed_slice::<T>();
1134 if offsets.as_ref().is_empty() {
1135 0..0
1136 } else {
1137 offsets.as_ref().first().unwrap().as_usize()..offsets.as_ref().last().unwrap().as_usize()
1138 }
1139}
1140
1141fn stitch_offsets<T: ArrowNativeType + std::ops::Add<Output = T> + std::ops::Sub<Output = T>>(
1147 offsets: Vec<LanceBuffer>,
1148) -> (LanceBuffer, Vec<Range<usize>>) {
1149 if offsets.is_empty() {
1150 return (LanceBuffer::empty(), Vec::default());
1151 }
1152 let len = offsets.iter().map(|b| b.len()).sum::<usize>();
1153 let mut dest = Vec::with_capacity(len);
1157 let mut byte_ranges = Vec::with_capacity(offsets.len());
1158
1159 dest.push(T::from_usize(0).unwrap());
1161
1162 for mut o in offsets.into_iter() {
1163 if !o.is_empty() {
1164 let last_offset = *dest.last().unwrap();
1165 let o = o.borrow_to_typed_slice::<T>();
1166 let start = *o.as_ref().first().unwrap();
1167 dest.extend(o.as_ref()[1..].iter().map(|&x| x + last_offset - start));
1181 }
1182 byte_ranges.push(get_byte_range::<T>(&mut o));
1183 }
1184 (LanceBuffer::reinterpret_vec(dest), byte_ranges)
1185}
1186
1187fn arrow_binary_to_data_block(
1188 arrays: &[ArrayRef],
1189 num_values: u64,
1190 bits_per_offset: u8,
1191) -> DataBlock {
1192 let data_vec = arrays.iter().map(|arr| arr.to_data()).collect::<Vec<_>>();
1193 let bytes_per_offset = bits_per_offset as usize / 8;
1194 let offsets = data_vec
1195 .iter()
1196 .map(|d| {
1197 LanceBuffer::from(
1198 d.buffers()[0].slice_with_length(d.offset(), (d.len() + 1) * bytes_per_offset),
1199 )
1200 })
1201 .collect::<Vec<_>>();
1202 let (offsets, data_ranges) = if bits_per_offset == 32 {
1203 stitch_offsets::<i32>(offsets)
1204 } else {
1205 stitch_offsets::<i64>(offsets)
1206 };
1207 let data = data_vec
1208 .iter()
1209 .zip(data_ranges)
1210 .map(|(d, byte_range)| {
1211 LanceBuffer::from(
1212 d.buffers()[1]
1213 .slice_with_length(byte_range.start, byte_range.end - byte_range.start),
1214 )
1215 })
1216 .collect::<Vec<_>>();
1217 let data = LanceBuffer::concat_into_one(data);
1218 DataBlock::VariableWidth(VariableWidthBlock {
1219 data,
1220 offsets,
1221 bits_per_offset,
1222 num_values,
1223 block_info: BlockInfo::new(),
1224 })
1225}
1226
1227fn encode_flat_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1228 let bytes_per_value = arrays[0].data_type().byte_width();
1229 let mut buffer = Vec::with_capacity(num_values as usize * bytes_per_value);
1230 for arr in arrays {
1231 let data = arr.to_data();
1232 buffer.extend_from_slice(data.buffers()[0].as_slice());
1233 }
1234 LanceBuffer::from(buffer)
1235}
1236
1237fn do_encode_bitmap_data(bitmaps: &[BooleanBuffer], num_values: u64) -> LanceBuffer {
1238 let mut builder = BooleanBufferBuilder::new(num_values as usize);
1239
1240 for buf in bitmaps {
1241 builder.append_buffer(buf);
1242 }
1243
1244 let buffer = builder.finish().into_inner();
1245 LanceBuffer::from(buffer)
1246}
1247
1248fn encode_bitmap_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1249 let bitmaps = arrays
1250 .iter()
1251 .map(|arr| arr.as_boolean().values().clone())
1252 .collect::<Vec<_>>();
1253 do_encode_bitmap_data(&bitmaps, num_values)
1254}
1255
1256fn concat_dict_arrays(arrays: &[ArrayRef]) -> ArrayRef {
1259 let value_type = arrays[0].as_any_dictionary().values().data_type();
1260 let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1261 match arrow_select::concat::concat(&array_refs) {
1262 Ok(array) => array,
1263 Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1264 let upscaled = array_refs
1266 .iter()
1267 .map(|arr| {
1268 match arrow_cast::cast(
1269 *arr,
1270 &DataType::Dictionary(
1271 Box::new(DataType::UInt32),
1272 Box::new(value_type.clone()),
1273 ),
1274 ) {
1275 Ok(arr) => arr,
1276 Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1277 unimplemented!("Dictionary arrays with more than 2^32 unique values")
1279 }
1280 err => err.unwrap(),
1281 }
1282 })
1283 .collect::<Vec<_>>();
1284 let array_refs = upscaled.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1285 match arrow_select::concat::concat(&array_refs) {
1287 Ok(array) => array,
1288 Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1289 unimplemented!("Dictionary arrays with more than 2^32 unique values")
1290 }
1291 err => err.unwrap(),
1292 }
1293 }
1294 err => err.unwrap(),
1296 }
1297}
1298
1299fn max_index_val(index_type: &DataType) -> u64 {
1300 match index_type {
1301 DataType::Int8 => i8::MAX as u64,
1302 DataType::Int16 => i16::MAX as u64,
1303 DataType::Int32 => i32::MAX as u64,
1304 DataType::Int64 => i64::MAX as u64,
1305 DataType::UInt8 => u8::MAX as u64,
1306 DataType::UInt16 => u16::MAX as u64,
1307 DataType::UInt32 => u32::MAX as u64,
1308 DataType::UInt64 => u64::MAX,
1309 _ => panic!("Invalid dictionary index type"),
1310 }
1311}
1312
1313fn arrow_dictionary_to_data_block(arrays: &[ArrayRef], validity: Option<NullBuffer>) -> DataBlock {
1332 let array = concat_dict_arrays(arrays);
1333 let array_dict = array.as_any_dictionary();
1334 let mut indices = array_dict.keys();
1335 let num_values = indices.len() as u64;
1336 let mut values = array_dict.values().clone();
1337 let mut upcast = None;
1339
1340 let indices_block = if let Some(validity) = validity {
1344 let mut first_invalid_index = None;
1348 if let Some(values_validity) = values.nulls() {
1349 first_invalid_index = (!values_validity.inner()).set_indices().next();
1350 }
1351 let first_invalid_index = first_invalid_index.unwrap_or_else(|| {
1352 let null_arr = new_null_array(values.data_type(), 1);
1353 values = arrow_select::concat::concat(&[values.as_ref(), null_arr.as_ref()]).unwrap();
1354 let null_index = values.len() - 1;
1355 let max_index_val = max_index_val(indices.data_type());
1356 if null_index as u64 > max_index_val {
1357 if max_index_val >= u32::MAX as u64 {
1359 unimplemented!("Dictionary arrays with 2^32 unique value (or more) and a null")
1360 }
1361 upcast = Some(arrow_cast::cast(indices, &DataType::UInt32).unwrap());
1362 indices = upcast.as_ref().unwrap();
1363 }
1364 null_index
1365 });
1366 let null_index_arr = arrow_cast::cast(
1368 &UInt64Array::from(vec![first_invalid_index as u64]),
1369 indices.data_type(),
1370 )
1371 .unwrap();
1372
1373 let bytes_per_index = indices.data_type().byte_width();
1374 let bits_per_index = bytes_per_index as u64 * 8;
1375
1376 let null_index_arr = null_index_arr.into_data();
1377 let null_index_bytes = &null_index_arr.buffers()[0];
1378 let mut indices_bytes = indices.to_data().buffers()[0].to_vec();
1380 for invalid_idx in (!validity.inner()).set_indices() {
1381 indices_bytes[invalid_idx * bytes_per_index..(invalid_idx + 1) * bytes_per_index]
1382 .copy_from_slice(null_index_bytes.as_slice());
1383 }
1384 FixedWidthDataBlock {
1385 data: LanceBuffer::from(indices_bytes),
1386 bits_per_value: bits_per_index,
1387 num_values,
1388 block_info: BlockInfo::new(),
1389 }
1390 } else {
1391 FixedWidthDataBlock {
1392 data: LanceBuffer::from(indices.to_data().buffers()[0].clone()),
1393 bits_per_value: indices.data_type().byte_width() as u64 * 8,
1394 num_values,
1395 block_info: BlockInfo::new(),
1396 }
1397 };
1398
1399 let items = DataBlock::from(values);
1400 DataBlock::Dictionary(DictionaryDataBlock {
1401 indices: indices_block,
1402 dictionary: Box::new(items),
1403 })
1404}
1405
1406enum Nullability {
1407 None,
1408 All,
1409 Some(NullBuffer),
1410}
1411
1412impl Nullability {
1413 fn to_option(&self) -> Option<NullBuffer> {
1414 match self {
1415 Self::Some(nulls) => Some(nulls.clone()),
1416 _ => None,
1417 }
1418 }
1419}
1420
1421fn extract_nulls(arrays: &[ArrayRef], num_values: u64) -> Nullability {
1422 let mut has_nulls = false;
1423 let nulls_and_lens = arrays
1424 .iter()
1425 .map(|arr| {
1426 let nulls = arr.logical_nulls();
1427 has_nulls |= nulls.is_some();
1428 (nulls, arr.len())
1429 })
1430 .collect::<Vec<_>>();
1431 if !has_nulls {
1432 return Nullability::None;
1433 }
1434 let mut builder = BooleanBufferBuilder::new(num_values as usize);
1435 let mut num_nulls = 0;
1436 for (null, len) in nulls_and_lens {
1437 if let Some(null) = null {
1438 num_nulls += null.null_count();
1439 builder.append_buffer(&null.into_inner());
1440 } else {
1441 builder.append_n(len, true);
1442 }
1443 }
1444 if num_nulls == num_values as usize {
1445 Nullability::All
1446 } else {
1447 Nullability::Some(NullBuffer::new(builder.finish()))
1448 }
1449}
1450
1451impl DataBlock {
1452 pub fn from_arrays(arrays: &[ArrayRef], num_values: u64) -> Self {
1453 if arrays.is_empty() || num_values == 0 {
1454 return Self::AllNull(AllNullDataBlock { num_values: 0 });
1455 }
1456
1457 let data_type = arrays[0].data_type();
1458 let nulls = extract_nulls(arrays, num_values);
1459
1460 if let Nullability::All = nulls {
1461 return Self::AllNull(AllNullDataBlock { num_values });
1462 }
1463
1464 let mut encoded = match data_type {
1465 DataType::Binary | DataType::Utf8 => arrow_binary_to_data_block(arrays, num_values, 32),
1466 DataType::BinaryView | DataType::Utf8View => {
1467 todo!()
1468 }
1469 DataType::LargeBinary | DataType::LargeUtf8 => {
1470 arrow_binary_to_data_block(arrays, num_values, 64)
1471 }
1472 DataType::Boolean => {
1473 let data = encode_bitmap_data(arrays, num_values);
1474 Self::FixedWidth(FixedWidthDataBlock {
1475 data,
1476 bits_per_value: 1,
1477 num_values,
1478 block_info: BlockInfo::new(),
1479 })
1480 }
1481 DataType::Date32
1482 | DataType::Date64
1483 | DataType::Decimal32(_, _)
1484 | DataType::Decimal64(_, _)
1485 | DataType::Decimal128(_, _)
1486 | DataType::Decimal256(_, _)
1487 | DataType::Duration(_)
1488 | DataType::FixedSizeBinary(_)
1489 | DataType::Float16
1490 | DataType::Float32
1491 | DataType::Float64
1492 | DataType::Int16
1493 | DataType::Int32
1494 | DataType::Int64
1495 | DataType::Int8
1496 | DataType::Interval(_)
1497 | DataType::Time32(_)
1498 | DataType::Time64(_)
1499 | DataType::Timestamp(_, _)
1500 | DataType::UInt16
1501 | DataType::UInt32
1502 | DataType::UInt64
1503 | DataType::UInt8 => {
1504 let data = encode_flat_data(arrays, num_values);
1505 Self::FixedWidth(FixedWidthDataBlock {
1506 data,
1507 bits_per_value: data_type.byte_width() as u64 * 8,
1508 num_values,
1509 block_info: BlockInfo::new(),
1510 })
1511 }
1512 DataType::Null => Self::AllNull(AllNullDataBlock { num_values }),
1513 DataType::Dictionary(_, _) => arrow_dictionary_to_data_block(arrays, nulls.to_option()),
1514 DataType::Struct(fields) => {
1515 let structs = arrays.iter().map(|arr| arr.as_struct()).collect::<Vec<_>>();
1516 let mut children = Vec::with_capacity(fields.len());
1517 for child_idx in 0..fields.len() {
1518 let child_vec = structs
1519 .iter()
1520 .map(|s| s.column(child_idx).clone())
1521 .collect::<Vec<_>>();
1522 children.push(Self::from_arrays(&child_vec, num_values));
1523 }
1524
1525 let validity = match &nulls {
1527 Nullability::None => None,
1528 Nullability::Some(null_buffer) => Some(null_buffer.clone()),
1529 Nullability::All => unreachable!("Should have returned AllNull earlier"),
1530 };
1531
1532 Self::Struct(StructDataBlock {
1533 children,
1534 block_info: BlockInfo::default(),
1535 validity,
1536 })
1537 }
1538 DataType::FixedSizeList(_, dim) => {
1539 let children = arrays
1540 .iter()
1541 .map(|arr| arr.as_fixed_size_list().values().clone())
1542 .collect::<Vec<_>>();
1543 let child_block = Self::from_arrays(&children, num_values * *dim as u64);
1544 Self::FixedSizeList(FixedSizeListBlock {
1545 child: Box::new(child_block),
1546 dimension: *dim as u64,
1547 })
1548 }
1549 DataType::LargeList(_)
1550 | DataType::List(_)
1551 | DataType::ListView(_)
1552 | DataType::LargeListView(_)
1553 | DataType::Map(_, _)
1554 | DataType::RunEndEncoded(_, _)
1555 | DataType::Union(_, _) => {
1556 panic!(
1557 "Field with data type {} cannot be converted to data block",
1558 data_type
1559 )
1560 }
1561 };
1562
1563 encoded.compute_stat();
1565
1566 if !matches!(data_type, DataType::Dictionary(_, _)) {
1567 match nulls {
1568 Nullability::None => encoded,
1569 Nullability::Some(nulls) => Self::Nullable(NullableDataBlock {
1570 data: Box::new(encoded),
1571 nulls: LanceBuffer::from(nulls.into_inner().into_inner()),
1572 block_info: BlockInfo::new(),
1573 }),
1574 _ => unreachable!(),
1575 }
1576 } else {
1577 encoded
1579 }
1580 }
1581
1582 pub fn from_array<T: Array + 'static>(array: T) -> Self {
1583 let num_values = array.len();
1584 Self::from_arrays(&[Arc::new(array)], num_values as u64)
1585 }
1586}
1587
1588impl From<ArrayRef> for DataBlock {
1589 fn from(array: ArrayRef) -> Self {
1590 let num_values = array.len() as u64;
1591 Self::from_arrays(&[array], num_values)
1592 }
1593}
1594
1595pub trait DataBlockBuilderImpl: std::fmt::Debug {
1596 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>);
1597 fn finish(self: Box<Self>) -> DataBlock;
1598}
1599
1600#[derive(Debug)]
1601pub struct DataBlockBuilder {
1602 estimated_size_bytes: u64,
1603 builder: Option<Box<dyn DataBlockBuilderImpl>>,
1604}
1605
1606impl DataBlockBuilder {
1607 pub fn with_capacity_estimate(estimated_size_bytes: u64) -> Self {
1608 Self {
1609 estimated_size_bytes,
1610 builder: None,
1611 }
1612 }
1613
1614 fn get_builder(&mut self, block: &DataBlock) -> &mut dyn DataBlockBuilderImpl {
1615 if self.builder.is_none() {
1616 self.builder = Some(block.make_builder(self.estimated_size_bytes));
1617 }
1618 self.builder.as_mut().unwrap().as_mut()
1619 }
1620
1621 pub fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
1622 self.get_builder(data_block).append(data_block, selection);
1623 }
1624
1625 pub fn finish(self) -> DataBlock {
1626 let builder = self.builder.expect("DataBlockBuilder didn't see any data");
1627 builder.finish()
1628 }
1629}
1630
1631#[cfg(test)]
1632mod tests {
1633 use std::sync::Arc;
1634
1635 use arrow_array::{
1636 make_array, new_null_array,
1637 types::{Int32Type, Int8Type},
1638 ArrayRef, DictionaryArray, Int8Array, LargeBinaryArray, StringArray, UInt16Array,
1639 UInt8Array,
1640 };
1641 use arrow_buffer::{BooleanBuffer, NullBuffer};
1642
1643 use arrow_schema::{DataType, Field, Fields};
1644 use lance_datagen::{array, ArrayGeneratorExt, RowCount, DEFAULT_SEED};
1645 use rand::SeedableRng;
1646
1647 use crate::buffer::LanceBuffer;
1648
1649 use super::{AllNullDataBlock, DataBlock};
1650
1651 use arrow_array::Array;
1652
1653 #[test]
1654 fn test_sliced_to_data_block() {
1655 let ints = UInt16Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
1656 let ints = ints.slice(2, 4);
1657 let data = DataBlock::from_array(ints);
1658
1659 let fixed_data = data.as_fixed_width().unwrap();
1660 assert_eq!(fixed_data.num_values, 4);
1661 assert_eq!(fixed_data.data.len(), 8);
1662
1663 let nullable_ints =
1664 UInt16Array::from(vec![Some(0), None, Some(2), None, Some(4), None, Some(6)]);
1665 let nullable_ints = nullable_ints.slice(1, 3);
1666 let data = DataBlock::from_array(nullable_ints);
1667
1668 let nullable = data.as_nullable().unwrap();
1669 assert_eq!(nullable.nulls, LanceBuffer::from(vec![0b00000010]));
1670 }
1671
1672 #[test]
1673 fn test_string_to_data_block() {
1674 let strings1 = StringArray::from(vec![Some("hello"), None, Some("world")]);
1676 let strings2 = StringArray::from(vec![Some("a"), Some("b")]);
1677 let strings3 = StringArray::from(vec![Option::<&'static str>::None, None]);
1678
1679 let arrays = &[strings1, strings2, strings3]
1680 .iter()
1681 .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1682 .collect::<Vec<_>>();
1683
1684 let block = DataBlock::from_arrays(arrays, 7);
1685
1686 assert_eq!(block.num_values(), 7);
1687 let block = block.as_nullable().unwrap();
1688
1689 assert_eq!(block.nulls, LanceBuffer::from(vec![0b00011101]));
1690
1691 let data = block.data.as_variable_width().unwrap();
1692 assert_eq!(
1693 data.offsets,
1694 LanceBuffer::reinterpret_vec(vec![0, 5, 5, 10, 11, 12, 12, 12])
1695 );
1696
1697 assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworldab"));
1698
1699 let strings1 = StringArray::from(vec![Some("a"), Some("bc")]);
1701 let strings2 = StringArray::from(vec![Some("def")]);
1702
1703 let arrays = &[strings1, strings2]
1704 .iter()
1705 .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1706 .collect::<Vec<_>>();
1707
1708 let block = DataBlock::from_arrays(arrays, 3);
1709
1710 assert_eq!(block.num_values(), 3);
1711 let data = block.as_variable_width().unwrap();
1713 assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(vec![0, 1, 3, 6]));
1714 assert_eq!(data.data, LanceBuffer::copy_slice(b"abcdef"));
1715 }
1716
1717 #[test]
1718 fn test_string_sliced() {
1719 let check = |arr: Vec<StringArray>, expected_off: Vec<i32>, expected_data: &[u8]| {
1720 let arrs = arr
1721 .into_iter()
1722 .map(|a| Arc::new(a) as ArrayRef)
1723 .collect::<Vec<_>>();
1724 let num_rows = arrs.iter().map(|a| a.len()).sum::<usize>() as u64;
1725 let data = DataBlock::from_arrays(&arrs, num_rows);
1726
1727 assert_eq!(data.num_values(), num_rows);
1728
1729 let data = data.as_variable_width().unwrap();
1730 assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(expected_off));
1731 assert_eq!(data.data, LanceBuffer::copy_slice(expected_data));
1732 };
1733
1734 let string = StringArray::from(vec![Some("hello"), Some("world")]);
1735 check(vec![string.slice(1, 1)], vec![0, 5], b"world");
1736 check(vec![string.slice(0, 1)], vec![0, 5], b"hello");
1737 check(
1738 vec![string.slice(0, 1), string.slice(1, 1)],
1739 vec![0, 5, 10],
1740 b"helloworld",
1741 );
1742
1743 let string2 = StringArray::from(vec![Some("foo"), Some("bar")]);
1744 check(
1745 vec![string.slice(0, 1), string2.slice(0, 1)],
1746 vec![0, 5, 8],
1747 b"hellofoo",
1748 );
1749 }
1750
1751 #[test]
1752 fn test_large() {
1753 let arr = LargeBinaryArray::from_vec(vec![b"hello", b"world"]);
1754 let data = DataBlock::from_array(arr);
1755
1756 assert_eq!(data.num_values(), 2);
1757 let data = data.as_variable_width().unwrap();
1758 assert_eq!(data.bits_per_offset, 64);
1759 assert_eq!(data.num_values, 2);
1760 assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworld"));
1761 assert_eq!(
1762 data.offsets,
1763 LanceBuffer::reinterpret_vec(vec![0_u64, 5, 10])
1764 );
1765 }
1766
1767 #[test]
1768 fn test_dictionary_indices_normalized() {
1769 let arr1 = DictionaryArray::<Int8Type>::from_iter([Some("a"), Some("a"), Some("b")]);
1770 let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("b"), Some("c")]);
1771
1772 let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1773
1774 assert_eq!(data.num_values(), 5);
1775 let data = data.as_dictionary().unwrap();
1776 let indices = data.indices;
1777 assert_eq!(indices.bits_per_value, 8);
1778 assert_eq!(indices.num_values, 5);
1779 assert_eq!(
1780 indices.data,
1781 LanceBuffer::reinterpret_vec::<i8>(vec![0, 0, 1, 2, 3])
1785 );
1786
1787 let items = data.dictionary.as_variable_width().unwrap();
1788 assert_eq!(items.bits_per_offset, 32);
1789 assert_eq!(items.num_values, 4);
1790 assert_eq!(items.data, LanceBuffer::copy_slice(b"abbc"));
1791 assert_eq!(
1792 items.offsets,
1793 LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 4],)
1794 );
1795 }
1796
1797 #[test]
1798 fn test_dictionary_nulls() {
1799 let arr1 = DictionaryArray::<Int8Type>::from_iter([None, Some("a"), Some("b")]);
1803 let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("c"), None]);
1804
1805 let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1806
1807 let check_common = |data: DataBlock| {
1808 assert_eq!(data.num_values(), 5);
1809 let dict = data.as_dictionary().unwrap();
1810
1811 let nullable_items = dict.dictionary.as_nullable().unwrap();
1812 assert_eq!(nullable_items.nulls, LanceBuffer::from(vec![0b00000111]));
1813 assert_eq!(nullable_items.data.num_values(), 4);
1814
1815 let items = nullable_items.data.as_variable_width().unwrap();
1816 assert_eq!(items.bits_per_offset, 32);
1817 assert_eq!(items.num_values, 4);
1818 assert_eq!(items.data, LanceBuffer::copy_slice(b"abc"));
1819 assert_eq!(
1820 items.offsets,
1821 LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 3],)
1822 );
1823
1824 let indices = dict.indices;
1825 assert_eq!(indices.bits_per_value, 8);
1826 assert_eq!(indices.num_values, 5);
1827 assert_eq!(
1828 indices.data,
1829 LanceBuffer::reinterpret_vec::<i8>(vec![3, 0, 1, 2, 3])
1830 );
1831 };
1832 check_common(data);
1833
1834 let items = StringArray::from(vec![Some("a"), Some("b"), Some("c"), None]);
1836 let indices = Int8Array::from(vec![Some(3), Some(0), Some(1), Some(2), Some(3)]);
1837 let dict = DictionaryArray::new(indices, Arc::new(items));
1838
1839 let data = DataBlock::from_array(dict);
1840
1841 check_common(data);
1842 }
1843
1844 #[test]
1845 fn test_dictionary_cannot_add_null() {
1846 let items = StringArray::from(
1848 (0..256)
1849 .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1850 .collect::<Vec<_>>(),
1851 );
1852 let indices = UInt8Array::from(
1854 (0..=256)
1855 .map(|i| if i == 256 { None } else { Some(i as u8) })
1856 .collect::<Vec<_>>(),
1857 );
1858 let dict = DictionaryArray::new(indices, Arc::new(items));
1861 let data = DataBlock::from_array(dict);
1862
1863 assert_eq!(data.num_values(), 257);
1864
1865 let dict = data.as_dictionary().unwrap();
1866
1867 assert_eq!(dict.indices.bits_per_value, 32);
1868 assert_eq!(
1869 dict.indices.data,
1870 LanceBuffer::reinterpret_vec((0_u32..257).collect::<Vec<_>>())
1871 );
1872
1873 let nullable_items = dict.dictionary.as_nullable().unwrap();
1874 let null_buffer = NullBuffer::new(BooleanBuffer::new(
1875 nullable_items.nulls.into_buffer(),
1876 0,
1877 257,
1878 ));
1879 for i in 0..256 {
1880 assert!(!null_buffer.is_null(i));
1881 }
1882 assert!(null_buffer.is_null(256));
1883
1884 assert_eq!(
1885 nullable_items.data.as_variable_width().unwrap().data.len(),
1886 32640
1887 );
1888 }
1889
1890 #[test]
1891 fn test_all_null() {
1892 for data_type in [
1893 DataType::UInt32,
1894 DataType::FixedSizeBinary(2),
1895 DataType::List(Arc::new(Field::new("item", DataType::UInt32, true))),
1896 DataType::Struct(Fields::from(vec![Field::new("a", DataType::UInt32, true)])),
1897 ] {
1898 let block = DataBlock::AllNull(AllNullDataBlock { num_values: 10 });
1899 let arr = block.into_arrow(data_type.clone(), true).unwrap();
1900 let arr = make_array(arr);
1901 let expected = new_null_array(&data_type, 10);
1902 assert_eq!(&arr, &expected);
1903 }
1904 }
1905
1906 #[test]
1907 fn test_dictionary_cannot_concatenate() {
1908 let items = StringArray::from(
1910 (0..256)
1911 .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1912 .collect::<Vec<_>>(),
1913 );
1914 let other_items = StringArray::from(
1916 (0..256)
1917 .map(|i| Some(String::from_utf8(vec![1; i + 1]).unwrap()))
1918 .collect::<Vec<_>>(),
1919 );
1920 let indices = UInt8Array::from_iter_values(0..=255);
1921 let dict1 = DictionaryArray::new(indices.clone(), Arc::new(items));
1922 let dict2 = DictionaryArray::new(indices, Arc::new(other_items));
1923 let data = DataBlock::from_arrays(&[Arc::new(dict1), Arc::new(dict2)], 512);
1924 assert_eq!(data.num_values(), 512);
1925
1926 let dict = data.as_dictionary().unwrap();
1927
1928 assert_eq!(dict.indices.bits_per_value, 32);
1929 assert_eq!(
1930 dict.indices.data,
1931 LanceBuffer::reinterpret_vec::<u32>((0..512).collect::<Vec<_>>())
1932 );
1933 assert_eq!(
1935 dict.dictionary.as_variable_width().unwrap().data.len(),
1936 65536
1937 );
1938 }
1939
1940 #[test]
1941 fn test_data_size() {
1942 let mut rng = rand_xoshiro::Xoshiro256PlusPlus::seed_from_u64(DEFAULT_SEED.0);
1943 let mut genn = array::rand::<Int32Type>().with_nulls(&[false, false, false]);
1945
1946 let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
1947 let block = DataBlock::from_array(arr.clone());
1948 assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
1949
1950 let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
1951 let block = DataBlock::from_array(arr.clone());
1952 assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
1953
1954 let mut genn = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
1956 let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
1957 let block = DataBlock::from_array(arr.clone());
1958
1959 let array_data = arr.to_data();
1960 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1961 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1963 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1964
1965 let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
1966 let block = DataBlock::from_array(arr.clone());
1967
1968 let array_data = arr.to_data();
1969 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1970 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1971 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1972
1973 let mut genn = array::rand::<Int32Type>().with_nulls(&[true, true, false]);
1974 let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
1975 let block = DataBlock::from_array(arr.clone());
1976
1977 let array_data = arr.to_data();
1978 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1979 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1980 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1981
1982 let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
1983 let block = DataBlock::from_array(arr.clone());
1984
1985 let array_data = arr.to_data();
1986 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1987 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1988 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1989
1990 let mut genn = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
1991 let arr1 = genn.generate(RowCount::from(3), &mut rng).unwrap();
1992 let arr2 = genn.generate(RowCount::from(3), &mut rng).unwrap();
1993 let arr3 = genn.generate(RowCount::from(3), &mut rng).unwrap();
1994 let block = DataBlock::from_arrays(&[arr1.clone(), arr2.clone(), arr3.clone()], 9);
1995
1996 let concatenated_array = arrow_select::concat::concat(&[
1997 &*Arc::new(arr1.clone()) as &dyn Array,
1998 &*Arc::new(arr2.clone()) as &dyn Array,
1999 &*Arc::new(arr3.clone()) as &dyn Array,
2000 ])
2001 .unwrap();
2002 let total_buffer_size: usize = concatenated_array
2003 .to_data()
2004 .buffers()
2005 .iter()
2006 .map(|buffer| buffer.len())
2007 .sum();
2008
2009 let total_nulls_size_in_bytes = concatenated_array.nulls().unwrap().len().div_ceil(8);
2010 assert!(block.data_size() == (total_buffer_size + total_nulls_size_in_bytes) as u64);
2011 }
2012}