1use std::{
18 ops::Range,
19 sync::{Arc, RwLock},
20};
21
22use arrow_array::{
23 cast::AsArray,
24 new_empty_array, new_null_array,
25 types::{ArrowDictionaryKeyType, UInt16Type, UInt32Type, UInt64Type, UInt8Type},
26 Array, ArrayRef, OffsetSizeTrait, UInt64Array,
27};
28use arrow_buffer::{
29 ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer, ScalarBuffer,
30};
31use arrow_data::{ArrayData, ArrayDataBuilder};
32use arrow_schema::DataType;
33use lance_arrow::DataTypeExt;
34use snafu::location;
35
36use lance_core::{Error, Result};
37
38use crate::{
39 buffer::LanceBuffer,
40 statistics::{ComputeStat, Stat},
41};
42
43#[derive(Debug, Clone)]
49pub struct AllNullDataBlock {
50 pub num_values: u64,
52}
53
54impl AllNullDataBlock {
55 fn into_arrow(self, data_type: DataType, _validate: bool) -> Result<ArrayData> {
56 Ok(ArrayData::new_null(&data_type, self.num_values as usize))
57 }
58
59 fn into_buffers(self) -> Vec<LanceBuffer> {
60 vec![]
61 }
62}
63
64use std::collections::HashMap;
65
66#[derive(Debug, Clone)]
69pub struct BlockInfo(pub Arc<RwLock<HashMap<Stat, Arc<dyn Array>>>>);
70
71impl Default for BlockInfo {
72 fn default() -> Self {
73 Self::new()
74 }
75}
76
77impl BlockInfo {
78 pub fn new() -> Self {
79 Self(Arc::new(RwLock::new(HashMap::new())))
80 }
81}
82
83impl PartialEq for BlockInfo {
84 fn eq(&self, other: &Self) -> bool {
85 let self_info = self.0.read().unwrap();
86 let other_info = other.0.read().unwrap();
87 *self_info == *other_info
88 }
89}
90
91#[derive(Debug, Clone)]
97pub struct NullableDataBlock {
98 pub data: Box<DataBlock>,
100 pub nulls: LanceBuffer,
102
103 pub block_info: BlockInfo,
104}
105
106impl NullableDataBlock {
107 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
108 let nulls = self.nulls.into_buffer();
109 let data = self.data.into_arrow(data_type, validate)?.into_builder();
110 let data = data.null_bit_buffer(Some(nulls));
111 if validate {
112 Ok(data.build()?)
113 } else {
114 Ok(unsafe { data.build_unchecked() })
115 }
116 }
117
118 fn into_buffers(self) -> Vec<LanceBuffer> {
119 let mut buffers = vec![self.nulls];
120 buffers.extend(self.data.into_buffers());
121 buffers
122 }
123
124 pub fn data_size(&self) -> u64 {
125 self.data.data_size() + self.nulls.len() as u64
126 }
127}
128
129#[derive(Debug, PartialEq, Clone)]
131pub struct ConstantDataBlock {
132 pub data: LanceBuffer,
134 pub num_values: u64,
136}
137
138impl ConstantDataBlock {
139 fn into_buffers(self) -> Vec<LanceBuffer> {
140 vec![self.data]
141 }
142
143 fn into_arrow(self, _data_type: DataType, _validate: bool) -> Result<ArrayData> {
144 todo!()
147 }
148
149 pub fn try_clone(&self) -> Result<Self> {
150 Ok(Self {
151 data: self.data.clone(),
152 num_values: self.num_values,
153 })
154 }
155
156 pub fn data_size(&self) -> u64 {
157 self.data.len() as u64
158 }
159}
160
161#[derive(Debug, PartialEq, Clone)]
163pub struct FixedWidthDataBlock {
164 pub data: LanceBuffer,
166 pub bits_per_value: u64,
168 pub num_values: u64,
170
171 pub block_info: BlockInfo,
172}
173
174impl FixedWidthDataBlock {
175 fn do_into_arrow(
176 self,
177 data_type: DataType,
178 num_values: u64,
179 validate: bool,
180 ) -> Result<ArrayData> {
181 let data_buffer = self.data.into_buffer();
182 let builder = ArrayDataBuilder::new(data_type)
183 .add_buffer(data_buffer)
184 .len(num_values as usize)
185 .null_count(0);
186 if validate {
187 Ok(builder.build()?)
188 } else {
189 Ok(unsafe { builder.build_unchecked() })
190 }
191 }
192
193 pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
194 let root_num_values = self.num_values;
195 self.do_into_arrow(data_type, root_num_values, validate)
196 }
197
198 pub fn into_buffers(self) -> Vec<LanceBuffer> {
199 vec![self.data]
200 }
201
202 pub fn try_clone(&self) -> Result<Self> {
203 Ok(Self {
204 data: self.data.clone(),
205 bits_per_value: self.bits_per_value,
206 num_values: self.num_values,
207 block_info: self.block_info.clone(),
208 })
209 }
210
211 pub fn data_size(&self) -> u64 {
212 self.data.len() as u64
213 }
214}
215
216#[derive(Debug)]
217pub struct VariableWidthDataBlockBuilder<T: OffsetSizeTrait> {
218 offsets: Vec<T>,
219 bytes: Vec<u8>,
220}
221
222impl<T: OffsetSizeTrait> VariableWidthDataBlockBuilder<T> {
223 fn new(estimated_size_bytes: u64) -> Self {
224 Self {
225 offsets: vec![T::from_usize(0).unwrap()],
226 bytes: Vec::with_capacity(estimated_size_bytes as usize),
227 }
228 }
229}
230
231impl<T: OffsetSizeTrait> DataBlockBuilderImpl for VariableWidthDataBlockBuilder<T> {
232 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
233 let block = data_block.as_variable_width_ref().unwrap();
234 assert!(block.bits_per_offset == T::get_byte_width() as u8 * 8);
235
236 let offsets: ScalarBuffer<T> = block.offsets.clone().borrow_to_typed_slice();
237
238 let start_offset = offsets[selection.start as usize];
239 let end_offset = offsets[selection.end as usize];
240 let mut previous_len = self.bytes.len();
241
242 self.bytes
243 .extend_from_slice(&block.data[start_offset.as_usize()..end_offset.as_usize()]);
244
245 self.offsets.extend(
246 offsets[selection.start as usize..selection.end as usize]
247 .iter()
248 .zip(&offsets[selection.start as usize + 1..=selection.end as usize])
249 .map(|(¤t, &next)| {
250 let this_value_len = next - current;
251 previous_len += this_value_len.as_usize();
252 T::from_usize(previous_len).unwrap()
253 }),
254 );
255 }
256
257 fn finish(self: Box<Self>) -> DataBlock {
258 let num_values = (self.offsets.len() - 1) as u64;
259 DataBlock::VariableWidth(VariableWidthBlock {
260 data: LanceBuffer::from(self.bytes),
261 offsets: LanceBuffer::reinterpret_vec(self.offsets),
262 bits_per_offset: T::get_byte_width() as u8 * 8,
263 num_values,
264 block_info: BlockInfo::new(),
265 })
266 }
267}
268
269#[derive(Debug)]
270struct BitmapDataBlockBuilder {
271 values: BooleanBufferBuilder,
272}
273
274impl BitmapDataBlockBuilder {
275 fn new(estimated_size_bytes: u64) -> Self {
276 Self {
277 values: BooleanBufferBuilder::new(estimated_size_bytes as usize * 8),
278 }
279 }
280}
281
282impl DataBlockBuilderImpl for BitmapDataBlockBuilder {
283 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
284 let bitmap_blk = data_block.as_fixed_width_ref().unwrap();
285 self.values.append_packed_range(
286 selection.start as usize..selection.end as usize,
287 &bitmap_blk.data,
288 );
289 }
290
291 fn finish(mut self: Box<Self>) -> DataBlock {
292 let bool_buf = self.values.finish();
293 let num_values = bool_buf.len() as u64;
294 let bits_buf = bool_buf.into_inner();
295 DataBlock::FixedWidth(FixedWidthDataBlock {
296 data: LanceBuffer::from(bits_buf),
297 bits_per_value: 1,
298 num_values,
299 block_info: BlockInfo::new(),
300 })
301 }
302}
303
304#[derive(Debug)]
305struct FixedWidthDataBlockBuilder {
306 bits_per_value: u64,
307 bytes_per_value: u64,
308 values: Vec<u8>,
309}
310
311impl FixedWidthDataBlockBuilder {
312 fn new(bits_per_value: u64, estimated_size_bytes: u64) -> Self {
313 assert!(bits_per_value % 8 == 0);
314 Self {
315 bits_per_value,
316 bytes_per_value: bits_per_value / 8,
317 values: Vec::with_capacity(estimated_size_bytes as usize),
318 }
319 }
320}
321
322impl DataBlockBuilderImpl for FixedWidthDataBlockBuilder {
323 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
324 let block = data_block.as_fixed_width_ref().unwrap();
325 assert_eq!(self.bits_per_value, block.bits_per_value);
326 let start = selection.start as usize * self.bytes_per_value as usize;
327 let end = selection.end as usize * self.bytes_per_value as usize;
328 self.values.extend_from_slice(&block.data[start..end]);
329 }
330
331 fn finish(self: Box<Self>) -> DataBlock {
332 let num_values = (self.values.len() / self.bytes_per_value as usize) as u64;
333 DataBlock::FixedWidth(FixedWidthDataBlock {
334 data: LanceBuffer::from(self.values),
335 bits_per_value: self.bits_per_value,
336 num_values,
337 block_info: BlockInfo::new(),
338 })
339 }
340}
341
342#[derive(Debug)]
343struct StructDataBlockBuilder {
344 children: Vec<Box<dyn DataBlockBuilderImpl>>,
345}
346
347impl StructDataBlockBuilder {
348 fn new(children: Vec<Box<dyn DataBlockBuilderImpl>>) -> Self {
349 Self { children }
350 }
351}
352
353impl DataBlockBuilderImpl for StructDataBlockBuilder {
354 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
355 let data_block = data_block.as_struct_ref().unwrap();
356 for i in 0..self.children.len() {
357 self.children[i].append(&data_block.children[i], selection.clone());
358 }
359 }
360
361 fn finish(self: Box<Self>) -> DataBlock {
362 let mut children_data_block = Vec::new();
363 for child in self.children {
364 let child_data_block = child.finish();
365 children_data_block.push(child_data_block);
366 }
367 DataBlock::Struct(StructDataBlock {
368 children: children_data_block,
369 block_info: BlockInfo::new(),
370 validity: None,
371 })
372 }
373}
374
375#[derive(Debug, Default)]
376struct AllNullDataBlockBuilder {
377 num_values: u64,
378}
379
380impl DataBlockBuilderImpl for AllNullDataBlockBuilder {
381 fn append(&mut self, _data_block: &DataBlock, selection: Range<u64>) {
382 self.num_values += selection.end - selection.start;
383 }
384
385 fn finish(self: Box<Self>) -> DataBlock {
386 DataBlock::AllNull(AllNullDataBlock {
387 num_values: self.num_values,
388 })
389 }
390}
391
392#[derive(Debug, Clone)]
394pub struct FixedSizeListBlock {
395 pub child: Box<DataBlock>,
397 pub dimension: u64,
399}
400
401impl FixedSizeListBlock {
402 pub fn num_values(&self) -> u64 {
403 self.child.num_values() / self.dimension
404 }
405
406 pub fn try_into_flat(self) -> Option<FixedWidthDataBlock> {
410 match *self.child {
411 DataBlock::Nullable(_) => None,
413 DataBlock::FixedSizeList(inner) => {
414 let mut flat = inner.try_into_flat()?;
415 flat.bits_per_value *= self.dimension;
416 flat.num_values /= self.dimension;
417 Some(flat)
418 }
419 DataBlock::FixedWidth(mut inner) => {
420 inner.bits_per_value *= self.dimension;
421 inner.num_values /= self.dimension;
422 Some(inner)
423 }
424 _ => panic!(
425 "Expected FixedSizeList or FixedWidth data block but found {:?}",
426 self
427 ),
428 }
429 }
430
431 pub fn flatten_as_fixed(&mut self) -> FixedWidthDataBlock {
432 match self.child.as_mut() {
433 DataBlock::FixedSizeList(fsl) => fsl.flatten_as_fixed(),
434 DataBlock::FixedWidth(fw) => fw.clone(),
435 _ => panic!("Expected FixedSizeList or FixedWidth data block"),
436 }
437 }
438
439 pub fn from_flat(data: FixedWidthDataBlock, data_type: &DataType) -> DataBlock {
441 match data_type {
442 DataType::FixedSizeList(child_field, dimension) => {
443 let mut data = data;
444 data.bits_per_value /= *dimension as u64;
445 data.num_values *= *dimension as u64;
446 let child_data = Self::from_flat(data, child_field.data_type());
447 DataBlock::FixedSizeList(Self {
448 child: Box::new(child_data),
449 dimension: *dimension as u64,
450 })
451 }
452 _ => DataBlock::FixedWidth(data),
454 }
455 }
456
457 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
458 let num_values = self.num_values();
459 let builder = match &data_type {
460 DataType::FixedSizeList(child_field, _) => {
461 let child_data = self
462 .child
463 .into_arrow(child_field.data_type().clone(), validate)?;
464 ArrayDataBuilder::new(data_type)
465 .add_child_data(child_data)
466 .len(num_values as usize)
467 .null_count(0)
468 }
469 _ => panic!("Expected FixedSizeList data type and got {:?}", data_type),
470 };
471 if validate {
472 Ok(builder.build()?)
473 } else {
474 Ok(unsafe { builder.build_unchecked() })
475 }
476 }
477
478 fn into_buffers(self) -> Vec<LanceBuffer> {
479 self.child.into_buffers()
480 }
481
482 fn data_size(&self) -> u64 {
483 self.child.data_size()
484 }
485}
486
487#[derive(Debug)]
488struct FixedSizeListBlockBuilder {
489 inner: Box<dyn DataBlockBuilderImpl>,
490 dimension: u64,
491}
492
493impl FixedSizeListBlockBuilder {
494 fn new(inner: Box<dyn DataBlockBuilderImpl>, dimension: u64) -> Self {
495 Self { inner, dimension }
496 }
497}
498
499impl DataBlockBuilderImpl for FixedSizeListBlockBuilder {
500 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
501 let selection = selection.start * self.dimension..selection.end * self.dimension;
502 let fsl = data_block.as_fixed_size_list_ref().unwrap();
503 self.inner.append(fsl.child.as_ref(), selection);
504 }
505
506 fn finish(self: Box<Self>) -> DataBlock {
507 let inner_block = self.inner.finish();
508 DataBlock::FixedSizeList(FixedSizeListBlock {
509 child: Box::new(inner_block),
510 dimension: self.dimension,
511 })
512 }
513}
514
515#[derive(Debug)]
516struct NullableDataBlockBuilder {
517 inner: Box<dyn DataBlockBuilderImpl>,
518 validity: BooleanBufferBuilder,
519}
520
521impl NullableDataBlockBuilder {
522 fn new(inner: Box<dyn DataBlockBuilderImpl>, estimated_size_bytes: usize) -> Self {
523 Self {
524 inner,
525 validity: BooleanBufferBuilder::new(estimated_size_bytes * 8),
526 }
527 }
528}
529
530impl DataBlockBuilderImpl for NullableDataBlockBuilder {
531 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
532 let nullable = data_block.as_nullable_ref().unwrap();
533 let bool_buf = BooleanBuffer::new(
534 nullable.nulls.clone().into_buffer(),
535 selection.start as usize,
536 (selection.end - selection.start) as usize,
537 );
538 self.validity.append_buffer(&bool_buf);
539 self.inner.append(nullable.data.as_ref(), selection);
540 }
541
542 fn finish(mut self: Box<Self>) -> DataBlock {
543 let inner_block = self.inner.finish();
544 DataBlock::Nullable(NullableDataBlock {
545 data: Box::new(inner_block),
546 nulls: LanceBuffer::from(self.validity.finish().into_inner()),
547 block_info: BlockInfo::new(),
548 })
549 }
550}
551
552#[derive(Debug, Clone)]
556pub struct OpaqueBlock {
557 pub buffers: Vec<LanceBuffer>,
558 pub num_values: u64,
559 pub block_info: BlockInfo,
560}
561
562impl OpaqueBlock {
563 pub fn data_size(&self) -> u64 {
564 self.buffers.iter().map(|b| b.len() as u64).sum()
565 }
566}
567
568#[derive(Debug, Clone)]
570pub struct VariableWidthBlock {
571 pub data: LanceBuffer,
573 pub offsets: LanceBuffer,
577 pub bits_per_offset: u8,
579 pub num_values: u64,
581
582 pub block_info: BlockInfo,
583}
584
585impl VariableWidthBlock {
586 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
587 let data_buffer = self.data.into_buffer();
588 let offsets_buffer = self.offsets.into_buffer();
589 let builder = ArrayDataBuilder::new(data_type)
590 .add_buffer(offsets_buffer)
591 .add_buffer(data_buffer)
592 .len(self.num_values as usize)
593 .null_count(0);
594 if validate {
595 Ok(builder.build()?)
596 } else {
597 Ok(unsafe { builder.build_unchecked() })
598 }
599 }
600
601 fn into_buffers(self) -> Vec<LanceBuffer> {
602 vec![self.offsets, self.data]
603 }
604
605 pub fn offsets_as_block(&mut self) -> DataBlock {
606 let offsets = self.offsets.clone();
607 DataBlock::FixedWidth(FixedWidthDataBlock {
608 data: offsets,
609 bits_per_value: self.bits_per_offset as u64,
610 num_values: self.num_values + 1,
611 block_info: BlockInfo::new(),
612 })
613 }
614
615 pub fn data_size(&self) -> u64 {
616 (self.data.len() + self.offsets.len()) as u64
617 }
618}
619
620#[derive(Debug, Clone)]
622pub struct StructDataBlock {
623 pub children: Vec<DataBlock>,
625 pub block_info: BlockInfo,
626 pub validity: Option<NullBuffer>,
628}
629
630impl StructDataBlock {
631 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
632 if let DataType::Struct(fields) = &data_type {
633 let mut builder = ArrayDataBuilder::new(DataType::Struct(fields.clone()));
634 let mut num_rows = 0;
635 for (field, child) in fields.iter().zip(self.children) {
636 let child_data = child.into_arrow(field.data_type().clone(), validate)?;
637 num_rows = child_data.len();
638 builder = builder.add_child_data(child_data);
639 }
640
641 let builder = if let Some(validity) = self.validity {
643 let null_count = validity.null_count();
644 builder
645 .null_bit_buffer(Some(validity.into_inner().into_inner()))
646 .null_count(null_count)
647 } else {
648 builder.null_count(0)
649 };
650
651 let builder = builder.len(num_rows);
652 if validate {
653 Ok(builder.build()?)
654 } else {
655 Ok(unsafe { builder.build_unchecked() })
656 }
657 } else {
658 Err(Error::Internal {
659 message: format!("Expected Struct, got {:?}", data_type),
660 location: location!(),
661 })
662 }
663 }
664
665 fn remove_outer_validity(self) -> Self {
666 Self {
667 children: self
668 .children
669 .into_iter()
670 .map(|c| c.remove_outer_validity())
671 .collect(),
672 block_info: self.block_info,
673 validity: None, }
675 }
676
677 fn into_buffers(self) -> Vec<LanceBuffer> {
678 self.children
679 .into_iter()
680 .flat_map(|c| c.into_buffers())
681 .collect()
682 }
683
684 pub fn has_variable_width_child(&self) -> bool {
685 self.children
686 .iter()
687 .any(|child| !matches!(child, DataBlock::FixedWidth(_)))
688 }
689
690 pub fn data_size(&self) -> u64 {
691 self.children
692 .iter()
693 .map(|data_block| data_block.data_size())
694 .sum()
695 }
696}
697
698#[derive(Debug, Clone)]
700pub struct DictionaryDataBlock {
701 pub indices: FixedWidthDataBlock,
703 pub dictionary: Box<DataBlock>,
705}
706
707impl DictionaryDataBlock {
708 fn decode_helper<K: ArrowDictionaryKeyType>(self) -> Result<DataBlock> {
709 if self.indices.num_values == 0 {
712 return Ok(DataBlock::AllNull(AllNullDataBlock { num_values: 0 }));
713 }
714
715 let estimated_size_bytes = self.dictionary.data_size()
717 * (self.indices.num_values + self.dictionary.num_values() - 1)
718 / self.dictionary.num_values();
719 let mut data_builder = DataBlockBuilder::with_capacity_estimate(estimated_size_bytes);
720
721 let indices = self.indices.data.borrow_to_typed_slice::<K::Native>();
722 let indices = indices.as_ref();
723
724 indices
725 .iter()
726 .map(|idx| idx.to_usize().unwrap() as u64)
727 .for_each(|idx| {
728 data_builder.append(&self.dictionary, idx..idx + 1);
729 });
730
731 Ok(data_builder.finish())
732 }
733
734 pub fn decode(self) -> Result<DataBlock> {
735 match self.indices.bits_per_value {
736 8 => self.decode_helper::<UInt8Type>(),
737 16 => self.decode_helper::<UInt16Type>(),
738 32 => self.decode_helper::<UInt32Type>(),
739 64 => self.decode_helper::<UInt64Type>(),
740 _ => Err(lance_core::Error::Internal {
741 message: format!(
742 "Unsupported dictionary index bit width: {} bits",
743 self.indices.bits_per_value
744 ),
745 location: location!(),
746 }),
747 }
748 }
749
750 fn into_arrow_dict(
751 self,
752 key_type: Box<DataType>,
753 value_type: Box<DataType>,
754 validate: bool,
755 ) -> Result<ArrayData> {
756 let indices = self.indices.into_arrow((*key_type).clone(), validate)?;
757 let dictionary = self
758 .dictionary
759 .into_arrow((*value_type).clone(), validate)?;
760
761 let builder = indices
762 .into_builder()
763 .add_child_data(dictionary)
764 .data_type(DataType::Dictionary(key_type, value_type));
765
766 if validate {
767 Ok(builder.build()?)
768 } else {
769 Ok(unsafe { builder.build_unchecked() })
770 }
771 }
772
773 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
774 if let DataType::Dictionary(key_type, value_type) = data_type {
775 self.into_arrow_dict(key_type, value_type, validate)
776 } else {
777 self.decode()?.into_arrow(data_type, validate)
778 }
779 }
780
781 fn into_buffers(self) -> Vec<LanceBuffer> {
782 let mut buffers = self.indices.into_buffers();
783 buffers.extend(self.dictionary.into_buffers());
784 buffers
785 }
786
787 pub fn into_parts(self) -> (DataBlock, DataBlock) {
788 (DataBlock::FixedWidth(self.indices), *self.dictionary)
789 }
790
791 pub fn from_parts(indices: FixedWidthDataBlock, dictionary: DataBlock) -> Self {
792 Self {
793 indices,
794 dictionary: Box::new(dictionary),
795 }
796 }
797}
798
799#[derive(Debug, Clone)]
814pub enum DataBlock {
815 Empty(),
816 Constant(ConstantDataBlock),
817 AllNull(AllNullDataBlock),
818 Nullable(NullableDataBlock),
819 FixedWidth(FixedWidthDataBlock),
820 FixedSizeList(FixedSizeListBlock),
821 VariableWidth(VariableWidthBlock),
822 Opaque(OpaqueBlock),
823 Struct(StructDataBlock),
824 Dictionary(DictionaryDataBlock),
825}
826
827impl DataBlock {
828 pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
830 match self {
831 Self::Empty() => Ok(new_empty_array(&data_type).to_data()),
832 Self::Constant(inner) => inner.into_arrow(data_type, validate),
833 Self::AllNull(inner) => inner.into_arrow(data_type, validate),
834 Self::Nullable(inner) => inner.into_arrow(data_type, validate),
835 Self::FixedWidth(inner) => inner.into_arrow(data_type, validate),
836 Self::FixedSizeList(inner) => inner.into_arrow(data_type, validate),
837 Self::VariableWidth(inner) => inner.into_arrow(data_type, validate),
838 Self::Struct(inner) => inner.into_arrow(data_type, validate),
839 Self::Dictionary(inner) => inner.into_arrow(data_type, validate),
840 Self::Opaque(_) => Err(Error::Internal {
841 message: "Cannot convert OpaqueBlock to Arrow".to_string(),
842 location: location!(),
843 }),
844 }
845 }
846
847 pub fn into_buffers(self) -> Vec<LanceBuffer> {
851 match self {
852 Self::Empty() => Vec::default(),
853 Self::Constant(inner) => inner.into_buffers(),
854 Self::AllNull(inner) => inner.into_buffers(),
855 Self::Nullable(inner) => inner.into_buffers(),
856 Self::FixedWidth(inner) => inner.into_buffers(),
857 Self::FixedSizeList(inner) => inner.into_buffers(),
858 Self::VariableWidth(inner) => inner.into_buffers(),
859 Self::Struct(inner) => inner.into_buffers(),
860 Self::Dictionary(inner) => inner.into_buffers(),
861 Self::Opaque(inner) => inner.buffers,
862 }
863 }
864
865 pub fn try_clone(&self) -> Result<Self> {
874 match self {
875 Self::Empty() => Ok(Self::Empty()),
876 Self::Constant(inner) => Ok(Self::Constant(inner.clone())),
877 Self::AllNull(inner) => Ok(Self::AllNull(inner.clone())),
878 Self::Nullable(inner) => Ok(Self::Nullable(inner.clone())),
879 Self::FixedWidth(inner) => Ok(Self::FixedWidth(inner.clone())),
880 Self::FixedSizeList(inner) => Ok(Self::FixedSizeList(inner.clone())),
881 Self::VariableWidth(inner) => Ok(Self::VariableWidth(inner.clone())),
882 Self::Struct(inner) => Ok(Self::Struct(inner.clone())),
883 Self::Dictionary(inner) => Ok(Self::Dictionary(inner.clone())),
884 Self::Opaque(inner) => Ok(Self::Opaque(inner.clone())),
885 }
886 }
887
888 pub fn name(&self) -> &'static str {
889 match self {
890 Self::Constant(_) => "Constant",
891 Self::Empty() => "Empty",
892 Self::AllNull(_) => "AllNull",
893 Self::Nullable(_) => "Nullable",
894 Self::FixedWidth(_) => "FixedWidth",
895 Self::FixedSizeList(_) => "FixedSizeList",
896 Self::VariableWidth(_) => "VariableWidth",
897 Self::Struct(_) => "Struct",
898 Self::Dictionary(_) => "Dictionary",
899 Self::Opaque(_) => "Opaque",
900 }
901 }
902
903 pub fn is_variable(&self) -> bool {
904 match self {
905 Self::Constant(_) => false,
906 Self::Empty() => false,
907 Self::AllNull(_) => false,
908 Self::Nullable(nullable) => nullable.data.is_variable(),
909 Self::FixedWidth(_) => false,
910 Self::FixedSizeList(fsl) => fsl.child.is_variable(),
911 Self::VariableWidth(_) => true,
912 Self::Struct(strct) => strct.children.iter().any(|c| c.is_variable()),
913 Self::Dictionary(_) => {
914 todo!("is_variable for DictionaryDataBlock is not implemented yet")
915 }
916 Self::Opaque(_) => panic!("Does not make sense to ask if an Opaque block is variable"),
917 }
918 }
919
920 pub fn is_nullable(&self) -> bool {
921 match self {
922 Self::AllNull(_) => true,
923 Self::Nullable(_) => true,
924 Self::FixedSizeList(fsl) => fsl.child.is_nullable(),
925 Self::Struct(strct) => strct.children.iter().any(|c| c.is_nullable()),
926 Self::Dictionary(_) => {
927 todo!("is_nullable for DictionaryDataBlock is not implemented yet")
928 }
929 Self::Opaque(_) => panic!("Does not make sense to ask if an Opaque block is nullable"),
930 _ => false,
931 }
932 }
933
934 pub fn num_values(&self) -> u64 {
939 match self {
940 Self::Empty() => 0,
941 Self::Constant(inner) => inner.num_values,
942 Self::AllNull(inner) => inner.num_values,
943 Self::Nullable(inner) => inner.data.num_values(),
944 Self::FixedWidth(inner) => inner.num_values,
945 Self::FixedSizeList(inner) => inner.num_values(),
946 Self::VariableWidth(inner) => inner.num_values,
947 Self::Struct(inner) => inner.children[0].num_values(),
948 Self::Dictionary(inner) => inner.indices.num_values,
949 Self::Opaque(inner) => inner.num_values,
950 }
951 }
952
953 pub fn items_per_row(&self) -> u64 {
957 match self {
958 Self::Empty() => todo!(), Self::Constant(_) => todo!(), Self::AllNull(_) => todo!(), Self::Nullable(nullable) => nullable.data.items_per_row(),
962 Self::FixedWidth(_) => 1,
963 Self::FixedSizeList(fsl) => fsl.dimension * fsl.child.items_per_row(),
964 Self::VariableWidth(_) => 1,
965 Self::Struct(_) => todo!(), Self::Dictionary(_) => 1,
967 Self::Opaque(_) => 1,
968 }
969 }
970
971 pub fn data_size(&self) -> u64 {
973 match self {
974 Self::Empty() => 0,
975 Self::Constant(inner) => inner.data_size(),
976 Self::AllNull(_) => 0,
977 Self::Nullable(inner) => inner.data_size(),
978 Self::FixedWidth(inner) => inner.data_size(),
979 Self::FixedSizeList(inner) => inner.data_size(),
980 Self::VariableWidth(inner) => inner.data_size(),
981 Self::Struct(_) => {
982 todo!("the data_size method for StructDataBlock is not implemented yet")
983 }
984 Self::Dictionary(_) => {
985 todo!("the data_size method for DictionaryDataBlock is not implemented yet")
986 }
987 Self::Opaque(inner) => inner.data_size(),
988 }
989 }
990
991 pub fn remove_outer_validity(self) -> Self {
999 match self {
1000 Self::AllNull(_) => panic!("Cannot remove validity on all-null data"),
1001 Self::Nullable(inner) => *inner.data,
1002 Self::Struct(inner) => Self::Struct(inner.remove_outer_validity()),
1003 other => other,
1004 }
1005 }
1006
1007 pub fn make_builder(&self, estimated_size_bytes: u64) -> Box<dyn DataBlockBuilderImpl> {
1008 match self {
1009 Self::FixedWidth(inner) => {
1010 if inner.bits_per_value == 1 {
1011 Box::new(BitmapDataBlockBuilder::new(estimated_size_bytes))
1012 } else {
1013 Box::new(FixedWidthDataBlockBuilder::new(
1014 inner.bits_per_value,
1015 estimated_size_bytes,
1016 ))
1017 }
1018 }
1019 Self::VariableWidth(inner) => {
1020 if inner.bits_per_offset == 32 {
1021 Box::new(VariableWidthDataBlockBuilder::<i32>::new(
1022 estimated_size_bytes,
1023 ))
1024 } else if inner.bits_per_offset == 64 {
1025 Box::new(VariableWidthDataBlockBuilder::<i64>::new(
1026 estimated_size_bytes,
1027 ))
1028 } else {
1029 todo!()
1030 }
1031 }
1032 Self::FixedSizeList(inner) => {
1033 let inner_builder = inner.child.make_builder(estimated_size_bytes);
1034 Box::new(FixedSizeListBlockBuilder::new(
1035 inner_builder,
1036 inner.dimension,
1037 ))
1038 }
1039 Self::Nullable(nullable) => {
1040 let estimated_validity_size_bytes = estimated_size_bytes / 16;
1043 let inner_builder = nullable
1044 .data
1045 .make_builder(estimated_size_bytes - estimated_validity_size_bytes);
1046 Box::new(NullableDataBlockBuilder::new(
1047 inner_builder,
1048 estimated_validity_size_bytes as usize,
1049 ))
1050 }
1051 Self::Struct(struct_data_block) => {
1052 let num_children = struct_data_block.children.len();
1053 let per_child_estimate = if num_children == 0 {
1054 0
1055 } else {
1056 estimated_size_bytes / num_children as u64
1057 };
1058 let child_builders = struct_data_block
1059 .children
1060 .iter()
1061 .map(|child| child.make_builder(per_child_estimate))
1062 .collect();
1063 Box::new(StructDataBlockBuilder::new(child_builders))
1064 }
1065 Self::AllNull(_) => Box::new(AllNullDataBlockBuilder::default()),
1066 _ => todo!("make_builder for {:?}", self),
1067 }
1068 }
1069}
1070
1071macro_rules! as_type {
1072 ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1073 pub fn $fn_name(self) -> Option<$inner_type> {
1074 match self {
1075 Self::$inner(inner) => Some(inner),
1076 _ => None,
1077 }
1078 }
1079 };
1080}
1081
1082macro_rules! as_type_ref {
1083 ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1084 pub fn $fn_name(&self) -> Option<&$inner_type> {
1085 match self {
1086 Self::$inner(inner) => Some(inner),
1087 _ => None,
1088 }
1089 }
1090 };
1091}
1092
1093macro_rules! as_type_ref_mut {
1094 ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1095 pub fn $fn_name(&mut self) -> Option<&mut $inner_type> {
1096 match self {
1097 Self::$inner(inner) => Some(inner),
1098 _ => None,
1099 }
1100 }
1101 };
1102}
1103
1104impl DataBlock {
1106 as_type!(as_all_null, AllNull, AllNullDataBlock);
1107 as_type!(as_nullable, Nullable, NullableDataBlock);
1108 as_type!(as_fixed_width, FixedWidth, FixedWidthDataBlock);
1109 as_type!(as_fixed_size_list, FixedSizeList, FixedSizeListBlock);
1110 as_type!(as_variable_width, VariableWidth, VariableWidthBlock);
1111 as_type!(as_struct, Struct, StructDataBlock);
1112 as_type!(as_dictionary, Dictionary, DictionaryDataBlock);
1113 as_type_ref!(as_all_null_ref, AllNull, AllNullDataBlock);
1114 as_type_ref!(as_nullable_ref, Nullable, NullableDataBlock);
1115 as_type_ref!(as_fixed_width_ref, FixedWidth, FixedWidthDataBlock);
1116 as_type_ref!(as_fixed_size_list_ref, FixedSizeList, FixedSizeListBlock);
1117 as_type_ref!(as_variable_width_ref, VariableWidth, VariableWidthBlock);
1118 as_type_ref!(as_struct_ref, Struct, StructDataBlock);
1119 as_type_ref!(as_dictionary_ref, Dictionary, DictionaryDataBlock);
1120 as_type_ref_mut!(as_all_null_ref_mut, AllNull, AllNullDataBlock);
1121 as_type_ref_mut!(as_nullable_ref_mut, Nullable, NullableDataBlock);
1122 as_type_ref_mut!(as_fixed_width_ref_mut, FixedWidth, FixedWidthDataBlock);
1123 as_type_ref_mut!(
1124 as_fixed_size_list_ref_mut,
1125 FixedSizeList,
1126 FixedSizeListBlock
1127 );
1128 as_type_ref_mut!(as_variable_width_ref_mut, VariableWidth, VariableWidthBlock);
1129 as_type_ref_mut!(as_struct_ref_mut, Struct, StructDataBlock);
1130 as_type_ref_mut!(as_dictionary_ref_mut, Dictionary, DictionaryDataBlock);
1131}
1132
1133fn get_byte_range<T: ArrowNativeType>(offsets: &mut LanceBuffer) -> Range<usize> {
1136 let offsets = offsets.borrow_to_typed_slice::<T>();
1137 if offsets.as_ref().is_empty() {
1138 0..0
1139 } else {
1140 offsets.as_ref().first().unwrap().as_usize()..offsets.as_ref().last().unwrap().as_usize()
1141 }
1142}
1143
1144fn stitch_offsets<T: ArrowNativeType + std::ops::Add<Output = T> + std::ops::Sub<Output = T>>(
1150 offsets: Vec<LanceBuffer>,
1151) -> (LanceBuffer, Vec<Range<usize>>) {
1152 if offsets.is_empty() {
1153 return (LanceBuffer::empty(), Vec::default());
1154 }
1155 let len = offsets.iter().map(|b| b.len()).sum::<usize>();
1156 let mut dest = Vec::with_capacity(len);
1160 let mut byte_ranges = Vec::with_capacity(offsets.len());
1161
1162 dest.push(T::from_usize(0).unwrap());
1164
1165 for mut o in offsets.into_iter() {
1166 if !o.is_empty() {
1167 let last_offset = *dest.last().unwrap();
1168 let o = o.borrow_to_typed_slice::<T>();
1169 let start = *o.as_ref().first().unwrap();
1170 dest.extend(o.as_ref()[1..].iter().map(|&x| x + last_offset - start));
1184 }
1185 byte_ranges.push(get_byte_range::<T>(&mut o));
1186 }
1187 (LanceBuffer::reinterpret_vec(dest), byte_ranges)
1188}
1189
1190fn arrow_binary_to_data_block(
1191 arrays: &[ArrayRef],
1192 num_values: u64,
1193 bits_per_offset: u8,
1194) -> DataBlock {
1195 let data_vec = arrays.iter().map(|arr| arr.to_data()).collect::<Vec<_>>();
1196 let bytes_per_offset = bits_per_offset as usize / 8;
1197 let offsets = data_vec
1198 .iter()
1199 .map(|d| {
1200 LanceBuffer::from(
1201 d.buffers()[0].slice_with_length(d.offset(), (d.len() + 1) * bytes_per_offset),
1202 )
1203 })
1204 .collect::<Vec<_>>();
1205 let (offsets, data_ranges) = if bits_per_offset == 32 {
1206 stitch_offsets::<i32>(offsets)
1207 } else {
1208 stitch_offsets::<i64>(offsets)
1209 };
1210 let data = data_vec
1211 .iter()
1212 .zip(data_ranges)
1213 .map(|(d, byte_range)| {
1214 LanceBuffer::from(
1215 d.buffers()[1]
1216 .slice_with_length(byte_range.start, byte_range.end - byte_range.start),
1217 )
1218 })
1219 .collect::<Vec<_>>();
1220 let data = LanceBuffer::concat_into_one(data);
1221 DataBlock::VariableWidth(VariableWidthBlock {
1222 data,
1223 offsets,
1224 bits_per_offset,
1225 num_values,
1226 block_info: BlockInfo::new(),
1227 })
1228}
1229
1230fn encode_flat_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1231 let bytes_per_value = arrays[0].data_type().byte_width();
1232 let mut buffer = Vec::with_capacity(num_values as usize * bytes_per_value);
1233 for arr in arrays {
1234 let data = arr.to_data();
1235 buffer.extend_from_slice(data.buffers()[0].as_slice());
1236 }
1237 LanceBuffer::from(buffer)
1238}
1239
1240fn do_encode_bitmap_data(bitmaps: &[BooleanBuffer], num_values: u64) -> LanceBuffer {
1241 let mut builder = BooleanBufferBuilder::new(num_values as usize);
1242
1243 for buf in bitmaps {
1244 builder.append_buffer(buf);
1245 }
1246
1247 let buffer = builder.finish().into_inner();
1248 LanceBuffer::from(buffer)
1249}
1250
1251fn encode_bitmap_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1252 let bitmaps = arrays
1253 .iter()
1254 .map(|arr| arr.as_boolean().values().clone())
1255 .collect::<Vec<_>>();
1256 do_encode_bitmap_data(&bitmaps, num_values)
1257}
1258
1259fn concat_dict_arrays(arrays: &[ArrayRef]) -> ArrayRef {
1262 let value_type = arrays[0].as_any_dictionary().values().data_type();
1263 let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1264 match arrow_select::concat::concat(&array_refs) {
1265 Ok(array) => array,
1266 Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1267 let upscaled = array_refs
1269 .iter()
1270 .map(|arr| {
1271 match arrow_cast::cast(
1272 *arr,
1273 &DataType::Dictionary(
1274 Box::new(DataType::UInt32),
1275 Box::new(value_type.clone()),
1276 ),
1277 ) {
1278 Ok(arr) => arr,
1279 Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1280 unimplemented!("Dictionary arrays with more than 2^32 unique values")
1282 }
1283 err => err.unwrap(),
1284 }
1285 })
1286 .collect::<Vec<_>>();
1287 let array_refs = upscaled.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1288 match arrow_select::concat::concat(&array_refs) {
1290 Ok(array) => array,
1291 Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1292 unimplemented!("Dictionary arrays with more than 2^32 unique values")
1293 }
1294 err => err.unwrap(),
1295 }
1296 }
1297 err => err.unwrap(),
1299 }
1300}
1301
1302fn max_index_val(index_type: &DataType) -> u64 {
1303 match index_type {
1304 DataType::Int8 => i8::MAX as u64,
1305 DataType::Int16 => i16::MAX as u64,
1306 DataType::Int32 => i32::MAX as u64,
1307 DataType::Int64 => i64::MAX as u64,
1308 DataType::UInt8 => u8::MAX as u64,
1309 DataType::UInt16 => u16::MAX as u64,
1310 DataType::UInt32 => u32::MAX as u64,
1311 DataType::UInt64 => u64::MAX,
1312 _ => panic!("Invalid dictionary index type"),
1313 }
1314}
1315
1316fn arrow_dictionary_to_data_block(arrays: &[ArrayRef], validity: Option<NullBuffer>) -> DataBlock {
1335 let array = concat_dict_arrays(arrays);
1336 let array_dict = array.as_any_dictionary();
1337 let mut indices = array_dict.keys();
1338 let num_values = indices.len() as u64;
1339 let mut values = array_dict.values().clone();
1340 let mut upcast = None;
1342
1343 let indices_block = if let Some(validity) = validity {
1347 let mut first_invalid_index = None;
1351 if let Some(values_validity) = values.nulls() {
1352 first_invalid_index = (!values_validity.inner()).set_indices().next();
1353 }
1354 let first_invalid_index = first_invalid_index.unwrap_or_else(|| {
1355 let null_arr = new_null_array(values.data_type(), 1);
1356 values = arrow_select::concat::concat(&[values.as_ref(), null_arr.as_ref()]).unwrap();
1357 let null_index = values.len() - 1;
1358 let max_index_val = max_index_val(indices.data_type());
1359 if null_index as u64 > max_index_val {
1360 if max_index_val >= u32::MAX as u64 {
1362 unimplemented!("Dictionary arrays with 2^32 unique value (or more) and a null")
1363 }
1364 upcast = Some(arrow_cast::cast(indices, &DataType::UInt32).unwrap());
1365 indices = upcast.as_ref().unwrap();
1366 }
1367 null_index
1368 });
1369 let null_index_arr = arrow_cast::cast(
1371 &UInt64Array::from(vec![first_invalid_index as u64]),
1372 indices.data_type(),
1373 )
1374 .unwrap();
1375
1376 let bytes_per_index = indices.data_type().byte_width();
1377 let bits_per_index = bytes_per_index as u64 * 8;
1378
1379 let null_index_arr = null_index_arr.into_data();
1380 let null_index_bytes = &null_index_arr.buffers()[0];
1381 let mut indices_bytes = indices.to_data().buffers()[0].to_vec();
1383 for invalid_idx in (!validity.inner()).set_indices() {
1384 indices_bytes[invalid_idx * bytes_per_index..(invalid_idx + 1) * bytes_per_index]
1385 .copy_from_slice(null_index_bytes.as_slice());
1386 }
1387 FixedWidthDataBlock {
1388 data: LanceBuffer::from(indices_bytes),
1389 bits_per_value: bits_per_index,
1390 num_values,
1391 block_info: BlockInfo::new(),
1392 }
1393 } else {
1394 FixedWidthDataBlock {
1395 data: LanceBuffer::from(indices.to_data().buffers()[0].clone()),
1396 bits_per_value: indices.data_type().byte_width() as u64 * 8,
1397 num_values,
1398 block_info: BlockInfo::new(),
1399 }
1400 };
1401
1402 let items = DataBlock::from(values);
1403 DataBlock::Dictionary(DictionaryDataBlock {
1404 indices: indices_block,
1405 dictionary: Box::new(items),
1406 })
1407}
1408
1409enum Nullability {
1410 None,
1411 All,
1412 Some(NullBuffer),
1413}
1414
1415impl Nullability {
1416 fn to_option(&self) -> Option<NullBuffer> {
1417 match self {
1418 Self::Some(nulls) => Some(nulls.clone()),
1419 _ => None,
1420 }
1421 }
1422}
1423
1424fn extract_nulls(arrays: &[ArrayRef], num_values: u64) -> Nullability {
1425 let mut has_nulls = false;
1426 let nulls_and_lens = arrays
1427 .iter()
1428 .map(|arr| {
1429 let nulls = arr.logical_nulls();
1430 has_nulls |= nulls.is_some();
1431 (nulls, arr.len())
1432 })
1433 .collect::<Vec<_>>();
1434 if !has_nulls {
1435 return Nullability::None;
1436 }
1437 let mut builder = BooleanBufferBuilder::new(num_values as usize);
1438 let mut num_nulls = 0;
1439 for (null, len) in nulls_and_lens {
1440 if let Some(null) = null {
1441 num_nulls += null.null_count();
1442 builder.append_buffer(&null.into_inner());
1443 } else {
1444 builder.append_n(len, true);
1445 }
1446 }
1447 if num_nulls == num_values as usize {
1448 Nullability::All
1449 } else {
1450 Nullability::Some(NullBuffer::new(builder.finish()))
1451 }
1452}
1453
1454impl DataBlock {
1455 pub fn from_arrays(arrays: &[ArrayRef], num_values: u64) -> Self {
1456 if arrays.is_empty() || num_values == 0 {
1457 return Self::AllNull(AllNullDataBlock { num_values: 0 });
1458 }
1459
1460 let data_type = arrays[0].data_type();
1461 let nulls = extract_nulls(arrays, num_values);
1462
1463 if let Nullability::All = nulls {
1464 return Self::AllNull(AllNullDataBlock { num_values });
1465 }
1466
1467 let mut encoded = match data_type {
1468 DataType::Binary | DataType::Utf8 => arrow_binary_to_data_block(arrays, num_values, 32),
1469 DataType::BinaryView | DataType::Utf8View => {
1470 todo!()
1471 }
1472 DataType::LargeBinary | DataType::LargeUtf8 => {
1473 arrow_binary_to_data_block(arrays, num_values, 64)
1474 }
1475 DataType::Boolean => {
1476 let data = encode_bitmap_data(arrays, num_values);
1477 Self::FixedWidth(FixedWidthDataBlock {
1478 data,
1479 bits_per_value: 1,
1480 num_values,
1481 block_info: BlockInfo::new(),
1482 })
1483 }
1484 DataType::Date32
1485 | DataType::Date64
1486 | DataType::Decimal32(_, _)
1487 | DataType::Decimal64(_, _)
1488 | DataType::Decimal128(_, _)
1489 | DataType::Decimal256(_, _)
1490 | DataType::Duration(_)
1491 | DataType::FixedSizeBinary(_)
1492 | DataType::Float16
1493 | DataType::Float32
1494 | DataType::Float64
1495 | DataType::Int16
1496 | DataType::Int32
1497 | DataType::Int64
1498 | DataType::Int8
1499 | DataType::Interval(_)
1500 | DataType::Time32(_)
1501 | DataType::Time64(_)
1502 | DataType::Timestamp(_, _)
1503 | DataType::UInt16
1504 | DataType::UInt32
1505 | DataType::UInt64
1506 | DataType::UInt8 => {
1507 let data = encode_flat_data(arrays, num_values);
1508 Self::FixedWidth(FixedWidthDataBlock {
1509 data,
1510 bits_per_value: data_type.byte_width() as u64 * 8,
1511 num_values,
1512 block_info: BlockInfo::new(),
1513 })
1514 }
1515 DataType::Null => Self::AllNull(AllNullDataBlock { num_values }),
1516 DataType::Dictionary(_, _) => arrow_dictionary_to_data_block(arrays, nulls.to_option()),
1517 DataType::Struct(fields) => {
1518 let structs = arrays.iter().map(|arr| arr.as_struct()).collect::<Vec<_>>();
1519 let mut children = Vec::with_capacity(fields.len());
1520 for child_idx in 0..fields.len() {
1521 let child_vec = structs
1522 .iter()
1523 .map(|s| s.column(child_idx).clone())
1524 .collect::<Vec<_>>();
1525 children.push(Self::from_arrays(&child_vec, num_values));
1526 }
1527
1528 let validity = match &nulls {
1530 Nullability::None => None,
1531 Nullability::Some(null_buffer) => Some(null_buffer.clone()),
1532 Nullability::All => unreachable!("Should have returned AllNull earlier"),
1533 };
1534
1535 Self::Struct(StructDataBlock {
1536 children,
1537 block_info: BlockInfo::default(),
1538 validity,
1539 })
1540 }
1541 DataType::FixedSizeList(_, dim) => {
1542 let children = arrays
1543 .iter()
1544 .map(|arr| arr.as_fixed_size_list().values().clone())
1545 .collect::<Vec<_>>();
1546 let child_block = Self::from_arrays(&children, num_values * *dim as u64);
1547 Self::FixedSizeList(FixedSizeListBlock {
1548 child: Box::new(child_block),
1549 dimension: *dim as u64,
1550 })
1551 }
1552 DataType::LargeList(_)
1553 | DataType::List(_)
1554 | DataType::ListView(_)
1555 | DataType::LargeListView(_)
1556 | DataType::Map(_, _)
1557 | DataType::RunEndEncoded(_, _)
1558 | DataType::Union(_, _) => {
1559 panic!(
1560 "Field with data type {} cannot be converted to data block",
1561 data_type
1562 )
1563 }
1564 };
1565
1566 encoded.compute_stat();
1568
1569 if !matches!(data_type, DataType::Dictionary(_, _)) {
1570 match nulls {
1571 Nullability::None => encoded,
1572 Nullability::Some(nulls) => Self::Nullable(NullableDataBlock {
1573 data: Box::new(encoded),
1574 nulls: LanceBuffer::from(nulls.into_inner().into_inner()),
1575 block_info: BlockInfo::new(),
1576 }),
1577 _ => unreachable!(),
1578 }
1579 } else {
1580 encoded
1582 }
1583 }
1584
1585 pub fn from_array<T: Array + 'static>(array: T) -> Self {
1586 let num_values = array.len();
1587 Self::from_arrays(&[Arc::new(array)], num_values as u64)
1588 }
1589}
1590
1591impl From<ArrayRef> for DataBlock {
1592 fn from(array: ArrayRef) -> Self {
1593 let num_values = array.len() as u64;
1594 Self::from_arrays(&[array], num_values)
1595 }
1596}
1597
1598pub trait DataBlockBuilderImpl: std::fmt::Debug {
1599 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>);
1600 fn finish(self: Box<Self>) -> DataBlock;
1601}
1602
1603#[derive(Debug)]
1604pub struct DataBlockBuilder {
1605 estimated_size_bytes: u64,
1606 builder: Option<Box<dyn DataBlockBuilderImpl>>,
1607}
1608
1609impl DataBlockBuilder {
1610 pub fn with_capacity_estimate(estimated_size_bytes: u64) -> Self {
1611 Self {
1612 estimated_size_bytes,
1613 builder: None,
1614 }
1615 }
1616
1617 fn get_builder(&mut self, block: &DataBlock) -> &mut dyn DataBlockBuilderImpl {
1618 if self.builder.is_none() {
1619 self.builder = Some(block.make_builder(self.estimated_size_bytes));
1620 }
1621 self.builder.as_mut().unwrap().as_mut()
1622 }
1623
1624 pub fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
1625 self.get_builder(data_block).append(data_block, selection);
1626 }
1627
1628 pub fn finish(self) -> DataBlock {
1629 let builder = self.builder.expect("DataBlockBuilder didn't see any data");
1630 builder.finish()
1631 }
1632}
1633
1634#[cfg(test)]
1635mod tests {
1636 use std::sync::Arc;
1637
1638 use arrow_array::{
1639 make_array, new_null_array,
1640 types::{Int32Type, Int8Type},
1641 ArrayRef, DictionaryArray, Int8Array, LargeBinaryArray, StringArray, UInt16Array,
1642 UInt8Array,
1643 };
1644 use arrow_buffer::{BooleanBuffer, NullBuffer};
1645
1646 use arrow_schema::{DataType, Field, Fields};
1647 use lance_datagen::{array, ArrayGeneratorExt, RowCount, DEFAULT_SEED};
1648 use rand::SeedableRng;
1649
1650 use crate::buffer::LanceBuffer;
1651
1652 use super::{AllNullDataBlock, DataBlock};
1653
1654 use arrow_array::Array;
1655
1656 #[test]
1657 fn test_sliced_to_data_block() {
1658 let ints = UInt16Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
1659 let ints = ints.slice(2, 4);
1660 let data = DataBlock::from_array(ints);
1661
1662 let fixed_data = data.as_fixed_width().unwrap();
1663 assert_eq!(fixed_data.num_values, 4);
1664 assert_eq!(fixed_data.data.len(), 8);
1665
1666 let nullable_ints =
1667 UInt16Array::from(vec![Some(0), None, Some(2), None, Some(4), None, Some(6)]);
1668 let nullable_ints = nullable_ints.slice(1, 3);
1669 let data = DataBlock::from_array(nullable_ints);
1670
1671 let nullable = data.as_nullable().unwrap();
1672 assert_eq!(nullable.nulls, LanceBuffer::from(vec![0b00000010]));
1673 }
1674
1675 #[test]
1676 fn test_string_to_data_block() {
1677 let strings1 = StringArray::from(vec![Some("hello"), None, Some("world")]);
1679 let strings2 = StringArray::from(vec![Some("a"), Some("b")]);
1680 let strings3 = StringArray::from(vec![Option::<&'static str>::None, None]);
1681
1682 let arrays = &[strings1, strings2, strings3]
1683 .iter()
1684 .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1685 .collect::<Vec<_>>();
1686
1687 let block = DataBlock::from_arrays(arrays, 7);
1688
1689 assert_eq!(block.num_values(), 7);
1690 let block = block.as_nullable().unwrap();
1691
1692 assert_eq!(block.nulls, LanceBuffer::from(vec![0b00011101]));
1693
1694 let data = block.data.as_variable_width().unwrap();
1695 assert_eq!(
1696 data.offsets,
1697 LanceBuffer::reinterpret_vec(vec![0, 5, 5, 10, 11, 12, 12, 12])
1698 );
1699
1700 assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworldab"));
1701
1702 let strings1 = StringArray::from(vec![Some("a"), Some("bc")]);
1704 let strings2 = StringArray::from(vec![Some("def")]);
1705
1706 let arrays = &[strings1, strings2]
1707 .iter()
1708 .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1709 .collect::<Vec<_>>();
1710
1711 let block = DataBlock::from_arrays(arrays, 3);
1712
1713 assert_eq!(block.num_values(), 3);
1714 let data = block.as_variable_width().unwrap();
1716 assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(vec![0, 1, 3, 6]));
1717 assert_eq!(data.data, LanceBuffer::copy_slice(b"abcdef"));
1718 }
1719
1720 #[test]
1721 fn test_string_sliced() {
1722 let check = |arr: Vec<StringArray>, expected_off: Vec<i32>, expected_data: &[u8]| {
1723 let arrs = arr
1724 .into_iter()
1725 .map(|a| Arc::new(a) as ArrayRef)
1726 .collect::<Vec<_>>();
1727 let num_rows = arrs.iter().map(|a| a.len()).sum::<usize>() as u64;
1728 let data = DataBlock::from_arrays(&arrs, num_rows);
1729
1730 assert_eq!(data.num_values(), num_rows);
1731
1732 let data = data.as_variable_width().unwrap();
1733 assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(expected_off));
1734 assert_eq!(data.data, LanceBuffer::copy_slice(expected_data));
1735 };
1736
1737 let string = StringArray::from(vec![Some("hello"), Some("world")]);
1738 check(vec![string.slice(1, 1)], vec![0, 5], b"world");
1739 check(vec![string.slice(0, 1)], vec![0, 5], b"hello");
1740 check(
1741 vec![string.slice(0, 1), string.slice(1, 1)],
1742 vec![0, 5, 10],
1743 b"helloworld",
1744 );
1745
1746 let string2 = StringArray::from(vec![Some("foo"), Some("bar")]);
1747 check(
1748 vec![string.slice(0, 1), string2.slice(0, 1)],
1749 vec![0, 5, 8],
1750 b"hellofoo",
1751 );
1752 }
1753
1754 #[test]
1755 fn test_large() {
1756 let arr = LargeBinaryArray::from_vec(vec![b"hello", b"world"]);
1757 let data = DataBlock::from_array(arr);
1758
1759 assert_eq!(data.num_values(), 2);
1760 let data = data.as_variable_width().unwrap();
1761 assert_eq!(data.bits_per_offset, 64);
1762 assert_eq!(data.num_values, 2);
1763 assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworld"));
1764 assert_eq!(
1765 data.offsets,
1766 LanceBuffer::reinterpret_vec(vec![0_u64, 5, 10])
1767 );
1768 }
1769
1770 #[test]
1771 fn test_dictionary_indices_normalized() {
1772 let arr1 = DictionaryArray::<Int8Type>::from_iter([Some("a"), Some("a"), Some("b")]);
1773 let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("b"), Some("c")]);
1774
1775 let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1776
1777 assert_eq!(data.num_values(), 5);
1778 let data = data.as_dictionary().unwrap();
1779 let indices = data.indices;
1780 assert_eq!(indices.bits_per_value, 8);
1781 assert_eq!(indices.num_values, 5);
1782 assert_eq!(
1783 indices.data,
1784 LanceBuffer::reinterpret_vec::<i8>(vec![0, 0, 1, 2, 3])
1788 );
1789
1790 let items = data.dictionary.as_variable_width().unwrap();
1791 assert_eq!(items.bits_per_offset, 32);
1792 assert_eq!(items.num_values, 4);
1793 assert_eq!(items.data, LanceBuffer::copy_slice(b"abbc"));
1794 assert_eq!(
1795 items.offsets,
1796 LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 4],)
1797 );
1798 }
1799
1800 #[test]
1801 fn test_dictionary_nulls() {
1802 let arr1 = DictionaryArray::<Int8Type>::from_iter([None, Some("a"), Some("b")]);
1806 let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("c"), None]);
1807
1808 let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1809
1810 let check_common = |data: DataBlock| {
1811 assert_eq!(data.num_values(), 5);
1812 let dict = data.as_dictionary().unwrap();
1813
1814 let nullable_items = dict.dictionary.as_nullable().unwrap();
1815 assert_eq!(nullable_items.nulls, LanceBuffer::from(vec![0b00000111]));
1816 assert_eq!(nullable_items.data.num_values(), 4);
1817
1818 let items = nullable_items.data.as_variable_width().unwrap();
1819 assert_eq!(items.bits_per_offset, 32);
1820 assert_eq!(items.num_values, 4);
1821 assert_eq!(items.data, LanceBuffer::copy_slice(b"abc"));
1822 assert_eq!(
1823 items.offsets,
1824 LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 3],)
1825 );
1826
1827 let indices = dict.indices;
1828 assert_eq!(indices.bits_per_value, 8);
1829 assert_eq!(indices.num_values, 5);
1830 assert_eq!(
1831 indices.data,
1832 LanceBuffer::reinterpret_vec::<i8>(vec![3, 0, 1, 2, 3])
1833 );
1834 };
1835 check_common(data);
1836
1837 let items = StringArray::from(vec![Some("a"), Some("b"), Some("c"), None]);
1839 let indices = Int8Array::from(vec![Some(3), Some(0), Some(1), Some(2), Some(3)]);
1840 let dict = DictionaryArray::new(indices, Arc::new(items));
1841
1842 let data = DataBlock::from_array(dict);
1843
1844 check_common(data);
1845 }
1846
1847 #[test]
1848 fn test_dictionary_cannot_add_null() {
1849 let items = StringArray::from(
1851 (0..256)
1852 .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1853 .collect::<Vec<_>>(),
1854 );
1855 let indices = UInt8Array::from(
1857 (0..=256)
1858 .map(|i| if i == 256 { None } else { Some(i as u8) })
1859 .collect::<Vec<_>>(),
1860 );
1861 let dict = DictionaryArray::new(indices, Arc::new(items));
1864 let data = DataBlock::from_array(dict);
1865
1866 assert_eq!(data.num_values(), 257);
1867
1868 let dict = data.as_dictionary().unwrap();
1869
1870 assert_eq!(dict.indices.bits_per_value, 32);
1871 assert_eq!(
1872 dict.indices.data,
1873 LanceBuffer::reinterpret_vec((0_u32..257).collect::<Vec<_>>())
1874 );
1875
1876 let nullable_items = dict.dictionary.as_nullable().unwrap();
1877 let null_buffer = NullBuffer::new(BooleanBuffer::new(
1878 nullable_items.nulls.into_buffer(),
1879 0,
1880 257,
1881 ));
1882 for i in 0..256 {
1883 assert!(!null_buffer.is_null(i));
1884 }
1885 assert!(null_buffer.is_null(256));
1886
1887 assert_eq!(
1888 nullable_items.data.as_variable_width().unwrap().data.len(),
1889 32640
1890 );
1891 }
1892
1893 #[test]
1894 fn test_all_null() {
1895 for data_type in [
1896 DataType::UInt32,
1897 DataType::FixedSizeBinary(2),
1898 DataType::List(Arc::new(Field::new("item", DataType::UInt32, true))),
1899 DataType::Struct(Fields::from(vec![Field::new("a", DataType::UInt32, true)])),
1900 ] {
1901 let block = DataBlock::AllNull(AllNullDataBlock { num_values: 10 });
1902 let arr = block.into_arrow(data_type.clone(), true).unwrap();
1903 let arr = make_array(arr);
1904 let expected = new_null_array(&data_type, 10);
1905 assert_eq!(&arr, &expected);
1906 }
1907 }
1908
1909 #[test]
1910 fn test_dictionary_cannot_concatenate() {
1911 let items = StringArray::from(
1913 (0..256)
1914 .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1915 .collect::<Vec<_>>(),
1916 );
1917 let other_items = StringArray::from(
1919 (0..256)
1920 .map(|i| Some(String::from_utf8(vec![1; i + 1]).unwrap()))
1921 .collect::<Vec<_>>(),
1922 );
1923 let indices = UInt8Array::from_iter_values(0..=255);
1924 let dict1 = DictionaryArray::new(indices.clone(), Arc::new(items));
1925 let dict2 = DictionaryArray::new(indices, Arc::new(other_items));
1926 let data = DataBlock::from_arrays(&[Arc::new(dict1), Arc::new(dict2)], 512);
1927 assert_eq!(data.num_values(), 512);
1928
1929 let dict = data.as_dictionary().unwrap();
1930
1931 assert_eq!(dict.indices.bits_per_value, 32);
1932 assert_eq!(
1933 dict.indices.data,
1934 LanceBuffer::reinterpret_vec::<u32>((0..512).collect::<Vec<_>>())
1935 );
1936 assert_eq!(
1938 dict.dictionary.as_variable_width().unwrap().data.len(),
1939 65536
1940 );
1941 }
1942
1943 #[test]
1944 fn test_data_size() {
1945 let mut rng = rand_xoshiro::Xoshiro256PlusPlus::seed_from_u64(DEFAULT_SEED.0);
1946 let mut genn = array::rand::<Int32Type>().with_nulls(&[false, false, false]);
1948
1949 let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
1950 let block = DataBlock::from_array(arr.clone());
1951 assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
1952
1953 let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
1954 let block = DataBlock::from_array(arr.clone());
1955 assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
1956
1957 let mut genn = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
1959 let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
1960 let block = DataBlock::from_array(arr.clone());
1961
1962 let array_data = arr.to_data();
1963 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1964 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1966 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1967
1968 let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
1969 let block = DataBlock::from_array(arr.clone());
1970
1971 let array_data = arr.to_data();
1972 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1973 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1974 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1975
1976 let mut genn = array::rand::<Int32Type>().with_nulls(&[true, true, false]);
1977 let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
1978 let block = DataBlock::from_array(arr.clone());
1979
1980 let array_data = arr.to_data();
1981 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1982 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1983 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1984
1985 let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
1986 let block = DataBlock::from_array(arr.clone());
1987
1988 let array_data = arr.to_data();
1989 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1990 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1991 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1992
1993 let mut genn = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
1994 let arr1 = genn.generate(RowCount::from(3), &mut rng).unwrap();
1995 let arr2 = genn.generate(RowCount::from(3), &mut rng).unwrap();
1996 let arr3 = genn.generate(RowCount::from(3), &mut rng).unwrap();
1997 let block = DataBlock::from_arrays(&[arr1.clone(), arr2.clone(), arr3.clone()], 9);
1998
1999 let concatenated_array = arrow_select::concat::concat(&[
2000 &*Arc::new(arr1.clone()) as &dyn Array,
2001 &*Arc::new(arr2.clone()) as &dyn Array,
2002 &*Arc::new(arr3.clone()) as &dyn Array,
2003 ])
2004 .unwrap();
2005 let total_buffer_size: usize = concatenated_array
2006 .to_data()
2007 .buffers()
2008 .iter()
2009 .map(|buffer| buffer.len())
2010 .sum();
2011
2012 let total_nulls_size_in_bytes = concatenated_array.nulls().unwrap().len().div_ceil(8);
2013 assert!(block.data_size() == (total_buffer_size + total_nulls_size_in_bytes) as u64);
2014 }
2015}