1use std::{
18 ops::Range,
19 sync::{Arc, RwLock},
20};
21
22use arrow_array::{
23 Array, ArrayRef, OffsetSizeTrait, UInt64Array,
24 cast::AsArray,
25 new_empty_array, new_null_array,
26 types::{ArrowDictionaryKeyType, UInt8Type, UInt16Type, UInt32Type, UInt64Type},
27};
28use arrow_buffer::{ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer};
29use arrow_data::{ArrayData, ArrayDataBuilder};
30use arrow_schema::DataType;
31use lance_arrow::DataTypeExt;
32
33use lance_core::{Error, Result};
34
35use crate::{
36 buffer::LanceBuffer,
37 statistics::{ComputeStat, Stat},
38};
39
40#[derive(Debug, Clone)]
46pub struct AllNullDataBlock {
47 pub num_values: u64,
49}
50
51impl AllNullDataBlock {
52 fn into_arrow(self, data_type: DataType, _validate: bool) -> Result<ArrayData> {
53 Ok(ArrayData::new_null(&data_type, self.num_values as usize))
54 }
55
56 fn into_buffers(self) -> Vec<LanceBuffer> {
57 vec![]
58 }
59}
60
61use std::collections::HashMap;
62
63#[derive(Debug, Clone)]
66pub struct BlockInfo(pub Arc<RwLock<HashMap<Stat, Arc<dyn Array>>>>);
67
68impl Default for BlockInfo {
69 fn default() -> Self {
70 Self::new()
71 }
72}
73
74impl BlockInfo {
75 pub fn new() -> Self {
76 Self(Arc::new(RwLock::new(HashMap::new())))
77 }
78}
79
80impl PartialEq for BlockInfo {
81 fn eq(&self, other: &Self) -> bool {
82 let self_info = self.0.read().unwrap();
83 let other_info = other.0.read().unwrap();
84 *self_info == *other_info
85 }
86}
87
88#[derive(Debug, Clone)]
94pub struct NullableDataBlock {
95 pub data: Box<DataBlock>,
97 pub nulls: LanceBuffer,
99
100 pub block_info: BlockInfo,
101}
102
103impl NullableDataBlock {
104 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
105 let nulls = self.nulls.into_buffer();
106 let data = self.data.into_arrow(data_type, validate)?.into_builder();
107 let data = data.null_bit_buffer(Some(nulls));
108 if validate {
109 Ok(data.build()?)
110 } else {
111 Ok(unsafe { data.build_unchecked() })
112 }
113 }
114
115 fn into_buffers(self) -> Vec<LanceBuffer> {
116 let mut buffers = vec![self.nulls];
117 buffers.extend(self.data.into_buffers());
118 buffers
119 }
120
121 pub fn data_size(&self) -> u64 {
122 self.data.data_size() + self.nulls.len() as u64
123 }
124}
125
126#[derive(Debug, PartialEq, Clone)]
128pub struct ConstantDataBlock {
129 pub data: LanceBuffer,
131 pub num_values: u64,
133}
134
135impl ConstantDataBlock {
136 fn into_buffers(self) -> Vec<LanceBuffer> {
137 vec![self.data]
138 }
139
140 fn into_arrow(self, _data_type: DataType, _validate: bool) -> Result<ArrayData> {
141 todo!()
144 }
145
146 pub fn try_clone(&self) -> Result<Self> {
147 Ok(Self {
148 data: self.data.clone(),
149 num_values: self.num_values,
150 })
151 }
152
153 pub fn data_size(&self) -> u64 {
154 self.data.len() as u64
155 }
156}
157
158#[derive(Debug, PartialEq, Clone)]
160pub struct FixedWidthDataBlock {
161 pub data: LanceBuffer,
163 pub bits_per_value: u64,
165 pub num_values: u64,
167
168 pub block_info: BlockInfo,
169}
170
171impl FixedWidthDataBlock {
172 fn do_into_arrow(
173 self,
174 data_type: DataType,
175 num_values: u64,
176 validate: bool,
177 ) -> Result<ArrayData> {
178 let data_buffer = if matches!(data_type, DataType::Boolean) && self.bits_per_value == 8 {
181 let mut builder = BooleanBufferBuilder::new(num_values as usize);
182 for &byte in self.data.as_ref().iter().take(num_values as usize) {
183 builder.append(byte != 0);
184 }
185 builder.finish().into_inner()
186 } else {
187 self.data.into_buffer()
188 };
189 let builder = ArrayDataBuilder::new(data_type)
190 .add_buffer(data_buffer)
191 .len(num_values as usize)
192 .null_count(0);
193 if validate {
194 Ok(builder.build()?)
195 } else {
196 Ok(unsafe { builder.build_unchecked() })
197 }
198 }
199
200 pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
201 let root_num_values = self.num_values;
202 self.do_into_arrow(data_type, root_num_values, validate)
203 }
204
205 pub fn into_buffers(self) -> Vec<LanceBuffer> {
206 vec![self.data]
207 }
208
209 pub fn try_clone(&self) -> Result<Self> {
210 Ok(Self {
211 data: self.data.clone(),
212 bits_per_value: self.bits_per_value,
213 num_values: self.num_values,
214 block_info: self.block_info.clone(),
215 })
216 }
217
218 pub fn data_size(&self) -> u64 {
219 self.data.len() as u64
220 }
221}
222
223#[derive(Debug)]
224pub struct VariableWidthDataBlockBuilder<T: OffsetSizeTrait> {
225 offsets: Vec<T>,
226 bytes: Vec<u8>,
227}
228
229impl<T: OffsetSizeTrait> VariableWidthDataBlockBuilder<T> {
230 fn new(estimated_size_bytes: u64) -> Self {
231 Self {
232 offsets: vec![T::from_usize(0).unwrap()],
233 bytes: Vec::with_capacity(estimated_size_bytes as usize),
234 }
235 }
236}
237
238impl<T: OffsetSizeTrait + bytemuck::Pod> DataBlockBuilderImpl for VariableWidthDataBlockBuilder<T> {
239 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
240 let block = data_block.as_variable_width_ref().unwrap();
241 assert!(block.bits_per_offset == T::get_byte_width() as u8 * 8);
242 let offsets = block.offsets.borrow_to_typed_view::<T>();
243
244 let start_offset = offsets[selection.start as usize];
245 let end_offset = offsets[selection.end as usize];
246 let mut previous_len = self.bytes.len();
247
248 self.bytes
249 .extend_from_slice(&block.data[start_offset.as_usize()..end_offset.as_usize()]);
250
251 self.offsets.extend(
252 offsets[selection.start as usize..selection.end as usize]
253 .iter()
254 .zip(&offsets[selection.start as usize + 1..=selection.end as usize])
255 .map(|(¤t, &next)| {
256 let this_value_len = next - current;
257 previous_len += this_value_len.as_usize();
258 T::from_usize(previous_len).unwrap()
259 }),
260 );
261 }
262
263 fn finish(self: Box<Self>) -> DataBlock {
264 let num_values = (self.offsets.len() - 1) as u64;
265 DataBlock::VariableWidth(VariableWidthBlock {
266 data: LanceBuffer::from(self.bytes),
267 offsets: LanceBuffer::reinterpret_vec(self.offsets),
268 bits_per_offset: T::get_byte_width() as u8 * 8,
269 num_values,
270 block_info: BlockInfo::new(),
271 })
272 }
273}
274
275#[derive(Debug)]
276struct BitmapDataBlockBuilder {
277 values: BooleanBufferBuilder,
278}
279
280impl BitmapDataBlockBuilder {
281 fn new(estimated_size_bytes: u64) -> Self {
282 Self {
283 values: BooleanBufferBuilder::new(estimated_size_bytes as usize * 8),
284 }
285 }
286}
287
288impl DataBlockBuilderImpl for BitmapDataBlockBuilder {
289 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
290 let bitmap_blk = data_block.as_fixed_width_ref().unwrap();
291 self.values.append_packed_range(
292 selection.start as usize..selection.end as usize,
293 &bitmap_blk.data,
294 );
295 }
296
297 fn finish(mut self: Box<Self>) -> DataBlock {
298 let bool_buf = self.values.finish();
299 let num_values = bool_buf.len() as u64;
300 let bits_buf = bool_buf.into_inner();
301 DataBlock::FixedWidth(FixedWidthDataBlock {
302 data: LanceBuffer::from(bits_buf),
303 bits_per_value: 1,
304 num_values,
305 block_info: BlockInfo::new(),
306 })
307 }
308}
309
310#[derive(Debug)]
311struct FixedWidthDataBlockBuilder {
312 bits_per_value: u64,
313 bytes_per_value: u64,
314 values: Vec<u8>,
315}
316
317impl FixedWidthDataBlockBuilder {
318 fn new(bits_per_value: u64, estimated_size_bytes: u64) -> Self {
319 assert!(bits_per_value.is_multiple_of(8));
320 Self {
321 bits_per_value,
322 bytes_per_value: bits_per_value / 8,
323 values: Vec::with_capacity(estimated_size_bytes as usize),
324 }
325 }
326}
327
328impl DataBlockBuilderImpl for FixedWidthDataBlockBuilder {
329 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
330 let block = data_block.as_fixed_width_ref().unwrap();
331 assert_eq!(self.bits_per_value, block.bits_per_value);
332 let start = selection.start as usize * self.bytes_per_value as usize;
333 let end = selection.end as usize * self.bytes_per_value as usize;
334 self.values.extend_from_slice(&block.data[start..end]);
335 }
336
337 fn finish(self: Box<Self>) -> DataBlock {
338 let num_values = (self.values.len() / self.bytes_per_value as usize) as u64;
339 DataBlock::FixedWidth(FixedWidthDataBlock {
340 data: LanceBuffer::from(self.values),
341 bits_per_value: self.bits_per_value,
342 num_values,
343 block_info: BlockInfo::new(),
344 })
345 }
346}
347
348#[derive(Debug)]
349struct StructDataBlockBuilder {
350 children: Vec<Box<dyn DataBlockBuilderImpl>>,
351}
352
353impl StructDataBlockBuilder {
354 fn new(children: Vec<Box<dyn DataBlockBuilderImpl>>) -> Self {
355 Self { children }
356 }
357}
358
359impl DataBlockBuilderImpl for StructDataBlockBuilder {
360 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
361 let data_block = data_block.as_struct_ref().unwrap();
362 for i in 0..self.children.len() {
363 self.children[i].append(&data_block.children[i], selection.clone());
364 }
365 }
366
367 fn finish(self: Box<Self>) -> DataBlock {
368 let mut children_data_block = Vec::new();
369 for child in self.children {
370 let child_data_block = child.finish();
371 children_data_block.push(child_data_block);
372 }
373 DataBlock::Struct(StructDataBlock {
374 children: children_data_block,
375 block_info: BlockInfo::new(),
376 validity: None,
377 })
378 }
379}
380
381#[derive(Debug, Default)]
382struct AllNullDataBlockBuilder {
383 num_values: u64,
384}
385
386impl DataBlockBuilderImpl for AllNullDataBlockBuilder {
387 fn append(&mut self, _data_block: &DataBlock, selection: Range<u64>) {
388 self.num_values += selection.end - selection.start;
389 }
390
391 fn finish(self: Box<Self>) -> DataBlock {
392 DataBlock::AllNull(AllNullDataBlock {
393 num_values: self.num_values,
394 })
395 }
396}
397
398#[derive(Debug, Clone)]
400pub struct FixedSizeListBlock {
401 pub child: Box<DataBlock>,
403 pub dimension: u64,
405}
406
407impl FixedSizeListBlock {
408 pub fn num_values(&self) -> u64 {
409 self.child.num_values() / self.dimension
410 }
411
412 pub fn try_into_flat(self) -> Option<FixedWidthDataBlock> {
416 match *self.child {
417 DataBlock::Nullable(_) => None,
419 DataBlock::FixedSizeList(inner) => {
420 let mut flat = inner.try_into_flat()?;
421 flat.bits_per_value *= self.dimension;
422 flat.num_values /= self.dimension;
423 Some(flat)
424 }
425 DataBlock::FixedWidth(mut inner) => {
426 inner.bits_per_value *= self.dimension;
427 inner.num_values /= self.dimension;
428 Some(inner)
429 }
430 _ => panic!(
431 "Expected FixedSizeList or FixedWidth data block but found {:?}",
432 self
433 ),
434 }
435 }
436
437 pub fn flatten_as_fixed(&mut self) -> FixedWidthDataBlock {
438 match self.child.as_mut() {
439 DataBlock::FixedSizeList(fsl) => fsl.flatten_as_fixed(),
440 DataBlock::FixedWidth(fw) => fw.clone(),
441 _ => panic!("Expected FixedSizeList or FixedWidth data block"),
442 }
443 }
444
445 pub fn from_flat(data: FixedWidthDataBlock, data_type: &DataType) -> DataBlock {
447 match data_type {
448 DataType::FixedSizeList(child_field, dimension) => {
449 let mut data = data;
450 data.bits_per_value /= *dimension as u64;
451 data.num_values *= *dimension as u64;
452 let child_data = Self::from_flat(data, child_field.data_type());
453 DataBlock::FixedSizeList(Self {
454 child: Box::new(child_data),
455 dimension: *dimension as u64,
456 })
457 }
458 _ => DataBlock::FixedWidth(data),
460 }
461 }
462
463 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
464 let num_values = self.num_values();
465 let builder = match &data_type {
466 DataType::FixedSizeList(child_field, _) => {
467 let child_data = self
468 .child
469 .into_arrow(child_field.data_type().clone(), validate)?;
470 ArrayDataBuilder::new(data_type)
471 .add_child_data(child_data)
472 .len(num_values as usize)
473 .null_count(0)
474 }
475 _ => panic!("Expected FixedSizeList data type and got {:?}", data_type),
476 };
477 if validate {
478 Ok(builder.build()?)
479 } else {
480 Ok(unsafe { builder.build_unchecked() })
481 }
482 }
483
484 fn into_buffers(self) -> Vec<LanceBuffer> {
485 self.child.into_buffers()
486 }
487
488 fn data_size(&self) -> u64 {
489 self.child.data_size()
490 }
491}
492
493#[derive(Debug)]
494struct FixedSizeListBlockBuilder {
495 inner: Box<dyn DataBlockBuilderImpl>,
496 dimension: u64,
497}
498
499impl FixedSizeListBlockBuilder {
500 fn new(inner: Box<dyn DataBlockBuilderImpl>, dimension: u64) -> Self {
501 Self { inner, dimension }
502 }
503}
504
505impl DataBlockBuilderImpl for FixedSizeListBlockBuilder {
506 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
507 let selection = selection.start * self.dimension..selection.end * self.dimension;
508 let fsl = data_block.as_fixed_size_list_ref().unwrap();
509 self.inner.append(fsl.child.as_ref(), selection);
510 }
511
512 fn finish(self: Box<Self>) -> DataBlock {
513 let inner_block = self.inner.finish();
514 DataBlock::FixedSizeList(FixedSizeListBlock {
515 child: Box::new(inner_block),
516 dimension: self.dimension,
517 })
518 }
519}
520
521#[derive(Debug)]
522struct NullableDataBlockBuilder {
523 inner: Box<dyn DataBlockBuilderImpl>,
524 validity: BooleanBufferBuilder,
525}
526
527impl NullableDataBlockBuilder {
528 fn new(inner: Box<dyn DataBlockBuilderImpl>, estimated_size_bytes: usize) -> Self {
529 Self {
530 inner,
531 validity: BooleanBufferBuilder::new(estimated_size_bytes * 8),
532 }
533 }
534}
535
536impl DataBlockBuilderImpl for NullableDataBlockBuilder {
537 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
538 let nullable = data_block.as_nullable_ref().unwrap();
539 let bool_buf = BooleanBuffer::new(
540 nullable.nulls.clone().into_buffer(),
541 selection.start as usize,
542 (selection.end - selection.start) as usize,
543 );
544 self.validity.append_buffer(&bool_buf);
545 self.inner.append(nullable.data.as_ref(), selection);
546 }
547
548 fn finish(mut self: Box<Self>) -> DataBlock {
549 let inner_block = self.inner.finish();
550 DataBlock::Nullable(NullableDataBlock {
551 data: Box::new(inner_block),
552 nulls: LanceBuffer::from(self.validity.finish().into_inner()),
553 block_info: BlockInfo::new(),
554 })
555 }
556}
557
558#[derive(Debug, Clone)]
562pub struct OpaqueBlock {
563 pub buffers: Vec<LanceBuffer>,
564 pub num_values: u64,
565 pub block_info: BlockInfo,
566}
567
568impl OpaqueBlock {
569 pub fn data_size(&self) -> u64 {
570 self.buffers.iter().map(|b| b.len() as u64).sum()
571 }
572}
573
574#[derive(Debug, Clone)]
576pub struct VariableWidthBlock {
577 pub data: LanceBuffer,
579 pub offsets: LanceBuffer,
583 pub bits_per_offset: u8,
585 pub num_values: u64,
587
588 pub block_info: BlockInfo,
589}
590
591impl VariableWidthBlock {
592 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
593 let data_buffer = self.data.into_buffer();
594 let offsets_buffer = self.offsets.into_buffer();
595 let builder = ArrayDataBuilder::new(data_type)
596 .add_buffer(offsets_buffer)
597 .add_buffer(data_buffer)
598 .len(self.num_values as usize)
599 .null_count(0);
600 if validate {
601 Ok(builder.build()?)
602 } else {
603 Ok(unsafe { builder.build_unchecked() })
604 }
605 }
606
607 fn into_buffers(self) -> Vec<LanceBuffer> {
608 vec![self.offsets, self.data]
609 }
610
611 pub fn offsets_as_block(&mut self) -> DataBlock {
612 let offsets = self.offsets.clone();
613 DataBlock::FixedWidth(FixedWidthDataBlock {
614 data: offsets,
615 bits_per_value: self.bits_per_offset as u64,
616 num_values: self.num_values + 1,
617 block_info: BlockInfo::new(),
618 })
619 }
620
621 pub fn data_size(&self) -> u64 {
622 (self.data.len() + self.offsets.len()) as u64
623 }
624}
625
626#[derive(Debug, Clone)]
628pub struct StructDataBlock {
629 pub children: Vec<DataBlock>,
631 pub block_info: BlockInfo,
632 pub validity: Option<NullBuffer>,
634}
635
636impl StructDataBlock {
637 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
638 if let DataType::Struct(fields) = &data_type {
639 let mut builder = ArrayDataBuilder::new(DataType::Struct(fields.clone()));
640 let mut num_rows = 0;
641 for (field, child) in fields.iter().zip(self.children) {
642 let child_data = child.into_arrow(field.data_type().clone(), validate)?;
643 num_rows = child_data.len();
644 builder = builder.add_child_data(child_data);
645 }
646
647 let builder = if let Some(validity) = self.validity {
649 let null_count = validity.null_count();
650 builder
651 .null_bit_buffer(Some(validity.into_inner().into_inner()))
652 .null_count(null_count)
653 } else {
654 builder.null_count(0)
655 };
656
657 let builder = builder.len(num_rows);
658 if validate {
659 Ok(builder.build()?)
660 } else {
661 Ok(unsafe { builder.build_unchecked() })
662 }
663 } else {
664 Err(Error::internal(format!(
665 "Expected Struct, got {:?}",
666 data_type
667 )))
668 }
669 }
670
671 fn remove_outer_validity(self) -> Self {
672 Self {
673 children: self
674 .children
675 .into_iter()
676 .map(|c| c.remove_outer_validity())
677 .collect(),
678 block_info: self.block_info,
679 validity: None, }
681 }
682
683 fn into_buffers(self) -> Vec<LanceBuffer> {
684 self.children
685 .into_iter()
686 .flat_map(|c| c.into_buffers())
687 .collect()
688 }
689
690 pub fn has_variable_width_child(&self) -> bool {
691 self.children
692 .iter()
693 .any(|child| !matches!(child, DataBlock::FixedWidth(_)))
694 }
695
696 pub fn data_size(&self) -> u64 {
697 self.children
698 .iter()
699 .map(|data_block| data_block.data_size())
700 .sum()
701 }
702}
703
704#[derive(Debug, Clone)]
706pub struct DictionaryDataBlock {
707 pub indices: FixedWidthDataBlock,
709 pub dictionary: Box<DataBlock>,
711}
712
713impl DictionaryDataBlock {
714 fn decode_helper<K: ArrowDictionaryKeyType>(self) -> Result<DataBlock> {
715 if self.indices.num_values == 0 {
718 return Ok(DataBlock::AllNull(AllNullDataBlock { num_values: 0 }));
719 }
720
721 let estimated_size_bytes = self.dictionary.data_size()
723 * (self.indices.num_values + self.dictionary.num_values() - 1)
724 / self.dictionary.num_values();
725 let mut data_builder = DataBlockBuilder::with_capacity_estimate(estimated_size_bytes);
726
727 let indices = self.indices.data.borrow_to_typed_slice::<K::Native>();
728 let indices = indices.as_ref();
729
730 indices
731 .iter()
732 .map(|idx| idx.to_usize().unwrap() as u64)
733 .for_each(|idx| {
734 data_builder.append(&self.dictionary, idx..idx + 1);
735 });
736
737 Ok(data_builder.finish())
738 }
739
740 pub fn decode(self) -> Result<DataBlock> {
741 match self.indices.bits_per_value {
742 8 => self.decode_helper::<UInt8Type>(),
743 16 => self.decode_helper::<UInt16Type>(),
744 32 => self.decode_helper::<UInt32Type>(),
745 64 => self.decode_helper::<UInt64Type>(),
746 _ => Err(lance_core::Error::internal(format!(
747 "Unsupported dictionary index bit width: {} bits",
748 self.indices.bits_per_value
749 ))),
750 }
751 }
752
753 fn into_arrow_dict(
754 self,
755 key_type: Box<DataType>,
756 value_type: Box<DataType>,
757 validate: bool,
758 ) -> Result<ArrayData> {
759 let indices = self.indices.into_arrow((*key_type).clone(), validate)?;
760 let dictionary = self
761 .dictionary
762 .into_arrow((*value_type).clone(), validate)?;
763
764 let builder = indices
765 .into_builder()
766 .add_child_data(dictionary)
767 .data_type(DataType::Dictionary(key_type, value_type));
768
769 if validate {
770 Ok(builder.build()?)
771 } else {
772 Ok(unsafe { builder.build_unchecked() })
773 }
774 }
775
776 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
777 if let DataType::Dictionary(key_type, value_type) = data_type {
778 self.into_arrow_dict(key_type, value_type, validate)
779 } else {
780 self.decode()?.into_arrow(data_type, validate)
781 }
782 }
783
784 fn into_buffers(self) -> Vec<LanceBuffer> {
785 let mut buffers = self.indices.into_buffers();
786 buffers.extend(self.dictionary.into_buffers());
787 buffers
788 }
789
790 pub fn into_parts(self) -> (DataBlock, DataBlock) {
791 (DataBlock::FixedWidth(self.indices), *self.dictionary)
792 }
793
794 pub fn from_parts(indices: FixedWidthDataBlock, dictionary: DataBlock) -> Self {
795 Self {
796 indices,
797 dictionary: Box::new(dictionary),
798 }
799 }
800}
801
802#[derive(Debug, Clone)]
817pub enum DataBlock {
818 Empty(),
819 Constant(ConstantDataBlock),
820 AllNull(AllNullDataBlock),
821 Nullable(NullableDataBlock),
822 FixedWidth(FixedWidthDataBlock),
823 FixedSizeList(FixedSizeListBlock),
824 VariableWidth(VariableWidthBlock),
825 Opaque(OpaqueBlock),
826 Struct(StructDataBlock),
827 Dictionary(DictionaryDataBlock),
828}
829
830impl DataBlock {
831 pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
833 match self {
834 Self::Empty() => Ok(new_empty_array(&data_type).to_data()),
835 Self::Constant(inner) => inner.into_arrow(data_type, validate),
836 Self::AllNull(inner) => inner.into_arrow(data_type, validate),
837 Self::Nullable(inner) => inner.into_arrow(data_type, validate),
838 Self::FixedWidth(inner) => inner.into_arrow(data_type, validate),
839 Self::FixedSizeList(inner) => inner.into_arrow(data_type, validate),
840 Self::VariableWidth(inner) => inner.into_arrow(data_type, validate),
841 Self::Struct(inner) => inner.into_arrow(data_type, validate),
842 Self::Dictionary(inner) => inner.into_arrow(data_type, validate),
843 Self::Opaque(_) => Err(Error::internal(
844 "Cannot convert OpaqueBlock to Arrow".to_string(),
845 )),
846 }
847 }
848
849 pub fn into_buffers(self) -> Vec<LanceBuffer> {
853 match self {
854 Self::Empty() => Vec::default(),
855 Self::Constant(inner) => inner.into_buffers(),
856 Self::AllNull(inner) => inner.into_buffers(),
857 Self::Nullable(inner) => inner.into_buffers(),
858 Self::FixedWidth(inner) => inner.into_buffers(),
859 Self::FixedSizeList(inner) => inner.into_buffers(),
860 Self::VariableWidth(inner) => inner.into_buffers(),
861 Self::Struct(inner) => inner.into_buffers(),
862 Self::Dictionary(inner) => inner.into_buffers(),
863 Self::Opaque(inner) => inner.buffers,
864 }
865 }
866
867 pub fn try_clone(&self) -> Result<Self> {
876 match self {
877 Self::Empty() => Ok(Self::Empty()),
878 Self::Constant(inner) => Ok(Self::Constant(inner.clone())),
879 Self::AllNull(inner) => Ok(Self::AllNull(inner.clone())),
880 Self::Nullable(inner) => Ok(Self::Nullable(inner.clone())),
881 Self::FixedWidth(inner) => Ok(Self::FixedWidth(inner.clone())),
882 Self::FixedSizeList(inner) => Ok(Self::FixedSizeList(inner.clone())),
883 Self::VariableWidth(inner) => Ok(Self::VariableWidth(inner.clone())),
884 Self::Struct(inner) => Ok(Self::Struct(inner.clone())),
885 Self::Dictionary(inner) => Ok(Self::Dictionary(inner.clone())),
886 Self::Opaque(inner) => Ok(Self::Opaque(inner.clone())),
887 }
888 }
889
890 pub fn name(&self) -> &'static str {
891 match self {
892 Self::Constant(_) => "Constant",
893 Self::Empty() => "Empty",
894 Self::AllNull(_) => "AllNull",
895 Self::Nullable(_) => "Nullable",
896 Self::FixedWidth(_) => "FixedWidth",
897 Self::FixedSizeList(_) => "FixedSizeList",
898 Self::VariableWidth(_) => "VariableWidth",
899 Self::Struct(_) => "Struct",
900 Self::Dictionary(_) => "Dictionary",
901 Self::Opaque(_) => "Opaque",
902 }
903 }
904
905 pub fn is_variable(&self) -> bool {
906 match self {
907 Self::Constant(_) => false,
908 Self::Empty() => false,
909 Self::AllNull(_) => false,
910 Self::Nullable(nullable) => nullable.data.is_variable(),
911 Self::FixedWidth(_) => false,
912 Self::FixedSizeList(fsl) => fsl.child.is_variable(),
913 Self::VariableWidth(_) => true,
914 Self::Struct(strct) => strct.children.iter().any(|c| c.is_variable()),
915 Self::Dictionary(_) => {
916 todo!("is_variable for DictionaryDataBlock is not implemented yet")
917 }
918 Self::Opaque(_) => panic!("Does not make sense to ask if an Opaque block is variable"),
919 }
920 }
921
922 pub fn is_nullable(&self) -> bool {
923 match self {
924 Self::AllNull(_) => true,
925 Self::Nullable(_) => true,
926 Self::FixedSizeList(fsl) => fsl.child.is_nullable(),
927 Self::Struct(strct) => strct.children.iter().any(|c| c.is_nullable()),
928 Self::Dictionary(_) => {
929 todo!("is_nullable for DictionaryDataBlock is not implemented yet")
930 }
931 Self::Opaque(_) => panic!("Does not make sense to ask if an Opaque block is nullable"),
932 _ => false,
933 }
934 }
935
936 pub fn num_values(&self) -> u64 {
941 match self {
942 Self::Empty() => 0,
943 Self::Constant(inner) => inner.num_values,
944 Self::AllNull(inner) => inner.num_values,
945 Self::Nullable(inner) => inner.data.num_values(),
946 Self::FixedWidth(inner) => inner.num_values,
947 Self::FixedSizeList(inner) => inner.num_values(),
948 Self::VariableWidth(inner) => inner.num_values,
949 Self::Struct(inner) => inner.children[0].num_values(),
950 Self::Dictionary(inner) => inner.indices.num_values,
951 Self::Opaque(inner) => inner.num_values,
952 }
953 }
954
955 pub fn items_per_row(&self) -> u64 {
959 match self {
960 Self::Empty() => todo!(), Self::Constant(_) => todo!(), Self::AllNull(_) => todo!(), Self::Nullable(nullable) => nullable.data.items_per_row(),
964 Self::FixedWidth(_) => 1,
965 Self::FixedSizeList(fsl) => fsl.dimension * fsl.child.items_per_row(),
966 Self::VariableWidth(_) => 1,
967 Self::Struct(_) => todo!(), Self::Dictionary(_) => 1,
969 Self::Opaque(_) => 1,
970 }
971 }
972
973 pub fn data_size(&self) -> u64 {
975 match self {
976 Self::Empty() => 0,
977 Self::Constant(inner) => inner.data_size(),
978 Self::AllNull(_) => 0,
979 Self::Nullable(inner) => inner.data_size(),
980 Self::FixedWidth(inner) => inner.data_size(),
981 Self::FixedSizeList(inner) => inner.data_size(),
982 Self::VariableWidth(inner) => inner.data_size(),
983 Self::Struct(inner) => inner.children.iter().map(|child| child.data_size()).sum(),
984 Self::Dictionary(inner) => inner.indices.data_size() + inner.dictionary.data_size(),
985 Self::Opaque(inner) => inner.data_size(),
986 }
987 }
988
989 pub fn remove_outer_validity(self) -> Self {
997 match self {
998 Self::AllNull(_) => panic!("Cannot remove validity on all-null data"),
999 Self::Nullable(inner) => *inner.data,
1000 Self::Struct(inner) => Self::Struct(inner.remove_outer_validity()),
1001 other => other,
1002 }
1003 }
1004
1005 pub fn make_builder(&self, estimated_size_bytes: u64) -> Box<dyn DataBlockBuilderImpl> {
1006 match self {
1007 Self::FixedWidth(inner) => {
1008 if inner.bits_per_value == 1 {
1009 Box::new(BitmapDataBlockBuilder::new(estimated_size_bytes))
1010 } else {
1011 Box::new(FixedWidthDataBlockBuilder::new(
1012 inner.bits_per_value,
1013 estimated_size_bytes,
1014 ))
1015 }
1016 }
1017 Self::VariableWidth(inner) => {
1018 if inner.bits_per_offset == 32 {
1019 Box::new(VariableWidthDataBlockBuilder::<i32>::new(
1020 estimated_size_bytes,
1021 ))
1022 } else if inner.bits_per_offset == 64 {
1023 Box::new(VariableWidthDataBlockBuilder::<i64>::new(
1024 estimated_size_bytes,
1025 ))
1026 } else {
1027 todo!()
1028 }
1029 }
1030 Self::FixedSizeList(inner) => {
1031 let inner_builder = inner.child.make_builder(estimated_size_bytes);
1032 Box::new(FixedSizeListBlockBuilder::new(
1033 inner_builder,
1034 inner.dimension,
1035 ))
1036 }
1037 Self::Nullable(nullable) => {
1038 let estimated_validity_size_bytes = estimated_size_bytes / 16;
1041 let inner_builder = nullable
1042 .data
1043 .make_builder(estimated_size_bytes - estimated_validity_size_bytes);
1044 Box::new(NullableDataBlockBuilder::new(
1045 inner_builder,
1046 estimated_validity_size_bytes as usize,
1047 ))
1048 }
1049 Self::Struct(struct_data_block) => {
1050 let num_children = struct_data_block.children.len();
1051 let per_child_estimate = if num_children == 0 {
1052 0
1053 } else {
1054 estimated_size_bytes / num_children as u64
1055 };
1056 let child_builders = struct_data_block
1057 .children
1058 .iter()
1059 .map(|child| child.make_builder(per_child_estimate))
1060 .collect();
1061 Box::new(StructDataBlockBuilder::new(child_builders))
1062 }
1063 Self::AllNull(_) => Box::new(AllNullDataBlockBuilder::default()),
1064 _ => todo!("make_builder for {:?}", self),
1065 }
1066 }
1067}
1068
1069macro_rules! as_type {
1070 ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1071 pub fn $fn_name(self) -> Option<$inner_type> {
1072 match self {
1073 Self::$inner(inner) => Some(inner),
1074 _ => None,
1075 }
1076 }
1077 };
1078}
1079
1080macro_rules! as_type_ref {
1081 ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1082 pub fn $fn_name(&self) -> Option<&$inner_type> {
1083 match self {
1084 Self::$inner(inner) => Some(inner),
1085 _ => None,
1086 }
1087 }
1088 };
1089}
1090
1091macro_rules! as_type_ref_mut {
1092 ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1093 pub fn $fn_name(&mut self) -> Option<&mut $inner_type> {
1094 match self {
1095 Self::$inner(inner) => Some(inner),
1096 _ => None,
1097 }
1098 }
1099 };
1100}
1101
1102impl DataBlock {
1104 as_type!(as_all_null, AllNull, AllNullDataBlock);
1105 as_type!(as_nullable, Nullable, NullableDataBlock);
1106 as_type!(as_fixed_width, FixedWidth, FixedWidthDataBlock);
1107 as_type!(as_fixed_size_list, FixedSizeList, FixedSizeListBlock);
1108 as_type!(as_variable_width, VariableWidth, VariableWidthBlock);
1109 as_type!(as_struct, Struct, StructDataBlock);
1110 as_type!(as_dictionary, Dictionary, DictionaryDataBlock);
1111 as_type_ref!(as_all_null_ref, AllNull, AllNullDataBlock);
1112 as_type_ref!(as_nullable_ref, Nullable, NullableDataBlock);
1113 as_type_ref!(as_fixed_width_ref, FixedWidth, FixedWidthDataBlock);
1114 as_type_ref!(as_fixed_size_list_ref, FixedSizeList, FixedSizeListBlock);
1115 as_type_ref!(as_variable_width_ref, VariableWidth, VariableWidthBlock);
1116 as_type_ref!(as_struct_ref, Struct, StructDataBlock);
1117 as_type_ref!(as_dictionary_ref, Dictionary, DictionaryDataBlock);
1118 as_type_ref_mut!(as_all_null_ref_mut, AllNull, AllNullDataBlock);
1119 as_type_ref_mut!(as_nullable_ref_mut, Nullable, NullableDataBlock);
1120 as_type_ref_mut!(as_fixed_width_ref_mut, FixedWidth, FixedWidthDataBlock);
1121 as_type_ref_mut!(
1122 as_fixed_size_list_ref_mut,
1123 FixedSizeList,
1124 FixedSizeListBlock
1125 );
1126 as_type_ref_mut!(as_variable_width_ref_mut, VariableWidth, VariableWidthBlock);
1127 as_type_ref_mut!(as_struct_ref_mut, Struct, StructDataBlock);
1128 as_type_ref_mut!(as_dictionary_ref_mut, Dictionary, DictionaryDataBlock);
1129}
1130
1131fn get_byte_range<T: ArrowNativeType>(offsets: &mut LanceBuffer) -> Range<usize> {
1134 let offsets = offsets.borrow_to_typed_slice::<T>();
1135 if offsets.as_ref().is_empty() {
1136 0..0
1137 } else {
1138 offsets.as_ref().first().unwrap().as_usize()..offsets.as_ref().last().unwrap().as_usize()
1139 }
1140}
1141
1142fn stitch_offsets<T: ArrowNativeType + std::ops::Add<Output = T> + std::ops::Sub<Output = T>>(
1148 offsets: Vec<LanceBuffer>,
1149) -> (LanceBuffer, Vec<Range<usize>>) {
1150 if offsets.is_empty() {
1151 return (LanceBuffer::empty(), Vec::default());
1152 }
1153 let len = offsets.iter().map(|b| b.len()).sum::<usize>();
1154 let mut dest = Vec::with_capacity(len);
1158 let mut byte_ranges = Vec::with_capacity(offsets.len());
1159
1160 dest.push(T::from_usize(0).unwrap());
1162
1163 for mut o in offsets.into_iter() {
1164 if !o.is_empty() {
1165 let last_offset = *dest.last().unwrap();
1166 let o = o.borrow_to_typed_slice::<T>();
1167 let start = *o.as_ref().first().unwrap();
1168 dest.extend(o.as_ref()[1..].iter().map(|&x| x + last_offset - start));
1182 }
1183 byte_ranges.push(get_byte_range::<T>(&mut o));
1184 }
1185 (LanceBuffer::reinterpret_vec(dest), byte_ranges)
1186}
1187
1188fn arrow_binary_to_data_block(
1189 arrays: &[ArrayRef],
1190 num_values: u64,
1191 bits_per_offset: u8,
1192) -> DataBlock {
1193 let data_vec = arrays.iter().map(|arr| arr.to_data()).collect::<Vec<_>>();
1194 let bytes_per_offset = bits_per_offset as usize / 8;
1195 let offsets = data_vec
1196 .iter()
1197 .map(|d| {
1198 LanceBuffer::from(
1199 d.buffers()[0].slice_with_length(d.offset(), (d.len() + 1) * bytes_per_offset),
1200 )
1201 })
1202 .collect::<Vec<_>>();
1203 let (offsets, data_ranges) = if bits_per_offset == 32 {
1204 stitch_offsets::<i32>(offsets)
1205 } else {
1206 stitch_offsets::<i64>(offsets)
1207 };
1208 let data = data_vec
1209 .iter()
1210 .zip(data_ranges)
1211 .map(|(d, byte_range)| {
1212 LanceBuffer::from(
1213 d.buffers()[1]
1214 .slice_with_length(byte_range.start, byte_range.end - byte_range.start),
1215 )
1216 })
1217 .collect::<Vec<_>>();
1218 let data = LanceBuffer::concat_into_one(data);
1219 DataBlock::VariableWidth(VariableWidthBlock {
1220 data,
1221 offsets,
1222 bits_per_offset,
1223 num_values,
1224 block_info: BlockInfo::new(),
1225 })
1226}
1227
1228fn encode_flat_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1229 let bytes_per_value = arrays[0].data_type().byte_width();
1230 let mut buffer = Vec::with_capacity(num_values as usize * bytes_per_value);
1231 for arr in arrays {
1232 let data = arr.to_data();
1233 buffer.extend_from_slice(data.buffers()[0].as_slice());
1234 }
1235 LanceBuffer::from(buffer)
1236}
1237
1238fn do_encode_bitmap_data(bitmaps: &[BooleanBuffer], num_values: u64) -> LanceBuffer {
1239 let mut builder = BooleanBufferBuilder::new(num_values as usize);
1240
1241 for buf in bitmaps {
1242 builder.append_buffer(buf);
1243 }
1244
1245 let buffer = builder.finish().into_inner();
1246 LanceBuffer::from(buffer)
1247}
1248
1249fn encode_bitmap_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1250 let bitmaps = arrays
1251 .iter()
1252 .map(|arr| arr.as_boolean().values().clone())
1253 .collect::<Vec<_>>();
1254 do_encode_bitmap_data(&bitmaps, num_values)
1255}
1256
1257fn concat_dict_arrays(arrays: &[ArrayRef]) -> ArrayRef {
1260 let value_type = arrays[0].as_any_dictionary().values().data_type();
1261 let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1262 match arrow_select::concat::concat(&array_refs) {
1263 Ok(array) => array,
1264 Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1265 let upscaled = array_refs
1267 .iter()
1268 .map(|arr| {
1269 match arrow_cast::cast(
1270 *arr,
1271 &DataType::Dictionary(
1272 Box::new(DataType::UInt32),
1273 Box::new(value_type.clone()),
1274 ),
1275 ) {
1276 Ok(arr) => arr,
1277 Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1278 unimplemented!("Dictionary arrays with more than 2^32 unique values")
1280 }
1281 err => err.unwrap(),
1282 }
1283 })
1284 .collect::<Vec<_>>();
1285 let array_refs = upscaled.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1286 match arrow_select::concat::concat(&array_refs) {
1288 Ok(array) => array,
1289 Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1290 unimplemented!("Dictionary arrays with more than 2^32 unique values")
1291 }
1292 err => err.unwrap(),
1293 }
1294 }
1295 err => err.unwrap(),
1297 }
1298}
1299
1300fn max_index_val(index_type: &DataType) -> u64 {
1301 match index_type {
1302 DataType::Int8 => i8::MAX as u64,
1303 DataType::Int16 => i16::MAX as u64,
1304 DataType::Int32 => i32::MAX as u64,
1305 DataType::Int64 => i64::MAX as u64,
1306 DataType::UInt8 => u8::MAX as u64,
1307 DataType::UInt16 => u16::MAX as u64,
1308 DataType::UInt32 => u32::MAX as u64,
1309 DataType::UInt64 => u64::MAX,
1310 _ => panic!("Invalid dictionary index type"),
1311 }
1312}
1313
1314fn arrow_dictionary_to_data_block(arrays: &[ArrayRef], validity: Option<NullBuffer>) -> DataBlock {
1333 let array = concat_dict_arrays(arrays);
1334 let array_dict = array.as_any_dictionary();
1335 let mut indices = array_dict.keys();
1336 let num_values = indices.len() as u64;
1337 let mut values = array_dict.values().clone();
1338 let mut upcast = None;
1340
1341 let indices_block = if let Some(validity) = validity {
1345 let mut first_invalid_index = None;
1349 if let Some(values_validity) = values.nulls() {
1350 first_invalid_index = (!values_validity.inner()).set_indices().next();
1351 }
1352 let first_invalid_index = first_invalid_index.unwrap_or_else(|| {
1353 let null_arr = new_null_array(values.data_type(), 1);
1354 values = arrow_select::concat::concat(&[values.as_ref(), null_arr.as_ref()]).unwrap();
1355 let null_index = values.len() - 1;
1356 let max_index_val = max_index_val(indices.data_type());
1357 if null_index as u64 > max_index_val {
1358 if max_index_val >= u32::MAX as u64 {
1360 unimplemented!("Dictionary arrays with 2^32 unique value (or more) and a null")
1361 }
1362 upcast = Some(arrow_cast::cast(indices, &DataType::UInt32).unwrap());
1363 indices = upcast.as_ref().unwrap();
1364 }
1365 null_index
1366 });
1367 let null_index_arr = arrow_cast::cast(
1369 &UInt64Array::from(vec![first_invalid_index as u64]),
1370 indices.data_type(),
1371 )
1372 .unwrap();
1373
1374 let bytes_per_index = indices.data_type().byte_width();
1375 let bits_per_index = bytes_per_index as u64 * 8;
1376
1377 let null_index_arr = null_index_arr.into_data();
1378 let null_index_bytes = &null_index_arr.buffers()[0];
1379 let mut indices_bytes = indices.to_data().buffers()[0].to_vec();
1381 for invalid_idx in (!validity.inner()).set_indices() {
1382 indices_bytes[invalid_idx * bytes_per_index..(invalid_idx + 1) * bytes_per_index]
1383 .copy_from_slice(null_index_bytes.as_slice());
1384 }
1385 FixedWidthDataBlock {
1386 data: LanceBuffer::from(indices_bytes),
1387 bits_per_value: bits_per_index,
1388 num_values,
1389 block_info: BlockInfo::new(),
1390 }
1391 } else {
1392 FixedWidthDataBlock {
1393 data: LanceBuffer::from(indices.to_data().buffers()[0].clone()),
1394 bits_per_value: indices.data_type().byte_width() as u64 * 8,
1395 num_values,
1396 block_info: BlockInfo::new(),
1397 }
1398 };
1399
1400 let items = DataBlock::from(values);
1401 DataBlock::Dictionary(DictionaryDataBlock {
1402 indices: indices_block,
1403 dictionary: Box::new(items),
1404 })
1405}
1406
1407enum Nullability {
1408 None,
1409 All,
1410 Some(NullBuffer),
1411}
1412
1413impl Nullability {
1414 fn to_option(&self) -> Option<NullBuffer> {
1415 match self {
1416 Self::Some(nulls) => Some(nulls.clone()),
1417 _ => None,
1418 }
1419 }
1420}
1421
1422fn extract_nulls(arrays: &[ArrayRef], num_values: u64) -> Nullability {
1423 let mut has_nulls = false;
1424 let nulls_and_lens = arrays
1425 .iter()
1426 .map(|arr| {
1427 let nulls = arr.logical_nulls();
1428 has_nulls |= nulls.is_some();
1429 (nulls, arr.len())
1430 })
1431 .collect::<Vec<_>>();
1432 if !has_nulls {
1433 return Nullability::None;
1434 }
1435 let mut builder = BooleanBufferBuilder::new(num_values as usize);
1436 let mut num_nulls = 0;
1437 for (null, len) in nulls_and_lens {
1438 if let Some(null) = null {
1439 num_nulls += null.null_count();
1440 builder.append_buffer(&null.into_inner());
1441 } else {
1442 builder.append_n(len, true);
1443 }
1444 }
1445 if num_nulls == num_values as usize {
1446 Nullability::All
1447 } else {
1448 Nullability::Some(NullBuffer::new(builder.finish()))
1449 }
1450}
1451
1452impl DataBlock {
1453 pub fn from_arrays(arrays: &[ArrayRef], num_values: u64) -> Self {
1454 if arrays.is_empty() || num_values == 0 {
1455 return Self::AllNull(AllNullDataBlock { num_values: 0 });
1456 }
1457
1458 let data_type = arrays[0].data_type();
1459 let nulls = extract_nulls(arrays, num_values);
1460
1461 if let Nullability::All = nulls {
1462 return Self::AllNull(AllNullDataBlock { num_values });
1463 }
1464
1465 let mut encoded = match data_type {
1466 DataType::Binary | DataType::Utf8 => arrow_binary_to_data_block(arrays, num_values, 32),
1467 DataType::Utf8View => {
1469 let casted: Vec<ArrayRef> = arrays
1470 .iter()
1471 .map(|a| {
1472 arrow_cast::cast(a.as_ref(), &DataType::Utf8)
1473 .expect("Utf8View to Utf8 cast is always valid")
1474 })
1475 .collect();
1476 arrow_binary_to_data_block(&casted, num_values, 32)
1477 }
1478 DataType::BinaryView => {
1479 let casted: Vec<ArrayRef> = arrays
1480 .iter()
1481 .map(|a| {
1482 arrow_cast::cast(a.as_ref(), &DataType::Binary)
1483 .expect("BinaryView to Binary cast is always valid")
1484 })
1485 .collect();
1486 arrow_binary_to_data_block(&casted, num_values, 32)
1487 }
1488 DataType::LargeBinary | DataType::LargeUtf8 => {
1489 arrow_binary_to_data_block(arrays, num_values, 64)
1490 }
1491 DataType::Boolean => {
1492 let data = encode_bitmap_data(arrays, num_values);
1493 Self::FixedWidth(FixedWidthDataBlock {
1494 data,
1495 bits_per_value: 1,
1496 num_values,
1497 block_info: BlockInfo::new(),
1498 })
1499 }
1500 DataType::Date32
1501 | DataType::Date64
1502 | DataType::Decimal32(_, _)
1503 | DataType::Decimal64(_, _)
1504 | DataType::Decimal128(_, _)
1505 | DataType::Decimal256(_, _)
1506 | DataType::Duration(_)
1507 | DataType::FixedSizeBinary(_)
1508 | DataType::Float16
1509 | DataType::Float32
1510 | DataType::Float64
1511 | DataType::Int16
1512 | DataType::Int32
1513 | DataType::Int64
1514 | DataType::Int8
1515 | DataType::Interval(_)
1516 | DataType::Time32(_)
1517 | DataType::Time64(_)
1518 | DataType::Timestamp(_, _)
1519 | DataType::UInt16
1520 | DataType::UInt32
1521 | DataType::UInt64
1522 | DataType::UInt8 => {
1523 let data = encode_flat_data(arrays, num_values);
1524 Self::FixedWidth(FixedWidthDataBlock {
1525 data,
1526 bits_per_value: data_type.byte_width() as u64 * 8,
1527 num_values,
1528 block_info: BlockInfo::new(),
1529 })
1530 }
1531 DataType::Null => Self::AllNull(AllNullDataBlock { num_values }),
1532 DataType::Dictionary(_, _) => arrow_dictionary_to_data_block(arrays, nulls.to_option()),
1533 DataType::Struct(fields) => {
1534 let structs = arrays.iter().map(|arr| arr.as_struct()).collect::<Vec<_>>();
1535 let mut children = Vec::with_capacity(fields.len());
1536 for child_idx in 0..fields.len() {
1537 let child_vec = structs
1538 .iter()
1539 .map(|s| s.column(child_idx).clone())
1540 .collect::<Vec<_>>();
1541 children.push(Self::from_arrays(&child_vec, num_values));
1542 }
1543
1544 let validity = match &nulls {
1546 Nullability::None => None,
1547 Nullability::Some(null_buffer) => Some(null_buffer.clone()),
1548 Nullability::All => unreachable!("Should have returned AllNull earlier"),
1549 };
1550
1551 Self::Struct(StructDataBlock {
1552 children,
1553 block_info: BlockInfo::default(),
1554 validity,
1555 })
1556 }
1557 DataType::FixedSizeList(_, dim) => {
1558 let children = arrays
1559 .iter()
1560 .map(|arr| arr.as_fixed_size_list().values().clone())
1561 .collect::<Vec<_>>();
1562 let child_block = Self::from_arrays(&children, num_values * *dim as u64);
1563 Self::FixedSizeList(FixedSizeListBlock {
1564 child: Box::new(child_block),
1565 dimension: *dim as u64,
1566 })
1567 }
1568 DataType::LargeList(_)
1569 | DataType::List(_)
1570 | DataType::ListView(_)
1571 | DataType::LargeListView(_)
1572 | DataType::Map(_, _)
1573 | DataType::RunEndEncoded(_, _)
1574 | DataType::Union(_, _) => {
1575 panic!(
1576 "Field with data type {} cannot be converted to data block",
1577 data_type
1578 )
1579 }
1580 };
1581
1582 encoded.compute_stat();
1584
1585 if !matches!(data_type, DataType::Dictionary(_, _)) {
1586 match nulls {
1587 Nullability::None => encoded,
1588 Nullability::Some(nulls) => Self::Nullable(NullableDataBlock {
1589 data: Box::new(encoded),
1590 nulls: LanceBuffer::from(nulls.into_inner().into_inner()),
1591 block_info: BlockInfo::new(),
1592 }),
1593 _ => unreachable!(),
1594 }
1595 } else {
1596 encoded
1598 }
1599 }
1600
1601 pub fn from_array<T: Array + 'static>(array: T) -> Self {
1602 let num_values = array.len();
1603 Self::from_arrays(&[Arc::new(array)], num_values as u64)
1604 }
1605}
1606
1607impl From<ArrayRef> for DataBlock {
1608 fn from(array: ArrayRef) -> Self {
1609 let num_values = array.len() as u64;
1610 Self::from_arrays(&[array], num_values)
1611 }
1612}
1613
1614pub trait DataBlockBuilderImpl: std::fmt::Debug {
1615 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>);
1616 fn finish(self: Box<Self>) -> DataBlock;
1617}
1618
1619#[derive(Debug)]
1620pub struct DataBlockBuilder {
1621 estimated_size_bytes: u64,
1622 builder: Option<Box<dyn DataBlockBuilderImpl>>,
1623}
1624
1625impl DataBlockBuilder {
1626 pub fn with_capacity_estimate(estimated_size_bytes: u64) -> Self {
1627 Self {
1628 estimated_size_bytes,
1629 builder: None,
1630 }
1631 }
1632
1633 fn get_builder(&mut self, block: &DataBlock) -> &mut dyn DataBlockBuilderImpl {
1634 if self.builder.is_none() {
1635 self.builder = Some(block.make_builder(self.estimated_size_bytes));
1636 }
1637 self.builder.as_mut().unwrap().as_mut()
1638 }
1639
1640 pub fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
1641 self.get_builder(data_block).append(data_block, selection);
1642 }
1643
1644 pub fn finish(self) -> DataBlock {
1645 let builder = self.builder.expect("DataBlockBuilder didn't see any data");
1646 builder.finish()
1647 }
1648}
1649
1650#[cfg(test)]
1651mod tests {
1652 use std::sync::Arc;
1653
1654 use arrow_array::{
1655 ArrayRef, BinaryViewArray, DictionaryArray, Int8Array, LargeBinaryArray, StringArray,
1656 StringViewArray, UInt8Array, UInt16Array, make_array, new_null_array,
1657 types::{Int8Type, Int32Type},
1658 };
1659 use arrow_buffer::{BooleanBuffer, NullBuffer};
1660
1661 use arrow_schema::{DataType, Field, Fields};
1662 use lance_datagen::{ArrayGeneratorExt, DEFAULT_SEED, RowCount, array};
1663 use rand::SeedableRng;
1664
1665 use crate::buffer::LanceBuffer;
1666
1667 use super::{AllNullDataBlock, DataBlock};
1668
1669 use arrow_array::Array;
1670
1671 #[test]
1672 fn test_sliced_to_data_block() {
1673 let ints = UInt16Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
1674 let ints = ints.slice(2, 4);
1675 let data = DataBlock::from_array(ints);
1676
1677 let fixed_data = data.as_fixed_width().unwrap();
1678 assert_eq!(fixed_data.num_values, 4);
1679 assert_eq!(fixed_data.data.len(), 8);
1680
1681 let nullable_ints =
1682 UInt16Array::from(vec![Some(0), None, Some(2), None, Some(4), None, Some(6)]);
1683 let nullable_ints = nullable_ints.slice(1, 3);
1684 let data = DataBlock::from_array(nullable_ints);
1685
1686 let nullable = data.as_nullable().unwrap();
1687 assert_eq!(nullable.nulls, LanceBuffer::from(vec![0b00000010]));
1688 }
1689
1690 #[test]
1691 fn test_string_to_data_block() {
1692 let strings1 = StringArray::from(vec![Some("hello"), None, Some("world")]);
1694 let strings2 = StringArray::from(vec![Some("a"), Some("b")]);
1695 let strings3 = StringArray::from(vec![Option::<&'static str>::None, None]);
1696
1697 let arrays = &[strings1, strings2, strings3]
1698 .iter()
1699 .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1700 .collect::<Vec<_>>();
1701
1702 let block = DataBlock::from_arrays(arrays, 7);
1703
1704 assert_eq!(block.num_values(), 7);
1705 let block = block.as_nullable().unwrap();
1706
1707 assert_eq!(block.nulls, LanceBuffer::from(vec![0b00011101]));
1708
1709 let data = block.data.as_variable_width().unwrap();
1710 assert_eq!(
1711 data.offsets,
1712 LanceBuffer::reinterpret_vec(vec![0, 5, 5, 10, 11, 12, 12, 12])
1713 );
1714
1715 assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworldab"));
1716
1717 let strings1 = StringArray::from(vec![Some("a"), Some("bc")]);
1719 let strings2 = StringArray::from(vec![Some("def")]);
1720
1721 let arrays = &[strings1, strings2]
1722 .iter()
1723 .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1724 .collect::<Vec<_>>();
1725
1726 let block = DataBlock::from_arrays(arrays, 3);
1727
1728 assert_eq!(block.num_values(), 3);
1729 let data = block.as_variable_width().unwrap();
1731 assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(vec![0, 1, 3, 6]));
1732 assert_eq!(data.data, LanceBuffer::copy_slice(b"abcdef"));
1733 }
1734
1735 #[test]
1736 fn test_string_view_to_data_block() {
1737 let views1 = StringViewArray::from(vec![Some("hello"), None, Some("world")]);
1738 let views2 = StringViewArray::from(vec![Some("a"), Some("b")]);
1739 let views3 = StringViewArray::from(vec![Option::<&'static str>::None, None]);
1740
1741 let arrays = &[views1, views2, views3]
1742 .iter()
1743 .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1744 .collect::<Vec<_>>();
1745
1746 let block = DataBlock::from_arrays(arrays, 7);
1747
1748 assert_eq!(block.num_values(), 7);
1749 let block = block.as_nullable().unwrap();
1750 assert_eq!(block.nulls, LanceBuffer::from(vec![0b00011101]));
1751 let data = block.data.as_variable_width().unwrap();
1752 assert_eq!(
1753 data.offsets,
1754 LanceBuffer::reinterpret_vec(vec![0, 5, 5, 10, 11, 12, 12, 12])
1755 );
1756 assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworldab"));
1757
1758 let views1 = StringViewArray::from(vec![Some("a"), Some("bc")]);
1759 let views2 = StringViewArray::from(vec![Some("def")]);
1760
1761 let arrays = &[views1, views2]
1762 .iter()
1763 .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1764 .collect::<Vec<_>>();
1765
1766 let block = DataBlock::from_arrays(arrays, 3);
1767
1768 assert_eq!(block.num_values(), 3);
1769 let data = block.as_variable_width().unwrap();
1770 assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(vec![0, 1, 3, 6]));
1771 assert_eq!(data.data, LanceBuffer::copy_slice(b"abcdef"));
1772 }
1773
1774 #[test]
1775 fn test_binary_view_to_data_block() {
1776 let arr: ArrayRef = Arc::new(BinaryViewArray::from(vec![
1777 Some(b"foo".as_slice()),
1778 None,
1779 Some(b"bar".as_slice()),
1780 ]));
1781 let block = DataBlock::from_arrays(&[arr], 3);
1782 let block = block.as_nullable().unwrap();
1783 let data = block.data.as_variable_width().unwrap();
1784 assert_eq!(data.data, LanceBuffer::copy_slice(b"foobar"));
1785 }
1786
1787 #[test]
1788 fn test_string_sliced() {
1789 let check = |arr: Vec<StringArray>, expected_off: Vec<i32>, expected_data: &[u8]| {
1790 let arrs = arr
1791 .into_iter()
1792 .map(|a| Arc::new(a) as ArrayRef)
1793 .collect::<Vec<_>>();
1794 let num_rows = arrs.iter().map(|a| a.len()).sum::<usize>() as u64;
1795 let data = DataBlock::from_arrays(&arrs, num_rows);
1796
1797 assert_eq!(data.num_values(), num_rows);
1798
1799 let data = data.as_variable_width().unwrap();
1800 assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(expected_off));
1801 assert_eq!(data.data, LanceBuffer::copy_slice(expected_data));
1802 };
1803
1804 let string = StringArray::from(vec![Some("hello"), Some("world")]);
1805 check(vec![string.slice(1, 1)], vec![0, 5], b"world");
1806 check(vec![string.slice(0, 1)], vec![0, 5], b"hello");
1807 check(
1808 vec![string.slice(0, 1), string.slice(1, 1)],
1809 vec![0, 5, 10],
1810 b"helloworld",
1811 );
1812
1813 let string2 = StringArray::from(vec![Some("foo"), Some("bar")]);
1814 check(
1815 vec![string.slice(0, 1), string2.slice(0, 1)],
1816 vec![0, 5, 8],
1817 b"hellofoo",
1818 );
1819 }
1820
1821 #[test]
1822 fn test_large() {
1823 let arr = LargeBinaryArray::from_vec(vec![b"hello", b"world"]);
1824 let data = DataBlock::from_array(arr);
1825
1826 assert_eq!(data.num_values(), 2);
1827 let data = data.as_variable_width().unwrap();
1828 assert_eq!(data.bits_per_offset, 64);
1829 assert_eq!(data.num_values, 2);
1830 assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworld"));
1831 assert_eq!(
1832 data.offsets,
1833 LanceBuffer::reinterpret_vec(vec![0_u64, 5, 10])
1834 );
1835 }
1836
1837 #[test]
1838 fn test_dictionary_indices_normalized() {
1839 let arr1 = DictionaryArray::<Int8Type>::from_iter([Some("a"), Some("a"), Some("b")]);
1840 let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("b"), Some("c")]);
1841
1842 let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1843
1844 assert_eq!(data.num_values(), 5);
1845 let data = data.as_dictionary().unwrap();
1846 let indices = data.indices;
1847 assert_eq!(indices.bits_per_value, 8);
1848 assert_eq!(indices.num_values, 5);
1849 assert_eq!(
1850 indices.data,
1851 LanceBuffer::reinterpret_vec::<i8>(vec![0, 0, 1, 2, 3])
1855 );
1856
1857 let items = data.dictionary.as_variable_width().unwrap();
1858 assert_eq!(items.bits_per_offset, 32);
1859 assert_eq!(items.num_values, 4);
1860 assert_eq!(items.data, LanceBuffer::copy_slice(b"abbc"));
1861 assert_eq!(
1862 items.offsets,
1863 LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 4],)
1864 );
1865 }
1866
1867 #[test]
1868 fn test_dictionary_nulls() {
1869 let arr1 = DictionaryArray::<Int8Type>::from_iter([None, Some("a"), Some("b")]);
1873 let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("c"), None]);
1874
1875 let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1876
1877 let check_common = |data: DataBlock| {
1878 assert_eq!(data.num_values(), 5);
1879 let dict = data.as_dictionary().unwrap();
1880
1881 let nullable_items = dict.dictionary.as_nullable().unwrap();
1882 assert_eq!(nullable_items.nulls, LanceBuffer::from(vec![0b00000111]));
1883 assert_eq!(nullable_items.data.num_values(), 4);
1884
1885 let items = nullable_items.data.as_variable_width().unwrap();
1886 assert_eq!(items.bits_per_offset, 32);
1887 assert_eq!(items.num_values, 4);
1888 assert_eq!(items.data, LanceBuffer::copy_slice(b"abc"));
1889 assert_eq!(
1890 items.offsets,
1891 LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 3],)
1892 );
1893
1894 let indices = dict.indices;
1895 assert_eq!(indices.bits_per_value, 8);
1896 assert_eq!(indices.num_values, 5);
1897 assert_eq!(
1898 indices.data,
1899 LanceBuffer::reinterpret_vec::<i8>(vec![3, 0, 1, 2, 3])
1900 );
1901 };
1902 check_common(data);
1903
1904 let items = StringArray::from(vec![Some("a"), Some("b"), Some("c"), None]);
1906 let indices = Int8Array::from(vec![Some(3), Some(0), Some(1), Some(2), Some(3)]);
1907 let dict = DictionaryArray::new(indices, Arc::new(items));
1908
1909 let data = DataBlock::from_array(dict);
1910
1911 check_common(data);
1912 }
1913
1914 #[test]
1915 fn test_dictionary_cannot_add_null() {
1916 let items = StringArray::from(
1918 (0..256)
1919 .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1920 .collect::<Vec<_>>(),
1921 );
1922 let indices = UInt8Array::from(
1924 (0..=256)
1925 .map(|i| if i == 256 { None } else { Some(i as u8) })
1926 .collect::<Vec<_>>(),
1927 );
1928 let dict = DictionaryArray::new(indices, Arc::new(items));
1931 let data = DataBlock::from_array(dict);
1932
1933 assert_eq!(data.num_values(), 257);
1934
1935 let dict = data.as_dictionary().unwrap();
1936
1937 assert_eq!(dict.indices.bits_per_value, 32);
1938 assert_eq!(
1939 dict.indices.data,
1940 LanceBuffer::reinterpret_vec((0_u32..257).collect::<Vec<_>>())
1941 );
1942
1943 let nullable_items = dict.dictionary.as_nullable().unwrap();
1944 let null_buffer = NullBuffer::new(BooleanBuffer::new(
1945 nullable_items.nulls.into_buffer(),
1946 0,
1947 257,
1948 ));
1949 for i in 0..256 {
1950 assert!(!null_buffer.is_null(i));
1951 }
1952 assert!(null_buffer.is_null(256));
1953
1954 assert_eq!(
1955 nullable_items.data.as_variable_width().unwrap().data.len(),
1956 32640
1957 );
1958 }
1959
1960 #[test]
1961 fn test_all_null() {
1962 for data_type in [
1963 DataType::UInt32,
1964 DataType::FixedSizeBinary(2),
1965 DataType::List(Arc::new(Field::new("item", DataType::UInt32, true))),
1966 DataType::Struct(Fields::from(vec![Field::new("a", DataType::UInt32, true)])),
1967 ] {
1968 let block = DataBlock::AllNull(AllNullDataBlock { num_values: 10 });
1969 let arr = block.into_arrow(data_type.clone(), true).unwrap();
1970 let arr = make_array(arr);
1971 let expected = new_null_array(&data_type, 10);
1972 assert_eq!(&arr, &expected);
1973 }
1974 }
1975
1976 #[test]
1977 fn test_dictionary_cannot_concatenate() {
1978 let items = StringArray::from(
1980 (0..256)
1981 .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1982 .collect::<Vec<_>>(),
1983 );
1984 let other_items = StringArray::from(
1986 (0..256)
1987 .map(|i| Some(String::from_utf8(vec![1; i + 1]).unwrap()))
1988 .collect::<Vec<_>>(),
1989 );
1990 let indices = UInt8Array::from_iter_values(0..=255);
1991 let dict1 = DictionaryArray::new(indices.clone(), Arc::new(items));
1992 let dict2 = DictionaryArray::new(indices, Arc::new(other_items));
1993 let data = DataBlock::from_arrays(&[Arc::new(dict1), Arc::new(dict2)], 512);
1994 assert_eq!(data.num_values(), 512);
1995
1996 let dict = data.as_dictionary().unwrap();
1997
1998 assert_eq!(dict.indices.bits_per_value, 32);
1999 assert_eq!(
2000 dict.indices.data,
2001 LanceBuffer::reinterpret_vec::<u32>((0..512).collect::<Vec<_>>())
2002 );
2003 assert_eq!(
2005 dict.dictionary.as_variable_width().unwrap().data.len(),
2006 65536
2007 );
2008 }
2009
2010 #[test]
2011 fn test_data_size() {
2012 let mut rng = rand_xoshiro::Xoshiro256PlusPlus::seed_from_u64(DEFAULT_SEED.0);
2013 let mut genn = array::rand::<Int32Type>().with_nulls(&[false, false, false]);
2015
2016 let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
2017 let block = DataBlock::from_array(arr.clone());
2018 assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
2019
2020 let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
2021 let block = DataBlock::from_array(arr.clone());
2022 assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
2023
2024 let mut genn = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
2026 let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
2027 let block = DataBlock::from_array(arr.clone());
2028
2029 let array_data = arr.to_data();
2030 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
2031 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
2033 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
2034
2035 let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
2036 let block = DataBlock::from_array(arr.clone());
2037
2038 let array_data = arr.to_data();
2039 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
2040 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
2041 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
2042
2043 let mut genn = array::rand::<Int32Type>().with_nulls(&[true, true, false]);
2044 let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
2045 let block = DataBlock::from_array(arr.clone());
2046
2047 let array_data = arr.to_data();
2048 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
2049 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
2050 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
2051
2052 let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
2053 let block = DataBlock::from_array(arr.clone());
2054
2055 let array_data = arr.to_data();
2056 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
2057 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
2058 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
2059
2060 let mut genn = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
2061 let arr1 = genn.generate(RowCount::from(3), &mut rng).unwrap();
2062 let arr2 = genn.generate(RowCount::from(3), &mut rng).unwrap();
2063 let arr3 = genn.generate(RowCount::from(3), &mut rng).unwrap();
2064 let block = DataBlock::from_arrays(&[arr1.clone(), arr2.clone(), arr3.clone()], 9);
2065
2066 let concatenated_array = arrow_select::concat::concat(&[
2067 &*Arc::new(arr1.clone()) as &dyn Array,
2068 &*Arc::new(arr2.clone()) as &dyn Array,
2069 &*Arc::new(arr3.clone()) as &dyn Array,
2070 ])
2071 .unwrap();
2072 let total_buffer_size: usize = concatenated_array
2073 .to_data()
2074 .buffers()
2075 .iter()
2076 .map(|buffer| buffer.len())
2077 .sum();
2078
2079 let total_nulls_size_in_bytes = concatenated_array.nulls().unwrap().len().div_ceil(8);
2080 assert!(block.data_size() == (total_buffer_size + total_nulls_size_in_bytes) as u64);
2081 }
2082}