1use std::{
18 ops::Range,
19 sync::{Arc, RwLock},
20};
21
22use arrow_array::{
23 Array, ArrayRef, OffsetSizeTrait, UInt64Array,
24 cast::AsArray,
25 new_empty_array, new_null_array,
26 types::{ArrowDictionaryKeyType, UInt8Type, UInt16Type, UInt32Type, UInt64Type},
27};
28use arrow_buffer::{ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer};
29use arrow_data::{ArrayData, ArrayDataBuilder};
30use arrow_schema::DataType;
31use lance_arrow::DataTypeExt;
32
33use lance_core::{Error, Result};
34
35use crate::{
36 buffer::LanceBuffer,
37 statistics::{ComputeStat, Stat},
38};
39
40#[derive(Debug, Clone)]
46pub struct AllNullDataBlock {
47 pub num_values: u64,
49}
50
51impl AllNullDataBlock {
52 fn into_arrow(self, data_type: DataType, _validate: bool) -> Result<ArrayData> {
53 Ok(ArrayData::new_null(&data_type, self.num_values as usize))
54 }
55
56 fn into_buffers(self) -> Vec<LanceBuffer> {
57 vec![]
58 }
59}
60
61use std::collections::HashMap;
62
63#[derive(Debug, Clone)]
66pub struct BlockInfo(pub Arc<RwLock<HashMap<Stat, Arc<dyn Array>>>>);
67
68impl Default for BlockInfo {
69 fn default() -> Self {
70 Self::new()
71 }
72}
73
74impl BlockInfo {
75 pub fn new() -> Self {
76 Self(Arc::new(RwLock::new(HashMap::new())))
77 }
78}
79
80impl PartialEq for BlockInfo {
81 fn eq(&self, other: &Self) -> bool {
82 let self_info = self.0.read().unwrap();
83 let other_info = other.0.read().unwrap();
84 *self_info == *other_info
85 }
86}
87
88#[derive(Debug, Clone)]
94pub struct NullableDataBlock {
95 pub data: Box<DataBlock>,
97 pub nulls: LanceBuffer,
99
100 pub block_info: BlockInfo,
101}
102
103impl NullableDataBlock {
104 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
105 let nulls = self.nulls.into_buffer();
106 let data = self.data.into_arrow(data_type, validate)?.into_builder();
107 let data = data.null_bit_buffer(Some(nulls));
108 if validate {
109 Ok(data.build()?)
110 } else {
111 Ok(unsafe { data.build_unchecked() })
112 }
113 }
114
115 fn into_buffers(self) -> Vec<LanceBuffer> {
116 let mut buffers = vec![self.nulls];
117 buffers.extend(self.data.into_buffers());
118 buffers
119 }
120
121 pub fn data_size(&self) -> u64 {
122 self.data.data_size() + self.nulls.len() as u64
123 }
124}
125
126#[derive(Debug, PartialEq, Clone)]
128pub struct ConstantDataBlock {
129 pub data: LanceBuffer,
131 pub num_values: u64,
133}
134
135impl ConstantDataBlock {
136 fn into_buffers(self) -> Vec<LanceBuffer> {
137 vec![self.data]
138 }
139
140 fn into_arrow(self, _data_type: DataType, _validate: bool) -> Result<ArrayData> {
141 todo!()
144 }
145
146 pub fn try_clone(&self) -> Result<Self> {
147 Ok(Self {
148 data: self.data.clone(),
149 num_values: self.num_values,
150 })
151 }
152
153 pub fn data_size(&self) -> u64 {
154 self.data.len() as u64
155 }
156}
157
158#[derive(Debug, PartialEq, Clone)]
160pub struct FixedWidthDataBlock {
161 pub data: LanceBuffer,
163 pub bits_per_value: u64,
165 pub num_values: u64,
167
168 pub block_info: BlockInfo,
169}
170
171impl FixedWidthDataBlock {
172 fn do_into_arrow(
173 self,
174 data_type: DataType,
175 num_values: u64,
176 validate: bool,
177 ) -> Result<ArrayData> {
178 let data_buffer = self.data.into_buffer();
179 let builder = ArrayDataBuilder::new(data_type)
180 .add_buffer(data_buffer)
181 .len(num_values as usize)
182 .null_count(0);
183 if validate {
184 Ok(builder.build()?)
185 } else {
186 Ok(unsafe { builder.build_unchecked() })
187 }
188 }
189
190 pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
191 let root_num_values = self.num_values;
192 self.do_into_arrow(data_type, root_num_values, validate)
193 }
194
195 pub fn into_buffers(self) -> Vec<LanceBuffer> {
196 vec![self.data]
197 }
198
199 pub fn try_clone(&self) -> Result<Self> {
200 Ok(Self {
201 data: self.data.clone(),
202 bits_per_value: self.bits_per_value,
203 num_values: self.num_values,
204 block_info: self.block_info.clone(),
205 })
206 }
207
208 pub fn data_size(&self) -> u64 {
209 self.data.len() as u64
210 }
211}
212
213#[derive(Debug)]
214pub struct VariableWidthDataBlockBuilder<T: OffsetSizeTrait> {
215 offsets: Vec<T>,
216 bytes: Vec<u8>,
217}
218
219impl<T: OffsetSizeTrait> VariableWidthDataBlockBuilder<T> {
220 fn new(estimated_size_bytes: u64) -> Self {
221 Self {
222 offsets: vec![T::from_usize(0).unwrap()],
223 bytes: Vec::with_capacity(estimated_size_bytes as usize),
224 }
225 }
226}
227
228impl<T: OffsetSizeTrait + bytemuck::Pod> DataBlockBuilderImpl for VariableWidthDataBlockBuilder<T> {
229 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
230 let block = data_block.as_variable_width_ref().unwrap();
231 assert!(block.bits_per_offset == T::get_byte_width() as u8 * 8);
232 let offsets = block.offsets.borrow_to_typed_view::<T>();
233
234 let start_offset = offsets[selection.start as usize];
235 let end_offset = offsets[selection.end as usize];
236 let mut previous_len = self.bytes.len();
237
238 self.bytes
239 .extend_from_slice(&block.data[start_offset.as_usize()..end_offset.as_usize()]);
240
241 self.offsets.extend(
242 offsets[selection.start as usize..selection.end as usize]
243 .iter()
244 .zip(&offsets[selection.start as usize + 1..=selection.end as usize])
245 .map(|(¤t, &next)| {
246 let this_value_len = next - current;
247 previous_len += this_value_len.as_usize();
248 T::from_usize(previous_len).unwrap()
249 }),
250 );
251 }
252
253 fn finish(self: Box<Self>) -> DataBlock {
254 let num_values = (self.offsets.len() - 1) as u64;
255 DataBlock::VariableWidth(VariableWidthBlock {
256 data: LanceBuffer::from(self.bytes),
257 offsets: LanceBuffer::reinterpret_vec(self.offsets),
258 bits_per_offset: T::get_byte_width() as u8 * 8,
259 num_values,
260 block_info: BlockInfo::new(),
261 })
262 }
263}
264
265#[derive(Debug)]
266struct BitmapDataBlockBuilder {
267 values: BooleanBufferBuilder,
268}
269
270impl BitmapDataBlockBuilder {
271 fn new(estimated_size_bytes: u64) -> Self {
272 Self {
273 values: BooleanBufferBuilder::new(estimated_size_bytes as usize * 8),
274 }
275 }
276}
277
278impl DataBlockBuilderImpl for BitmapDataBlockBuilder {
279 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
280 let bitmap_blk = data_block.as_fixed_width_ref().unwrap();
281 self.values.append_packed_range(
282 selection.start as usize..selection.end as usize,
283 &bitmap_blk.data,
284 );
285 }
286
287 fn finish(mut self: Box<Self>) -> DataBlock {
288 let bool_buf = self.values.finish();
289 let num_values = bool_buf.len() as u64;
290 let bits_buf = bool_buf.into_inner();
291 DataBlock::FixedWidth(FixedWidthDataBlock {
292 data: LanceBuffer::from(bits_buf),
293 bits_per_value: 1,
294 num_values,
295 block_info: BlockInfo::new(),
296 })
297 }
298}
299
300#[derive(Debug)]
301struct FixedWidthDataBlockBuilder {
302 bits_per_value: u64,
303 bytes_per_value: u64,
304 values: Vec<u8>,
305}
306
307impl FixedWidthDataBlockBuilder {
308 fn new(bits_per_value: u64, estimated_size_bytes: u64) -> Self {
309 assert!(bits_per_value.is_multiple_of(8));
310 Self {
311 bits_per_value,
312 bytes_per_value: bits_per_value / 8,
313 values: Vec::with_capacity(estimated_size_bytes as usize),
314 }
315 }
316}
317
318impl DataBlockBuilderImpl for FixedWidthDataBlockBuilder {
319 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
320 let block = data_block.as_fixed_width_ref().unwrap();
321 assert_eq!(self.bits_per_value, block.bits_per_value);
322 let start = selection.start as usize * self.bytes_per_value as usize;
323 let end = selection.end as usize * self.bytes_per_value as usize;
324 self.values.extend_from_slice(&block.data[start..end]);
325 }
326
327 fn finish(self: Box<Self>) -> DataBlock {
328 let num_values = (self.values.len() / self.bytes_per_value as usize) as u64;
329 DataBlock::FixedWidth(FixedWidthDataBlock {
330 data: LanceBuffer::from(self.values),
331 bits_per_value: self.bits_per_value,
332 num_values,
333 block_info: BlockInfo::new(),
334 })
335 }
336}
337
338#[derive(Debug)]
339struct StructDataBlockBuilder {
340 children: Vec<Box<dyn DataBlockBuilderImpl>>,
341}
342
343impl StructDataBlockBuilder {
344 fn new(children: Vec<Box<dyn DataBlockBuilderImpl>>) -> Self {
345 Self { children }
346 }
347}
348
349impl DataBlockBuilderImpl for StructDataBlockBuilder {
350 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
351 let data_block = data_block.as_struct_ref().unwrap();
352 for i in 0..self.children.len() {
353 self.children[i].append(&data_block.children[i], selection.clone());
354 }
355 }
356
357 fn finish(self: Box<Self>) -> DataBlock {
358 let mut children_data_block = Vec::new();
359 for child in self.children {
360 let child_data_block = child.finish();
361 children_data_block.push(child_data_block);
362 }
363 DataBlock::Struct(StructDataBlock {
364 children: children_data_block,
365 block_info: BlockInfo::new(),
366 validity: None,
367 })
368 }
369}
370
371#[derive(Debug, Default)]
372struct AllNullDataBlockBuilder {
373 num_values: u64,
374}
375
376impl DataBlockBuilderImpl for AllNullDataBlockBuilder {
377 fn append(&mut self, _data_block: &DataBlock, selection: Range<u64>) {
378 self.num_values += selection.end - selection.start;
379 }
380
381 fn finish(self: Box<Self>) -> DataBlock {
382 DataBlock::AllNull(AllNullDataBlock {
383 num_values: self.num_values,
384 })
385 }
386}
387
388#[derive(Debug, Clone)]
390pub struct FixedSizeListBlock {
391 pub child: Box<DataBlock>,
393 pub dimension: u64,
395}
396
397impl FixedSizeListBlock {
398 pub fn num_values(&self) -> u64 {
399 self.child.num_values() / self.dimension
400 }
401
402 pub fn try_into_flat(self) -> Option<FixedWidthDataBlock> {
406 match *self.child {
407 DataBlock::Nullable(_) => None,
409 DataBlock::FixedSizeList(inner) => {
410 let mut flat = inner.try_into_flat()?;
411 flat.bits_per_value *= self.dimension;
412 flat.num_values /= self.dimension;
413 Some(flat)
414 }
415 DataBlock::FixedWidth(mut inner) => {
416 inner.bits_per_value *= self.dimension;
417 inner.num_values /= self.dimension;
418 Some(inner)
419 }
420 _ => panic!(
421 "Expected FixedSizeList or FixedWidth data block but found {:?}",
422 self
423 ),
424 }
425 }
426
427 pub fn flatten_as_fixed(&mut self) -> FixedWidthDataBlock {
428 match self.child.as_mut() {
429 DataBlock::FixedSizeList(fsl) => fsl.flatten_as_fixed(),
430 DataBlock::FixedWidth(fw) => fw.clone(),
431 _ => panic!("Expected FixedSizeList or FixedWidth data block"),
432 }
433 }
434
435 pub fn from_flat(data: FixedWidthDataBlock, data_type: &DataType) -> DataBlock {
437 match data_type {
438 DataType::FixedSizeList(child_field, dimension) => {
439 let mut data = data;
440 data.bits_per_value /= *dimension as u64;
441 data.num_values *= *dimension as u64;
442 let child_data = Self::from_flat(data, child_field.data_type());
443 DataBlock::FixedSizeList(Self {
444 child: Box::new(child_data),
445 dimension: *dimension as u64,
446 })
447 }
448 _ => DataBlock::FixedWidth(data),
450 }
451 }
452
453 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
454 let num_values = self.num_values();
455 let builder = match &data_type {
456 DataType::FixedSizeList(child_field, _) => {
457 let child_data = self
458 .child
459 .into_arrow(child_field.data_type().clone(), validate)?;
460 ArrayDataBuilder::new(data_type)
461 .add_child_data(child_data)
462 .len(num_values as usize)
463 .null_count(0)
464 }
465 _ => panic!("Expected FixedSizeList data type and got {:?}", data_type),
466 };
467 if validate {
468 Ok(builder.build()?)
469 } else {
470 Ok(unsafe { builder.build_unchecked() })
471 }
472 }
473
474 fn into_buffers(self) -> Vec<LanceBuffer> {
475 self.child.into_buffers()
476 }
477
478 fn data_size(&self) -> u64 {
479 self.child.data_size()
480 }
481}
482
483#[derive(Debug)]
484struct FixedSizeListBlockBuilder {
485 inner: Box<dyn DataBlockBuilderImpl>,
486 dimension: u64,
487}
488
489impl FixedSizeListBlockBuilder {
490 fn new(inner: Box<dyn DataBlockBuilderImpl>, dimension: u64) -> Self {
491 Self { inner, dimension }
492 }
493}
494
495impl DataBlockBuilderImpl for FixedSizeListBlockBuilder {
496 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
497 let selection = selection.start * self.dimension..selection.end * self.dimension;
498 let fsl = data_block.as_fixed_size_list_ref().unwrap();
499 self.inner.append(fsl.child.as_ref(), selection);
500 }
501
502 fn finish(self: Box<Self>) -> DataBlock {
503 let inner_block = self.inner.finish();
504 DataBlock::FixedSizeList(FixedSizeListBlock {
505 child: Box::new(inner_block),
506 dimension: self.dimension,
507 })
508 }
509}
510
511#[derive(Debug)]
512struct NullableDataBlockBuilder {
513 inner: Box<dyn DataBlockBuilderImpl>,
514 validity: BooleanBufferBuilder,
515}
516
517impl NullableDataBlockBuilder {
518 fn new(inner: Box<dyn DataBlockBuilderImpl>, estimated_size_bytes: usize) -> Self {
519 Self {
520 inner,
521 validity: BooleanBufferBuilder::new(estimated_size_bytes * 8),
522 }
523 }
524}
525
526impl DataBlockBuilderImpl for NullableDataBlockBuilder {
527 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
528 let nullable = data_block.as_nullable_ref().unwrap();
529 let bool_buf = BooleanBuffer::new(
530 nullable.nulls.clone().into_buffer(),
531 selection.start as usize,
532 (selection.end - selection.start) as usize,
533 );
534 self.validity.append_buffer(&bool_buf);
535 self.inner.append(nullable.data.as_ref(), selection);
536 }
537
538 fn finish(mut self: Box<Self>) -> DataBlock {
539 let inner_block = self.inner.finish();
540 DataBlock::Nullable(NullableDataBlock {
541 data: Box::new(inner_block),
542 nulls: LanceBuffer::from(self.validity.finish().into_inner()),
543 block_info: BlockInfo::new(),
544 })
545 }
546}
547
548#[derive(Debug, Clone)]
552pub struct OpaqueBlock {
553 pub buffers: Vec<LanceBuffer>,
554 pub num_values: u64,
555 pub block_info: BlockInfo,
556}
557
558impl OpaqueBlock {
559 pub fn data_size(&self) -> u64 {
560 self.buffers.iter().map(|b| b.len() as u64).sum()
561 }
562}
563
564#[derive(Debug, Clone)]
566pub struct VariableWidthBlock {
567 pub data: LanceBuffer,
569 pub offsets: LanceBuffer,
573 pub bits_per_offset: u8,
575 pub num_values: u64,
577
578 pub block_info: BlockInfo,
579}
580
581impl VariableWidthBlock {
582 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
583 let data_buffer = self.data.into_buffer();
584 let offsets_buffer = self.offsets.into_buffer();
585 let builder = ArrayDataBuilder::new(data_type)
586 .add_buffer(offsets_buffer)
587 .add_buffer(data_buffer)
588 .len(self.num_values as usize)
589 .null_count(0);
590 if validate {
591 Ok(builder.build()?)
592 } else {
593 Ok(unsafe { builder.build_unchecked() })
594 }
595 }
596
597 fn into_buffers(self) -> Vec<LanceBuffer> {
598 vec![self.offsets, self.data]
599 }
600
601 pub fn offsets_as_block(&mut self) -> DataBlock {
602 let offsets = self.offsets.clone();
603 DataBlock::FixedWidth(FixedWidthDataBlock {
604 data: offsets,
605 bits_per_value: self.bits_per_offset as u64,
606 num_values: self.num_values + 1,
607 block_info: BlockInfo::new(),
608 })
609 }
610
611 pub fn data_size(&self) -> u64 {
612 (self.data.len() + self.offsets.len()) as u64
613 }
614}
615
616#[derive(Debug, Clone)]
618pub struct StructDataBlock {
619 pub children: Vec<DataBlock>,
621 pub block_info: BlockInfo,
622 pub validity: Option<NullBuffer>,
624}
625
626impl StructDataBlock {
627 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
628 if let DataType::Struct(fields) = &data_type {
629 let mut builder = ArrayDataBuilder::new(DataType::Struct(fields.clone()));
630 let mut num_rows = 0;
631 for (field, child) in fields.iter().zip(self.children) {
632 let child_data = child.into_arrow(field.data_type().clone(), validate)?;
633 num_rows = child_data.len();
634 builder = builder.add_child_data(child_data);
635 }
636
637 let builder = if let Some(validity) = self.validity {
639 let null_count = validity.null_count();
640 builder
641 .null_bit_buffer(Some(validity.into_inner().into_inner()))
642 .null_count(null_count)
643 } else {
644 builder.null_count(0)
645 };
646
647 let builder = builder.len(num_rows);
648 if validate {
649 Ok(builder.build()?)
650 } else {
651 Ok(unsafe { builder.build_unchecked() })
652 }
653 } else {
654 Err(Error::internal(format!(
655 "Expected Struct, got {:?}",
656 data_type
657 )))
658 }
659 }
660
661 fn remove_outer_validity(self) -> Self {
662 Self {
663 children: self
664 .children
665 .into_iter()
666 .map(|c| c.remove_outer_validity())
667 .collect(),
668 block_info: self.block_info,
669 validity: None, }
671 }
672
673 fn into_buffers(self) -> Vec<LanceBuffer> {
674 self.children
675 .into_iter()
676 .flat_map(|c| c.into_buffers())
677 .collect()
678 }
679
680 pub fn has_variable_width_child(&self) -> bool {
681 self.children
682 .iter()
683 .any(|child| !matches!(child, DataBlock::FixedWidth(_)))
684 }
685
686 pub fn data_size(&self) -> u64 {
687 self.children
688 .iter()
689 .map(|data_block| data_block.data_size())
690 .sum()
691 }
692}
693
694#[derive(Debug, Clone)]
696pub struct DictionaryDataBlock {
697 pub indices: FixedWidthDataBlock,
699 pub dictionary: Box<DataBlock>,
701}
702
703impl DictionaryDataBlock {
704 fn decode_helper<K: ArrowDictionaryKeyType>(self) -> Result<DataBlock> {
705 if self.indices.num_values == 0 {
708 return Ok(DataBlock::AllNull(AllNullDataBlock { num_values: 0 }));
709 }
710
711 let estimated_size_bytes = self.dictionary.data_size()
713 * (self.indices.num_values + self.dictionary.num_values() - 1)
714 / self.dictionary.num_values();
715 let mut data_builder = DataBlockBuilder::with_capacity_estimate(estimated_size_bytes);
716
717 let indices = self.indices.data.borrow_to_typed_slice::<K::Native>();
718 let indices = indices.as_ref();
719
720 indices
721 .iter()
722 .map(|idx| idx.to_usize().unwrap() as u64)
723 .for_each(|idx| {
724 data_builder.append(&self.dictionary, idx..idx + 1);
725 });
726
727 Ok(data_builder.finish())
728 }
729
730 pub fn decode(self) -> Result<DataBlock> {
731 match self.indices.bits_per_value {
732 8 => self.decode_helper::<UInt8Type>(),
733 16 => self.decode_helper::<UInt16Type>(),
734 32 => self.decode_helper::<UInt32Type>(),
735 64 => self.decode_helper::<UInt64Type>(),
736 _ => Err(lance_core::Error::internal(format!(
737 "Unsupported dictionary index bit width: {} bits",
738 self.indices.bits_per_value
739 ))),
740 }
741 }
742
743 fn into_arrow_dict(
744 self,
745 key_type: Box<DataType>,
746 value_type: Box<DataType>,
747 validate: bool,
748 ) -> Result<ArrayData> {
749 let indices = self.indices.into_arrow((*key_type).clone(), validate)?;
750 let dictionary = self
751 .dictionary
752 .into_arrow((*value_type).clone(), validate)?;
753
754 let builder = indices
755 .into_builder()
756 .add_child_data(dictionary)
757 .data_type(DataType::Dictionary(key_type, value_type));
758
759 if validate {
760 Ok(builder.build()?)
761 } else {
762 Ok(unsafe { builder.build_unchecked() })
763 }
764 }
765
766 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
767 if let DataType::Dictionary(key_type, value_type) = data_type {
768 self.into_arrow_dict(key_type, value_type, validate)
769 } else {
770 self.decode()?.into_arrow(data_type, validate)
771 }
772 }
773
774 fn into_buffers(self) -> Vec<LanceBuffer> {
775 let mut buffers = self.indices.into_buffers();
776 buffers.extend(self.dictionary.into_buffers());
777 buffers
778 }
779
780 pub fn into_parts(self) -> (DataBlock, DataBlock) {
781 (DataBlock::FixedWidth(self.indices), *self.dictionary)
782 }
783
784 pub fn from_parts(indices: FixedWidthDataBlock, dictionary: DataBlock) -> Self {
785 Self {
786 indices,
787 dictionary: Box::new(dictionary),
788 }
789 }
790}
791
792#[derive(Debug, Clone)]
807pub enum DataBlock {
808 Empty(),
809 Constant(ConstantDataBlock),
810 AllNull(AllNullDataBlock),
811 Nullable(NullableDataBlock),
812 FixedWidth(FixedWidthDataBlock),
813 FixedSizeList(FixedSizeListBlock),
814 VariableWidth(VariableWidthBlock),
815 Opaque(OpaqueBlock),
816 Struct(StructDataBlock),
817 Dictionary(DictionaryDataBlock),
818}
819
820impl DataBlock {
821 pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
823 match self {
824 Self::Empty() => Ok(new_empty_array(&data_type).to_data()),
825 Self::Constant(inner) => inner.into_arrow(data_type, validate),
826 Self::AllNull(inner) => inner.into_arrow(data_type, validate),
827 Self::Nullable(inner) => inner.into_arrow(data_type, validate),
828 Self::FixedWidth(inner) => inner.into_arrow(data_type, validate),
829 Self::FixedSizeList(inner) => inner.into_arrow(data_type, validate),
830 Self::VariableWidth(inner) => inner.into_arrow(data_type, validate),
831 Self::Struct(inner) => inner.into_arrow(data_type, validate),
832 Self::Dictionary(inner) => inner.into_arrow(data_type, validate),
833 Self::Opaque(_) => Err(Error::internal(
834 "Cannot convert OpaqueBlock to Arrow".to_string(),
835 )),
836 }
837 }
838
839 pub fn into_buffers(self) -> Vec<LanceBuffer> {
843 match self {
844 Self::Empty() => Vec::default(),
845 Self::Constant(inner) => inner.into_buffers(),
846 Self::AllNull(inner) => inner.into_buffers(),
847 Self::Nullable(inner) => inner.into_buffers(),
848 Self::FixedWidth(inner) => inner.into_buffers(),
849 Self::FixedSizeList(inner) => inner.into_buffers(),
850 Self::VariableWidth(inner) => inner.into_buffers(),
851 Self::Struct(inner) => inner.into_buffers(),
852 Self::Dictionary(inner) => inner.into_buffers(),
853 Self::Opaque(inner) => inner.buffers,
854 }
855 }
856
857 pub fn try_clone(&self) -> Result<Self> {
866 match self {
867 Self::Empty() => Ok(Self::Empty()),
868 Self::Constant(inner) => Ok(Self::Constant(inner.clone())),
869 Self::AllNull(inner) => Ok(Self::AllNull(inner.clone())),
870 Self::Nullable(inner) => Ok(Self::Nullable(inner.clone())),
871 Self::FixedWidth(inner) => Ok(Self::FixedWidth(inner.clone())),
872 Self::FixedSizeList(inner) => Ok(Self::FixedSizeList(inner.clone())),
873 Self::VariableWidth(inner) => Ok(Self::VariableWidth(inner.clone())),
874 Self::Struct(inner) => Ok(Self::Struct(inner.clone())),
875 Self::Dictionary(inner) => Ok(Self::Dictionary(inner.clone())),
876 Self::Opaque(inner) => Ok(Self::Opaque(inner.clone())),
877 }
878 }
879
880 pub fn name(&self) -> &'static str {
881 match self {
882 Self::Constant(_) => "Constant",
883 Self::Empty() => "Empty",
884 Self::AllNull(_) => "AllNull",
885 Self::Nullable(_) => "Nullable",
886 Self::FixedWidth(_) => "FixedWidth",
887 Self::FixedSizeList(_) => "FixedSizeList",
888 Self::VariableWidth(_) => "VariableWidth",
889 Self::Struct(_) => "Struct",
890 Self::Dictionary(_) => "Dictionary",
891 Self::Opaque(_) => "Opaque",
892 }
893 }
894
895 pub fn is_variable(&self) -> bool {
896 match self {
897 Self::Constant(_) => false,
898 Self::Empty() => false,
899 Self::AllNull(_) => false,
900 Self::Nullable(nullable) => nullable.data.is_variable(),
901 Self::FixedWidth(_) => false,
902 Self::FixedSizeList(fsl) => fsl.child.is_variable(),
903 Self::VariableWidth(_) => true,
904 Self::Struct(strct) => strct.children.iter().any(|c| c.is_variable()),
905 Self::Dictionary(_) => {
906 todo!("is_variable for DictionaryDataBlock is not implemented yet")
907 }
908 Self::Opaque(_) => panic!("Does not make sense to ask if an Opaque block is variable"),
909 }
910 }
911
912 pub fn is_nullable(&self) -> bool {
913 match self {
914 Self::AllNull(_) => true,
915 Self::Nullable(_) => true,
916 Self::FixedSizeList(fsl) => fsl.child.is_nullable(),
917 Self::Struct(strct) => strct.children.iter().any(|c| c.is_nullable()),
918 Self::Dictionary(_) => {
919 todo!("is_nullable for DictionaryDataBlock is not implemented yet")
920 }
921 Self::Opaque(_) => panic!("Does not make sense to ask if an Opaque block is nullable"),
922 _ => false,
923 }
924 }
925
926 pub fn num_values(&self) -> u64 {
931 match self {
932 Self::Empty() => 0,
933 Self::Constant(inner) => inner.num_values,
934 Self::AllNull(inner) => inner.num_values,
935 Self::Nullable(inner) => inner.data.num_values(),
936 Self::FixedWidth(inner) => inner.num_values,
937 Self::FixedSizeList(inner) => inner.num_values(),
938 Self::VariableWidth(inner) => inner.num_values,
939 Self::Struct(inner) => inner.children[0].num_values(),
940 Self::Dictionary(inner) => inner.indices.num_values,
941 Self::Opaque(inner) => inner.num_values,
942 }
943 }
944
945 pub fn items_per_row(&self) -> u64 {
949 match self {
950 Self::Empty() => todo!(), Self::Constant(_) => todo!(), Self::AllNull(_) => todo!(), Self::Nullable(nullable) => nullable.data.items_per_row(),
954 Self::FixedWidth(_) => 1,
955 Self::FixedSizeList(fsl) => fsl.dimension * fsl.child.items_per_row(),
956 Self::VariableWidth(_) => 1,
957 Self::Struct(_) => todo!(), Self::Dictionary(_) => 1,
959 Self::Opaque(_) => 1,
960 }
961 }
962
963 pub fn data_size(&self) -> u64 {
965 match self {
966 Self::Empty() => 0,
967 Self::Constant(inner) => inner.data_size(),
968 Self::AllNull(_) => 0,
969 Self::Nullable(inner) => inner.data_size(),
970 Self::FixedWidth(inner) => inner.data_size(),
971 Self::FixedSizeList(inner) => inner.data_size(),
972 Self::VariableWidth(inner) => inner.data_size(),
973 Self::Struct(inner) => inner.children.iter().map(|child| child.data_size()).sum(),
974 Self::Dictionary(inner) => inner.indices.data_size() + inner.dictionary.data_size(),
975 Self::Opaque(inner) => inner.data_size(),
976 }
977 }
978
979 pub fn remove_outer_validity(self) -> Self {
987 match self {
988 Self::AllNull(_) => panic!("Cannot remove validity on all-null data"),
989 Self::Nullable(inner) => *inner.data,
990 Self::Struct(inner) => Self::Struct(inner.remove_outer_validity()),
991 other => other,
992 }
993 }
994
995 pub fn make_builder(&self, estimated_size_bytes: u64) -> Box<dyn DataBlockBuilderImpl> {
996 match self {
997 Self::FixedWidth(inner) => {
998 if inner.bits_per_value == 1 {
999 Box::new(BitmapDataBlockBuilder::new(estimated_size_bytes))
1000 } else {
1001 Box::new(FixedWidthDataBlockBuilder::new(
1002 inner.bits_per_value,
1003 estimated_size_bytes,
1004 ))
1005 }
1006 }
1007 Self::VariableWidth(inner) => {
1008 if inner.bits_per_offset == 32 {
1009 Box::new(VariableWidthDataBlockBuilder::<i32>::new(
1010 estimated_size_bytes,
1011 ))
1012 } else if inner.bits_per_offset == 64 {
1013 Box::new(VariableWidthDataBlockBuilder::<i64>::new(
1014 estimated_size_bytes,
1015 ))
1016 } else {
1017 todo!()
1018 }
1019 }
1020 Self::FixedSizeList(inner) => {
1021 let inner_builder = inner.child.make_builder(estimated_size_bytes);
1022 Box::new(FixedSizeListBlockBuilder::new(
1023 inner_builder,
1024 inner.dimension,
1025 ))
1026 }
1027 Self::Nullable(nullable) => {
1028 let estimated_validity_size_bytes = estimated_size_bytes / 16;
1031 let inner_builder = nullable
1032 .data
1033 .make_builder(estimated_size_bytes - estimated_validity_size_bytes);
1034 Box::new(NullableDataBlockBuilder::new(
1035 inner_builder,
1036 estimated_validity_size_bytes as usize,
1037 ))
1038 }
1039 Self::Struct(struct_data_block) => {
1040 let num_children = struct_data_block.children.len();
1041 let per_child_estimate = if num_children == 0 {
1042 0
1043 } else {
1044 estimated_size_bytes / num_children as u64
1045 };
1046 let child_builders = struct_data_block
1047 .children
1048 .iter()
1049 .map(|child| child.make_builder(per_child_estimate))
1050 .collect();
1051 Box::new(StructDataBlockBuilder::new(child_builders))
1052 }
1053 Self::AllNull(_) => Box::new(AllNullDataBlockBuilder::default()),
1054 _ => todo!("make_builder for {:?}", self),
1055 }
1056 }
1057}
1058
1059macro_rules! as_type {
1060 ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1061 pub fn $fn_name(self) -> Option<$inner_type> {
1062 match self {
1063 Self::$inner(inner) => Some(inner),
1064 _ => None,
1065 }
1066 }
1067 };
1068}
1069
1070macro_rules! as_type_ref {
1071 ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1072 pub fn $fn_name(&self) -> Option<&$inner_type> {
1073 match self {
1074 Self::$inner(inner) => Some(inner),
1075 _ => None,
1076 }
1077 }
1078 };
1079}
1080
1081macro_rules! as_type_ref_mut {
1082 ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1083 pub fn $fn_name(&mut self) -> Option<&mut $inner_type> {
1084 match self {
1085 Self::$inner(inner) => Some(inner),
1086 _ => None,
1087 }
1088 }
1089 };
1090}
1091
1092impl DataBlock {
1094 as_type!(as_all_null, AllNull, AllNullDataBlock);
1095 as_type!(as_nullable, Nullable, NullableDataBlock);
1096 as_type!(as_fixed_width, FixedWidth, FixedWidthDataBlock);
1097 as_type!(as_fixed_size_list, FixedSizeList, FixedSizeListBlock);
1098 as_type!(as_variable_width, VariableWidth, VariableWidthBlock);
1099 as_type!(as_struct, Struct, StructDataBlock);
1100 as_type!(as_dictionary, Dictionary, DictionaryDataBlock);
1101 as_type_ref!(as_all_null_ref, AllNull, AllNullDataBlock);
1102 as_type_ref!(as_nullable_ref, Nullable, NullableDataBlock);
1103 as_type_ref!(as_fixed_width_ref, FixedWidth, FixedWidthDataBlock);
1104 as_type_ref!(as_fixed_size_list_ref, FixedSizeList, FixedSizeListBlock);
1105 as_type_ref!(as_variable_width_ref, VariableWidth, VariableWidthBlock);
1106 as_type_ref!(as_struct_ref, Struct, StructDataBlock);
1107 as_type_ref!(as_dictionary_ref, Dictionary, DictionaryDataBlock);
1108 as_type_ref_mut!(as_all_null_ref_mut, AllNull, AllNullDataBlock);
1109 as_type_ref_mut!(as_nullable_ref_mut, Nullable, NullableDataBlock);
1110 as_type_ref_mut!(as_fixed_width_ref_mut, FixedWidth, FixedWidthDataBlock);
1111 as_type_ref_mut!(
1112 as_fixed_size_list_ref_mut,
1113 FixedSizeList,
1114 FixedSizeListBlock
1115 );
1116 as_type_ref_mut!(as_variable_width_ref_mut, VariableWidth, VariableWidthBlock);
1117 as_type_ref_mut!(as_struct_ref_mut, Struct, StructDataBlock);
1118 as_type_ref_mut!(as_dictionary_ref_mut, Dictionary, DictionaryDataBlock);
1119}
1120
1121fn get_byte_range<T: ArrowNativeType>(offsets: &mut LanceBuffer) -> Range<usize> {
1124 let offsets = offsets.borrow_to_typed_slice::<T>();
1125 if offsets.as_ref().is_empty() {
1126 0..0
1127 } else {
1128 offsets.as_ref().first().unwrap().as_usize()..offsets.as_ref().last().unwrap().as_usize()
1129 }
1130}
1131
1132fn stitch_offsets<T: ArrowNativeType + std::ops::Add<Output = T> + std::ops::Sub<Output = T>>(
1138 offsets: Vec<LanceBuffer>,
1139) -> (LanceBuffer, Vec<Range<usize>>) {
1140 if offsets.is_empty() {
1141 return (LanceBuffer::empty(), Vec::default());
1142 }
1143 let len = offsets.iter().map(|b| b.len()).sum::<usize>();
1144 let mut dest = Vec::with_capacity(len);
1148 let mut byte_ranges = Vec::with_capacity(offsets.len());
1149
1150 dest.push(T::from_usize(0).unwrap());
1152
1153 for mut o in offsets.into_iter() {
1154 if !o.is_empty() {
1155 let last_offset = *dest.last().unwrap();
1156 let o = o.borrow_to_typed_slice::<T>();
1157 let start = *o.as_ref().first().unwrap();
1158 dest.extend(o.as_ref()[1..].iter().map(|&x| x + last_offset - start));
1172 }
1173 byte_ranges.push(get_byte_range::<T>(&mut o));
1174 }
1175 (LanceBuffer::reinterpret_vec(dest), byte_ranges)
1176}
1177
1178fn arrow_binary_to_data_block(
1179 arrays: &[ArrayRef],
1180 num_values: u64,
1181 bits_per_offset: u8,
1182) -> DataBlock {
1183 let data_vec = arrays.iter().map(|arr| arr.to_data()).collect::<Vec<_>>();
1184 let bytes_per_offset = bits_per_offset as usize / 8;
1185 let offsets = data_vec
1186 .iter()
1187 .map(|d| {
1188 LanceBuffer::from(
1189 d.buffers()[0].slice_with_length(d.offset(), (d.len() + 1) * bytes_per_offset),
1190 )
1191 })
1192 .collect::<Vec<_>>();
1193 let (offsets, data_ranges) = if bits_per_offset == 32 {
1194 stitch_offsets::<i32>(offsets)
1195 } else {
1196 stitch_offsets::<i64>(offsets)
1197 };
1198 let data = data_vec
1199 .iter()
1200 .zip(data_ranges)
1201 .map(|(d, byte_range)| {
1202 LanceBuffer::from(
1203 d.buffers()[1]
1204 .slice_with_length(byte_range.start, byte_range.end - byte_range.start),
1205 )
1206 })
1207 .collect::<Vec<_>>();
1208 let data = LanceBuffer::concat_into_one(data);
1209 DataBlock::VariableWidth(VariableWidthBlock {
1210 data,
1211 offsets,
1212 bits_per_offset,
1213 num_values,
1214 block_info: BlockInfo::new(),
1215 })
1216}
1217
1218fn encode_flat_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1219 let bytes_per_value = arrays[0].data_type().byte_width();
1220 let mut buffer = Vec::with_capacity(num_values as usize * bytes_per_value);
1221 for arr in arrays {
1222 let data = arr.to_data();
1223 buffer.extend_from_slice(data.buffers()[0].as_slice());
1224 }
1225 LanceBuffer::from(buffer)
1226}
1227
1228fn do_encode_bitmap_data(bitmaps: &[BooleanBuffer], num_values: u64) -> LanceBuffer {
1229 let mut builder = BooleanBufferBuilder::new(num_values as usize);
1230
1231 for buf in bitmaps {
1232 builder.append_buffer(buf);
1233 }
1234
1235 let buffer = builder.finish().into_inner();
1236 LanceBuffer::from(buffer)
1237}
1238
1239fn encode_bitmap_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1240 let bitmaps = arrays
1241 .iter()
1242 .map(|arr| arr.as_boolean().values().clone())
1243 .collect::<Vec<_>>();
1244 do_encode_bitmap_data(&bitmaps, num_values)
1245}
1246
1247fn concat_dict_arrays(arrays: &[ArrayRef]) -> ArrayRef {
1250 let value_type = arrays[0].as_any_dictionary().values().data_type();
1251 let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1252 match arrow_select::concat::concat(&array_refs) {
1253 Ok(array) => array,
1254 Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1255 let upscaled = array_refs
1257 .iter()
1258 .map(|arr| {
1259 match arrow_cast::cast(
1260 *arr,
1261 &DataType::Dictionary(
1262 Box::new(DataType::UInt32),
1263 Box::new(value_type.clone()),
1264 ),
1265 ) {
1266 Ok(arr) => arr,
1267 Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1268 unimplemented!("Dictionary arrays with more than 2^32 unique values")
1270 }
1271 err => err.unwrap(),
1272 }
1273 })
1274 .collect::<Vec<_>>();
1275 let array_refs = upscaled.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1276 match arrow_select::concat::concat(&array_refs) {
1278 Ok(array) => array,
1279 Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1280 unimplemented!("Dictionary arrays with more than 2^32 unique values")
1281 }
1282 err => err.unwrap(),
1283 }
1284 }
1285 err => err.unwrap(),
1287 }
1288}
1289
1290fn max_index_val(index_type: &DataType) -> u64 {
1291 match index_type {
1292 DataType::Int8 => i8::MAX as u64,
1293 DataType::Int16 => i16::MAX as u64,
1294 DataType::Int32 => i32::MAX as u64,
1295 DataType::Int64 => i64::MAX as u64,
1296 DataType::UInt8 => u8::MAX as u64,
1297 DataType::UInt16 => u16::MAX as u64,
1298 DataType::UInt32 => u32::MAX as u64,
1299 DataType::UInt64 => u64::MAX,
1300 _ => panic!("Invalid dictionary index type"),
1301 }
1302}
1303
1304fn arrow_dictionary_to_data_block(arrays: &[ArrayRef], validity: Option<NullBuffer>) -> DataBlock {
1323 let array = concat_dict_arrays(arrays);
1324 let array_dict = array.as_any_dictionary();
1325 let mut indices = array_dict.keys();
1326 let num_values = indices.len() as u64;
1327 let mut values = array_dict.values().clone();
1328 let mut upcast = None;
1330
1331 let indices_block = if let Some(validity) = validity {
1335 let mut first_invalid_index = None;
1339 if let Some(values_validity) = values.nulls() {
1340 first_invalid_index = (!values_validity.inner()).set_indices().next();
1341 }
1342 let first_invalid_index = first_invalid_index.unwrap_or_else(|| {
1343 let null_arr = new_null_array(values.data_type(), 1);
1344 values = arrow_select::concat::concat(&[values.as_ref(), null_arr.as_ref()]).unwrap();
1345 let null_index = values.len() - 1;
1346 let max_index_val = max_index_val(indices.data_type());
1347 if null_index as u64 > max_index_val {
1348 if max_index_val >= u32::MAX as u64 {
1350 unimplemented!("Dictionary arrays with 2^32 unique value (or more) and a null")
1351 }
1352 upcast = Some(arrow_cast::cast(indices, &DataType::UInt32).unwrap());
1353 indices = upcast.as_ref().unwrap();
1354 }
1355 null_index
1356 });
1357 let null_index_arr = arrow_cast::cast(
1359 &UInt64Array::from(vec![first_invalid_index as u64]),
1360 indices.data_type(),
1361 )
1362 .unwrap();
1363
1364 let bytes_per_index = indices.data_type().byte_width();
1365 let bits_per_index = bytes_per_index as u64 * 8;
1366
1367 let null_index_arr = null_index_arr.into_data();
1368 let null_index_bytes = &null_index_arr.buffers()[0];
1369 let mut indices_bytes = indices.to_data().buffers()[0].to_vec();
1371 for invalid_idx in (!validity.inner()).set_indices() {
1372 indices_bytes[invalid_idx * bytes_per_index..(invalid_idx + 1) * bytes_per_index]
1373 .copy_from_slice(null_index_bytes.as_slice());
1374 }
1375 FixedWidthDataBlock {
1376 data: LanceBuffer::from(indices_bytes),
1377 bits_per_value: bits_per_index,
1378 num_values,
1379 block_info: BlockInfo::new(),
1380 }
1381 } else {
1382 FixedWidthDataBlock {
1383 data: LanceBuffer::from(indices.to_data().buffers()[0].clone()),
1384 bits_per_value: indices.data_type().byte_width() as u64 * 8,
1385 num_values,
1386 block_info: BlockInfo::new(),
1387 }
1388 };
1389
1390 let items = DataBlock::from(values);
1391 DataBlock::Dictionary(DictionaryDataBlock {
1392 indices: indices_block,
1393 dictionary: Box::new(items),
1394 })
1395}
1396
1397enum Nullability {
1398 None,
1399 All,
1400 Some(NullBuffer),
1401}
1402
1403impl Nullability {
1404 fn to_option(&self) -> Option<NullBuffer> {
1405 match self {
1406 Self::Some(nulls) => Some(nulls.clone()),
1407 _ => None,
1408 }
1409 }
1410}
1411
1412fn extract_nulls(arrays: &[ArrayRef], num_values: u64) -> Nullability {
1413 let mut has_nulls = false;
1414 let nulls_and_lens = arrays
1415 .iter()
1416 .map(|arr| {
1417 let nulls = arr.logical_nulls();
1418 has_nulls |= nulls.is_some();
1419 (nulls, arr.len())
1420 })
1421 .collect::<Vec<_>>();
1422 if !has_nulls {
1423 return Nullability::None;
1424 }
1425 let mut builder = BooleanBufferBuilder::new(num_values as usize);
1426 let mut num_nulls = 0;
1427 for (null, len) in nulls_and_lens {
1428 if let Some(null) = null {
1429 num_nulls += null.null_count();
1430 builder.append_buffer(&null.into_inner());
1431 } else {
1432 builder.append_n(len, true);
1433 }
1434 }
1435 if num_nulls == num_values as usize {
1436 Nullability::All
1437 } else {
1438 Nullability::Some(NullBuffer::new(builder.finish()))
1439 }
1440}
1441
1442impl DataBlock {
1443 pub fn from_arrays(arrays: &[ArrayRef], num_values: u64) -> Self {
1444 if arrays.is_empty() || num_values == 0 {
1445 return Self::AllNull(AllNullDataBlock { num_values: 0 });
1446 }
1447
1448 let data_type = arrays[0].data_type();
1449 let nulls = extract_nulls(arrays, num_values);
1450
1451 if let Nullability::All = nulls {
1452 return Self::AllNull(AllNullDataBlock { num_values });
1453 }
1454
1455 let mut encoded = match data_type {
1456 DataType::Binary | DataType::Utf8 => arrow_binary_to_data_block(arrays, num_values, 32),
1457 DataType::BinaryView | DataType::Utf8View => {
1458 todo!()
1459 }
1460 DataType::LargeBinary | DataType::LargeUtf8 => {
1461 arrow_binary_to_data_block(arrays, num_values, 64)
1462 }
1463 DataType::Boolean => {
1464 let data = encode_bitmap_data(arrays, num_values);
1465 Self::FixedWidth(FixedWidthDataBlock {
1466 data,
1467 bits_per_value: 1,
1468 num_values,
1469 block_info: BlockInfo::new(),
1470 })
1471 }
1472 DataType::Date32
1473 | DataType::Date64
1474 | DataType::Decimal32(_, _)
1475 | DataType::Decimal64(_, _)
1476 | DataType::Decimal128(_, _)
1477 | DataType::Decimal256(_, _)
1478 | DataType::Duration(_)
1479 | DataType::FixedSizeBinary(_)
1480 | DataType::Float16
1481 | DataType::Float32
1482 | DataType::Float64
1483 | DataType::Int16
1484 | DataType::Int32
1485 | DataType::Int64
1486 | DataType::Int8
1487 | DataType::Interval(_)
1488 | DataType::Time32(_)
1489 | DataType::Time64(_)
1490 | DataType::Timestamp(_, _)
1491 | DataType::UInt16
1492 | DataType::UInt32
1493 | DataType::UInt64
1494 | DataType::UInt8 => {
1495 let data = encode_flat_data(arrays, num_values);
1496 Self::FixedWidth(FixedWidthDataBlock {
1497 data,
1498 bits_per_value: data_type.byte_width() as u64 * 8,
1499 num_values,
1500 block_info: BlockInfo::new(),
1501 })
1502 }
1503 DataType::Null => Self::AllNull(AllNullDataBlock { num_values }),
1504 DataType::Dictionary(_, _) => arrow_dictionary_to_data_block(arrays, nulls.to_option()),
1505 DataType::Struct(fields) => {
1506 let structs = arrays.iter().map(|arr| arr.as_struct()).collect::<Vec<_>>();
1507 let mut children = Vec::with_capacity(fields.len());
1508 for child_idx in 0..fields.len() {
1509 let child_vec = structs
1510 .iter()
1511 .map(|s| s.column(child_idx).clone())
1512 .collect::<Vec<_>>();
1513 children.push(Self::from_arrays(&child_vec, num_values));
1514 }
1515
1516 let validity = match &nulls {
1518 Nullability::None => None,
1519 Nullability::Some(null_buffer) => Some(null_buffer.clone()),
1520 Nullability::All => unreachable!("Should have returned AllNull earlier"),
1521 };
1522
1523 Self::Struct(StructDataBlock {
1524 children,
1525 block_info: BlockInfo::default(),
1526 validity,
1527 })
1528 }
1529 DataType::FixedSizeList(_, dim) => {
1530 let children = arrays
1531 .iter()
1532 .map(|arr| arr.as_fixed_size_list().values().clone())
1533 .collect::<Vec<_>>();
1534 let child_block = Self::from_arrays(&children, num_values * *dim as u64);
1535 Self::FixedSizeList(FixedSizeListBlock {
1536 child: Box::new(child_block),
1537 dimension: *dim as u64,
1538 })
1539 }
1540 DataType::LargeList(_)
1541 | DataType::List(_)
1542 | DataType::ListView(_)
1543 | DataType::LargeListView(_)
1544 | DataType::Map(_, _)
1545 | DataType::RunEndEncoded(_, _)
1546 | DataType::Union(_, _) => {
1547 panic!(
1548 "Field with data type {} cannot be converted to data block",
1549 data_type
1550 )
1551 }
1552 };
1553
1554 encoded.compute_stat();
1556
1557 if !matches!(data_type, DataType::Dictionary(_, _)) {
1558 match nulls {
1559 Nullability::None => encoded,
1560 Nullability::Some(nulls) => Self::Nullable(NullableDataBlock {
1561 data: Box::new(encoded),
1562 nulls: LanceBuffer::from(nulls.into_inner().into_inner()),
1563 block_info: BlockInfo::new(),
1564 }),
1565 _ => unreachable!(),
1566 }
1567 } else {
1568 encoded
1570 }
1571 }
1572
1573 pub fn from_array<T: Array + 'static>(array: T) -> Self {
1574 let num_values = array.len();
1575 Self::from_arrays(&[Arc::new(array)], num_values as u64)
1576 }
1577}
1578
1579impl From<ArrayRef> for DataBlock {
1580 fn from(array: ArrayRef) -> Self {
1581 let num_values = array.len() as u64;
1582 Self::from_arrays(&[array], num_values)
1583 }
1584}
1585
1586pub trait DataBlockBuilderImpl: std::fmt::Debug {
1587 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>);
1588 fn finish(self: Box<Self>) -> DataBlock;
1589}
1590
1591#[derive(Debug)]
1592pub struct DataBlockBuilder {
1593 estimated_size_bytes: u64,
1594 builder: Option<Box<dyn DataBlockBuilderImpl>>,
1595}
1596
1597impl DataBlockBuilder {
1598 pub fn with_capacity_estimate(estimated_size_bytes: u64) -> Self {
1599 Self {
1600 estimated_size_bytes,
1601 builder: None,
1602 }
1603 }
1604
1605 fn get_builder(&mut self, block: &DataBlock) -> &mut dyn DataBlockBuilderImpl {
1606 if self.builder.is_none() {
1607 self.builder = Some(block.make_builder(self.estimated_size_bytes));
1608 }
1609 self.builder.as_mut().unwrap().as_mut()
1610 }
1611
1612 pub fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
1613 self.get_builder(data_block).append(data_block, selection);
1614 }
1615
1616 pub fn finish(self) -> DataBlock {
1617 let builder = self.builder.expect("DataBlockBuilder didn't see any data");
1618 builder.finish()
1619 }
1620}
1621
1622#[cfg(test)]
1623mod tests {
1624 use std::sync::Arc;
1625
1626 use arrow_array::{
1627 ArrayRef, DictionaryArray, Int8Array, LargeBinaryArray, StringArray, UInt8Array,
1628 UInt16Array, make_array, new_null_array,
1629 types::{Int8Type, Int32Type},
1630 };
1631 use arrow_buffer::{BooleanBuffer, NullBuffer};
1632
1633 use arrow_schema::{DataType, Field, Fields};
1634 use lance_datagen::{ArrayGeneratorExt, DEFAULT_SEED, RowCount, array};
1635 use rand::SeedableRng;
1636
1637 use crate::buffer::LanceBuffer;
1638
1639 use super::{AllNullDataBlock, DataBlock};
1640
1641 use arrow_array::Array;
1642
1643 #[test]
1644 fn test_sliced_to_data_block() {
1645 let ints = UInt16Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
1646 let ints = ints.slice(2, 4);
1647 let data = DataBlock::from_array(ints);
1648
1649 let fixed_data = data.as_fixed_width().unwrap();
1650 assert_eq!(fixed_data.num_values, 4);
1651 assert_eq!(fixed_data.data.len(), 8);
1652
1653 let nullable_ints =
1654 UInt16Array::from(vec![Some(0), None, Some(2), None, Some(4), None, Some(6)]);
1655 let nullable_ints = nullable_ints.slice(1, 3);
1656 let data = DataBlock::from_array(nullable_ints);
1657
1658 let nullable = data.as_nullable().unwrap();
1659 assert_eq!(nullable.nulls, LanceBuffer::from(vec![0b00000010]));
1660 }
1661
1662 #[test]
1663 fn test_string_to_data_block() {
1664 let strings1 = StringArray::from(vec![Some("hello"), None, Some("world")]);
1666 let strings2 = StringArray::from(vec![Some("a"), Some("b")]);
1667 let strings3 = StringArray::from(vec![Option::<&'static str>::None, None]);
1668
1669 let arrays = &[strings1, strings2, strings3]
1670 .iter()
1671 .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1672 .collect::<Vec<_>>();
1673
1674 let block = DataBlock::from_arrays(arrays, 7);
1675
1676 assert_eq!(block.num_values(), 7);
1677 let block = block.as_nullable().unwrap();
1678
1679 assert_eq!(block.nulls, LanceBuffer::from(vec![0b00011101]));
1680
1681 let data = block.data.as_variable_width().unwrap();
1682 assert_eq!(
1683 data.offsets,
1684 LanceBuffer::reinterpret_vec(vec![0, 5, 5, 10, 11, 12, 12, 12])
1685 );
1686
1687 assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworldab"));
1688
1689 let strings1 = StringArray::from(vec![Some("a"), Some("bc")]);
1691 let strings2 = StringArray::from(vec![Some("def")]);
1692
1693 let arrays = &[strings1, strings2]
1694 .iter()
1695 .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1696 .collect::<Vec<_>>();
1697
1698 let block = DataBlock::from_arrays(arrays, 3);
1699
1700 assert_eq!(block.num_values(), 3);
1701 let data = block.as_variable_width().unwrap();
1703 assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(vec![0, 1, 3, 6]));
1704 assert_eq!(data.data, LanceBuffer::copy_slice(b"abcdef"));
1705 }
1706
1707 #[test]
1708 fn test_string_sliced() {
1709 let check = |arr: Vec<StringArray>, expected_off: Vec<i32>, expected_data: &[u8]| {
1710 let arrs = arr
1711 .into_iter()
1712 .map(|a| Arc::new(a) as ArrayRef)
1713 .collect::<Vec<_>>();
1714 let num_rows = arrs.iter().map(|a| a.len()).sum::<usize>() as u64;
1715 let data = DataBlock::from_arrays(&arrs, num_rows);
1716
1717 assert_eq!(data.num_values(), num_rows);
1718
1719 let data = data.as_variable_width().unwrap();
1720 assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(expected_off));
1721 assert_eq!(data.data, LanceBuffer::copy_slice(expected_data));
1722 };
1723
1724 let string = StringArray::from(vec![Some("hello"), Some("world")]);
1725 check(vec![string.slice(1, 1)], vec![0, 5], b"world");
1726 check(vec![string.slice(0, 1)], vec![0, 5], b"hello");
1727 check(
1728 vec![string.slice(0, 1), string.slice(1, 1)],
1729 vec![0, 5, 10],
1730 b"helloworld",
1731 );
1732
1733 let string2 = StringArray::from(vec![Some("foo"), Some("bar")]);
1734 check(
1735 vec![string.slice(0, 1), string2.slice(0, 1)],
1736 vec![0, 5, 8],
1737 b"hellofoo",
1738 );
1739 }
1740
1741 #[test]
1742 fn test_large() {
1743 let arr = LargeBinaryArray::from_vec(vec![b"hello", b"world"]);
1744 let data = DataBlock::from_array(arr);
1745
1746 assert_eq!(data.num_values(), 2);
1747 let data = data.as_variable_width().unwrap();
1748 assert_eq!(data.bits_per_offset, 64);
1749 assert_eq!(data.num_values, 2);
1750 assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworld"));
1751 assert_eq!(
1752 data.offsets,
1753 LanceBuffer::reinterpret_vec(vec![0_u64, 5, 10])
1754 );
1755 }
1756
1757 #[test]
1758 fn test_dictionary_indices_normalized() {
1759 let arr1 = DictionaryArray::<Int8Type>::from_iter([Some("a"), Some("a"), Some("b")]);
1760 let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("b"), Some("c")]);
1761
1762 let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1763
1764 assert_eq!(data.num_values(), 5);
1765 let data = data.as_dictionary().unwrap();
1766 let indices = data.indices;
1767 assert_eq!(indices.bits_per_value, 8);
1768 assert_eq!(indices.num_values, 5);
1769 assert_eq!(
1770 indices.data,
1771 LanceBuffer::reinterpret_vec::<i8>(vec![0, 0, 1, 2, 3])
1775 );
1776
1777 let items = data.dictionary.as_variable_width().unwrap();
1778 assert_eq!(items.bits_per_offset, 32);
1779 assert_eq!(items.num_values, 4);
1780 assert_eq!(items.data, LanceBuffer::copy_slice(b"abbc"));
1781 assert_eq!(
1782 items.offsets,
1783 LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 4],)
1784 );
1785 }
1786
1787 #[test]
1788 fn test_dictionary_nulls() {
1789 let arr1 = DictionaryArray::<Int8Type>::from_iter([None, Some("a"), Some("b")]);
1793 let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("c"), None]);
1794
1795 let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1796
1797 let check_common = |data: DataBlock| {
1798 assert_eq!(data.num_values(), 5);
1799 let dict = data.as_dictionary().unwrap();
1800
1801 let nullable_items = dict.dictionary.as_nullable().unwrap();
1802 assert_eq!(nullable_items.nulls, LanceBuffer::from(vec![0b00000111]));
1803 assert_eq!(nullable_items.data.num_values(), 4);
1804
1805 let items = nullable_items.data.as_variable_width().unwrap();
1806 assert_eq!(items.bits_per_offset, 32);
1807 assert_eq!(items.num_values, 4);
1808 assert_eq!(items.data, LanceBuffer::copy_slice(b"abc"));
1809 assert_eq!(
1810 items.offsets,
1811 LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 3],)
1812 );
1813
1814 let indices = dict.indices;
1815 assert_eq!(indices.bits_per_value, 8);
1816 assert_eq!(indices.num_values, 5);
1817 assert_eq!(
1818 indices.data,
1819 LanceBuffer::reinterpret_vec::<i8>(vec![3, 0, 1, 2, 3])
1820 );
1821 };
1822 check_common(data);
1823
1824 let items = StringArray::from(vec![Some("a"), Some("b"), Some("c"), None]);
1826 let indices = Int8Array::from(vec![Some(3), Some(0), Some(1), Some(2), Some(3)]);
1827 let dict = DictionaryArray::new(indices, Arc::new(items));
1828
1829 let data = DataBlock::from_array(dict);
1830
1831 check_common(data);
1832 }
1833
1834 #[test]
1835 fn test_dictionary_cannot_add_null() {
1836 let items = StringArray::from(
1838 (0..256)
1839 .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1840 .collect::<Vec<_>>(),
1841 );
1842 let indices = UInt8Array::from(
1844 (0..=256)
1845 .map(|i| if i == 256 { None } else { Some(i as u8) })
1846 .collect::<Vec<_>>(),
1847 );
1848 let dict = DictionaryArray::new(indices, Arc::new(items));
1851 let data = DataBlock::from_array(dict);
1852
1853 assert_eq!(data.num_values(), 257);
1854
1855 let dict = data.as_dictionary().unwrap();
1856
1857 assert_eq!(dict.indices.bits_per_value, 32);
1858 assert_eq!(
1859 dict.indices.data,
1860 LanceBuffer::reinterpret_vec((0_u32..257).collect::<Vec<_>>())
1861 );
1862
1863 let nullable_items = dict.dictionary.as_nullable().unwrap();
1864 let null_buffer = NullBuffer::new(BooleanBuffer::new(
1865 nullable_items.nulls.into_buffer(),
1866 0,
1867 257,
1868 ));
1869 for i in 0..256 {
1870 assert!(!null_buffer.is_null(i));
1871 }
1872 assert!(null_buffer.is_null(256));
1873
1874 assert_eq!(
1875 nullable_items.data.as_variable_width().unwrap().data.len(),
1876 32640
1877 );
1878 }
1879
1880 #[test]
1881 fn test_all_null() {
1882 for data_type in [
1883 DataType::UInt32,
1884 DataType::FixedSizeBinary(2),
1885 DataType::List(Arc::new(Field::new("item", DataType::UInt32, true))),
1886 DataType::Struct(Fields::from(vec![Field::new("a", DataType::UInt32, true)])),
1887 ] {
1888 let block = DataBlock::AllNull(AllNullDataBlock { num_values: 10 });
1889 let arr = block.into_arrow(data_type.clone(), true).unwrap();
1890 let arr = make_array(arr);
1891 let expected = new_null_array(&data_type, 10);
1892 assert_eq!(&arr, &expected);
1893 }
1894 }
1895
1896 #[test]
1897 fn test_dictionary_cannot_concatenate() {
1898 let items = StringArray::from(
1900 (0..256)
1901 .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1902 .collect::<Vec<_>>(),
1903 );
1904 let other_items = StringArray::from(
1906 (0..256)
1907 .map(|i| Some(String::from_utf8(vec![1; i + 1]).unwrap()))
1908 .collect::<Vec<_>>(),
1909 );
1910 let indices = UInt8Array::from_iter_values(0..=255);
1911 let dict1 = DictionaryArray::new(indices.clone(), Arc::new(items));
1912 let dict2 = DictionaryArray::new(indices, Arc::new(other_items));
1913 let data = DataBlock::from_arrays(&[Arc::new(dict1), Arc::new(dict2)], 512);
1914 assert_eq!(data.num_values(), 512);
1915
1916 let dict = data.as_dictionary().unwrap();
1917
1918 assert_eq!(dict.indices.bits_per_value, 32);
1919 assert_eq!(
1920 dict.indices.data,
1921 LanceBuffer::reinterpret_vec::<u32>((0..512).collect::<Vec<_>>())
1922 );
1923 assert_eq!(
1925 dict.dictionary.as_variable_width().unwrap().data.len(),
1926 65536
1927 );
1928 }
1929
1930 #[test]
1931 fn test_data_size() {
1932 let mut rng = rand_xoshiro::Xoshiro256PlusPlus::seed_from_u64(DEFAULT_SEED.0);
1933 let mut genn = array::rand::<Int32Type>().with_nulls(&[false, false, false]);
1935
1936 let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
1937 let block = DataBlock::from_array(arr.clone());
1938 assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
1939
1940 let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
1941 let block = DataBlock::from_array(arr.clone());
1942 assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
1943
1944 let mut genn = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
1946 let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
1947 let block = DataBlock::from_array(arr.clone());
1948
1949 let array_data = arr.to_data();
1950 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1951 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1953 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1954
1955 let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
1956 let block = DataBlock::from_array(arr.clone());
1957
1958 let array_data = arr.to_data();
1959 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1960 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1961 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1962
1963 let mut genn = array::rand::<Int32Type>().with_nulls(&[true, true, false]);
1964 let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
1965 let block = DataBlock::from_array(arr.clone());
1966
1967 let array_data = arr.to_data();
1968 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1969 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1970 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1971
1972 let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
1973 let block = DataBlock::from_array(arr.clone());
1974
1975 let array_data = arr.to_data();
1976 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1977 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1978 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1979
1980 let mut genn = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
1981 let arr1 = genn.generate(RowCount::from(3), &mut rng).unwrap();
1982 let arr2 = genn.generate(RowCount::from(3), &mut rng).unwrap();
1983 let arr3 = genn.generate(RowCount::from(3), &mut rng).unwrap();
1984 let block = DataBlock::from_arrays(&[arr1.clone(), arr2.clone(), arr3.clone()], 9);
1985
1986 let concatenated_array = arrow_select::concat::concat(&[
1987 &*Arc::new(arr1.clone()) as &dyn Array,
1988 &*Arc::new(arr2.clone()) as &dyn Array,
1989 &*Arc::new(arr3.clone()) as &dyn Array,
1990 ])
1991 .unwrap();
1992 let total_buffer_size: usize = concatenated_array
1993 .to_data()
1994 .buffers()
1995 .iter()
1996 .map(|buffer| buffer.len())
1997 .sum();
1998
1999 let total_nulls_size_in_bytes = concatenated_array.nulls().unwrap().len().div_ceil(8);
2000 assert!(block.data_size() == (total_buffer_size + total_nulls_size_in_bytes) as u64);
2001 }
2002}