1use std::{
18 ops::Range,
19 sync::{Arc, RwLock},
20};
21
22use arrow_array::{
23 Array, ArrayRef, OffsetSizeTrait, UInt64Array,
24 cast::AsArray,
25 new_empty_array, new_null_array,
26 types::{ArrowDictionaryKeyType, UInt8Type, UInt16Type, UInt32Type, UInt64Type},
27};
28use arrow_buffer::{ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer};
29use arrow_data::{ArrayData, ArrayDataBuilder};
30use arrow_schema::DataType;
31use lance_arrow::DataTypeExt;
32
33use lance_core::{Error, Result};
34
35use crate::{
36 buffer::LanceBuffer,
37 statistics::{ComputeStat, Stat},
38};
39
40#[derive(Debug, Clone)]
46pub struct AllNullDataBlock {
47 pub num_values: u64,
49}
50
51impl AllNullDataBlock {
52 fn into_arrow(self, data_type: DataType, _validate: bool) -> Result<ArrayData> {
53 Ok(ArrayData::new_null(&data_type, self.num_values as usize))
54 }
55
56 fn into_buffers(self) -> Vec<LanceBuffer> {
57 vec![]
58 }
59}
60
61use std::collections::HashMap;
62
63#[derive(Debug, Clone)]
66pub struct BlockInfo(pub Arc<RwLock<HashMap<Stat, Arc<dyn Array>>>>);
67
68impl Default for BlockInfo {
69 fn default() -> Self {
70 Self::new()
71 }
72}
73
74impl BlockInfo {
75 pub fn new() -> Self {
76 Self(Arc::new(RwLock::new(HashMap::new())))
77 }
78}
79
80impl PartialEq for BlockInfo {
81 fn eq(&self, other: &Self) -> bool {
82 let self_info = self.0.read().unwrap();
83 let other_info = other.0.read().unwrap();
84 *self_info == *other_info
85 }
86}
87
88#[derive(Debug, Clone)]
94pub struct NullableDataBlock {
95 pub data: Box<DataBlock>,
97 pub nulls: LanceBuffer,
99
100 pub block_info: BlockInfo,
101}
102
103impl NullableDataBlock {
104 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
105 let nulls = self.nulls.into_buffer();
106 let data = self.data.into_arrow(data_type, validate)?.into_builder();
107 let data = data.null_bit_buffer(Some(nulls));
108 if validate {
109 Ok(data.build()?)
110 } else {
111 Ok(unsafe { data.build_unchecked() })
112 }
113 }
114
115 fn into_buffers(self) -> Vec<LanceBuffer> {
116 let mut buffers = vec![self.nulls];
117 buffers.extend(self.data.into_buffers());
118 buffers
119 }
120
121 pub fn data_size(&self) -> u64 {
122 self.data.data_size() + self.nulls.len() as u64
123 }
124}
125
126#[derive(Debug, PartialEq, Clone)]
128pub struct ConstantDataBlock {
129 pub data: LanceBuffer,
131 pub num_values: u64,
133}
134
135impl ConstantDataBlock {
136 fn into_buffers(self) -> Vec<LanceBuffer> {
137 vec![self.data]
138 }
139
140 fn into_arrow(self, _data_type: DataType, _validate: bool) -> Result<ArrayData> {
141 todo!()
144 }
145
146 pub fn try_clone(&self) -> Result<Self> {
147 Ok(Self {
148 data: self.data.clone(),
149 num_values: self.num_values,
150 })
151 }
152
153 pub fn data_size(&self) -> u64 {
154 self.data.len() as u64
155 }
156}
157
158#[derive(Debug, PartialEq, Clone)]
160pub struct FixedWidthDataBlock {
161 pub data: LanceBuffer,
163 pub bits_per_value: u64,
165 pub num_values: u64,
167
168 pub block_info: BlockInfo,
169}
170
171impl FixedWidthDataBlock {
172 fn do_into_arrow(
173 self,
174 data_type: DataType,
175 num_values: u64,
176 validate: bool,
177 ) -> Result<ArrayData> {
178 let data_buffer = self.data.into_buffer();
179 let builder = ArrayDataBuilder::new(data_type)
180 .add_buffer(data_buffer)
181 .len(num_values as usize)
182 .null_count(0);
183 if validate {
184 Ok(builder.build()?)
185 } else {
186 Ok(unsafe { builder.build_unchecked() })
187 }
188 }
189
190 pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
191 let root_num_values = self.num_values;
192 self.do_into_arrow(data_type, root_num_values, validate)
193 }
194
195 pub fn into_buffers(self) -> Vec<LanceBuffer> {
196 vec![self.data]
197 }
198
199 pub fn try_clone(&self) -> Result<Self> {
200 Ok(Self {
201 data: self.data.clone(),
202 bits_per_value: self.bits_per_value,
203 num_values: self.num_values,
204 block_info: self.block_info.clone(),
205 })
206 }
207
208 pub fn data_size(&self) -> u64 {
209 self.data.len() as u64
210 }
211}
212
213#[derive(Debug)]
214pub struct VariableWidthDataBlockBuilder<T: OffsetSizeTrait> {
215 offsets: Vec<T>,
216 bytes: Vec<u8>,
217}
218
219impl<T: OffsetSizeTrait> VariableWidthDataBlockBuilder<T> {
220 fn new(estimated_size_bytes: u64) -> Self {
221 Self {
222 offsets: vec![T::from_usize(0).unwrap()],
223 bytes: Vec::with_capacity(estimated_size_bytes as usize),
224 }
225 }
226}
227
228impl<T: OffsetSizeTrait + bytemuck::Pod> DataBlockBuilderImpl for VariableWidthDataBlockBuilder<T> {
229 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
230 let block = data_block.as_variable_width_ref().unwrap();
231 assert!(block.bits_per_offset == T::get_byte_width() as u8 * 8);
232 let offsets = block.offsets.borrow_to_typed_view::<T>();
233
234 let start_offset = offsets[selection.start as usize];
235 let end_offset = offsets[selection.end as usize];
236 let mut previous_len = self.bytes.len();
237
238 self.bytes
239 .extend_from_slice(&block.data[start_offset.as_usize()..end_offset.as_usize()]);
240
241 self.offsets.extend(
242 offsets[selection.start as usize..selection.end as usize]
243 .iter()
244 .zip(&offsets[selection.start as usize + 1..=selection.end as usize])
245 .map(|(¤t, &next)| {
246 let this_value_len = next - current;
247 previous_len += this_value_len.as_usize();
248 T::from_usize(previous_len).unwrap()
249 }),
250 );
251 }
252
253 fn finish(self: Box<Self>) -> DataBlock {
254 let num_values = (self.offsets.len() - 1) as u64;
255 DataBlock::VariableWidth(VariableWidthBlock {
256 data: LanceBuffer::from(self.bytes),
257 offsets: LanceBuffer::reinterpret_vec(self.offsets),
258 bits_per_offset: T::get_byte_width() as u8 * 8,
259 num_values,
260 block_info: BlockInfo::new(),
261 })
262 }
263}
264
265#[derive(Debug)]
266struct BitmapDataBlockBuilder {
267 values: BooleanBufferBuilder,
268}
269
270impl BitmapDataBlockBuilder {
271 fn new(estimated_size_bytes: u64) -> Self {
272 Self {
273 values: BooleanBufferBuilder::new(estimated_size_bytes as usize * 8),
274 }
275 }
276}
277
278impl DataBlockBuilderImpl for BitmapDataBlockBuilder {
279 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
280 let bitmap_blk = data_block.as_fixed_width_ref().unwrap();
281 self.values.append_packed_range(
282 selection.start as usize..selection.end as usize,
283 &bitmap_blk.data,
284 );
285 }
286
287 fn finish(mut self: Box<Self>) -> DataBlock {
288 let bool_buf = self.values.finish();
289 let num_values = bool_buf.len() as u64;
290 let bits_buf = bool_buf.into_inner();
291 DataBlock::FixedWidth(FixedWidthDataBlock {
292 data: LanceBuffer::from(bits_buf),
293 bits_per_value: 1,
294 num_values,
295 block_info: BlockInfo::new(),
296 })
297 }
298}
299
300#[derive(Debug)]
301struct FixedWidthDataBlockBuilder {
302 bits_per_value: u64,
303 bytes_per_value: u64,
304 values: Vec<u8>,
305}
306
307impl FixedWidthDataBlockBuilder {
308 fn new(bits_per_value: u64, estimated_size_bytes: u64) -> Self {
309 assert!(bits_per_value.is_multiple_of(8));
310 Self {
311 bits_per_value,
312 bytes_per_value: bits_per_value / 8,
313 values: Vec::with_capacity(estimated_size_bytes as usize),
314 }
315 }
316}
317
318impl DataBlockBuilderImpl for FixedWidthDataBlockBuilder {
319 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
320 let block = data_block.as_fixed_width_ref().unwrap();
321 assert_eq!(self.bits_per_value, block.bits_per_value);
322 let start = selection.start as usize * self.bytes_per_value as usize;
323 let end = selection.end as usize * self.bytes_per_value as usize;
324 self.values.extend_from_slice(&block.data[start..end]);
325 }
326
327 fn finish(self: Box<Self>) -> DataBlock {
328 let num_values = (self.values.len() / self.bytes_per_value as usize) as u64;
329 DataBlock::FixedWidth(FixedWidthDataBlock {
330 data: LanceBuffer::from(self.values),
331 bits_per_value: self.bits_per_value,
332 num_values,
333 block_info: BlockInfo::new(),
334 })
335 }
336}
337
338#[derive(Debug)]
339struct StructDataBlockBuilder {
340 children: Vec<Box<dyn DataBlockBuilderImpl>>,
341}
342
343impl StructDataBlockBuilder {
344 fn new(children: Vec<Box<dyn DataBlockBuilderImpl>>) -> Self {
345 Self { children }
346 }
347}
348
349impl DataBlockBuilderImpl for StructDataBlockBuilder {
350 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
351 let data_block = data_block.as_struct_ref().unwrap();
352 for i in 0..self.children.len() {
353 self.children[i].append(&data_block.children[i], selection.clone());
354 }
355 }
356
357 fn finish(self: Box<Self>) -> DataBlock {
358 let mut children_data_block = Vec::new();
359 for child in self.children {
360 let child_data_block = child.finish();
361 children_data_block.push(child_data_block);
362 }
363 DataBlock::Struct(StructDataBlock {
364 children: children_data_block,
365 block_info: BlockInfo::new(),
366 validity: None,
367 })
368 }
369}
370
371#[derive(Debug, Default)]
372struct AllNullDataBlockBuilder {
373 num_values: u64,
374}
375
376impl DataBlockBuilderImpl for AllNullDataBlockBuilder {
377 fn append(&mut self, _data_block: &DataBlock, selection: Range<u64>) {
378 self.num_values += selection.end - selection.start;
379 }
380
381 fn finish(self: Box<Self>) -> DataBlock {
382 DataBlock::AllNull(AllNullDataBlock {
383 num_values: self.num_values,
384 })
385 }
386}
387
388#[derive(Debug, Clone)]
390pub struct FixedSizeListBlock {
391 pub child: Box<DataBlock>,
393 pub dimension: u64,
395}
396
397impl FixedSizeListBlock {
398 pub fn num_values(&self) -> u64 {
399 self.child.num_values() / self.dimension
400 }
401
402 pub fn try_into_flat(self) -> Option<FixedWidthDataBlock> {
406 match *self.child {
407 DataBlock::Nullable(_) => None,
409 DataBlock::FixedSizeList(inner) => {
410 let mut flat = inner.try_into_flat()?;
411 flat.bits_per_value *= self.dimension;
412 flat.num_values /= self.dimension;
413 Some(flat)
414 }
415 DataBlock::FixedWidth(mut inner) => {
416 inner.bits_per_value *= self.dimension;
417 inner.num_values /= self.dimension;
418 Some(inner)
419 }
420 _ => panic!(
421 "Expected FixedSizeList or FixedWidth data block but found {:?}",
422 self
423 ),
424 }
425 }
426
427 pub fn flatten_as_fixed(&mut self) -> FixedWidthDataBlock {
428 match self.child.as_mut() {
429 DataBlock::FixedSizeList(fsl) => fsl.flatten_as_fixed(),
430 DataBlock::FixedWidth(fw) => fw.clone(),
431 _ => panic!("Expected FixedSizeList or FixedWidth data block"),
432 }
433 }
434
435 pub fn from_flat(data: FixedWidthDataBlock, data_type: &DataType) -> DataBlock {
437 match data_type {
438 DataType::FixedSizeList(child_field, dimension) => {
439 let mut data = data;
440 data.bits_per_value /= *dimension as u64;
441 data.num_values *= *dimension as u64;
442 let child_data = Self::from_flat(data, child_field.data_type());
443 DataBlock::FixedSizeList(Self {
444 child: Box::new(child_data),
445 dimension: *dimension as u64,
446 })
447 }
448 _ => DataBlock::FixedWidth(data),
450 }
451 }
452
453 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
454 let num_values = self.num_values();
455 let builder = match &data_type {
456 DataType::FixedSizeList(child_field, _) => {
457 let child_data = self
458 .child
459 .into_arrow(child_field.data_type().clone(), validate)?;
460 ArrayDataBuilder::new(data_type)
461 .add_child_data(child_data)
462 .len(num_values as usize)
463 .null_count(0)
464 }
465 _ => panic!("Expected FixedSizeList data type and got {:?}", data_type),
466 };
467 if validate {
468 Ok(builder.build()?)
469 } else {
470 Ok(unsafe { builder.build_unchecked() })
471 }
472 }
473
474 fn into_buffers(self) -> Vec<LanceBuffer> {
475 self.child.into_buffers()
476 }
477
478 fn data_size(&self) -> u64 {
479 self.child.data_size()
480 }
481}
482
483#[derive(Debug)]
484struct FixedSizeListBlockBuilder {
485 inner: Box<dyn DataBlockBuilderImpl>,
486 dimension: u64,
487}
488
489impl FixedSizeListBlockBuilder {
490 fn new(inner: Box<dyn DataBlockBuilderImpl>, dimension: u64) -> Self {
491 Self { inner, dimension }
492 }
493}
494
495impl DataBlockBuilderImpl for FixedSizeListBlockBuilder {
496 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
497 let selection = selection.start * self.dimension..selection.end * self.dimension;
498 let fsl = data_block.as_fixed_size_list_ref().unwrap();
499 self.inner.append(fsl.child.as_ref(), selection);
500 }
501
502 fn finish(self: Box<Self>) -> DataBlock {
503 let inner_block = self.inner.finish();
504 DataBlock::FixedSizeList(FixedSizeListBlock {
505 child: Box::new(inner_block),
506 dimension: self.dimension,
507 })
508 }
509}
510
511#[derive(Debug)]
512struct NullableDataBlockBuilder {
513 inner: Box<dyn DataBlockBuilderImpl>,
514 validity: BooleanBufferBuilder,
515}
516
517impl NullableDataBlockBuilder {
518 fn new(inner: Box<dyn DataBlockBuilderImpl>, estimated_size_bytes: usize) -> Self {
519 Self {
520 inner,
521 validity: BooleanBufferBuilder::new(estimated_size_bytes * 8),
522 }
523 }
524}
525
526impl DataBlockBuilderImpl for NullableDataBlockBuilder {
527 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
528 let nullable = data_block.as_nullable_ref().unwrap();
529 let bool_buf = BooleanBuffer::new(
530 nullable.nulls.clone().into_buffer(),
531 selection.start as usize,
532 (selection.end - selection.start) as usize,
533 );
534 self.validity.append_buffer(&bool_buf);
535 self.inner.append(nullable.data.as_ref(), selection);
536 }
537
538 fn finish(mut self: Box<Self>) -> DataBlock {
539 let inner_block = self.inner.finish();
540 DataBlock::Nullable(NullableDataBlock {
541 data: Box::new(inner_block),
542 nulls: LanceBuffer::from(self.validity.finish().into_inner()),
543 block_info: BlockInfo::new(),
544 })
545 }
546}
547
548#[derive(Debug, Clone)]
552pub struct OpaqueBlock {
553 pub buffers: Vec<LanceBuffer>,
554 pub num_values: u64,
555 pub block_info: BlockInfo,
556}
557
558impl OpaqueBlock {
559 pub fn data_size(&self) -> u64 {
560 self.buffers.iter().map(|b| b.len() as u64).sum()
561 }
562}
563
564#[derive(Debug, Clone)]
566pub struct VariableWidthBlock {
567 pub data: LanceBuffer,
569 pub offsets: LanceBuffer,
573 pub bits_per_offset: u8,
575 pub num_values: u64,
577
578 pub block_info: BlockInfo,
579}
580
581impl VariableWidthBlock {
582 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
583 let data_buffer = self.data.into_buffer();
584 let offsets_buffer = self.offsets.into_buffer();
585 let builder = ArrayDataBuilder::new(data_type)
586 .add_buffer(offsets_buffer)
587 .add_buffer(data_buffer)
588 .len(self.num_values as usize)
589 .null_count(0);
590 if validate {
591 Ok(builder.build()?)
592 } else {
593 Ok(unsafe { builder.build_unchecked() })
594 }
595 }
596
597 fn into_buffers(self) -> Vec<LanceBuffer> {
598 vec![self.offsets, self.data]
599 }
600
601 pub fn offsets_as_block(&mut self) -> DataBlock {
602 let offsets = self.offsets.clone();
603 DataBlock::FixedWidth(FixedWidthDataBlock {
604 data: offsets,
605 bits_per_value: self.bits_per_offset as u64,
606 num_values: self.num_values + 1,
607 block_info: BlockInfo::new(),
608 })
609 }
610
611 pub fn data_size(&self) -> u64 {
612 (self.data.len() + self.offsets.len()) as u64
613 }
614}
615
616#[derive(Debug, Clone)]
618pub struct StructDataBlock {
619 pub children: Vec<DataBlock>,
621 pub block_info: BlockInfo,
622 pub validity: Option<NullBuffer>,
624}
625
626impl StructDataBlock {
627 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
628 if let DataType::Struct(fields) = &data_type {
629 let mut builder = ArrayDataBuilder::new(DataType::Struct(fields.clone()));
630 let mut num_rows = 0;
631 for (field, child) in fields.iter().zip(self.children) {
632 let child_data = child.into_arrow(field.data_type().clone(), validate)?;
633 num_rows = child_data.len();
634 builder = builder.add_child_data(child_data);
635 }
636
637 let builder = if let Some(validity) = self.validity {
639 let null_count = validity.null_count();
640 builder
641 .null_bit_buffer(Some(validity.into_inner().into_inner()))
642 .null_count(null_count)
643 } else {
644 builder.null_count(0)
645 };
646
647 let builder = builder.len(num_rows);
648 if validate {
649 Ok(builder.build()?)
650 } else {
651 Ok(unsafe { builder.build_unchecked() })
652 }
653 } else {
654 Err(Error::internal(format!(
655 "Expected Struct, got {:?}",
656 data_type
657 )))
658 }
659 }
660
661 fn remove_outer_validity(self) -> Self {
662 Self {
663 children: self
664 .children
665 .into_iter()
666 .map(|c| c.remove_outer_validity())
667 .collect(),
668 block_info: self.block_info,
669 validity: None, }
671 }
672
673 fn into_buffers(self) -> Vec<LanceBuffer> {
674 self.children
675 .into_iter()
676 .flat_map(|c| c.into_buffers())
677 .collect()
678 }
679
680 pub fn has_variable_width_child(&self) -> bool {
681 self.children
682 .iter()
683 .any(|child| !matches!(child, DataBlock::FixedWidth(_)))
684 }
685
686 pub fn data_size(&self) -> u64 {
687 self.children
688 .iter()
689 .map(|data_block| data_block.data_size())
690 .sum()
691 }
692}
693
694#[derive(Debug, Clone)]
696pub struct DictionaryDataBlock {
697 pub indices: FixedWidthDataBlock,
699 pub dictionary: Box<DataBlock>,
701}
702
703impl DictionaryDataBlock {
704 fn decode_helper<K: ArrowDictionaryKeyType>(self) -> Result<DataBlock> {
705 if self.indices.num_values == 0 {
708 return Ok(DataBlock::AllNull(AllNullDataBlock { num_values: 0 }));
709 }
710
711 let estimated_size_bytes = self.dictionary.data_size()
713 * (self.indices.num_values + self.dictionary.num_values() - 1)
714 / self.dictionary.num_values();
715 let mut data_builder = DataBlockBuilder::with_capacity_estimate(estimated_size_bytes);
716
717 let indices = self.indices.data.borrow_to_typed_slice::<K::Native>();
718 let indices = indices.as_ref();
719
720 indices
721 .iter()
722 .map(|idx| idx.to_usize().unwrap() as u64)
723 .for_each(|idx| {
724 data_builder.append(&self.dictionary, idx..idx + 1);
725 });
726
727 Ok(data_builder.finish())
728 }
729
730 pub fn decode(self) -> Result<DataBlock> {
731 match self.indices.bits_per_value {
732 8 => self.decode_helper::<UInt8Type>(),
733 16 => self.decode_helper::<UInt16Type>(),
734 32 => self.decode_helper::<UInt32Type>(),
735 64 => self.decode_helper::<UInt64Type>(),
736 _ => Err(lance_core::Error::internal(format!(
737 "Unsupported dictionary index bit width: {} bits",
738 self.indices.bits_per_value
739 ))),
740 }
741 }
742
743 fn into_arrow_dict(
744 self,
745 key_type: Box<DataType>,
746 value_type: Box<DataType>,
747 validate: bool,
748 ) -> Result<ArrayData> {
749 let indices = self.indices.into_arrow((*key_type).clone(), validate)?;
750 let dictionary = self
751 .dictionary
752 .into_arrow((*value_type).clone(), validate)?;
753
754 let builder = indices
755 .into_builder()
756 .add_child_data(dictionary)
757 .data_type(DataType::Dictionary(key_type, value_type));
758
759 if validate {
760 Ok(builder.build()?)
761 } else {
762 Ok(unsafe { builder.build_unchecked() })
763 }
764 }
765
766 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
767 if let DataType::Dictionary(key_type, value_type) = data_type {
768 self.into_arrow_dict(key_type, value_type, validate)
769 } else {
770 self.decode()?.into_arrow(data_type, validate)
771 }
772 }
773
774 fn into_buffers(self) -> Vec<LanceBuffer> {
775 let mut buffers = self.indices.into_buffers();
776 buffers.extend(self.dictionary.into_buffers());
777 buffers
778 }
779
780 pub fn into_parts(self) -> (DataBlock, DataBlock) {
781 (DataBlock::FixedWidth(self.indices), *self.dictionary)
782 }
783
784 pub fn from_parts(indices: FixedWidthDataBlock, dictionary: DataBlock) -> Self {
785 Self {
786 indices,
787 dictionary: Box::new(dictionary),
788 }
789 }
790}
791
792#[derive(Debug, Clone)]
807pub enum DataBlock {
808 Empty(),
809 Constant(ConstantDataBlock),
810 AllNull(AllNullDataBlock),
811 Nullable(NullableDataBlock),
812 FixedWidth(FixedWidthDataBlock),
813 FixedSizeList(FixedSizeListBlock),
814 VariableWidth(VariableWidthBlock),
815 Opaque(OpaqueBlock),
816 Struct(StructDataBlock),
817 Dictionary(DictionaryDataBlock),
818}
819
820impl DataBlock {
821 pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
823 match self {
824 Self::Empty() => Ok(new_empty_array(&data_type).to_data()),
825 Self::Constant(inner) => inner.into_arrow(data_type, validate),
826 Self::AllNull(inner) => inner.into_arrow(data_type, validate),
827 Self::Nullable(inner) => inner.into_arrow(data_type, validate),
828 Self::FixedWidth(inner) => inner.into_arrow(data_type, validate),
829 Self::FixedSizeList(inner) => inner.into_arrow(data_type, validate),
830 Self::VariableWidth(inner) => inner.into_arrow(data_type, validate),
831 Self::Struct(inner) => inner.into_arrow(data_type, validate),
832 Self::Dictionary(inner) => inner.into_arrow(data_type, validate),
833 Self::Opaque(_) => Err(Error::internal(
834 "Cannot convert OpaqueBlock to Arrow".to_string(),
835 )),
836 }
837 }
838
839 pub fn into_buffers(self) -> Vec<LanceBuffer> {
843 match self {
844 Self::Empty() => Vec::default(),
845 Self::Constant(inner) => inner.into_buffers(),
846 Self::AllNull(inner) => inner.into_buffers(),
847 Self::Nullable(inner) => inner.into_buffers(),
848 Self::FixedWidth(inner) => inner.into_buffers(),
849 Self::FixedSizeList(inner) => inner.into_buffers(),
850 Self::VariableWidth(inner) => inner.into_buffers(),
851 Self::Struct(inner) => inner.into_buffers(),
852 Self::Dictionary(inner) => inner.into_buffers(),
853 Self::Opaque(inner) => inner.buffers,
854 }
855 }
856
857 pub fn try_clone(&self) -> Result<Self> {
866 match self {
867 Self::Empty() => Ok(Self::Empty()),
868 Self::Constant(inner) => Ok(Self::Constant(inner.clone())),
869 Self::AllNull(inner) => Ok(Self::AllNull(inner.clone())),
870 Self::Nullable(inner) => Ok(Self::Nullable(inner.clone())),
871 Self::FixedWidth(inner) => Ok(Self::FixedWidth(inner.clone())),
872 Self::FixedSizeList(inner) => Ok(Self::FixedSizeList(inner.clone())),
873 Self::VariableWidth(inner) => Ok(Self::VariableWidth(inner.clone())),
874 Self::Struct(inner) => Ok(Self::Struct(inner.clone())),
875 Self::Dictionary(inner) => Ok(Self::Dictionary(inner.clone())),
876 Self::Opaque(inner) => Ok(Self::Opaque(inner.clone())),
877 }
878 }
879
880 pub fn name(&self) -> &'static str {
881 match self {
882 Self::Constant(_) => "Constant",
883 Self::Empty() => "Empty",
884 Self::AllNull(_) => "AllNull",
885 Self::Nullable(_) => "Nullable",
886 Self::FixedWidth(_) => "FixedWidth",
887 Self::FixedSizeList(_) => "FixedSizeList",
888 Self::VariableWidth(_) => "VariableWidth",
889 Self::Struct(_) => "Struct",
890 Self::Dictionary(_) => "Dictionary",
891 Self::Opaque(_) => "Opaque",
892 }
893 }
894
895 pub fn is_variable(&self) -> bool {
896 match self {
897 Self::Constant(_) => false,
898 Self::Empty() => false,
899 Self::AllNull(_) => false,
900 Self::Nullable(nullable) => nullable.data.is_variable(),
901 Self::FixedWidth(_) => false,
902 Self::FixedSizeList(fsl) => fsl.child.is_variable(),
903 Self::VariableWidth(_) => true,
904 Self::Struct(strct) => strct.children.iter().any(|c| c.is_variable()),
905 Self::Dictionary(_) => {
906 todo!("is_variable for DictionaryDataBlock is not implemented yet")
907 }
908 Self::Opaque(_) => panic!("Does not make sense to ask if an Opaque block is variable"),
909 }
910 }
911
912 pub fn is_nullable(&self) -> bool {
913 match self {
914 Self::AllNull(_) => true,
915 Self::Nullable(_) => true,
916 Self::FixedSizeList(fsl) => fsl.child.is_nullable(),
917 Self::Struct(strct) => strct.children.iter().any(|c| c.is_nullable()),
918 Self::Dictionary(_) => {
919 todo!("is_nullable for DictionaryDataBlock is not implemented yet")
920 }
921 Self::Opaque(_) => panic!("Does not make sense to ask if an Opaque block is nullable"),
922 _ => false,
923 }
924 }
925
926 pub fn num_values(&self) -> u64 {
931 match self {
932 Self::Empty() => 0,
933 Self::Constant(inner) => inner.num_values,
934 Self::AllNull(inner) => inner.num_values,
935 Self::Nullable(inner) => inner.data.num_values(),
936 Self::FixedWidth(inner) => inner.num_values,
937 Self::FixedSizeList(inner) => inner.num_values(),
938 Self::VariableWidth(inner) => inner.num_values,
939 Self::Struct(inner) => inner.children[0].num_values(),
940 Self::Dictionary(inner) => inner.indices.num_values,
941 Self::Opaque(inner) => inner.num_values,
942 }
943 }
944
945 pub fn items_per_row(&self) -> u64 {
949 match self {
950 Self::Empty() => todo!(), Self::Constant(_) => todo!(), Self::AllNull(_) => todo!(), Self::Nullable(nullable) => nullable.data.items_per_row(),
954 Self::FixedWidth(_) => 1,
955 Self::FixedSizeList(fsl) => fsl.dimension * fsl.child.items_per_row(),
956 Self::VariableWidth(_) => 1,
957 Self::Struct(_) => todo!(), Self::Dictionary(_) => 1,
959 Self::Opaque(_) => 1,
960 }
961 }
962
963 pub fn data_size(&self) -> u64 {
965 match self {
966 Self::Empty() => 0,
967 Self::Constant(inner) => inner.data_size(),
968 Self::AllNull(_) => 0,
969 Self::Nullable(inner) => inner.data_size(),
970 Self::FixedWidth(inner) => inner.data_size(),
971 Self::FixedSizeList(inner) => inner.data_size(),
972 Self::VariableWidth(inner) => inner.data_size(),
973 Self::Struct(_) => {
974 todo!("the data_size method for StructDataBlock is not implemented yet")
975 }
976 Self::Dictionary(_) => {
977 todo!("the data_size method for DictionaryDataBlock is not implemented yet")
978 }
979 Self::Opaque(inner) => inner.data_size(),
980 }
981 }
982
983 pub fn remove_outer_validity(self) -> Self {
991 match self {
992 Self::AllNull(_) => panic!("Cannot remove validity on all-null data"),
993 Self::Nullable(inner) => *inner.data,
994 Self::Struct(inner) => Self::Struct(inner.remove_outer_validity()),
995 other => other,
996 }
997 }
998
999 pub fn make_builder(&self, estimated_size_bytes: u64) -> Box<dyn DataBlockBuilderImpl> {
1000 match self {
1001 Self::FixedWidth(inner) => {
1002 if inner.bits_per_value == 1 {
1003 Box::new(BitmapDataBlockBuilder::new(estimated_size_bytes))
1004 } else {
1005 Box::new(FixedWidthDataBlockBuilder::new(
1006 inner.bits_per_value,
1007 estimated_size_bytes,
1008 ))
1009 }
1010 }
1011 Self::VariableWidth(inner) => {
1012 if inner.bits_per_offset == 32 {
1013 Box::new(VariableWidthDataBlockBuilder::<i32>::new(
1014 estimated_size_bytes,
1015 ))
1016 } else if inner.bits_per_offset == 64 {
1017 Box::new(VariableWidthDataBlockBuilder::<i64>::new(
1018 estimated_size_bytes,
1019 ))
1020 } else {
1021 todo!()
1022 }
1023 }
1024 Self::FixedSizeList(inner) => {
1025 let inner_builder = inner.child.make_builder(estimated_size_bytes);
1026 Box::new(FixedSizeListBlockBuilder::new(
1027 inner_builder,
1028 inner.dimension,
1029 ))
1030 }
1031 Self::Nullable(nullable) => {
1032 let estimated_validity_size_bytes = estimated_size_bytes / 16;
1035 let inner_builder = nullable
1036 .data
1037 .make_builder(estimated_size_bytes - estimated_validity_size_bytes);
1038 Box::new(NullableDataBlockBuilder::new(
1039 inner_builder,
1040 estimated_validity_size_bytes as usize,
1041 ))
1042 }
1043 Self::Struct(struct_data_block) => {
1044 let num_children = struct_data_block.children.len();
1045 let per_child_estimate = if num_children == 0 {
1046 0
1047 } else {
1048 estimated_size_bytes / num_children as u64
1049 };
1050 let child_builders = struct_data_block
1051 .children
1052 .iter()
1053 .map(|child| child.make_builder(per_child_estimate))
1054 .collect();
1055 Box::new(StructDataBlockBuilder::new(child_builders))
1056 }
1057 Self::AllNull(_) => Box::new(AllNullDataBlockBuilder::default()),
1058 _ => todo!("make_builder for {:?}", self),
1059 }
1060 }
1061}
1062
1063macro_rules! as_type {
1064 ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1065 pub fn $fn_name(self) -> Option<$inner_type> {
1066 match self {
1067 Self::$inner(inner) => Some(inner),
1068 _ => None,
1069 }
1070 }
1071 };
1072}
1073
1074macro_rules! as_type_ref {
1075 ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1076 pub fn $fn_name(&self) -> Option<&$inner_type> {
1077 match self {
1078 Self::$inner(inner) => Some(inner),
1079 _ => None,
1080 }
1081 }
1082 };
1083}
1084
1085macro_rules! as_type_ref_mut {
1086 ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1087 pub fn $fn_name(&mut self) -> Option<&mut $inner_type> {
1088 match self {
1089 Self::$inner(inner) => Some(inner),
1090 _ => None,
1091 }
1092 }
1093 };
1094}
1095
1096impl DataBlock {
1098 as_type!(as_all_null, AllNull, AllNullDataBlock);
1099 as_type!(as_nullable, Nullable, NullableDataBlock);
1100 as_type!(as_fixed_width, FixedWidth, FixedWidthDataBlock);
1101 as_type!(as_fixed_size_list, FixedSizeList, FixedSizeListBlock);
1102 as_type!(as_variable_width, VariableWidth, VariableWidthBlock);
1103 as_type!(as_struct, Struct, StructDataBlock);
1104 as_type!(as_dictionary, Dictionary, DictionaryDataBlock);
1105 as_type_ref!(as_all_null_ref, AllNull, AllNullDataBlock);
1106 as_type_ref!(as_nullable_ref, Nullable, NullableDataBlock);
1107 as_type_ref!(as_fixed_width_ref, FixedWidth, FixedWidthDataBlock);
1108 as_type_ref!(as_fixed_size_list_ref, FixedSizeList, FixedSizeListBlock);
1109 as_type_ref!(as_variable_width_ref, VariableWidth, VariableWidthBlock);
1110 as_type_ref!(as_struct_ref, Struct, StructDataBlock);
1111 as_type_ref!(as_dictionary_ref, Dictionary, DictionaryDataBlock);
1112 as_type_ref_mut!(as_all_null_ref_mut, AllNull, AllNullDataBlock);
1113 as_type_ref_mut!(as_nullable_ref_mut, Nullable, NullableDataBlock);
1114 as_type_ref_mut!(as_fixed_width_ref_mut, FixedWidth, FixedWidthDataBlock);
1115 as_type_ref_mut!(
1116 as_fixed_size_list_ref_mut,
1117 FixedSizeList,
1118 FixedSizeListBlock
1119 );
1120 as_type_ref_mut!(as_variable_width_ref_mut, VariableWidth, VariableWidthBlock);
1121 as_type_ref_mut!(as_struct_ref_mut, Struct, StructDataBlock);
1122 as_type_ref_mut!(as_dictionary_ref_mut, Dictionary, DictionaryDataBlock);
1123}
1124
1125fn get_byte_range<T: ArrowNativeType>(offsets: &mut LanceBuffer) -> Range<usize> {
1128 let offsets = offsets.borrow_to_typed_slice::<T>();
1129 if offsets.as_ref().is_empty() {
1130 0..0
1131 } else {
1132 offsets.as_ref().first().unwrap().as_usize()..offsets.as_ref().last().unwrap().as_usize()
1133 }
1134}
1135
1136fn stitch_offsets<T: ArrowNativeType + std::ops::Add<Output = T> + std::ops::Sub<Output = T>>(
1142 offsets: Vec<LanceBuffer>,
1143) -> (LanceBuffer, Vec<Range<usize>>) {
1144 if offsets.is_empty() {
1145 return (LanceBuffer::empty(), Vec::default());
1146 }
1147 let len = offsets.iter().map(|b| b.len()).sum::<usize>();
1148 let mut dest = Vec::with_capacity(len);
1152 let mut byte_ranges = Vec::with_capacity(offsets.len());
1153
1154 dest.push(T::from_usize(0).unwrap());
1156
1157 for mut o in offsets.into_iter() {
1158 if !o.is_empty() {
1159 let last_offset = *dest.last().unwrap();
1160 let o = o.borrow_to_typed_slice::<T>();
1161 let start = *o.as_ref().first().unwrap();
1162 dest.extend(o.as_ref()[1..].iter().map(|&x| x + last_offset - start));
1176 }
1177 byte_ranges.push(get_byte_range::<T>(&mut o));
1178 }
1179 (LanceBuffer::reinterpret_vec(dest), byte_ranges)
1180}
1181
1182fn arrow_binary_to_data_block(
1183 arrays: &[ArrayRef],
1184 num_values: u64,
1185 bits_per_offset: u8,
1186) -> DataBlock {
1187 let data_vec = arrays.iter().map(|arr| arr.to_data()).collect::<Vec<_>>();
1188 let bytes_per_offset = bits_per_offset as usize / 8;
1189 let offsets = data_vec
1190 .iter()
1191 .map(|d| {
1192 LanceBuffer::from(
1193 d.buffers()[0].slice_with_length(d.offset(), (d.len() + 1) * bytes_per_offset),
1194 )
1195 })
1196 .collect::<Vec<_>>();
1197 let (offsets, data_ranges) = if bits_per_offset == 32 {
1198 stitch_offsets::<i32>(offsets)
1199 } else {
1200 stitch_offsets::<i64>(offsets)
1201 };
1202 let data = data_vec
1203 .iter()
1204 .zip(data_ranges)
1205 .map(|(d, byte_range)| {
1206 LanceBuffer::from(
1207 d.buffers()[1]
1208 .slice_with_length(byte_range.start, byte_range.end - byte_range.start),
1209 )
1210 })
1211 .collect::<Vec<_>>();
1212 let data = LanceBuffer::concat_into_one(data);
1213 DataBlock::VariableWidth(VariableWidthBlock {
1214 data,
1215 offsets,
1216 bits_per_offset,
1217 num_values,
1218 block_info: BlockInfo::new(),
1219 })
1220}
1221
1222fn encode_flat_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1223 let bytes_per_value = arrays[0].data_type().byte_width();
1224 let mut buffer = Vec::with_capacity(num_values as usize * bytes_per_value);
1225 for arr in arrays {
1226 let data = arr.to_data();
1227 buffer.extend_from_slice(data.buffers()[0].as_slice());
1228 }
1229 LanceBuffer::from(buffer)
1230}
1231
1232fn do_encode_bitmap_data(bitmaps: &[BooleanBuffer], num_values: u64) -> LanceBuffer {
1233 let mut builder = BooleanBufferBuilder::new(num_values as usize);
1234
1235 for buf in bitmaps {
1236 builder.append_buffer(buf);
1237 }
1238
1239 let buffer = builder.finish().into_inner();
1240 LanceBuffer::from(buffer)
1241}
1242
1243fn encode_bitmap_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1244 let bitmaps = arrays
1245 .iter()
1246 .map(|arr| arr.as_boolean().values().clone())
1247 .collect::<Vec<_>>();
1248 do_encode_bitmap_data(&bitmaps, num_values)
1249}
1250
1251fn concat_dict_arrays(arrays: &[ArrayRef]) -> ArrayRef {
1254 let value_type = arrays[0].as_any_dictionary().values().data_type();
1255 let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1256 match arrow_select::concat::concat(&array_refs) {
1257 Ok(array) => array,
1258 Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1259 let upscaled = array_refs
1261 .iter()
1262 .map(|arr| {
1263 match arrow_cast::cast(
1264 *arr,
1265 &DataType::Dictionary(
1266 Box::new(DataType::UInt32),
1267 Box::new(value_type.clone()),
1268 ),
1269 ) {
1270 Ok(arr) => arr,
1271 Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1272 unimplemented!("Dictionary arrays with more than 2^32 unique values")
1274 }
1275 err => err.unwrap(),
1276 }
1277 })
1278 .collect::<Vec<_>>();
1279 let array_refs = upscaled.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1280 match arrow_select::concat::concat(&array_refs) {
1282 Ok(array) => array,
1283 Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1284 unimplemented!("Dictionary arrays with more than 2^32 unique values")
1285 }
1286 err => err.unwrap(),
1287 }
1288 }
1289 err => err.unwrap(),
1291 }
1292}
1293
1294fn max_index_val(index_type: &DataType) -> u64 {
1295 match index_type {
1296 DataType::Int8 => i8::MAX as u64,
1297 DataType::Int16 => i16::MAX as u64,
1298 DataType::Int32 => i32::MAX as u64,
1299 DataType::Int64 => i64::MAX as u64,
1300 DataType::UInt8 => u8::MAX as u64,
1301 DataType::UInt16 => u16::MAX as u64,
1302 DataType::UInt32 => u32::MAX as u64,
1303 DataType::UInt64 => u64::MAX,
1304 _ => panic!("Invalid dictionary index type"),
1305 }
1306}
1307
1308fn arrow_dictionary_to_data_block(arrays: &[ArrayRef], validity: Option<NullBuffer>) -> DataBlock {
1327 let array = concat_dict_arrays(arrays);
1328 let array_dict = array.as_any_dictionary();
1329 let mut indices = array_dict.keys();
1330 let num_values = indices.len() as u64;
1331 let mut values = array_dict.values().clone();
1332 let mut upcast = None;
1334
1335 let indices_block = if let Some(validity) = validity {
1339 let mut first_invalid_index = None;
1343 if let Some(values_validity) = values.nulls() {
1344 first_invalid_index = (!values_validity.inner()).set_indices().next();
1345 }
1346 let first_invalid_index = first_invalid_index.unwrap_or_else(|| {
1347 let null_arr = new_null_array(values.data_type(), 1);
1348 values = arrow_select::concat::concat(&[values.as_ref(), null_arr.as_ref()]).unwrap();
1349 let null_index = values.len() - 1;
1350 let max_index_val = max_index_val(indices.data_type());
1351 if null_index as u64 > max_index_val {
1352 if max_index_val >= u32::MAX as u64 {
1354 unimplemented!("Dictionary arrays with 2^32 unique value (or more) and a null")
1355 }
1356 upcast = Some(arrow_cast::cast(indices, &DataType::UInt32).unwrap());
1357 indices = upcast.as_ref().unwrap();
1358 }
1359 null_index
1360 });
1361 let null_index_arr = arrow_cast::cast(
1363 &UInt64Array::from(vec![first_invalid_index as u64]),
1364 indices.data_type(),
1365 )
1366 .unwrap();
1367
1368 let bytes_per_index = indices.data_type().byte_width();
1369 let bits_per_index = bytes_per_index as u64 * 8;
1370
1371 let null_index_arr = null_index_arr.into_data();
1372 let null_index_bytes = &null_index_arr.buffers()[0];
1373 let mut indices_bytes = indices.to_data().buffers()[0].to_vec();
1375 for invalid_idx in (!validity.inner()).set_indices() {
1376 indices_bytes[invalid_idx * bytes_per_index..(invalid_idx + 1) * bytes_per_index]
1377 .copy_from_slice(null_index_bytes.as_slice());
1378 }
1379 FixedWidthDataBlock {
1380 data: LanceBuffer::from(indices_bytes),
1381 bits_per_value: bits_per_index,
1382 num_values,
1383 block_info: BlockInfo::new(),
1384 }
1385 } else {
1386 FixedWidthDataBlock {
1387 data: LanceBuffer::from(indices.to_data().buffers()[0].clone()),
1388 bits_per_value: indices.data_type().byte_width() as u64 * 8,
1389 num_values,
1390 block_info: BlockInfo::new(),
1391 }
1392 };
1393
1394 let items = DataBlock::from(values);
1395 DataBlock::Dictionary(DictionaryDataBlock {
1396 indices: indices_block,
1397 dictionary: Box::new(items),
1398 })
1399}
1400
1401enum Nullability {
1402 None,
1403 All,
1404 Some(NullBuffer),
1405}
1406
1407impl Nullability {
1408 fn to_option(&self) -> Option<NullBuffer> {
1409 match self {
1410 Self::Some(nulls) => Some(nulls.clone()),
1411 _ => None,
1412 }
1413 }
1414}
1415
1416fn extract_nulls(arrays: &[ArrayRef], num_values: u64) -> Nullability {
1417 let mut has_nulls = false;
1418 let nulls_and_lens = arrays
1419 .iter()
1420 .map(|arr| {
1421 let nulls = arr.logical_nulls();
1422 has_nulls |= nulls.is_some();
1423 (nulls, arr.len())
1424 })
1425 .collect::<Vec<_>>();
1426 if !has_nulls {
1427 return Nullability::None;
1428 }
1429 let mut builder = BooleanBufferBuilder::new(num_values as usize);
1430 let mut num_nulls = 0;
1431 for (null, len) in nulls_and_lens {
1432 if let Some(null) = null {
1433 num_nulls += null.null_count();
1434 builder.append_buffer(&null.into_inner());
1435 } else {
1436 builder.append_n(len, true);
1437 }
1438 }
1439 if num_nulls == num_values as usize {
1440 Nullability::All
1441 } else {
1442 Nullability::Some(NullBuffer::new(builder.finish()))
1443 }
1444}
1445
1446impl DataBlock {
1447 pub fn from_arrays(arrays: &[ArrayRef], num_values: u64) -> Self {
1448 if arrays.is_empty() || num_values == 0 {
1449 return Self::AllNull(AllNullDataBlock { num_values: 0 });
1450 }
1451
1452 let data_type = arrays[0].data_type();
1453 let nulls = extract_nulls(arrays, num_values);
1454
1455 if let Nullability::All = nulls {
1456 return Self::AllNull(AllNullDataBlock { num_values });
1457 }
1458
1459 let mut encoded = match data_type {
1460 DataType::Binary | DataType::Utf8 => arrow_binary_to_data_block(arrays, num_values, 32),
1461 DataType::BinaryView | DataType::Utf8View => {
1462 todo!()
1463 }
1464 DataType::LargeBinary | DataType::LargeUtf8 => {
1465 arrow_binary_to_data_block(arrays, num_values, 64)
1466 }
1467 DataType::Boolean => {
1468 let data = encode_bitmap_data(arrays, num_values);
1469 Self::FixedWidth(FixedWidthDataBlock {
1470 data,
1471 bits_per_value: 1,
1472 num_values,
1473 block_info: BlockInfo::new(),
1474 })
1475 }
1476 DataType::Date32
1477 | DataType::Date64
1478 | DataType::Decimal32(_, _)
1479 | DataType::Decimal64(_, _)
1480 | DataType::Decimal128(_, _)
1481 | DataType::Decimal256(_, _)
1482 | DataType::Duration(_)
1483 | DataType::FixedSizeBinary(_)
1484 | DataType::Float16
1485 | DataType::Float32
1486 | DataType::Float64
1487 | DataType::Int16
1488 | DataType::Int32
1489 | DataType::Int64
1490 | DataType::Int8
1491 | DataType::Interval(_)
1492 | DataType::Time32(_)
1493 | DataType::Time64(_)
1494 | DataType::Timestamp(_, _)
1495 | DataType::UInt16
1496 | DataType::UInt32
1497 | DataType::UInt64
1498 | DataType::UInt8 => {
1499 let data = encode_flat_data(arrays, num_values);
1500 Self::FixedWidth(FixedWidthDataBlock {
1501 data,
1502 bits_per_value: data_type.byte_width() as u64 * 8,
1503 num_values,
1504 block_info: BlockInfo::new(),
1505 })
1506 }
1507 DataType::Null => Self::AllNull(AllNullDataBlock { num_values }),
1508 DataType::Dictionary(_, _) => arrow_dictionary_to_data_block(arrays, nulls.to_option()),
1509 DataType::Struct(fields) => {
1510 let structs = arrays.iter().map(|arr| arr.as_struct()).collect::<Vec<_>>();
1511 let mut children = Vec::with_capacity(fields.len());
1512 for child_idx in 0..fields.len() {
1513 let child_vec = structs
1514 .iter()
1515 .map(|s| s.column(child_idx).clone())
1516 .collect::<Vec<_>>();
1517 children.push(Self::from_arrays(&child_vec, num_values));
1518 }
1519
1520 let validity = match &nulls {
1522 Nullability::None => None,
1523 Nullability::Some(null_buffer) => Some(null_buffer.clone()),
1524 Nullability::All => unreachable!("Should have returned AllNull earlier"),
1525 };
1526
1527 Self::Struct(StructDataBlock {
1528 children,
1529 block_info: BlockInfo::default(),
1530 validity,
1531 })
1532 }
1533 DataType::FixedSizeList(_, dim) => {
1534 let children = arrays
1535 .iter()
1536 .map(|arr| arr.as_fixed_size_list().values().clone())
1537 .collect::<Vec<_>>();
1538 let child_block = Self::from_arrays(&children, num_values * *dim as u64);
1539 Self::FixedSizeList(FixedSizeListBlock {
1540 child: Box::new(child_block),
1541 dimension: *dim as u64,
1542 })
1543 }
1544 DataType::LargeList(_)
1545 | DataType::List(_)
1546 | DataType::ListView(_)
1547 | DataType::LargeListView(_)
1548 | DataType::Map(_, _)
1549 | DataType::RunEndEncoded(_, _)
1550 | DataType::Union(_, _) => {
1551 panic!(
1552 "Field with data type {} cannot be converted to data block",
1553 data_type
1554 )
1555 }
1556 };
1557
1558 encoded.compute_stat();
1560
1561 if !matches!(data_type, DataType::Dictionary(_, _)) {
1562 match nulls {
1563 Nullability::None => encoded,
1564 Nullability::Some(nulls) => Self::Nullable(NullableDataBlock {
1565 data: Box::new(encoded),
1566 nulls: LanceBuffer::from(nulls.into_inner().into_inner()),
1567 block_info: BlockInfo::new(),
1568 }),
1569 _ => unreachable!(),
1570 }
1571 } else {
1572 encoded
1574 }
1575 }
1576
1577 pub fn from_array<T: Array + 'static>(array: T) -> Self {
1578 let num_values = array.len();
1579 Self::from_arrays(&[Arc::new(array)], num_values as u64)
1580 }
1581}
1582
1583impl From<ArrayRef> for DataBlock {
1584 fn from(array: ArrayRef) -> Self {
1585 let num_values = array.len() as u64;
1586 Self::from_arrays(&[array], num_values)
1587 }
1588}
1589
1590pub trait DataBlockBuilderImpl: std::fmt::Debug {
1591 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>);
1592 fn finish(self: Box<Self>) -> DataBlock;
1593}
1594
1595#[derive(Debug)]
1596pub struct DataBlockBuilder {
1597 estimated_size_bytes: u64,
1598 builder: Option<Box<dyn DataBlockBuilderImpl>>,
1599}
1600
1601impl DataBlockBuilder {
1602 pub fn with_capacity_estimate(estimated_size_bytes: u64) -> Self {
1603 Self {
1604 estimated_size_bytes,
1605 builder: None,
1606 }
1607 }
1608
1609 fn get_builder(&mut self, block: &DataBlock) -> &mut dyn DataBlockBuilderImpl {
1610 if self.builder.is_none() {
1611 self.builder = Some(block.make_builder(self.estimated_size_bytes));
1612 }
1613 self.builder.as_mut().unwrap().as_mut()
1614 }
1615
1616 pub fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
1617 self.get_builder(data_block).append(data_block, selection);
1618 }
1619
1620 pub fn finish(self) -> DataBlock {
1621 let builder = self.builder.expect("DataBlockBuilder didn't see any data");
1622 builder.finish()
1623 }
1624}
1625
1626#[cfg(test)]
1627mod tests {
1628 use std::sync::Arc;
1629
1630 use arrow_array::{
1631 ArrayRef, DictionaryArray, Int8Array, LargeBinaryArray, StringArray, UInt8Array,
1632 UInt16Array, make_array, new_null_array,
1633 types::{Int8Type, Int32Type},
1634 };
1635 use arrow_buffer::{BooleanBuffer, NullBuffer};
1636
1637 use arrow_schema::{DataType, Field, Fields};
1638 use lance_datagen::{ArrayGeneratorExt, DEFAULT_SEED, RowCount, array};
1639 use rand::SeedableRng;
1640
1641 use crate::buffer::LanceBuffer;
1642
1643 use super::{AllNullDataBlock, DataBlock};
1644
1645 use arrow_array::Array;
1646
1647 #[test]
1648 fn test_sliced_to_data_block() {
1649 let ints = UInt16Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
1650 let ints = ints.slice(2, 4);
1651 let data = DataBlock::from_array(ints);
1652
1653 let fixed_data = data.as_fixed_width().unwrap();
1654 assert_eq!(fixed_data.num_values, 4);
1655 assert_eq!(fixed_data.data.len(), 8);
1656
1657 let nullable_ints =
1658 UInt16Array::from(vec![Some(0), None, Some(2), None, Some(4), None, Some(6)]);
1659 let nullable_ints = nullable_ints.slice(1, 3);
1660 let data = DataBlock::from_array(nullable_ints);
1661
1662 let nullable = data.as_nullable().unwrap();
1663 assert_eq!(nullable.nulls, LanceBuffer::from(vec![0b00000010]));
1664 }
1665
1666 #[test]
1667 fn test_string_to_data_block() {
1668 let strings1 = StringArray::from(vec![Some("hello"), None, Some("world")]);
1670 let strings2 = StringArray::from(vec![Some("a"), Some("b")]);
1671 let strings3 = StringArray::from(vec![Option::<&'static str>::None, None]);
1672
1673 let arrays = &[strings1, strings2, strings3]
1674 .iter()
1675 .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1676 .collect::<Vec<_>>();
1677
1678 let block = DataBlock::from_arrays(arrays, 7);
1679
1680 assert_eq!(block.num_values(), 7);
1681 let block = block.as_nullable().unwrap();
1682
1683 assert_eq!(block.nulls, LanceBuffer::from(vec![0b00011101]));
1684
1685 let data = block.data.as_variable_width().unwrap();
1686 assert_eq!(
1687 data.offsets,
1688 LanceBuffer::reinterpret_vec(vec![0, 5, 5, 10, 11, 12, 12, 12])
1689 );
1690
1691 assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworldab"));
1692
1693 let strings1 = StringArray::from(vec![Some("a"), Some("bc")]);
1695 let strings2 = StringArray::from(vec![Some("def")]);
1696
1697 let arrays = &[strings1, strings2]
1698 .iter()
1699 .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1700 .collect::<Vec<_>>();
1701
1702 let block = DataBlock::from_arrays(arrays, 3);
1703
1704 assert_eq!(block.num_values(), 3);
1705 let data = block.as_variable_width().unwrap();
1707 assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(vec![0, 1, 3, 6]));
1708 assert_eq!(data.data, LanceBuffer::copy_slice(b"abcdef"));
1709 }
1710
1711 #[test]
1712 fn test_string_sliced() {
1713 let check = |arr: Vec<StringArray>, expected_off: Vec<i32>, expected_data: &[u8]| {
1714 let arrs = arr
1715 .into_iter()
1716 .map(|a| Arc::new(a) as ArrayRef)
1717 .collect::<Vec<_>>();
1718 let num_rows = arrs.iter().map(|a| a.len()).sum::<usize>() as u64;
1719 let data = DataBlock::from_arrays(&arrs, num_rows);
1720
1721 assert_eq!(data.num_values(), num_rows);
1722
1723 let data = data.as_variable_width().unwrap();
1724 assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(expected_off));
1725 assert_eq!(data.data, LanceBuffer::copy_slice(expected_data));
1726 };
1727
1728 let string = StringArray::from(vec![Some("hello"), Some("world")]);
1729 check(vec![string.slice(1, 1)], vec![0, 5], b"world");
1730 check(vec![string.slice(0, 1)], vec![0, 5], b"hello");
1731 check(
1732 vec![string.slice(0, 1), string.slice(1, 1)],
1733 vec![0, 5, 10],
1734 b"helloworld",
1735 );
1736
1737 let string2 = StringArray::from(vec![Some("foo"), Some("bar")]);
1738 check(
1739 vec![string.slice(0, 1), string2.slice(0, 1)],
1740 vec![0, 5, 8],
1741 b"hellofoo",
1742 );
1743 }
1744
1745 #[test]
1746 fn test_large() {
1747 let arr = LargeBinaryArray::from_vec(vec![b"hello", b"world"]);
1748 let data = DataBlock::from_array(arr);
1749
1750 assert_eq!(data.num_values(), 2);
1751 let data = data.as_variable_width().unwrap();
1752 assert_eq!(data.bits_per_offset, 64);
1753 assert_eq!(data.num_values, 2);
1754 assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworld"));
1755 assert_eq!(
1756 data.offsets,
1757 LanceBuffer::reinterpret_vec(vec![0_u64, 5, 10])
1758 );
1759 }
1760
1761 #[test]
1762 fn test_dictionary_indices_normalized() {
1763 let arr1 = DictionaryArray::<Int8Type>::from_iter([Some("a"), Some("a"), Some("b")]);
1764 let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("b"), Some("c")]);
1765
1766 let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1767
1768 assert_eq!(data.num_values(), 5);
1769 let data = data.as_dictionary().unwrap();
1770 let indices = data.indices;
1771 assert_eq!(indices.bits_per_value, 8);
1772 assert_eq!(indices.num_values, 5);
1773 assert_eq!(
1774 indices.data,
1775 LanceBuffer::reinterpret_vec::<i8>(vec![0, 0, 1, 2, 3])
1779 );
1780
1781 let items = data.dictionary.as_variable_width().unwrap();
1782 assert_eq!(items.bits_per_offset, 32);
1783 assert_eq!(items.num_values, 4);
1784 assert_eq!(items.data, LanceBuffer::copy_slice(b"abbc"));
1785 assert_eq!(
1786 items.offsets,
1787 LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 4],)
1788 );
1789 }
1790
1791 #[test]
1792 fn test_dictionary_nulls() {
1793 let arr1 = DictionaryArray::<Int8Type>::from_iter([None, Some("a"), Some("b")]);
1797 let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("c"), None]);
1798
1799 let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1800
1801 let check_common = |data: DataBlock| {
1802 assert_eq!(data.num_values(), 5);
1803 let dict = data.as_dictionary().unwrap();
1804
1805 let nullable_items = dict.dictionary.as_nullable().unwrap();
1806 assert_eq!(nullable_items.nulls, LanceBuffer::from(vec![0b00000111]));
1807 assert_eq!(nullable_items.data.num_values(), 4);
1808
1809 let items = nullable_items.data.as_variable_width().unwrap();
1810 assert_eq!(items.bits_per_offset, 32);
1811 assert_eq!(items.num_values, 4);
1812 assert_eq!(items.data, LanceBuffer::copy_slice(b"abc"));
1813 assert_eq!(
1814 items.offsets,
1815 LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 3],)
1816 );
1817
1818 let indices = dict.indices;
1819 assert_eq!(indices.bits_per_value, 8);
1820 assert_eq!(indices.num_values, 5);
1821 assert_eq!(
1822 indices.data,
1823 LanceBuffer::reinterpret_vec::<i8>(vec![3, 0, 1, 2, 3])
1824 );
1825 };
1826 check_common(data);
1827
1828 let items = StringArray::from(vec![Some("a"), Some("b"), Some("c"), None]);
1830 let indices = Int8Array::from(vec![Some(3), Some(0), Some(1), Some(2), Some(3)]);
1831 let dict = DictionaryArray::new(indices, Arc::new(items));
1832
1833 let data = DataBlock::from_array(dict);
1834
1835 check_common(data);
1836 }
1837
1838 #[test]
1839 fn test_dictionary_cannot_add_null() {
1840 let items = StringArray::from(
1842 (0..256)
1843 .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1844 .collect::<Vec<_>>(),
1845 );
1846 let indices = UInt8Array::from(
1848 (0..=256)
1849 .map(|i| if i == 256 { None } else { Some(i as u8) })
1850 .collect::<Vec<_>>(),
1851 );
1852 let dict = DictionaryArray::new(indices, Arc::new(items));
1855 let data = DataBlock::from_array(dict);
1856
1857 assert_eq!(data.num_values(), 257);
1858
1859 let dict = data.as_dictionary().unwrap();
1860
1861 assert_eq!(dict.indices.bits_per_value, 32);
1862 assert_eq!(
1863 dict.indices.data,
1864 LanceBuffer::reinterpret_vec((0_u32..257).collect::<Vec<_>>())
1865 );
1866
1867 let nullable_items = dict.dictionary.as_nullable().unwrap();
1868 let null_buffer = NullBuffer::new(BooleanBuffer::new(
1869 nullable_items.nulls.into_buffer(),
1870 0,
1871 257,
1872 ));
1873 for i in 0..256 {
1874 assert!(!null_buffer.is_null(i));
1875 }
1876 assert!(null_buffer.is_null(256));
1877
1878 assert_eq!(
1879 nullable_items.data.as_variable_width().unwrap().data.len(),
1880 32640
1881 );
1882 }
1883
1884 #[test]
1885 fn test_all_null() {
1886 for data_type in [
1887 DataType::UInt32,
1888 DataType::FixedSizeBinary(2),
1889 DataType::List(Arc::new(Field::new("item", DataType::UInt32, true))),
1890 DataType::Struct(Fields::from(vec![Field::new("a", DataType::UInt32, true)])),
1891 ] {
1892 let block = DataBlock::AllNull(AllNullDataBlock { num_values: 10 });
1893 let arr = block.into_arrow(data_type.clone(), true).unwrap();
1894 let arr = make_array(arr);
1895 let expected = new_null_array(&data_type, 10);
1896 assert_eq!(&arr, &expected);
1897 }
1898 }
1899
1900 #[test]
1901 fn test_dictionary_cannot_concatenate() {
1902 let items = StringArray::from(
1904 (0..256)
1905 .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1906 .collect::<Vec<_>>(),
1907 );
1908 let other_items = StringArray::from(
1910 (0..256)
1911 .map(|i| Some(String::from_utf8(vec![1; i + 1]).unwrap()))
1912 .collect::<Vec<_>>(),
1913 );
1914 let indices = UInt8Array::from_iter_values(0..=255);
1915 let dict1 = DictionaryArray::new(indices.clone(), Arc::new(items));
1916 let dict2 = DictionaryArray::new(indices, Arc::new(other_items));
1917 let data = DataBlock::from_arrays(&[Arc::new(dict1), Arc::new(dict2)], 512);
1918 assert_eq!(data.num_values(), 512);
1919
1920 let dict = data.as_dictionary().unwrap();
1921
1922 assert_eq!(dict.indices.bits_per_value, 32);
1923 assert_eq!(
1924 dict.indices.data,
1925 LanceBuffer::reinterpret_vec::<u32>((0..512).collect::<Vec<_>>())
1926 );
1927 assert_eq!(
1929 dict.dictionary.as_variable_width().unwrap().data.len(),
1930 65536
1931 );
1932 }
1933
1934 #[test]
1935 fn test_data_size() {
1936 let mut rng = rand_xoshiro::Xoshiro256PlusPlus::seed_from_u64(DEFAULT_SEED.0);
1937 let mut genn = array::rand::<Int32Type>().with_nulls(&[false, false, false]);
1939
1940 let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
1941 let block = DataBlock::from_array(arr.clone());
1942 assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
1943
1944 let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
1945 let block = DataBlock::from_array(arr.clone());
1946 assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
1947
1948 let mut genn = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
1950 let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
1951 let block = DataBlock::from_array(arr.clone());
1952
1953 let array_data = arr.to_data();
1954 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1955 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1957 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1958
1959 let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
1960 let block = DataBlock::from_array(arr.clone());
1961
1962 let array_data = arr.to_data();
1963 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1964 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1965 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1966
1967 let mut genn = array::rand::<Int32Type>().with_nulls(&[true, true, false]);
1968 let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
1969 let block = DataBlock::from_array(arr.clone());
1970
1971 let array_data = arr.to_data();
1972 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1973 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1974 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1975
1976 let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
1977 let block = DataBlock::from_array(arr.clone());
1978
1979 let array_data = arr.to_data();
1980 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1981 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1982 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1983
1984 let mut genn = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
1985 let arr1 = genn.generate(RowCount::from(3), &mut rng).unwrap();
1986 let arr2 = genn.generate(RowCount::from(3), &mut rng).unwrap();
1987 let arr3 = genn.generate(RowCount::from(3), &mut rng).unwrap();
1988 let block = DataBlock::from_arrays(&[arr1.clone(), arr2.clone(), arr3.clone()], 9);
1989
1990 let concatenated_array = arrow_select::concat::concat(&[
1991 &*Arc::new(arr1.clone()) as &dyn Array,
1992 &*Arc::new(arr2.clone()) as &dyn Array,
1993 &*Arc::new(arr3.clone()) as &dyn Array,
1994 ])
1995 .unwrap();
1996 let total_buffer_size: usize = concatenated_array
1997 .to_data()
1998 .buffers()
1999 .iter()
2000 .map(|buffer| buffer.len())
2001 .sum();
2002
2003 let total_nulls_size_in_bytes = concatenated_array.nulls().unwrap().len().div_ceil(8);
2004 assert!(block.data_size() == (total_buffer_size + total_nulls_size_in_bytes) as u64);
2005 }
2006}