1use std::{
18 ops::Range,
19 sync::{Arc, RwLock},
20};
21
22use arrow_array::{
23 cast::AsArray,
24 new_empty_array, new_null_array,
25 types::{ArrowDictionaryKeyType, UInt16Type, UInt32Type, UInt64Type, UInt8Type},
26 Array, ArrayRef, OffsetSizeTrait, UInt64Array,
27};
28use arrow_buffer::{
29 ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer, ScalarBuffer,
30};
31use arrow_data::{ArrayData, ArrayDataBuilder};
32use arrow_schema::DataType;
33use lance_arrow::DataTypeExt;
34use snafu::location;
35
36use lance_core::{Error, Result};
37
38use crate::{
39 buffer::LanceBuffer,
40 statistics::{ComputeStat, Stat},
41};
42
43#[derive(Debug, Clone)]
49pub struct AllNullDataBlock {
50 pub num_values: u64,
52}
53
54impl AllNullDataBlock {
55 fn into_arrow(self, data_type: DataType, _validate: bool) -> Result<ArrayData> {
56 Ok(ArrayData::new_null(&data_type, self.num_values as usize))
57 }
58
59 fn into_buffers(self) -> Vec<LanceBuffer> {
60 vec![]
61 }
62}
63
64use std::collections::HashMap;
65
66#[derive(Debug, Clone)]
69pub struct BlockInfo(pub Arc<RwLock<HashMap<Stat, Arc<dyn Array>>>>);
70
71impl Default for BlockInfo {
72 fn default() -> Self {
73 Self::new()
74 }
75}
76
77impl BlockInfo {
78 pub fn new() -> Self {
79 Self(Arc::new(RwLock::new(HashMap::new())))
80 }
81}
82
83impl PartialEq for BlockInfo {
84 fn eq(&self, other: &Self) -> bool {
85 let self_info = self.0.read().unwrap();
86 let other_info = other.0.read().unwrap();
87 *self_info == *other_info
88 }
89}
90
91#[derive(Debug, Clone)]
97pub struct NullableDataBlock {
98 pub data: Box<DataBlock>,
100 pub nulls: LanceBuffer,
102
103 pub block_info: BlockInfo,
104}
105
106impl NullableDataBlock {
107 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
108 let nulls = self.nulls.into_buffer();
109 let data = self.data.into_arrow(data_type, validate)?.into_builder();
110 let data = data.null_bit_buffer(Some(nulls));
111 if validate {
112 Ok(data.build()?)
113 } else {
114 Ok(unsafe { data.build_unchecked() })
115 }
116 }
117
118 fn into_buffers(self) -> Vec<LanceBuffer> {
119 let mut buffers = vec![self.nulls];
120 buffers.extend(self.data.into_buffers());
121 buffers
122 }
123
124 pub fn data_size(&self) -> u64 {
125 self.data.data_size() + self.nulls.len() as u64
126 }
127}
128
129#[derive(Debug, PartialEq, Clone)]
131pub struct ConstantDataBlock {
132 pub data: LanceBuffer,
134 pub num_values: u64,
136}
137
138impl ConstantDataBlock {
139 fn into_buffers(self) -> Vec<LanceBuffer> {
140 vec![self.data]
141 }
142
143 fn into_arrow(self, _data_type: DataType, _validate: bool) -> Result<ArrayData> {
144 todo!()
147 }
148
149 pub fn try_clone(&self) -> Result<Self> {
150 Ok(Self {
151 data: self.data.clone(),
152 num_values: self.num_values,
153 })
154 }
155
156 pub fn data_size(&self) -> u64 {
157 self.data.len() as u64
158 }
159}
160
161#[derive(Debug, PartialEq, Clone)]
163pub struct FixedWidthDataBlock {
164 pub data: LanceBuffer,
166 pub bits_per_value: u64,
168 pub num_values: u64,
170
171 pub block_info: BlockInfo,
172}
173
174impl FixedWidthDataBlock {
175 fn do_into_arrow(
176 self,
177 data_type: DataType,
178 num_values: u64,
179 validate: bool,
180 ) -> Result<ArrayData> {
181 let data_buffer = self.data.into_buffer();
182 let builder = ArrayDataBuilder::new(data_type)
183 .add_buffer(data_buffer)
184 .len(num_values as usize)
185 .null_count(0);
186 if validate {
187 Ok(builder.build()?)
188 } else {
189 Ok(unsafe { builder.build_unchecked() })
190 }
191 }
192
193 pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
194 let root_num_values = self.num_values;
195 self.do_into_arrow(data_type, root_num_values, validate)
196 }
197
198 pub fn into_buffers(self) -> Vec<LanceBuffer> {
199 vec![self.data]
200 }
201
202 pub fn try_clone(&self) -> Result<Self> {
203 Ok(Self {
204 data: self.data.clone(),
205 bits_per_value: self.bits_per_value,
206 num_values: self.num_values,
207 block_info: self.block_info.clone(),
208 })
209 }
210
211 pub fn data_size(&self) -> u64 {
212 self.data.len() as u64
213 }
214}
215
216#[derive(Debug)]
217pub struct VariableWidthDataBlockBuilder<T: OffsetSizeTrait> {
218 offsets: Vec<T>,
219 bytes: Vec<u8>,
220}
221
222impl<T: OffsetSizeTrait> VariableWidthDataBlockBuilder<T> {
223 fn new(estimated_size_bytes: u64) -> Self {
224 Self {
225 offsets: vec![T::from_usize(0).unwrap()],
226 bytes: Vec::with_capacity(estimated_size_bytes as usize),
227 }
228 }
229}
230
231impl<T: OffsetSizeTrait> DataBlockBuilderImpl for VariableWidthDataBlockBuilder<T> {
232 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
233 let block = data_block.as_variable_width_ref().unwrap();
234 assert!(block.bits_per_offset == T::get_byte_width() as u8 * 8);
235
236 let offsets: ScalarBuffer<T> = block.offsets.clone().borrow_to_typed_slice();
237
238 let start_offset = offsets[selection.start as usize];
239 let end_offset = offsets[selection.end as usize];
240 let mut previous_len = self.bytes.len();
241
242 self.bytes
243 .extend_from_slice(&block.data[start_offset.as_usize()..end_offset.as_usize()]);
244
245 self.offsets.extend(
246 offsets[selection.start as usize..selection.end as usize]
247 .iter()
248 .zip(&offsets[selection.start as usize + 1..=selection.end as usize])
249 .map(|(¤t, &next)| {
250 let this_value_len = next - current;
251 previous_len += this_value_len.as_usize();
252 T::from_usize(previous_len).unwrap()
253 }),
254 );
255 }
256
257 fn finish(self: Box<Self>) -> DataBlock {
258 let num_values = (self.offsets.len() - 1) as u64;
259 DataBlock::VariableWidth(VariableWidthBlock {
260 data: LanceBuffer::from(self.bytes),
261 offsets: LanceBuffer::reinterpret_vec(self.offsets),
262 bits_per_offset: T::get_byte_width() as u8 * 8,
263 num_values,
264 block_info: BlockInfo::new(),
265 })
266 }
267}
268
269#[derive(Debug)]
270struct BitmapDataBlockBuilder {
271 values: BooleanBufferBuilder,
272}
273
274impl BitmapDataBlockBuilder {
275 fn new(estimated_size_bytes: u64) -> Self {
276 Self {
277 values: BooleanBufferBuilder::new(estimated_size_bytes as usize * 8),
278 }
279 }
280}
281
282impl DataBlockBuilderImpl for BitmapDataBlockBuilder {
283 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
284 let bitmap_blk = data_block.as_fixed_width_ref().unwrap();
285 self.values.append_packed_range(
286 selection.start as usize..selection.end as usize,
287 &bitmap_blk.data,
288 );
289 }
290
291 fn finish(mut self: Box<Self>) -> DataBlock {
292 let bool_buf = self.values.finish();
293 let num_values = bool_buf.len() as u64;
294 let bits_buf = bool_buf.into_inner();
295 DataBlock::FixedWidth(FixedWidthDataBlock {
296 data: LanceBuffer::from(bits_buf),
297 bits_per_value: 1,
298 num_values,
299 block_info: BlockInfo::new(),
300 })
301 }
302}
303
304#[derive(Debug)]
305struct FixedWidthDataBlockBuilder {
306 bits_per_value: u64,
307 bytes_per_value: u64,
308 values: Vec<u8>,
309}
310
311impl FixedWidthDataBlockBuilder {
312 fn new(bits_per_value: u64, estimated_size_bytes: u64) -> Self {
313 assert!(bits_per_value % 8 == 0);
314 Self {
315 bits_per_value,
316 bytes_per_value: bits_per_value / 8,
317 values: Vec::with_capacity(estimated_size_bytes as usize),
318 }
319 }
320}
321
322impl DataBlockBuilderImpl for FixedWidthDataBlockBuilder {
323 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
324 let block = data_block.as_fixed_width_ref().unwrap();
325 assert_eq!(self.bits_per_value, block.bits_per_value);
326 let start = selection.start as usize * self.bytes_per_value as usize;
327 let end = selection.end as usize * self.bytes_per_value as usize;
328 self.values.extend_from_slice(&block.data[start..end]);
329 }
330
331 fn finish(self: Box<Self>) -> DataBlock {
332 let num_values = (self.values.len() / self.bytes_per_value as usize) as u64;
333 DataBlock::FixedWidth(FixedWidthDataBlock {
334 data: LanceBuffer::from(self.values),
335 bits_per_value: self.bits_per_value,
336 num_values,
337 block_info: BlockInfo::new(),
338 })
339 }
340}
341
342#[derive(Debug)]
343struct StructDataBlockBuilder {
344 children: Vec<Box<dyn DataBlockBuilderImpl>>,
345}
346
347impl StructDataBlockBuilder {
348 fn new(bits_per_values: Vec<u32>, estimated_size_bytes: u64) -> Self {
351 let mut children = vec![];
352
353 debug_assert!(bits_per_values.iter().all(|bpv| bpv % 8 == 0));
354
355 let bytes_per_row: u32 = bits_per_values.iter().sum::<u32>() / 8;
356 let bytes_per_row = bytes_per_row as u64;
357
358 for bits_per_value in bits_per_values.iter() {
359 let this_estimated_size_bytes =
360 estimated_size_bytes / bytes_per_row * (*bits_per_value as u64) / 8;
361 let child =
362 FixedWidthDataBlockBuilder::new(*bits_per_value as u64, this_estimated_size_bytes);
363 children.push(Box::new(child) as Box<dyn DataBlockBuilderImpl>);
364 }
365 Self { children }
366 }
367}
368
369impl DataBlockBuilderImpl for StructDataBlockBuilder {
370 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
371 let data_block = data_block.as_struct_ref().unwrap();
372 for i in 0..self.children.len() {
373 self.children[i].append(&data_block.children[i], selection.clone());
374 }
375 }
376
377 fn finish(self: Box<Self>) -> DataBlock {
378 let mut children_data_block = Vec::new();
379 for child in self.children {
380 let child_data_block = child.finish();
381 children_data_block.push(child_data_block);
382 }
383 DataBlock::Struct(StructDataBlock {
384 children: children_data_block,
385 block_info: BlockInfo::new(),
386 validity: None,
387 })
388 }
389}
390
391#[derive(Debug, Default)]
392struct AllNullDataBlockBuilder {
393 num_values: u64,
394}
395
396impl DataBlockBuilderImpl for AllNullDataBlockBuilder {
397 fn append(&mut self, _data_block: &DataBlock, selection: Range<u64>) {
398 self.num_values += selection.end - selection.start;
399 }
400
401 fn finish(self: Box<Self>) -> DataBlock {
402 DataBlock::AllNull(AllNullDataBlock {
403 num_values: self.num_values,
404 })
405 }
406}
407
408#[derive(Debug, Clone)]
410pub struct FixedSizeListBlock {
411 pub child: Box<DataBlock>,
413 pub dimension: u64,
415}
416
417impl FixedSizeListBlock {
418 pub fn num_values(&self) -> u64 {
419 self.child.num_values() / self.dimension
420 }
421
422 pub fn try_into_flat(self) -> Option<FixedWidthDataBlock> {
426 match *self.child {
427 DataBlock::Nullable(_) => None,
429 DataBlock::FixedSizeList(inner) => {
430 let mut flat = inner.try_into_flat()?;
431 flat.bits_per_value *= self.dimension;
432 flat.num_values /= self.dimension;
433 Some(flat)
434 }
435 DataBlock::FixedWidth(mut inner) => {
436 inner.bits_per_value *= self.dimension;
437 inner.num_values /= self.dimension;
438 Some(inner)
439 }
440 _ => panic!(
441 "Expected FixedSizeList or FixedWidth data block but found {:?}",
442 self
443 ),
444 }
445 }
446
447 pub fn flatten_as_fixed(&mut self) -> FixedWidthDataBlock {
448 match self.child.as_mut() {
449 DataBlock::FixedSizeList(fsl) => fsl.flatten_as_fixed(),
450 DataBlock::FixedWidth(fw) => fw.clone(),
451 _ => panic!("Expected FixedSizeList or FixedWidth data block"),
452 }
453 }
454
455 pub fn from_flat(data: FixedWidthDataBlock, data_type: &DataType) -> DataBlock {
457 match data_type {
458 DataType::FixedSizeList(child_field, dimension) => {
459 let mut data = data;
460 data.bits_per_value /= *dimension as u64;
461 data.num_values *= *dimension as u64;
462 let child_data = Self::from_flat(data, child_field.data_type());
463 DataBlock::FixedSizeList(Self {
464 child: Box::new(child_data),
465 dimension: *dimension as u64,
466 })
467 }
468 _ => DataBlock::FixedWidth(data),
470 }
471 }
472
473 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
474 let num_values = self.num_values();
475 let builder = match &data_type {
476 DataType::FixedSizeList(child_field, _) => {
477 let child_data = self
478 .child
479 .into_arrow(child_field.data_type().clone(), validate)?;
480 ArrayDataBuilder::new(data_type)
481 .add_child_data(child_data)
482 .len(num_values as usize)
483 .null_count(0)
484 }
485 _ => panic!("Expected FixedSizeList data type and got {:?}", data_type),
486 };
487 if validate {
488 Ok(builder.build()?)
489 } else {
490 Ok(unsafe { builder.build_unchecked() })
491 }
492 }
493
494 fn into_buffers(self) -> Vec<LanceBuffer> {
495 self.child.into_buffers()
496 }
497
498 fn data_size(&self) -> u64 {
499 self.child.data_size()
500 }
501}
502
503#[derive(Debug)]
504struct FixedSizeListBlockBuilder {
505 inner: Box<dyn DataBlockBuilderImpl>,
506 dimension: u64,
507}
508
509impl FixedSizeListBlockBuilder {
510 fn new(inner: Box<dyn DataBlockBuilderImpl>, dimension: u64) -> Self {
511 Self { inner, dimension }
512 }
513}
514
515impl DataBlockBuilderImpl for FixedSizeListBlockBuilder {
516 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
517 let selection = selection.start * self.dimension..selection.end * self.dimension;
518 let fsl = data_block.as_fixed_size_list_ref().unwrap();
519 self.inner.append(fsl.child.as_ref(), selection);
520 }
521
522 fn finish(self: Box<Self>) -> DataBlock {
523 let inner_block = self.inner.finish();
524 DataBlock::FixedSizeList(FixedSizeListBlock {
525 child: Box::new(inner_block),
526 dimension: self.dimension,
527 })
528 }
529}
530
531#[derive(Debug)]
532struct NullableDataBlockBuilder {
533 inner: Box<dyn DataBlockBuilderImpl>,
534 validity: BooleanBufferBuilder,
535}
536
537impl NullableDataBlockBuilder {
538 fn new(inner: Box<dyn DataBlockBuilderImpl>, estimated_size_bytes: usize) -> Self {
539 Self {
540 inner,
541 validity: BooleanBufferBuilder::new(estimated_size_bytes * 8),
542 }
543 }
544}
545
546impl DataBlockBuilderImpl for NullableDataBlockBuilder {
547 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
548 let nullable = data_block.as_nullable_ref().unwrap();
549 let bool_buf = BooleanBuffer::new(
550 nullable.nulls.clone().into_buffer(),
551 selection.start as usize,
552 (selection.end - selection.start) as usize,
553 );
554 self.validity.append_buffer(&bool_buf);
555 self.inner.append(nullable.data.as_ref(), selection);
556 }
557
558 fn finish(mut self: Box<Self>) -> DataBlock {
559 let inner_block = self.inner.finish();
560 DataBlock::Nullable(NullableDataBlock {
561 data: Box::new(inner_block),
562 nulls: LanceBuffer::from(self.validity.finish().into_inner()),
563 block_info: BlockInfo::new(),
564 })
565 }
566}
567
568#[derive(Debug, Clone)]
572pub struct OpaqueBlock {
573 pub buffers: Vec<LanceBuffer>,
574 pub num_values: u64,
575 pub block_info: BlockInfo,
576}
577
578impl OpaqueBlock {
579 pub fn data_size(&self) -> u64 {
580 self.buffers.iter().map(|b| b.len() as u64).sum()
581 }
582}
583
584#[derive(Debug, Clone)]
586pub struct VariableWidthBlock {
587 pub data: LanceBuffer,
589 pub offsets: LanceBuffer,
593 pub bits_per_offset: u8,
595 pub num_values: u64,
597
598 pub block_info: BlockInfo,
599}
600
601impl VariableWidthBlock {
602 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
603 let data_buffer = self.data.into_buffer();
604 let offsets_buffer = self.offsets.into_buffer();
605 let builder = ArrayDataBuilder::new(data_type)
606 .add_buffer(offsets_buffer)
607 .add_buffer(data_buffer)
608 .len(self.num_values as usize)
609 .null_count(0);
610 if validate {
611 Ok(builder.build()?)
612 } else {
613 Ok(unsafe { builder.build_unchecked() })
614 }
615 }
616
617 fn into_buffers(self) -> Vec<LanceBuffer> {
618 vec![self.offsets, self.data]
619 }
620
621 pub fn offsets_as_block(&mut self) -> DataBlock {
622 let offsets = self.offsets.clone();
623 DataBlock::FixedWidth(FixedWidthDataBlock {
624 data: offsets,
625 bits_per_value: self.bits_per_offset as u64,
626 num_values: self.num_values + 1,
627 block_info: BlockInfo::new(),
628 })
629 }
630
631 pub fn data_size(&self) -> u64 {
632 (self.data.len() + self.offsets.len()) as u64
633 }
634}
635
636#[derive(Debug, Clone)]
638pub struct StructDataBlock {
639 pub children: Vec<DataBlock>,
641 pub block_info: BlockInfo,
642 pub validity: Option<NullBuffer>,
644}
645
646impl StructDataBlock {
647 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
648 if let DataType::Struct(fields) = &data_type {
649 let mut builder = ArrayDataBuilder::new(DataType::Struct(fields.clone()));
650 let mut num_rows = 0;
651 for (field, child) in fields.iter().zip(self.children) {
652 let child_data = child.into_arrow(field.data_type().clone(), validate)?;
653 num_rows = child_data.len();
654 builder = builder.add_child_data(child_data);
655 }
656
657 let builder = if let Some(validity) = self.validity {
659 let null_count = validity.null_count();
660 builder
661 .null_bit_buffer(Some(validity.into_inner().into_inner()))
662 .null_count(null_count)
663 } else {
664 builder.null_count(0)
665 };
666
667 let builder = builder.len(num_rows);
668 if validate {
669 Ok(builder.build()?)
670 } else {
671 Ok(unsafe { builder.build_unchecked() })
672 }
673 } else {
674 Err(Error::Internal {
675 message: format!("Expected Struct, got {:?}", data_type),
676 location: location!(),
677 })
678 }
679 }
680
681 fn remove_outer_validity(self) -> Self {
682 Self {
683 children: self
684 .children
685 .into_iter()
686 .map(|c| c.remove_outer_validity())
687 .collect(),
688 block_info: self.block_info,
689 validity: None, }
691 }
692
693 fn into_buffers(self) -> Vec<LanceBuffer> {
694 self.children
695 .into_iter()
696 .flat_map(|c| c.into_buffers())
697 .collect()
698 }
699
700 pub fn data_size(&self) -> u64 {
701 self.children
702 .iter()
703 .map(|data_block| data_block.data_size())
704 .sum()
705 }
706}
707
708#[derive(Debug, Clone)]
710pub struct DictionaryDataBlock {
711 pub indices: FixedWidthDataBlock,
713 pub dictionary: Box<DataBlock>,
715}
716
717impl DictionaryDataBlock {
718 fn decode_helper<K: ArrowDictionaryKeyType>(self) -> Result<DataBlock> {
719 let estimated_size_bytes = self.dictionary.data_size()
721 * (self.indices.num_values + self.dictionary.num_values() - 1)
722 / self.dictionary.num_values();
723 let mut data_builder = DataBlockBuilder::with_capacity_estimate(estimated_size_bytes);
724
725 let indices = self.indices.data.borrow_to_typed_slice::<K::Native>();
726 let indices = indices.as_ref();
727
728 indices
729 .iter()
730 .map(|idx| idx.to_usize().unwrap() as u64)
731 .for_each(|idx| {
732 data_builder.append(&self.dictionary, idx..idx + 1);
733 });
734
735 Ok(data_builder.finish())
736 }
737
738 pub fn decode(self) -> Result<DataBlock> {
739 match self.indices.bits_per_value {
740 8 => self.decode_helper::<UInt8Type>(),
741 16 => self.decode_helper::<UInt16Type>(),
742 32 => self.decode_helper::<UInt32Type>(),
743 64 => self.decode_helper::<UInt64Type>(),
744 _ => Err(lance_core::Error::Internal {
745 message: format!(
746 "Unsupported dictionary index bit width: {} bits",
747 self.indices.bits_per_value
748 ),
749 location: location!(),
750 }),
751 }
752 }
753
754 fn into_arrow_dict(
755 self,
756 key_type: Box<DataType>,
757 value_type: Box<DataType>,
758 validate: bool,
759 ) -> Result<ArrayData> {
760 let indices = self.indices.into_arrow((*key_type).clone(), validate)?;
761 let dictionary = self
762 .dictionary
763 .into_arrow((*value_type).clone(), validate)?;
764
765 let builder = indices
766 .into_builder()
767 .add_child_data(dictionary)
768 .data_type(DataType::Dictionary(key_type, value_type));
769
770 if validate {
771 Ok(builder.build()?)
772 } else {
773 Ok(unsafe { builder.build_unchecked() })
774 }
775 }
776
777 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
778 if let DataType::Dictionary(key_type, value_type) = data_type {
779 self.into_arrow_dict(key_type, value_type, validate)
780 } else {
781 self.decode()?.into_arrow(data_type, validate)
782 }
783 }
784
785 fn into_buffers(self) -> Vec<LanceBuffer> {
786 let mut buffers = self.indices.into_buffers();
787 buffers.extend(self.dictionary.into_buffers());
788 buffers
789 }
790
791 pub fn into_parts(self) -> (DataBlock, DataBlock) {
792 (DataBlock::FixedWidth(self.indices), *self.dictionary)
793 }
794
795 pub fn from_parts(indices: FixedWidthDataBlock, dictionary: DataBlock) -> Self {
796 Self {
797 indices,
798 dictionary: Box::new(dictionary),
799 }
800 }
801}
802
803#[derive(Debug, Clone)]
818pub enum DataBlock {
819 Empty(),
820 Constant(ConstantDataBlock),
821 AllNull(AllNullDataBlock),
822 Nullable(NullableDataBlock),
823 FixedWidth(FixedWidthDataBlock),
824 FixedSizeList(FixedSizeListBlock),
825 VariableWidth(VariableWidthBlock),
826 Opaque(OpaqueBlock),
827 Struct(StructDataBlock),
828 Dictionary(DictionaryDataBlock),
829}
830
831impl DataBlock {
832 pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
834 match self {
835 Self::Empty() => Ok(new_empty_array(&data_type).to_data()),
836 Self::Constant(inner) => inner.into_arrow(data_type, validate),
837 Self::AllNull(inner) => inner.into_arrow(data_type, validate),
838 Self::Nullable(inner) => inner.into_arrow(data_type, validate),
839 Self::FixedWidth(inner) => inner.into_arrow(data_type, validate),
840 Self::FixedSizeList(inner) => inner.into_arrow(data_type, validate),
841 Self::VariableWidth(inner) => inner.into_arrow(data_type, validate),
842 Self::Struct(inner) => inner.into_arrow(data_type, validate),
843 Self::Dictionary(inner) => inner.into_arrow(data_type, validate),
844 Self::Opaque(_) => Err(Error::Internal {
845 message: "Cannot convert OpaqueBlock to Arrow".to_string(),
846 location: location!(),
847 }),
848 }
849 }
850
851 pub fn into_buffers(self) -> Vec<LanceBuffer> {
855 match self {
856 Self::Empty() => Vec::default(),
857 Self::Constant(inner) => inner.into_buffers(),
858 Self::AllNull(inner) => inner.into_buffers(),
859 Self::Nullable(inner) => inner.into_buffers(),
860 Self::FixedWidth(inner) => inner.into_buffers(),
861 Self::FixedSizeList(inner) => inner.into_buffers(),
862 Self::VariableWidth(inner) => inner.into_buffers(),
863 Self::Struct(inner) => inner.into_buffers(),
864 Self::Dictionary(inner) => inner.into_buffers(),
865 Self::Opaque(inner) => inner.buffers,
866 }
867 }
868
869 pub fn try_clone(&self) -> Result<Self> {
878 match self {
879 Self::Empty() => Ok(Self::Empty()),
880 Self::Constant(inner) => Ok(Self::Constant(inner.clone())),
881 Self::AllNull(inner) => Ok(Self::AllNull(inner.clone())),
882 Self::Nullable(inner) => Ok(Self::Nullable(inner.clone())),
883 Self::FixedWidth(inner) => Ok(Self::FixedWidth(inner.clone())),
884 Self::FixedSizeList(inner) => Ok(Self::FixedSizeList(inner.clone())),
885 Self::VariableWidth(inner) => Ok(Self::VariableWidth(inner.clone())),
886 Self::Struct(inner) => Ok(Self::Struct(inner.clone())),
887 Self::Dictionary(inner) => Ok(Self::Dictionary(inner.clone())),
888 Self::Opaque(inner) => Ok(Self::Opaque(inner.clone())),
889 }
890 }
891
892 pub fn name(&self) -> &'static str {
893 match self {
894 Self::Constant(_) => "Constant",
895 Self::Empty() => "Empty",
896 Self::AllNull(_) => "AllNull",
897 Self::Nullable(_) => "Nullable",
898 Self::FixedWidth(_) => "FixedWidth",
899 Self::FixedSizeList(_) => "FixedSizeList",
900 Self::VariableWidth(_) => "VariableWidth",
901 Self::Struct(_) => "Struct",
902 Self::Dictionary(_) => "Dictionary",
903 Self::Opaque(_) => "Opaque",
904 }
905 }
906
907 pub fn is_variable(&self) -> bool {
908 match self {
909 Self::Constant(_) => false,
910 Self::Empty() => false,
911 Self::AllNull(_) => false,
912 Self::Nullable(nullable) => nullable.data.is_variable(),
913 Self::FixedWidth(_) => false,
914 Self::FixedSizeList(fsl) => fsl.child.is_variable(),
915 Self::VariableWidth(_) => true,
916 Self::Struct(strct) => strct.children.iter().any(|c| c.is_variable()),
917 Self::Dictionary(_) => {
918 todo!("is_variable for DictionaryDataBlock is not implemented yet")
919 }
920 Self::Opaque(_) => panic!("Does not make sense to ask if an Opaque block is variable"),
921 }
922 }
923
924 pub fn is_nullable(&self) -> bool {
925 match self {
926 Self::AllNull(_) => true,
927 Self::Nullable(_) => true,
928 Self::FixedSizeList(fsl) => fsl.child.is_nullable(),
929 Self::Struct(strct) => strct.children.iter().any(|c| c.is_nullable()),
930 Self::Dictionary(_) => {
931 todo!("is_nullable for DictionaryDataBlock is not implemented yet")
932 }
933 Self::Opaque(_) => panic!("Does not make sense to ask if an Opaque block is nullable"),
934 _ => false,
935 }
936 }
937
938 pub fn num_values(&self) -> u64 {
943 match self {
944 Self::Empty() => 0,
945 Self::Constant(inner) => inner.num_values,
946 Self::AllNull(inner) => inner.num_values,
947 Self::Nullable(inner) => inner.data.num_values(),
948 Self::FixedWidth(inner) => inner.num_values,
949 Self::FixedSizeList(inner) => inner.num_values(),
950 Self::VariableWidth(inner) => inner.num_values,
951 Self::Struct(inner) => inner.children[0].num_values(),
952 Self::Dictionary(inner) => inner.indices.num_values,
953 Self::Opaque(inner) => inner.num_values,
954 }
955 }
956
957 pub fn items_per_row(&self) -> u64 {
961 match self {
962 Self::Empty() => todo!(), Self::Constant(_) => todo!(), Self::AllNull(_) => todo!(), Self::Nullable(nullable) => nullable.data.items_per_row(),
966 Self::FixedWidth(_) => 1,
967 Self::FixedSizeList(fsl) => fsl.dimension * fsl.child.items_per_row(),
968 Self::VariableWidth(_) => 1,
969 Self::Struct(_) => todo!(), Self::Dictionary(_) => 1,
971 Self::Opaque(_) => 1,
972 }
973 }
974
975 pub fn data_size(&self) -> u64 {
977 match self {
978 Self::Empty() => 0,
979 Self::Constant(inner) => inner.data_size(),
980 Self::AllNull(_) => 0,
981 Self::Nullable(inner) => inner.data_size(),
982 Self::FixedWidth(inner) => inner.data_size(),
983 Self::FixedSizeList(inner) => inner.data_size(),
984 Self::VariableWidth(inner) => inner.data_size(),
985 Self::Struct(_) => {
986 todo!("the data_size method for StructDataBlock is not implemented yet")
987 }
988 Self::Dictionary(_) => {
989 todo!("the data_size method for DictionaryDataBlock is not implemented yet")
990 }
991 Self::Opaque(inner) => inner.data_size(),
992 }
993 }
994
995 pub fn remove_outer_validity(self) -> Self {
1003 match self {
1004 Self::AllNull(_) => panic!("Cannot remove validity on all-null data"),
1005 Self::Nullable(inner) => *inner.data,
1006 Self::Struct(inner) => Self::Struct(inner.remove_outer_validity()),
1007 other => other,
1008 }
1009 }
1010
1011 pub fn make_builder(&self, estimated_size_bytes: u64) -> Box<dyn DataBlockBuilderImpl> {
1012 match self {
1013 Self::FixedWidth(inner) => {
1014 if inner.bits_per_value == 1 {
1015 Box::new(BitmapDataBlockBuilder::new(estimated_size_bytes))
1016 } else {
1017 Box::new(FixedWidthDataBlockBuilder::new(
1018 inner.bits_per_value,
1019 estimated_size_bytes,
1020 ))
1021 }
1022 }
1023 Self::VariableWidth(inner) => {
1024 if inner.bits_per_offset == 32 {
1025 Box::new(VariableWidthDataBlockBuilder::<i32>::new(
1026 estimated_size_bytes,
1027 ))
1028 } else if inner.bits_per_offset == 64 {
1029 Box::new(VariableWidthDataBlockBuilder::<i64>::new(
1030 estimated_size_bytes,
1031 ))
1032 } else {
1033 todo!()
1034 }
1035 }
1036 Self::FixedSizeList(inner) => {
1037 let inner_builder = inner.child.make_builder(estimated_size_bytes);
1038 Box::new(FixedSizeListBlockBuilder::new(
1039 inner_builder,
1040 inner.dimension,
1041 ))
1042 }
1043 Self::Nullable(nullable) => {
1044 let estimated_validity_size_bytes = estimated_size_bytes / 16;
1047 let inner_builder = nullable
1048 .data
1049 .make_builder(estimated_size_bytes - estimated_validity_size_bytes);
1050 Box::new(NullableDataBlockBuilder::new(
1051 inner_builder,
1052 estimated_validity_size_bytes as usize,
1053 ))
1054 }
1055 Self::Struct(struct_data_block) => {
1056 let mut bits_per_values = vec![];
1057 for child in struct_data_block.children.iter() {
1058 let child = child.as_fixed_width_ref().
1059 expect("Currently StructDataBlockBuilder is only used in packed-struct encoding, and currently in packed-struct encoding, only fixed-width fields are supported.");
1060 bits_per_values.push(child.bits_per_value as u32);
1061 }
1062 Box::new(StructDataBlockBuilder::new(
1063 bits_per_values,
1064 estimated_size_bytes,
1065 ))
1066 }
1067 Self::AllNull(_) => Box::new(AllNullDataBlockBuilder::default()),
1068 _ => todo!("make_builder for {:?}", self),
1069 }
1070 }
1071}
1072
1073macro_rules! as_type {
1074 ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1075 pub fn $fn_name(self) -> Option<$inner_type> {
1076 match self {
1077 Self::$inner(inner) => Some(inner),
1078 _ => None,
1079 }
1080 }
1081 };
1082}
1083
1084macro_rules! as_type_ref {
1085 ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1086 pub fn $fn_name(&self) -> Option<&$inner_type> {
1087 match self {
1088 Self::$inner(inner) => Some(inner),
1089 _ => None,
1090 }
1091 }
1092 };
1093}
1094
1095macro_rules! as_type_ref_mut {
1096 ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1097 pub fn $fn_name(&mut self) -> Option<&mut $inner_type> {
1098 match self {
1099 Self::$inner(inner) => Some(inner),
1100 _ => None,
1101 }
1102 }
1103 };
1104}
1105
1106impl DataBlock {
1108 as_type!(as_all_null, AllNull, AllNullDataBlock);
1109 as_type!(as_nullable, Nullable, NullableDataBlock);
1110 as_type!(as_fixed_width, FixedWidth, FixedWidthDataBlock);
1111 as_type!(as_fixed_size_list, FixedSizeList, FixedSizeListBlock);
1112 as_type!(as_variable_width, VariableWidth, VariableWidthBlock);
1113 as_type!(as_struct, Struct, StructDataBlock);
1114 as_type!(as_dictionary, Dictionary, DictionaryDataBlock);
1115 as_type_ref!(as_all_null_ref, AllNull, AllNullDataBlock);
1116 as_type_ref!(as_nullable_ref, Nullable, NullableDataBlock);
1117 as_type_ref!(as_fixed_width_ref, FixedWidth, FixedWidthDataBlock);
1118 as_type_ref!(as_fixed_size_list_ref, FixedSizeList, FixedSizeListBlock);
1119 as_type_ref!(as_variable_width_ref, VariableWidth, VariableWidthBlock);
1120 as_type_ref!(as_struct_ref, Struct, StructDataBlock);
1121 as_type_ref!(as_dictionary_ref, Dictionary, DictionaryDataBlock);
1122 as_type_ref_mut!(as_all_null_ref_mut, AllNull, AllNullDataBlock);
1123 as_type_ref_mut!(as_nullable_ref_mut, Nullable, NullableDataBlock);
1124 as_type_ref_mut!(as_fixed_width_ref_mut, FixedWidth, FixedWidthDataBlock);
1125 as_type_ref_mut!(
1126 as_fixed_size_list_ref_mut,
1127 FixedSizeList,
1128 FixedSizeListBlock
1129 );
1130 as_type_ref_mut!(as_variable_width_ref_mut, VariableWidth, VariableWidthBlock);
1131 as_type_ref_mut!(as_struct_ref_mut, Struct, StructDataBlock);
1132 as_type_ref_mut!(as_dictionary_ref_mut, Dictionary, DictionaryDataBlock);
1133}
1134
1135fn get_byte_range<T: ArrowNativeType>(offsets: &mut LanceBuffer) -> Range<usize> {
1138 let offsets = offsets.borrow_to_typed_slice::<T>();
1139 if offsets.as_ref().is_empty() {
1140 0..0
1141 } else {
1142 offsets.as_ref().first().unwrap().as_usize()..offsets.as_ref().last().unwrap().as_usize()
1143 }
1144}
1145
1146fn stitch_offsets<T: ArrowNativeType + std::ops::Add<Output = T> + std::ops::Sub<Output = T>>(
1152 offsets: Vec<LanceBuffer>,
1153) -> (LanceBuffer, Vec<Range<usize>>) {
1154 if offsets.is_empty() {
1155 return (LanceBuffer::empty(), Vec::default());
1156 }
1157 let len = offsets.iter().map(|b| b.len()).sum::<usize>();
1158 let mut dest = Vec::with_capacity(len);
1162 let mut byte_ranges = Vec::with_capacity(offsets.len());
1163
1164 dest.push(T::from_usize(0).unwrap());
1166
1167 for mut o in offsets.into_iter() {
1168 if !o.is_empty() {
1169 let last_offset = *dest.last().unwrap();
1170 let o = o.borrow_to_typed_slice::<T>();
1171 let start = *o.as_ref().first().unwrap();
1172 dest.extend(o.as_ref()[1..].iter().map(|&x| x + last_offset - start));
1186 }
1187 byte_ranges.push(get_byte_range::<T>(&mut o));
1188 }
1189 (LanceBuffer::reinterpret_vec(dest), byte_ranges)
1190}
1191
1192fn arrow_binary_to_data_block(
1193 arrays: &[ArrayRef],
1194 num_values: u64,
1195 bits_per_offset: u8,
1196) -> DataBlock {
1197 let data_vec = arrays.iter().map(|arr| arr.to_data()).collect::<Vec<_>>();
1198 let bytes_per_offset = bits_per_offset as usize / 8;
1199 let offsets = data_vec
1200 .iter()
1201 .map(|d| {
1202 LanceBuffer::from(
1203 d.buffers()[0].slice_with_length(d.offset(), (d.len() + 1) * bytes_per_offset),
1204 )
1205 })
1206 .collect::<Vec<_>>();
1207 let (offsets, data_ranges) = if bits_per_offset == 32 {
1208 stitch_offsets::<i32>(offsets)
1209 } else {
1210 stitch_offsets::<i64>(offsets)
1211 };
1212 let data = data_vec
1213 .iter()
1214 .zip(data_ranges)
1215 .map(|(d, byte_range)| {
1216 LanceBuffer::from(
1217 d.buffers()[1]
1218 .slice_with_length(byte_range.start, byte_range.end - byte_range.start),
1219 )
1220 })
1221 .collect::<Vec<_>>();
1222 let data = LanceBuffer::concat_into_one(data);
1223 DataBlock::VariableWidth(VariableWidthBlock {
1224 data,
1225 offsets,
1226 bits_per_offset,
1227 num_values,
1228 block_info: BlockInfo::new(),
1229 })
1230}
1231
1232fn encode_flat_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1233 let bytes_per_value = arrays[0].data_type().byte_width();
1234 let mut buffer = Vec::with_capacity(num_values as usize * bytes_per_value);
1235 for arr in arrays {
1236 let data = arr.to_data();
1237 buffer.extend_from_slice(data.buffers()[0].as_slice());
1238 }
1239 LanceBuffer::from(buffer)
1240}
1241
1242fn do_encode_bitmap_data(bitmaps: &[BooleanBuffer], num_values: u64) -> LanceBuffer {
1243 let mut builder = BooleanBufferBuilder::new(num_values as usize);
1244
1245 for buf in bitmaps {
1246 builder.append_buffer(buf);
1247 }
1248
1249 let buffer = builder.finish().into_inner();
1250 LanceBuffer::from(buffer)
1251}
1252
1253fn encode_bitmap_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1254 let bitmaps = arrays
1255 .iter()
1256 .map(|arr| arr.as_boolean().values().clone())
1257 .collect::<Vec<_>>();
1258 do_encode_bitmap_data(&bitmaps, num_values)
1259}
1260
1261fn concat_dict_arrays(arrays: &[ArrayRef]) -> ArrayRef {
1264 let value_type = arrays[0].as_any_dictionary().values().data_type();
1265 let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1266 match arrow_select::concat::concat(&array_refs) {
1267 Ok(array) => array,
1268 Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1269 let upscaled = array_refs
1271 .iter()
1272 .map(|arr| {
1273 match arrow_cast::cast(
1274 *arr,
1275 &DataType::Dictionary(
1276 Box::new(DataType::UInt32),
1277 Box::new(value_type.clone()),
1278 ),
1279 ) {
1280 Ok(arr) => arr,
1281 Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1282 unimplemented!("Dictionary arrays with more than 2^32 unique values")
1284 }
1285 err => err.unwrap(),
1286 }
1287 })
1288 .collect::<Vec<_>>();
1289 let array_refs = upscaled.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1290 match arrow_select::concat::concat(&array_refs) {
1292 Ok(array) => array,
1293 Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1294 unimplemented!("Dictionary arrays with more than 2^32 unique values")
1295 }
1296 err => err.unwrap(),
1297 }
1298 }
1299 err => err.unwrap(),
1301 }
1302}
1303
1304fn max_index_val(index_type: &DataType) -> u64 {
1305 match index_type {
1306 DataType::Int8 => i8::MAX as u64,
1307 DataType::Int16 => i16::MAX as u64,
1308 DataType::Int32 => i32::MAX as u64,
1309 DataType::Int64 => i64::MAX as u64,
1310 DataType::UInt8 => u8::MAX as u64,
1311 DataType::UInt16 => u16::MAX as u64,
1312 DataType::UInt32 => u32::MAX as u64,
1313 DataType::UInt64 => u64::MAX,
1314 _ => panic!("Invalid dictionary index type"),
1315 }
1316}
1317
1318fn arrow_dictionary_to_data_block(arrays: &[ArrayRef], validity: Option<NullBuffer>) -> DataBlock {
1337 let array = concat_dict_arrays(arrays);
1338 let array_dict = array.as_any_dictionary();
1339 let mut indices = array_dict.keys();
1340 let num_values = indices.len() as u64;
1341 let mut values = array_dict.values().clone();
1342 let mut upcast = None;
1344
1345 let indices_block = if let Some(validity) = validity {
1349 let mut first_invalid_index = None;
1353 if let Some(values_validity) = values.nulls() {
1354 first_invalid_index = (!values_validity.inner()).set_indices().next();
1355 }
1356 let first_invalid_index = first_invalid_index.unwrap_or_else(|| {
1357 let null_arr = new_null_array(values.data_type(), 1);
1358 values = arrow_select::concat::concat(&[values.as_ref(), null_arr.as_ref()]).unwrap();
1359 let null_index = values.len() - 1;
1360 let max_index_val = max_index_val(indices.data_type());
1361 if null_index as u64 > max_index_val {
1362 if max_index_val >= u32::MAX as u64 {
1364 unimplemented!("Dictionary arrays with 2^32 unique value (or more) and a null")
1365 }
1366 upcast = Some(arrow_cast::cast(indices, &DataType::UInt32).unwrap());
1367 indices = upcast.as_ref().unwrap();
1368 }
1369 null_index
1370 });
1371 let null_index_arr = arrow_cast::cast(
1373 &UInt64Array::from(vec![first_invalid_index as u64]),
1374 indices.data_type(),
1375 )
1376 .unwrap();
1377
1378 let bytes_per_index = indices.data_type().byte_width();
1379 let bits_per_index = bytes_per_index as u64 * 8;
1380
1381 let null_index_arr = null_index_arr.into_data();
1382 let null_index_bytes = &null_index_arr.buffers()[0];
1383 let mut indices_bytes = indices.to_data().buffers()[0].to_vec();
1385 for invalid_idx in (!validity.inner()).set_indices() {
1386 indices_bytes[invalid_idx * bytes_per_index..(invalid_idx + 1) * bytes_per_index]
1387 .copy_from_slice(null_index_bytes.as_slice());
1388 }
1389 FixedWidthDataBlock {
1390 data: LanceBuffer::from(indices_bytes),
1391 bits_per_value: bits_per_index,
1392 num_values,
1393 block_info: BlockInfo::new(),
1394 }
1395 } else {
1396 FixedWidthDataBlock {
1397 data: LanceBuffer::from(indices.to_data().buffers()[0].clone()),
1398 bits_per_value: indices.data_type().byte_width() as u64 * 8,
1399 num_values,
1400 block_info: BlockInfo::new(),
1401 }
1402 };
1403
1404 let items = DataBlock::from(values);
1405 DataBlock::Dictionary(DictionaryDataBlock {
1406 indices: indices_block,
1407 dictionary: Box::new(items),
1408 })
1409}
1410
1411enum Nullability {
1412 None,
1413 All,
1414 Some(NullBuffer),
1415}
1416
1417impl Nullability {
1418 fn to_option(&self) -> Option<NullBuffer> {
1419 match self {
1420 Self::Some(nulls) => Some(nulls.clone()),
1421 _ => None,
1422 }
1423 }
1424}
1425
1426fn extract_nulls(arrays: &[ArrayRef], num_values: u64) -> Nullability {
1427 let mut has_nulls = false;
1428 let nulls_and_lens = arrays
1429 .iter()
1430 .map(|arr| {
1431 let nulls = arr.logical_nulls();
1432 has_nulls |= nulls.is_some();
1433 (nulls, arr.len())
1434 })
1435 .collect::<Vec<_>>();
1436 if !has_nulls {
1437 return Nullability::None;
1438 }
1439 let mut builder = BooleanBufferBuilder::new(num_values as usize);
1440 let mut num_nulls = 0;
1441 for (null, len) in nulls_and_lens {
1442 if let Some(null) = null {
1443 num_nulls += null.null_count();
1444 builder.append_buffer(&null.into_inner());
1445 } else {
1446 builder.append_n(len, true);
1447 }
1448 }
1449 if num_nulls == num_values as usize {
1450 Nullability::All
1451 } else {
1452 Nullability::Some(NullBuffer::new(builder.finish()))
1453 }
1454}
1455
1456impl DataBlock {
1457 pub fn from_arrays(arrays: &[ArrayRef], num_values: u64) -> Self {
1458 if arrays.is_empty() || num_values == 0 {
1459 return Self::AllNull(AllNullDataBlock { num_values: 0 });
1460 }
1461
1462 let data_type = arrays[0].data_type();
1463 let nulls = extract_nulls(arrays, num_values);
1464
1465 if let Nullability::All = nulls {
1466 return Self::AllNull(AllNullDataBlock { num_values });
1467 }
1468
1469 let mut encoded = match data_type {
1470 DataType::Binary | DataType::Utf8 => arrow_binary_to_data_block(arrays, num_values, 32),
1471 DataType::BinaryView | DataType::Utf8View => {
1472 todo!()
1473 }
1474 DataType::LargeBinary | DataType::LargeUtf8 => {
1475 arrow_binary_to_data_block(arrays, num_values, 64)
1476 }
1477 DataType::Boolean => {
1478 let data = encode_bitmap_data(arrays, num_values);
1479 Self::FixedWidth(FixedWidthDataBlock {
1480 data,
1481 bits_per_value: 1,
1482 num_values,
1483 block_info: BlockInfo::new(),
1484 })
1485 }
1486 DataType::Date32
1487 | DataType::Date64
1488 | DataType::Decimal32(_, _)
1489 | DataType::Decimal64(_, _)
1490 | DataType::Decimal128(_, _)
1491 | DataType::Decimal256(_, _)
1492 | DataType::Duration(_)
1493 | DataType::FixedSizeBinary(_)
1494 | DataType::Float16
1495 | DataType::Float32
1496 | DataType::Float64
1497 | DataType::Int16
1498 | DataType::Int32
1499 | DataType::Int64
1500 | DataType::Int8
1501 | DataType::Interval(_)
1502 | DataType::Time32(_)
1503 | DataType::Time64(_)
1504 | DataType::Timestamp(_, _)
1505 | DataType::UInt16
1506 | DataType::UInt32
1507 | DataType::UInt64
1508 | DataType::UInt8 => {
1509 let data = encode_flat_data(arrays, num_values);
1510 Self::FixedWidth(FixedWidthDataBlock {
1511 data,
1512 bits_per_value: data_type.byte_width() as u64 * 8,
1513 num_values,
1514 block_info: BlockInfo::new(),
1515 })
1516 }
1517 DataType::Null => Self::AllNull(AllNullDataBlock { num_values }),
1518 DataType::Dictionary(_, _) => arrow_dictionary_to_data_block(arrays, nulls.to_option()),
1519 DataType::Struct(fields) => {
1520 let structs = arrays.iter().map(|arr| arr.as_struct()).collect::<Vec<_>>();
1521 let mut children = Vec::with_capacity(fields.len());
1522 for child_idx in 0..fields.len() {
1523 let child_vec = structs
1524 .iter()
1525 .map(|s| s.column(child_idx).clone())
1526 .collect::<Vec<_>>();
1527 children.push(Self::from_arrays(&child_vec, num_values));
1528 }
1529
1530 let validity = match &nulls {
1532 Nullability::None => None,
1533 Nullability::Some(null_buffer) => Some(null_buffer.clone()),
1534 Nullability::All => unreachable!("Should have returned AllNull earlier"),
1535 };
1536
1537 Self::Struct(StructDataBlock {
1538 children,
1539 block_info: BlockInfo::default(),
1540 validity,
1541 })
1542 }
1543 DataType::FixedSizeList(_, dim) => {
1544 let children = arrays
1545 .iter()
1546 .map(|arr| arr.as_fixed_size_list().values().clone())
1547 .collect::<Vec<_>>();
1548 let child_block = Self::from_arrays(&children, num_values * *dim as u64);
1549 Self::FixedSizeList(FixedSizeListBlock {
1550 child: Box::new(child_block),
1551 dimension: *dim as u64,
1552 })
1553 }
1554 DataType::LargeList(_)
1555 | DataType::List(_)
1556 | DataType::ListView(_)
1557 | DataType::LargeListView(_)
1558 | DataType::Map(_, _)
1559 | DataType::RunEndEncoded(_, _)
1560 | DataType::Union(_, _) => {
1561 panic!(
1562 "Field with data type {} cannot be converted to data block",
1563 data_type
1564 )
1565 }
1566 };
1567
1568 encoded.compute_stat();
1570
1571 if !matches!(data_type, DataType::Dictionary(_, _)) {
1572 match nulls {
1573 Nullability::None => encoded,
1574 Nullability::Some(nulls) => Self::Nullable(NullableDataBlock {
1575 data: Box::new(encoded),
1576 nulls: LanceBuffer::from(nulls.into_inner().into_inner()),
1577 block_info: BlockInfo::new(),
1578 }),
1579 _ => unreachable!(),
1580 }
1581 } else {
1582 encoded
1584 }
1585 }
1586
1587 pub fn from_array<T: Array + 'static>(array: T) -> Self {
1588 let num_values = array.len();
1589 Self::from_arrays(&[Arc::new(array)], num_values as u64)
1590 }
1591}
1592
1593impl From<ArrayRef> for DataBlock {
1594 fn from(array: ArrayRef) -> Self {
1595 let num_values = array.len() as u64;
1596 Self::from_arrays(&[array], num_values)
1597 }
1598}
1599
1600pub trait DataBlockBuilderImpl: std::fmt::Debug {
1601 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>);
1602 fn finish(self: Box<Self>) -> DataBlock;
1603}
1604
1605#[derive(Debug)]
1606pub struct DataBlockBuilder {
1607 estimated_size_bytes: u64,
1608 builder: Option<Box<dyn DataBlockBuilderImpl>>,
1609}
1610
1611impl DataBlockBuilder {
1612 pub fn with_capacity_estimate(estimated_size_bytes: u64) -> Self {
1613 Self {
1614 estimated_size_bytes,
1615 builder: None,
1616 }
1617 }
1618
1619 fn get_builder(&mut self, block: &DataBlock) -> &mut dyn DataBlockBuilderImpl {
1620 if self.builder.is_none() {
1621 self.builder = Some(block.make_builder(self.estimated_size_bytes));
1622 }
1623 self.builder.as_mut().unwrap().as_mut()
1624 }
1625
1626 pub fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
1627 self.get_builder(data_block).append(data_block, selection);
1628 }
1629
1630 pub fn finish(self) -> DataBlock {
1631 let builder = self.builder.expect("DataBlockBuilder didn't see any data");
1632 builder.finish()
1633 }
1634}
1635
1636#[cfg(test)]
1637mod tests {
1638 use std::sync::Arc;
1639
1640 use arrow_array::{
1641 make_array, new_null_array,
1642 types::{Int32Type, Int8Type},
1643 ArrayRef, DictionaryArray, Int8Array, LargeBinaryArray, StringArray, UInt16Array,
1644 UInt8Array,
1645 };
1646 use arrow_buffer::{BooleanBuffer, NullBuffer};
1647
1648 use arrow_schema::{DataType, Field, Fields};
1649 use lance_datagen::{array, ArrayGeneratorExt, RowCount, DEFAULT_SEED};
1650 use rand::SeedableRng;
1651
1652 use crate::buffer::LanceBuffer;
1653
1654 use super::{AllNullDataBlock, DataBlock};
1655
1656 use arrow_array::Array;
1657
1658 #[test]
1659 fn test_sliced_to_data_block() {
1660 let ints = UInt16Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
1661 let ints = ints.slice(2, 4);
1662 let data = DataBlock::from_array(ints);
1663
1664 let fixed_data = data.as_fixed_width().unwrap();
1665 assert_eq!(fixed_data.num_values, 4);
1666 assert_eq!(fixed_data.data.len(), 8);
1667
1668 let nullable_ints =
1669 UInt16Array::from(vec![Some(0), None, Some(2), None, Some(4), None, Some(6)]);
1670 let nullable_ints = nullable_ints.slice(1, 3);
1671 let data = DataBlock::from_array(nullable_ints);
1672
1673 let nullable = data.as_nullable().unwrap();
1674 assert_eq!(nullable.nulls, LanceBuffer::from(vec![0b00000010]));
1675 }
1676
1677 #[test]
1678 fn test_string_to_data_block() {
1679 let strings1 = StringArray::from(vec![Some("hello"), None, Some("world")]);
1681 let strings2 = StringArray::from(vec![Some("a"), Some("b")]);
1682 let strings3 = StringArray::from(vec![Option::<&'static str>::None, None]);
1683
1684 let arrays = &[strings1, strings2, strings3]
1685 .iter()
1686 .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1687 .collect::<Vec<_>>();
1688
1689 let block = DataBlock::from_arrays(arrays, 7);
1690
1691 assert_eq!(block.num_values(), 7);
1692 let block = block.as_nullable().unwrap();
1693
1694 assert_eq!(block.nulls, LanceBuffer::from(vec![0b00011101]));
1695
1696 let data = block.data.as_variable_width().unwrap();
1697 assert_eq!(
1698 data.offsets,
1699 LanceBuffer::reinterpret_vec(vec![0, 5, 5, 10, 11, 12, 12, 12])
1700 );
1701
1702 assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworldab"));
1703
1704 let strings1 = StringArray::from(vec![Some("a"), Some("bc")]);
1706 let strings2 = StringArray::from(vec![Some("def")]);
1707
1708 let arrays = &[strings1, strings2]
1709 .iter()
1710 .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1711 .collect::<Vec<_>>();
1712
1713 let block = DataBlock::from_arrays(arrays, 3);
1714
1715 assert_eq!(block.num_values(), 3);
1716 let data = block.as_variable_width().unwrap();
1718 assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(vec![0, 1, 3, 6]));
1719 assert_eq!(data.data, LanceBuffer::copy_slice(b"abcdef"));
1720 }
1721
1722 #[test]
1723 fn test_string_sliced() {
1724 let check = |arr: Vec<StringArray>, expected_off: Vec<i32>, expected_data: &[u8]| {
1725 let arrs = arr
1726 .into_iter()
1727 .map(|a| Arc::new(a) as ArrayRef)
1728 .collect::<Vec<_>>();
1729 let num_rows = arrs.iter().map(|a| a.len()).sum::<usize>() as u64;
1730 let data = DataBlock::from_arrays(&arrs, num_rows);
1731
1732 assert_eq!(data.num_values(), num_rows);
1733
1734 let data = data.as_variable_width().unwrap();
1735 assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(expected_off));
1736 assert_eq!(data.data, LanceBuffer::copy_slice(expected_data));
1737 };
1738
1739 let string = StringArray::from(vec![Some("hello"), Some("world")]);
1740 check(vec![string.slice(1, 1)], vec![0, 5], b"world");
1741 check(vec![string.slice(0, 1)], vec![0, 5], b"hello");
1742 check(
1743 vec![string.slice(0, 1), string.slice(1, 1)],
1744 vec![0, 5, 10],
1745 b"helloworld",
1746 );
1747
1748 let string2 = StringArray::from(vec![Some("foo"), Some("bar")]);
1749 check(
1750 vec![string.slice(0, 1), string2.slice(0, 1)],
1751 vec![0, 5, 8],
1752 b"hellofoo",
1753 );
1754 }
1755
1756 #[test]
1757 fn test_large() {
1758 let arr = LargeBinaryArray::from_vec(vec![b"hello", b"world"]);
1759 let data = DataBlock::from_array(arr);
1760
1761 assert_eq!(data.num_values(), 2);
1762 let data = data.as_variable_width().unwrap();
1763 assert_eq!(data.bits_per_offset, 64);
1764 assert_eq!(data.num_values, 2);
1765 assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworld"));
1766 assert_eq!(
1767 data.offsets,
1768 LanceBuffer::reinterpret_vec(vec![0_u64, 5, 10])
1769 );
1770 }
1771
1772 #[test]
1773 fn test_dictionary_indices_normalized() {
1774 let arr1 = DictionaryArray::<Int8Type>::from_iter([Some("a"), Some("a"), Some("b")]);
1775 let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("b"), Some("c")]);
1776
1777 let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1778
1779 assert_eq!(data.num_values(), 5);
1780 let data = data.as_dictionary().unwrap();
1781 let indices = data.indices;
1782 assert_eq!(indices.bits_per_value, 8);
1783 assert_eq!(indices.num_values, 5);
1784 assert_eq!(
1785 indices.data,
1786 LanceBuffer::reinterpret_vec::<i8>(vec![0, 0, 1, 2, 3])
1790 );
1791
1792 let items = data.dictionary.as_variable_width().unwrap();
1793 assert_eq!(items.bits_per_offset, 32);
1794 assert_eq!(items.num_values, 4);
1795 assert_eq!(items.data, LanceBuffer::copy_slice(b"abbc"));
1796 assert_eq!(
1797 items.offsets,
1798 LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 4],)
1799 );
1800 }
1801
1802 #[test]
1803 fn test_dictionary_nulls() {
1804 let arr1 = DictionaryArray::<Int8Type>::from_iter([None, Some("a"), Some("b")]);
1808 let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("c"), None]);
1809
1810 let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1811
1812 let check_common = |data: DataBlock| {
1813 assert_eq!(data.num_values(), 5);
1814 let dict = data.as_dictionary().unwrap();
1815
1816 let nullable_items = dict.dictionary.as_nullable().unwrap();
1817 assert_eq!(nullable_items.nulls, LanceBuffer::from(vec![0b00000111]));
1818 assert_eq!(nullable_items.data.num_values(), 4);
1819
1820 let items = nullable_items.data.as_variable_width().unwrap();
1821 assert_eq!(items.bits_per_offset, 32);
1822 assert_eq!(items.num_values, 4);
1823 assert_eq!(items.data, LanceBuffer::copy_slice(b"abc"));
1824 assert_eq!(
1825 items.offsets,
1826 LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 3],)
1827 );
1828
1829 let indices = dict.indices;
1830 assert_eq!(indices.bits_per_value, 8);
1831 assert_eq!(indices.num_values, 5);
1832 assert_eq!(
1833 indices.data,
1834 LanceBuffer::reinterpret_vec::<i8>(vec![3, 0, 1, 2, 3])
1835 );
1836 };
1837 check_common(data);
1838
1839 let items = StringArray::from(vec![Some("a"), Some("b"), Some("c"), None]);
1841 let indices = Int8Array::from(vec![Some(3), Some(0), Some(1), Some(2), Some(3)]);
1842 let dict = DictionaryArray::new(indices, Arc::new(items));
1843
1844 let data = DataBlock::from_array(dict);
1845
1846 check_common(data);
1847 }
1848
1849 #[test]
1850 fn test_dictionary_cannot_add_null() {
1851 let items = StringArray::from(
1853 (0..256)
1854 .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1855 .collect::<Vec<_>>(),
1856 );
1857 let indices = UInt8Array::from(
1859 (0..=256)
1860 .map(|i| if i == 256 { None } else { Some(i as u8) })
1861 .collect::<Vec<_>>(),
1862 );
1863 let dict = DictionaryArray::new(indices, Arc::new(items));
1866 let data = DataBlock::from_array(dict);
1867
1868 assert_eq!(data.num_values(), 257);
1869
1870 let dict = data.as_dictionary().unwrap();
1871
1872 assert_eq!(dict.indices.bits_per_value, 32);
1873 assert_eq!(
1874 dict.indices.data,
1875 LanceBuffer::reinterpret_vec((0_u32..257).collect::<Vec<_>>())
1876 );
1877
1878 let nullable_items = dict.dictionary.as_nullable().unwrap();
1879 let null_buffer = NullBuffer::new(BooleanBuffer::new(
1880 nullable_items.nulls.into_buffer(),
1881 0,
1882 257,
1883 ));
1884 for i in 0..256 {
1885 assert!(!null_buffer.is_null(i));
1886 }
1887 assert!(null_buffer.is_null(256));
1888
1889 assert_eq!(
1890 nullable_items.data.as_variable_width().unwrap().data.len(),
1891 32640
1892 );
1893 }
1894
1895 #[test]
1896 fn test_all_null() {
1897 for data_type in [
1898 DataType::UInt32,
1899 DataType::FixedSizeBinary(2),
1900 DataType::List(Arc::new(Field::new("item", DataType::UInt32, true))),
1901 DataType::Struct(Fields::from(vec![Field::new("a", DataType::UInt32, true)])),
1902 ] {
1903 let block = DataBlock::AllNull(AllNullDataBlock { num_values: 10 });
1904 let arr = block.into_arrow(data_type.clone(), true).unwrap();
1905 let arr = make_array(arr);
1906 let expected = new_null_array(&data_type, 10);
1907 assert_eq!(&arr, &expected);
1908 }
1909 }
1910
1911 #[test]
1912 fn test_dictionary_cannot_concatenate() {
1913 let items = StringArray::from(
1915 (0..256)
1916 .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1917 .collect::<Vec<_>>(),
1918 );
1919 let other_items = StringArray::from(
1921 (0..256)
1922 .map(|i| Some(String::from_utf8(vec![1; i + 1]).unwrap()))
1923 .collect::<Vec<_>>(),
1924 );
1925 let indices = UInt8Array::from_iter_values(0..=255);
1926 let dict1 = DictionaryArray::new(indices.clone(), Arc::new(items));
1927 let dict2 = DictionaryArray::new(indices, Arc::new(other_items));
1928 let data = DataBlock::from_arrays(&[Arc::new(dict1), Arc::new(dict2)], 512);
1929 assert_eq!(data.num_values(), 512);
1930
1931 let dict = data.as_dictionary().unwrap();
1932
1933 assert_eq!(dict.indices.bits_per_value, 32);
1934 assert_eq!(
1935 dict.indices.data,
1936 LanceBuffer::reinterpret_vec::<u32>((0..512).collect::<Vec<_>>())
1937 );
1938 assert_eq!(
1940 dict.dictionary.as_variable_width().unwrap().data.len(),
1941 65536
1942 );
1943 }
1944
1945 #[test]
1946 fn test_data_size() {
1947 let mut rng = rand_xoshiro::Xoshiro256PlusPlus::seed_from_u64(DEFAULT_SEED.0);
1948 let mut genn = array::rand::<Int32Type>().with_nulls(&[false, false, false]);
1950
1951 let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
1952 let block = DataBlock::from_array(arr.clone());
1953 assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
1954
1955 let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
1956 let block = DataBlock::from_array(arr.clone());
1957 assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
1958
1959 let mut genn = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
1961 let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
1962 let block = DataBlock::from_array(arr.clone());
1963
1964 let array_data = arr.to_data();
1965 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1966 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1968 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1969
1970 let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
1971 let block = DataBlock::from_array(arr.clone());
1972
1973 let array_data = arr.to_data();
1974 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1975 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1976 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1977
1978 let mut genn = array::rand::<Int32Type>().with_nulls(&[true, true, false]);
1979 let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
1980 let block = DataBlock::from_array(arr.clone());
1981
1982 let array_data = arr.to_data();
1983 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1984 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1985 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1986
1987 let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
1988 let block = DataBlock::from_array(arr.clone());
1989
1990 let array_data = arr.to_data();
1991 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1992 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1993 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1994
1995 let mut genn = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
1996 let arr1 = genn.generate(RowCount::from(3), &mut rng).unwrap();
1997 let arr2 = genn.generate(RowCount::from(3), &mut rng).unwrap();
1998 let arr3 = genn.generate(RowCount::from(3), &mut rng).unwrap();
1999 let block = DataBlock::from_arrays(&[arr1.clone(), arr2.clone(), arr3.clone()], 9);
2000
2001 let concatenated_array = arrow_select::concat::concat(&[
2002 &*Arc::new(arr1.clone()) as &dyn Array,
2003 &*Arc::new(arr2.clone()) as &dyn Array,
2004 &*Arc::new(arr3.clone()) as &dyn Array,
2005 ])
2006 .unwrap();
2007 let total_buffer_size: usize = concatenated_array
2008 .to_data()
2009 .buffers()
2010 .iter()
2011 .map(|buffer| buffer.len())
2012 .sum();
2013
2014 let total_nulls_size_in_bytes = concatenated_array.nulls().unwrap().len().div_ceil(8);
2015 assert!(block.data_size() == (total_buffer_size + total_nulls_size_in_bytes) as u64);
2016 }
2017}