1use std::{
18 ops::Range,
19 sync::{Arc, RwLock},
20};
21
22use arrow_array::{
23 cast::AsArray, new_empty_array, new_null_array, Array, ArrayRef, OffsetSizeTrait, UInt64Array,
24};
25use arrow_buffer::{
26 ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer, ScalarBuffer,
27};
28use arrow_data::{ArrayData, ArrayDataBuilder};
29use arrow_schema::DataType;
30use lance_arrow::DataTypeExt;
31use snafu::location;
32
33use lance_core::{Error, Result};
34
35use crate::{
36 buffer::LanceBuffer,
37 statistics::{ComputeStat, Stat},
38};
39
40#[derive(Debug, Clone)]
46pub struct AllNullDataBlock {
47 pub num_values: u64,
49}
50
51impl AllNullDataBlock {
52 fn into_arrow(self, data_type: DataType, _validate: bool) -> Result<ArrayData> {
53 Ok(ArrayData::new_null(&data_type, self.num_values as usize))
54 }
55
56 fn into_buffers(self) -> Vec<LanceBuffer> {
57 vec![]
58 }
59}
60
61use std::collections::HashMap;
62
63#[derive(Debug, Clone)]
66pub struct BlockInfo(pub Arc<RwLock<HashMap<Stat, Arc<dyn Array>>>>);
67
68impl Default for BlockInfo {
69 fn default() -> Self {
70 Self::new()
71 }
72}
73
74impl BlockInfo {
75 pub fn new() -> Self {
76 Self(Arc::new(RwLock::new(HashMap::new())))
77 }
78}
79
80impl PartialEq for BlockInfo {
81 fn eq(&self, other: &Self) -> bool {
82 let self_info = self.0.read().unwrap();
83 let other_info = other.0.read().unwrap();
84 *self_info == *other_info
85 }
86}
87
88#[derive(Debug, Clone)]
94pub struct NullableDataBlock {
95 pub data: Box<DataBlock>,
97 pub nulls: LanceBuffer,
99
100 pub block_info: BlockInfo,
101}
102
103impl NullableDataBlock {
104 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
105 let nulls = self.nulls.into_buffer();
106 let data = self.data.into_arrow(data_type, validate)?.into_builder();
107 let data = data.null_bit_buffer(Some(nulls));
108 if validate {
109 Ok(data.build()?)
110 } else {
111 Ok(unsafe { data.build_unchecked() })
112 }
113 }
114
115 fn into_buffers(self) -> Vec<LanceBuffer> {
116 let mut buffers = vec![self.nulls];
117 buffers.extend(self.data.into_buffers());
118 buffers
119 }
120
121 pub fn data_size(&self) -> u64 {
122 self.data.data_size() + self.nulls.len() as u64
123 }
124}
125
126#[derive(Debug, PartialEq, Clone)]
128pub struct ConstantDataBlock {
129 pub data: LanceBuffer,
131 pub num_values: u64,
133}
134
135impl ConstantDataBlock {
136 fn into_buffers(self) -> Vec<LanceBuffer> {
137 vec![self.data]
138 }
139
140 fn into_arrow(self, _data_type: DataType, _validate: bool) -> Result<ArrayData> {
141 todo!()
144 }
145
146 pub fn try_clone(&self) -> Result<Self> {
147 Ok(Self {
148 data: self.data.clone(),
149 num_values: self.num_values,
150 })
151 }
152
153 pub fn data_size(&self) -> u64 {
154 self.data.len() as u64
155 }
156}
157
158#[derive(Debug, PartialEq, Clone)]
160pub struct FixedWidthDataBlock {
161 pub data: LanceBuffer,
163 pub bits_per_value: u64,
165 pub num_values: u64,
167
168 pub block_info: BlockInfo,
169}
170
171impl FixedWidthDataBlock {
172 fn do_into_arrow(
173 self,
174 data_type: DataType,
175 num_values: u64,
176 validate: bool,
177 ) -> Result<ArrayData> {
178 let data_buffer = self.data.into_buffer();
179 let builder = ArrayDataBuilder::new(data_type)
180 .add_buffer(data_buffer)
181 .len(num_values as usize)
182 .null_count(0);
183 if validate {
184 Ok(builder.build()?)
185 } else {
186 Ok(unsafe { builder.build_unchecked() })
187 }
188 }
189
190 pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
191 let root_num_values = self.num_values;
192 self.do_into_arrow(data_type, root_num_values, validate)
193 }
194
195 pub fn into_buffers(self) -> Vec<LanceBuffer> {
196 vec![self.data]
197 }
198
199 pub fn try_clone(&self) -> Result<Self> {
200 Ok(Self {
201 data: self.data.clone(),
202 bits_per_value: self.bits_per_value,
203 num_values: self.num_values,
204 block_info: self.block_info.clone(),
205 })
206 }
207
208 pub fn data_size(&self) -> u64 {
209 self.data.len() as u64
210 }
211}
212
213#[derive(Debug)]
214pub struct VariableWidthDataBlockBuilder<T: OffsetSizeTrait> {
215 offsets: Vec<T>,
216 bytes: Vec<u8>,
217}
218
219impl<T: OffsetSizeTrait> VariableWidthDataBlockBuilder<T> {
220 fn new(estimated_size_bytes: u64) -> Self {
221 Self {
222 offsets: vec![T::from_usize(0).unwrap()],
223 bytes: Vec::with_capacity(estimated_size_bytes as usize),
224 }
225 }
226}
227
228impl<T: OffsetSizeTrait> DataBlockBuilderImpl for VariableWidthDataBlockBuilder<T> {
229 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
230 let block = data_block.as_variable_width_ref().unwrap();
231 assert!(block.bits_per_offset == T::get_byte_width() as u8 * 8);
232
233 let offsets: ScalarBuffer<T> = block.offsets.clone().borrow_to_typed_slice();
234
235 let start_offset = offsets[selection.start as usize];
236 let end_offset = offsets[selection.end as usize];
237 let mut previous_len = self.bytes.len();
238
239 self.bytes
240 .extend_from_slice(&block.data[start_offset.as_usize()..end_offset.as_usize()]);
241
242 self.offsets.extend(
243 offsets[selection.start as usize..selection.end as usize]
244 .iter()
245 .zip(&offsets[selection.start as usize + 1..=selection.end as usize])
246 .map(|(¤t, &next)| {
247 let this_value_len = next - current;
248 previous_len += this_value_len.as_usize();
249 T::from_usize(previous_len).unwrap()
250 }),
251 );
252 }
253
254 fn finish(self: Box<Self>) -> DataBlock {
255 let num_values = (self.offsets.len() - 1) as u64;
256 DataBlock::VariableWidth(VariableWidthBlock {
257 data: LanceBuffer::from(self.bytes),
258 offsets: LanceBuffer::reinterpret_vec(self.offsets),
259 bits_per_offset: T::get_byte_width() as u8 * 8,
260 num_values,
261 block_info: BlockInfo::new(),
262 })
263 }
264}
265
266#[derive(Debug)]
267struct BitmapDataBlockBuilder {
268 values: BooleanBufferBuilder,
269}
270
271impl BitmapDataBlockBuilder {
272 fn new(estimated_size_bytes: u64) -> Self {
273 Self {
274 values: BooleanBufferBuilder::new(estimated_size_bytes as usize * 8),
275 }
276 }
277}
278
279impl DataBlockBuilderImpl for BitmapDataBlockBuilder {
280 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
281 let bitmap_blk = data_block.as_fixed_width_ref().unwrap();
282 self.values.append_packed_range(
283 selection.start as usize..selection.end as usize,
284 &bitmap_blk.data,
285 );
286 }
287
288 fn finish(mut self: Box<Self>) -> DataBlock {
289 let bool_buf = self.values.finish();
290 let num_values = bool_buf.len() as u64;
291 let bits_buf = bool_buf.into_inner();
292 DataBlock::FixedWidth(FixedWidthDataBlock {
293 data: LanceBuffer::from(bits_buf),
294 bits_per_value: 1,
295 num_values,
296 block_info: BlockInfo::new(),
297 })
298 }
299}
300
301#[derive(Debug)]
302struct FixedWidthDataBlockBuilder {
303 bits_per_value: u64,
304 bytes_per_value: u64,
305 values: Vec<u8>,
306}
307
308impl FixedWidthDataBlockBuilder {
309 fn new(bits_per_value: u64, estimated_size_bytes: u64) -> Self {
310 assert!(bits_per_value % 8 == 0);
311 Self {
312 bits_per_value,
313 bytes_per_value: bits_per_value / 8,
314 values: Vec::with_capacity(estimated_size_bytes as usize),
315 }
316 }
317}
318
319impl DataBlockBuilderImpl for FixedWidthDataBlockBuilder {
320 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
321 let block = data_block.as_fixed_width_ref().unwrap();
322 assert_eq!(self.bits_per_value, block.bits_per_value);
323 let start = selection.start as usize * self.bytes_per_value as usize;
324 let end = selection.end as usize * self.bytes_per_value as usize;
325 self.values.extend_from_slice(&block.data[start..end]);
326 }
327
328 fn finish(self: Box<Self>) -> DataBlock {
329 let num_values = (self.values.len() / self.bytes_per_value as usize) as u64;
330 DataBlock::FixedWidth(FixedWidthDataBlock {
331 data: LanceBuffer::from(self.values),
332 bits_per_value: self.bits_per_value,
333 num_values,
334 block_info: BlockInfo::new(),
335 })
336 }
337}
338
339#[derive(Debug)]
340struct StructDataBlockBuilder {
341 children: Vec<Box<dyn DataBlockBuilderImpl>>,
342}
343
344impl StructDataBlockBuilder {
345 fn new(bits_per_values: Vec<u32>, estimated_size_bytes: u64) -> Self {
348 let mut children = vec![];
349
350 debug_assert!(bits_per_values.iter().all(|bpv| bpv % 8 == 0));
351
352 let bytes_per_row: u32 = bits_per_values.iter().sum::<u32>() / 8;
353 let bytes_per_row = bytes_per_row as u64;
354
355 for bits_per_value in bits_per_values.iter() {
356 let this_estimated_size_bytes =
357 estimated_size_bytes / bytes_per_row * (*bits_per_value as u64) / 8;
358 let child =
359 FixedWidthDataBlockBuilder::new(*bits_per_value as u64, this_estimated_size_bytes);
360 children.push(Box::new(child) as Box<dyn DataBlockBuilderImpl>);
361 }
362 Self { children }
363 }
364}
365
366impl DataBlockBuilderImpl for StructDataBlockBuilder {
367 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
368 let data_block = data_block.as_struct_ref().unwrap();
369 for i in 0..self.children.len() {
370 self.children[i].append(&data_block.children[i], selection.clone());
371 }
372 }
373
374 fn finish(self: Box<Self>) -> DataBlock {
375 let mut children_data_block = Vec::new();
376 for child in self.children {
377 let child_data_block = child.finish();
378 children_data_block.push(child_data_block);
379 }
380 DataBlock::Struct(StructDataBlock {
381 children: children_data_block,
382 block_info: BlockInfo::new(),
383 validity: None,
384 })
385 }
386}
387#[derive(Debug, Clone)]
389pub struct FixedSizeListBlock {
390 pub child: Box<DataBlock>,
392 pub dimension: u64,
394}
395
396impl FixedSizeListBlock {
397 pub fn num_values(&self) -> u64 {
398 self.child.num_values() / self.dimension
399 }
400
401 pub fn try_into_flat(self) -> Option<FixedWidthDataBlock> {
405 match *self.child {
406 DataBlock::Nullable(_) => None,
408 DataBlock::FixedSizeList(inner) => {
409 let mut flat = inner.try_into_flat()?;
410 flat.bits_per_value *= self.dimension;
411 flat.num_values /= self.dimension;
412 Some(flat)
413 }
414 DataBlock::FixedWidth(mut inner) => {
415 inner.bits_per_value *= self.dimension;
416 inner.num_values /= self.dimension;
417 Some(inner)
418 }
419 _ => panic!(
420 "Expected FixedSizeList or FixedWidth data block but found {:?}",
421 self
422 ),
423 }
424 }
425
426 pub fn flatten_as_fixed(&mut self) -> FixedWidthDataBlock {
427 match self.child.as_mut() {
428 DataBlock::FixedSizeList(fsl) => fsl.flatten_as_fixed(),
429 DataBlock::FixedWidth(fw) => fw.clone(),
430 _ => panic!("Expected FixedSizeList or FixedWidth data block"),
431 }
432 }
433
434 pub fn from_flat(data: FixedWidthDataBlock, data_type: &DataType) -> DataBlock {
436 match data_type {
437 DataType::FixedSizeList(child_field, dimension) => {
438 let mut data = data;
439 data.bits_per_value /= *dimension as u64;
440 data.num_values *= *dimension as u64;
441 let child_data = Self::from_flat(data, child_field.data_type());
442 DataBlock::FixedSizeList(Self {
443 child: Box::new(child_data),
444 dimension: *dimension as u64,
445 })
446 }
447 _ => DataBlock::FixedWidth(data),
449 }
450 }
451
452 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
453 let num_values = self.num_values();
454 let builder = match &data_type {
455 DataType::FixedSizeList(child_field, _) => {
456 let child_data = self
457 .child
458 .into_arrow(child_field.data_type().clone(), validate)?;
459 ArrayDataBuilder::new(data_type)
460 .add_child_data(child_data)
461 .len(num_values as usize)
462 .null_count(0)
463 }
464 _ => panic!("Expected FixedSizeList data type and got {:?}", data_type),
465 };
466 if validate {
467 Ok(builder.build()?)
468 } else {
469 Ok(unsafe { builder.build_unchecked() })
470 }
471 }
472
473 fn into_buffers(self) -> Vec<LanceBuffer> {
474 self.child.into_buffers()
475 }
476
477 fn data_size(&self) -> u64 {
478 self.child.data_size()
479 }
480}
481
482#[derive(Debug)]
483struct FixedSizeListBlockBuilder {
484 inner: Box<dyn DataBlockBuilderImpl>,
485 dimension: u64,
486}
487
488impl FixedSizeListBlockBuilder {
489 fn new(inner: Box<dyn DataBlockBuilderImpl>, dimension: u64) -> Self {
490 Self { inner, dimension }
491 }
492}
493
494impl DataBlockBuilderImpl for FixedSizeListBlockBuilder {
495 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
496 let selection = selection.start * self.dimension..selection.end * self.dimension;
497 let fsl = data_block.as_fixed_size_list_ref().unwrap();
498 self.inner.append(fsl.child.as_ref(), selection);
499 }
500
501 fn finish(self: Box<Self>) -> DataBlock {
502 let inner_block = self.inner.finish();
503 DataBlock::FixedSizeList(FixedSizeListBlock {
504 child: Box::new(inner_block),
505 dimension: self.dimension,
506 })
507 }
508}
509
510#[derive(Debug)]
511struct NullableDataBlockBuilder {
512 inner: Box<dyn DataBlockBuilderImpl>,
513 validity: BooleanBufferBuilder,
514}
515
516impl NullableDataBlockBuilder {
517 fn new(inner: Box<dyn DataBlockBuilderImpl>, estimated_size_bytes: usize) -> Self {
518 Self {
519 inner,
520 validity: BooleanBufferBuilder::new(estimated_size_bytes * 8),
521 }
522 }
523}
524
525impl DataBlockBuilderImpl for NullableDataBlockBuilder {
526 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
527 let nullable = data_block.as_nullable_ref().unwrap();
528 let bool_buf = BooleanBuffer::new(
529 nullable.nulls.clone().into_buffer(),
530 selection.start as usize,
531 (selection.end - selection.start) as usize,
532 );
533 self.validity.append_buffer(&bool_buf);
534 self.inner.append(nullable.data.as_ref(), selection);
535 }
536
537 fn finish(mut self: Box<Self>) -> DataBlock {
538 let inner_block = self.inner.finish();
539 DataBlock::Nullable(NullableDataBlock {
540 data: Box::new(inner_block),
541 nulls: LanceBuffer::from(self.validity.finish().into_inner()),
542 block_info: BlockInfo::new(),
543 })
544 }
545}
546
547#[derive(Debug, Clone)]
551pub struct OpaqueBlock {
552 pub buffers: Vec<LanceBuffer>,
553 pub num_values: u64,
554 pub block_info: BlockInfo,
555}
556
557impl OpaqueBlock {
558 pub fn data_size(&self) -> u64 {
559 self.buffers.iter().map(|b| b.len() as u64).sum()
560 }
561}
562
563#[derive(Debug, Clone)]
565pub struct VariableWidthBlock {
566 pub data: LanceBuffer,
568 pub offsets: LanceBuffer,
572 pub bits_per_offset: u8,
574 pub num_values: u64,
576
577 pub block_info: BlockInfo,
578}
579
580impl VariableWidthBlock {
581 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
582 let data_buffer = self.data.into_buffer();
583 let offsets_buffer = self.offsets.into_buffer();
584 let builder = ArrayDataBuilder::new(data_type)
585 .add_buffer(offsets_buffer)
586 .add_buffer(data_buffer)
587 .len(self.num_values as usize)
588 .null_count(0);
589 if validate {
590 Ok(builder.build()?)
591 } else {
592 Ok(unsafe { builder.build_unchecked() })
593 }
594 }
595
596 fn into_buffers(self) -> Vec<LanceBuffer> {
597 vec![self.offsets, self.data]
598 }
599
600 pub fn offsets_as_block(&mut self) -> DataBlock {
601 let offsets = self.offsets.clone();
602 DataBlock::FixedWidth(FixedWidthDataBlock {
603 data: offsets,
604 bits_per_value: self.bits_per_offset as u64,
605 num_values: self.num_values + 1,
606 block_info: BlockInfo::new(),
607 })
608 }
609
610 pub fn data_size(&self) -> u64 {
611 (self.data.len() + self.offsets.len()) as u64
612 }
613}
614
615#[derive(Debug, Clone)]
617pub struct StructDataBlock {
618 pub children: Vec<DataBlock>,
620 pub block_info: BlockInfo,
621 pub validity: Option<NullBuffer>,
623}
624
625impl StructDataBlock {
626 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
627 if let DataType::Struct(fields) = &data_type {
628 let mut builder = ArrayDataBuilder::new(DataType::Struct(fields.clone()));
629 let mut num_rows = 0;
630 for (field, child) in fields.iter().zip(self.children) {
631 let child_data = child.into_arrow(field.data_type().clone(), validate)?;
632 num_rows = child_data.len();
633 builder = builder.add_child_data(child_data);
634 }
635
636 let builder = if let Some(validity) = self.validity {
638 let null_count = validity.null_count();
639 builder
640 .null_bit_buffer(Some(validity.into_inner().into_inner()))
641 .null_count(null_count)
642 } else {
643 builder.null_count(0)
644 };
645
646 let builder = builder.len(num_rows);
647 if validate {
648 Ok(builder.build()?)
649 } else {
650 Ok(unsafe { builder.build_unchecked() })
651 }
652 } else {
653 Err(Error::Internal {
654 message: format!("Expected Struct, got {:?}", data_type),
655 location: location!(),
656 })
657 }
658 }
659
660 fn remove_outer_validity(self) -> Self {
661 Self {
662 children: self
663 .children
664 .into_iter()
665 .map(|c| c.remove_outer_validity())
666 .collect(),
667 block_info: self.block_info,
668 validity: None, }
670 }
671
672 fn into_buffers(self) -> Vec<LanceBuffer> {
673 self.children
674 .into_iter()
675 .flat_map(|c| c.into_buffers())
676 .collect()
677 }
678
679 pub fn data_size(&self) -> u64 {
680 self.children
681 .iter()
682 .map(|data_block| data_block.data_size())
683 .sum()
684 }
685}
686
687#[derive(Debug, Clone)]
689pub struct DictionaryDataBlock {
690 pub indices: FixedWidthDataBlock,
692 pub dictionary: Box<DataBlock>,
694}
695
696impl DictionaryDataBlock {
697 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
698 let (key_type, value_type) = if let DataType::Dictionary(key_type, value_type) = &data_type
699 {
700 (key_type.as_ref().clone(), value_type.as_ref().clone())
701 } else {
702 return Err(Error::Internal {
703 message: format!("Expected Dictionary, got {:?}", data_type),
704 location: location!(),
705 });
706 };
707
708 let indices = self.indices.into_arrow(key_type, validate)?;
709 let dictionary = self.dictionary.into_arrow(value_type, validate)?;
710
711 let builder = indices
712 .into_builder()
713 .add_child_data(dictionary)
714 .data_type(data_type);
715
716 if validate {
717 Ok(builder.build()?)
718 } else {
719 Ok(unsafe { builder.build_unchecked() })
720 }
721 }
722
723 fn into_buffers(self) -> Vec<LanceBuffer> {
724 let mut buffers = self.indices.into_buffers();
725 buffers.extend(self.dictionary.into_buffers());
726 buffers
727 }
728}
729
730#[derive(Debug, Clone)]
745pub enum DataBlock {
746 Empty(),
747 Constant(ConstantDataBlock),
748 AllNull(AllNullDataBlock),
749 Nullable(NullableDataBlock),
750 FixedWidth(FixedWidthDataBlock),
751 FixedSizeList(FixedSizeListBlock),
752 VariableWidth(VariableWidthBlock),
753 Opaque(OpaqueBlock),
754 Struct(StructDataBlock),
755 Dictionary(DictionaryDataBlock),
756}
757
758impl DataBlock {
759 pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
761 match self {
762 Self::Empty() => Ok(new_empty_array(&data_type).to_data()),
763 Self::Constant(inner) => inner.into_arrow(data_type, validate),
764 Self::AllNull(inner) => inner.into_arrow(data_type, validate),
765 Self::Nullable(inner) => inner.into_arrow(data_type, validate),
766 Self::FixedWidth(inner) => inner.into_arrow(data_type, validate),
767 Self::FixedSizeList(inner) => inner.into_arrow(data_type, validate),
768 Self::VariableWidth(inner) => inner.into_arrow(data_type, validate),
769 Self::Struct(inner) => inner.into_arrow(data_type, validate),
770 Self::Dictionary(inner) => inner.into_arrow(data_type, validate),
771 Self::Opaque(_) => Err(Error::Internal {
772 message: "Cannot convert OpaqueBlock to Arrow".to_string(),
773 location: location!(),
774 }),
775 }
776 }
777
778 pub fn into_buffers(self) -> Vec<LanceBuffer> {
782 match self {
783 Self::Empty() => Vec::default(),
784 Self::Constant(inner) => inner.into_buffers(),
785 Self::AllNull(inner) => inner.into_buffers(),
786 Self::Nullable(inner) => inner.into_buffers(),
787 Self::FixedWidth(inner) => inner.into_buffers(),
788 Self::FixedSizeList(inner) => inner.into_buffers(),
789 Self::VariableWidth(inner) => inner.into_buffers(),
790 Self::Struct(inner) => inner.into_buffers(),
791 Self::Dictionary(inner) => inner.into_buffers(),
792 Self::Opaque(inner) => inner.buffers,
793 }
794 }
795
796 pub fn try_clone(&self) -> Result<Self> {
805 match self {
806 Self::Empty() => Ok(Self::Empty()),
807 Self::Constant(inner) => Ok(Self::Constant(inner.clone())),
808 Self::AllNull(inner) => Ok(Self::AllNull(inner.clone())),
809 Self::Nullable(inner) => Ok(Self::Nullable(inner.clone())),
810 Self::FixedWidth(inner) => Ok(Self::FixedWidth(inner.clone())),
811 Self::FixedSizeList(inner) => Ok(Self::FixedSizeList(inner.clone())),
812 Self::VariableWidth(inner) => Ok(Self::VariableWidth(inner.clone())),
813 Self::Struct(inner) => Ok(Self::Struct(inner.clone())),
814 Self::Dictionary(inner) => Ok(Self::Dictionary(inner.clone())),
815 Self::Opaque(inner) => Ok(Self::Opaque(inner.clone())),
816 }
817 }
818
819 pub fn name(&self) -> &'static str {
820 match self {
821 Self::Constant(_) => "Constant",
822 Self::Empty() => "Empty",
823 Self::AllNull(_) => "AllNull",
824 Self::Nullable(_) => "Nullable",
825 Self::FixedWidth(_) => "FixedWidth",
826 Self::FixedSizeList(_) => "FixedSizeList",
827 Self::VariableWidth(_) => "VariableWidth",
828 Self::Struct(_) => "Struct",
829 Self::Dictionary(_) => "Dictionary",
830 Self::Opaque(_) => "Opaque",
831 }
832 }
833
834 pub fn is_variable(&self) -> bool {
835 match self {
836 Self::Constant(_) => false,
837 Self::Empty() => false,
838 Self::AllNull(_) => false,
839 Self::Nullable(nullable) => nullable.data.is_variable(),
840 Self::FixedWidth(_) => false,
841 Self::FixedSizeList(fsl) => fsl.child.is_variable(),
842 Self::VariableWidth(_) => true,
843 Self::Struct(strct) => strct.children.iter().any(|c| c.is_variable()),
844 Self::Dictionary(_) => {
845 todo!("is_variable for DictionaryDataBlock is not implemented yet")
846 }
847 Self::Opaque(_) => panic!("Does not make sense to ask if an Opaque block is variable"),
848 }
849 }
850
851 pub fn is_nullable(&self) -> bool {
852 match self {
853 Self::AllNull(_) => true,
854 Self::Nullable(_) => true,
855 Self::FixedSizeList(fsl) => fsl.child.is_nullable(),
856 Self::Struct(strct) => strct.children.iter().any(|c| c.is_nullable()),
857 Self::Dictionary(_) => {
858 todo!("is_nullable for DictionaryDataBlock is not implemented yet")
859 }
860 Self::Opaque(_) => panic!("Does not make sense to ask if an Opaque block is nullable"),
861 _ => false,
862 }
863 }
864
865 pub fn num_values(&self) -> u64 {
870 match self {
871 Self::Empty() => 0,
872 Self::Constant(inner) => inner.num_values,
873 Self::AllNull(inner) => inner.num_values,
874 Self::Nullable(inner) => inner.data.num_values(),
875 Self::FixedWidth(inner) => inner.num_values,
876 Self::FixedSizeList(inner) => inner.num_values(),
877 Self::VariableWidth(inner) => inner.num_values,
878 Self::Struct(inner) => inner.children[0].num_values(),
879 Self::Dictionary(inner) => inner.indices.num_values,
880 Self::Opaque(inner) => inner.num_values,
881 }
882 }
883
884 pub fn items_per_row(&self) -> u64 {
888 match self {
889 Self::Empty() => todo!(), Self::Constant(_) => todo!(), Self::AllNull(_) => todo!(), Self::Nullable(nullable) => nullable.data.items_per_row(),
893 Self::FixedWidth(_) => 1,
894 Self::FixedSizeList(fsl) => fsl.dimension * fsl.child.items_per_row(),
895 Self::VariableWidth(_) => 1,
896 Self::Struct(_) => todo!(), Self::Dictionary(_) => 1,
898 Self::Opaque(_) => 1,
899 }
900 }
901
902 pub fn data_size(&self) -> u64 {
904 match self {
905 Self::Empty() => 0,
906 Self::Constant(inner) => inner.data_size(),
907 Self::AllNull(_) => 0,
908 Self::Nullable(inner) => inner.data_size(),
909 Self::FixedWidth(inner) => inner.data_size(),
910 Self::FixedSizeList(inner) => inner.data_size(),
911 Self::VariableWidth(inner) => inner.data_size(),
912 Self::Struct(_) => {
913 todo!("the data_size method for StructDataBlock is not implemented yet")
914 }
915 Self::Dictionary(_) => {
916 todo!("the data_size method for DictionaryDataBlock is not implemented yet")
917 }
918 Self::Opaque(inner) => inner.data_size(),
919 }
920 }
921
922 pub fn remove_outer_validity(self) -> Self {
930 match self {
931 Self::AllNull(_) => panic!("Cannot remove validity on all-null data"),
932 Self::Nullable(inner) => *inner.data,
933 Self::Struct(inner) => Self::Struct(inner.remove_outer_validity()),
934 other => other,
935 }
936 }
937
938 pub fn make_builder(&self, estimated_size_bytes: u64) -> Box<dyn DataBlockBuilderImpl> {
939 match self {
940 Self::FixedWidth(inner) => {
941 if inner.bits_per_value == 1 {
942 Box::new(BitmapDataBlockBuilder::new(estimated_size_bytes))
943 } else {
944 Box::new(FixedWidthDataBlockBuilder::new(
945 inner.bits_per_value,
946 estimated_size_bytes,
947 ))
948 }
949 }
950 Self::VariableWidth(inner) => {
951 if inner.bits_per_offset == 32 {
952 Box::new(VariableWidthDataBlockBuilder::<i32>::new(
953 estimated_size_bytes,
954 ))
955 } else if inner.bits_per_offset == 64 {
956 Box::new(VariableWidthDataBlockBuilder::<i64>::new(
957 estimated_size_bytes,
958 ))
959 } else {
960 todo!()
961 }
962 }
963 Self::FixedSizeList(inner) => {
964 let inner_builder = inner.child.make_builder(estimated_size_bytes);
965 Box::new(FixedSizeListBlockBuilder::new(
966 inner_builder,
967 inner.dimension,
968 ))
969 }
970 Self::Nullable(nullable) => {
971 let estimated_validity_size_bytes = estimated_size_bytes / 16;
974 let inner_builder = nullable
975 .data
976 .make_builder(estimated_size_bytes - estimated_validity_size_bytes);
977 Box::new(NullableDataBlockBuilder::new(
978 inner_builder,
979 estimated_validity_size_bytes as usize,
980 ))
981 }
982 Self::Struct(struct_data_block) => {
983 let mut bits_per_values = vec![];
984 for child in struct_data_block.children.iter() {
985 let child = child.as_fixed_width_ref().
986 expect("Currently StructDataBlockBuilder is only used in packed-struct encoding, and currently in packed-struct encoding, only fixed-width fields are supported.");
987 bits_per_values.push(child.bits_per_value as u32);
988 }
989 Box::new(StructDataBlockBuilder::new(
990 bits_per_values,
991 estimated_size_bytes,
992 ))
993 }
994 _ => todo!("make_builder for {:?}", self),
995 }
996 }
997}
998
999macro_rules! as_type {
1000 ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1001 pub fn $fn_name(self) -> Option<$inner_type> {
1002 match self {
1003 Self::$inner(inner) => Some(inner),
1004 _ => None,
1005 }
1006 }
1007 };
1008}
1009
1010macro_rules! as_type_ref {
1011 ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1012 pub fn $fn_name(&self) -> Option<&$inner_type> {
1013 match self {
1014 Self::$inner(inner) => Some(inner),
1015 _ => None,
1016 }
1017 }
1018 };
1019}
1020
1021macro_rules! as_type_ref_mut {
1022 ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1023 pub fn $fn_name(&mut self) -> Option<&mut $inner_type> {
1024 match self {
1025 Self::$inner(inner) => Some(inner),
1026 _ => None,
1027 }
1028 }
1029 };
1030}
1031
1032impl DataBlock {
1034 as_type!(as_all_null, AllNull, AllNullDataBlock);
1035 as_type!(as_nullable, Nullable, NullableDataBlock);
1036 as_type!(as_fixed_width, FixedWidth, FixedWidthDataBlock);
1037 as_type!(as_fixed_size_list, FixedSizeList, FixedSizeListBlock);
1038 as_type!(as_variable_width, VariableWidth, VariableWidthBlock);
1039 as_type!(as_struct, Struct, StructDataBlock);
1040 as_type!(as_dictionary, Dictionary, DictionaryDataBlock);
1041 as_type_ref!(as_all_null_ref, AllNull, AllNullDataBlock);
1042 as_type_ref!(as_nullable_ref, Nullable, NullableDataBlock);
1043 as_type_ref!(as_fixed_width_ref, FixedWidth, FixedWidthDataBlock);
1044 as_type_ref!(as_fixed_size_list_ref, FixedSizeList, FixedSizeListBlock);
1045 as_type_ref!(as_variable_width_ref, VariableWidth, VariableWidthBlock);
1046 as_type_ref!(as_struct_ref, Struct, StructDataBlock);
1047 as_type_ref!(as_dictionary_ref, Dictionary, DictionaryDataBlock);
1048 as_type_ref_mut!(as_all_null_ref_mut, AllNull, AllNullDataBlock);
1049 as_type_ref_mut!(as_nullable_ref_mut, Nullable, NullableDataBlock);
1050 as_type_ref_mut!(as_fixed_width_ref_mut, FixedWidth, FixedWidthDataBlock);
1051 as_type_ref_mut!(
1052 as_fixed_size_list_ref_mut,
1053 FixedSizeList,
1054 FixedSizeListBlock
1055 );
1056 as_type_ref_mut!(as_variable_width_ref_mut, VariableWidth, VariableWidthBlock);
1057 as_type_ref_mut!(as_struct_ref_mut, Struct, StructDataBlock);
1058 as_type_ref_mut!(as_dictionary_ref_mut, Dictionary, DictionaryDataBlock);
1059}
1060
1061fn get_byte_range<T: ArrowNativeType>(offsets: &mut LanceBuffer) -> Range<usize> {
1064 let offsets = offsets.borrow_to_typed_slice::<T>();
1065 if offsets.as_ref().is_empty() {
1066 0..0
1067 } else {
1068 offsets.as_ref().first().unwrap().as_usize()..offsets.as_ref().last().unwrap().as_usize()
1069 }
1070}
1071
1072fn stitch_offsets<T: ArrowNativeType + std::ops::Add<Output = T> + std::ops::Sub<Output = T>>(
1078 offsets: Vec<LanceBuffer>,
1079) -> (LanceBuffer, Vec<Range<usize>>) {
1080 if offsets.is_empty() {
1081 return (LanceBuffer::empty(), Vec::default());
1082 }
1083 let len = offsets.iter().map(|b| b.len()).sum::<usize>();
1084 let mut dest = Vec::with_capacity(len);
1088 let mut byte_ranges = Vec::with_capacity(offsets.len());
1089
1090 dest.push(T::from_usize(0).unwrap());
1092
1093 for mut o in offsets.into_iter() {
1094 if !o.is_empty() {
1095 let last_offset = *dest.last().unwrap();
1096 let o = o.borrow_to_typed_slice::<T>();
1097 let start = *o.as_ref().first().unwrap();
1098 dest.extend(o.as_ref()[1..].iter().map(|&x| x + last_offset - start));
1112 }
1113 byte_ranges.push(get_byte_range::<T>(&mut o));
1114 }
1115 (LanceBuffer::reinterpret_vec(dest), byte_ranges)
1116}
1117
1118fn arrow_binary_to_data_block(
1119 arrays: &[ArrayRef],
1120 num_values: u64,
1121 bits_per_offset: u8,
1122) -> DataBlock {
1123 let data_vec = arrays.iter().map(|arr| arr.to_data()).collect::<Vec<_>>();
1124 let bytes_per_offset = bits_per_offset as usize / 8;
1125 let offsets = data_vec
1126 .iter()
1127 .map(|d| {
1128 LanceBuffer::from(
1129 d.buffers()[0].slice_with_length(d.offset(), (d.len() + 1) * bytes_per_offset),
1130 )
1131 })
1132 .collect::<Vec<_>>();
1133 let (offsets, data_ranges) = if bits_per_offset == 32 {
1134 stitch_offsets::<i32>(offsets)
1135 } else {
1136 stitch_offsets::<i64>(offsets)
1137 };
1138 let data = data_vec
1139 .iter()
1140 .zip(data_ranges)
1141 .map(|(d, byte_range)| {
1142 LanceBuffer::from(
1143 d.buffers()[1]
1144 .slice_with_length(byte_range.start, byte_range.end - byte_range.start),
1145 )
1146 })
1147 .collect::<Vec<_>>();
1148 let data = LanceBuffer::concat_into_one(data);
1149 DataBlock::VariableWidth(VariableWidthBlock {
1150 data,
1151 offsets,
1152 bits_per_offset,
1153 num_values,
1154 block_info: BlockInfo::new(),
1155 })
1156}
1157
1158fn encode_flat_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1159 let bytes_per_value = arrays[0].data_type().byte_width();
1160 let mut buffer = Vec::with_capacity(num_values as usize * bytes_per_value);
1161 for arr in arrays {
1162 let data = arr.to_data();
1163 buffer.extend_from_slice(data.buffers()[0].as_slice());
1164 }
1165 LanceBuffer::from(buffer)
1166}
1167
1168fn do_encode_bitmap_data(bitmaps: &[BooleanBuffer], num_values: u64) -> LanceBuffer {
1169 let mut builder = BooleanBufferBuilder::new(num_values as usize);
1170
1171 for buf in bitmaps {
1172 builder.append_buffer(buf);
1173 }
1174
1175 let buffer = builder.finish().into_inner();
1176 LanceBuffer::from(buffer)
1177}
1178
1179fn encode_bitmap_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1180 let bitmaps = arrays
1181 .iter()
1182 .map(|arr| arr.as_boolean().values().clone())
1183 .collect::<Vec<_>>();
1184 do_encode_bitmap_data(&bitmaps, num_values)
1185}
1186
1187fn concat_dict_arrays(arrays: &[ArrayRef]) -> ArrayRef {
1190 let value_type = arrays[0].as_any_dictionary().values().data_type();
1191 let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1192 match arrow_select::concat::concat(&array_refs) {
1193 Ok(array) => array,
1194 Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1195 let upscaled = array_refs
1197 .iter()
1198 .map(|arr| {
1199 match arrow_cast::cast(
1200 *arr,
1201 &DataType::Dictionary(
1202 Box::new(DataType::UInt32),
1203 Box::new(value_type.clone()),
1204 ),
1205 ) {
1206 Ok(arr) => arr,
1207 Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1208 unimplemented!("Dictionary arrays with more than 2^32 unique values")
1210 }
1211 err => err.unwrap(),
1212 }
1213 })
1214 .collect::<Vec<_>>();
1215 let array_refs = upscaled.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1216 match arrow_select::concat::concat(&array_refs) {
1218 Ok(array) => array,
1219 Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1220 unimplemented!("Dictionary arrays with more than 2^32 unique values")
1221 }
1222 err => err.unwrap(),
1223 }
1224 }
1225 err => err.unwrap(),
1227 }
1228}
1229
1230fn max_index_val(index_type: &DataType) -> u64 {
1231 match index_type {
1232 DataType::Int8 => i8::MAX as u64,
1233 DataType::Int16 => i16::MAX as u64,
1234 DataType::Int32 => i32::MAX as u64,
1235 DataType::Int64 => i64::MAX as u64,
1236 DataType::UInt8 => u8::MAX as u64,
1237 DataType::UInt16 => u16::MAX as u64,
1238 DataType::UInt32 => u32::MAX as u64,
1239 DataType::UInt64 => u64::MAX,
1240 _ => panic!("Invalid dictionary index type"),
1241 }
1242}
1243
1244fn arrow_dictionary_to_data_block(arrays: &[ArrayRef], validity: Option<NullBuffer>) -> DataBlock {
1263 let array = concat_dict_arrays(arrays);
1264 let array_dict = array.as_any_dictionary();
1265 let mut indices = array_dict.keys();
1266 let num_values = indices.len() as u64;
1267 let mut values = array_dict.values().clone();
1268 let mut upcast = None;
1270
1271 let indices_block = if let Some(validity) = validity {
1275 let mut first_invalid_index = None;
1279 if let Some(values_validity) = values.nulls() {
1280 first_invalid_index = (!values_validity.inner()).set_indices().next();
1281 }
1282 let first_invalid_index = first_invalid_index.unwrap_or_else(|| {
1283 let null_arr = new_null_array(values.data_type(), 1);
1284 values = arrow_select::concat::concat(&[values.as_ref(), null_arr.as_ref()]).unwrap();
1285 let null_index = values.len() - 1;
1286 let max_index_val = max_index_val(indices.data_type());
1287 if null_index as u64 > max_index_val {
1288 if max_index_val >= u32::MAX as u64 {
1290 unimplemented!("Dictionary arrays with 2^32 unique value (or more) and a null")
1291 }
1292 upcast = Some(arrow_cast::cast(indices, &DataType::UInt32).unwrap());
1293 indices = upcast.as_ref().unwrap();
1294 }
1295 null_index
1296 });
1297 let null_index_arr = arrow_cast::cast(
1299 &UInt64Array::from(vec![first_invalid_index as u64]),
1300 indices.data_type(),
1301 )
1302 .unwrap();
1303
1304 let bytes_per_index = indices.data_type().byte_width();
1305 let bits_per_index = bytes_per_index as u64 * 8;
1306
1307 let null_index_arr = null_index_arr.into_data();
1308 let null_index_bytes = &null_index_arr.buffers()[0];
1309 let mut indices_bytes = indices.to_data().buffers()[0].to_vec();
1311 for invalid_idx in (!validity.inner()).set_indices() {
1312 indices_bytes[invalid_idx * bytes_per_index..(invalid_idx + 1) * bytes_per_index]
1313 .copy_from_slice(null_index_bytes.as_slice());
1314 }
1315 FixedWidthDataBlock {
1316 data: LanceBuffer::from(indices_bytes),
1317 bits_per_value: bits_per_index,
1318 num_values,
1319 block_info: BlockInfo::new(),
1320 }
1321 } else {
1322 FixedWidthDataBlock {
1323 data: LanceBuffer::from(indices.to_data().buffers()[0].clone()),
1324 bits_per_value: indices.data_type().byte_width() as u64 * 8,
1325 num_values,
1326 block_info: BlockInfo::new(),
1327 }
1328 };
1329
1330 let items = DataBlock::from(values);
1331 DataBlock::Dictionary(DictionaryDataBlock {
1332 indices: indices_block,
1333 dictionary: Box::new(items),
1334 })
1335}
1336
1337enum Nullability {
1338 None,
1339 All,
1340 Some(NullBuffer),
1341}
1342
1343impl Nullability {
1344 fn to_option(&self) -> Option<NullBuffer> {
1345 match self {
1346 Self::Some(nulls) => Some(nulls.clone()),
1347 _ => None,
1348 }
1349 }
1350}
1351
1352fn extract_nulls(arrays: &[ArrayRef], num_values: u64) -> Nullability {
1353 let mut has_nulls = false;
1354 let nulls_and_lens = arrays
1355 .iter()
1356 .map(|arr| {
1357 let nulls = arr.logical_nulls();
1358 has_nulls |= nulls.is_some();
1359 (nulls, arr.len())
1360 })
1361 .collect::<Vec<_>>();
1362 if !has_nulls {
1363 return Nullability::None;
1364 }
1365 let mut builder = BooleanBufferBuilder::new(num_values as usize);
1366 let mut num_nulls = 0;
1367 for (null, len) in nulls_and_lens {
1368 if let Some(null) = null {
1369 num_nulls += null.null_count();
1370 builder.append_buffer(&null.into_inner());
1371 } else {
1372 builder.append_n(len, true);
1373 }
1374 }
1375 if num_nulls == num_values as usize {
1376 Nullability::All
1377 } else {
1378 Nullability::Some(NullBuffer::new(builder.finish()))
1379 }
1380}
1381
1382impl DataBlock {
1383 pub fn from_arrays(arrays: &[ArrayRef], num_values: u64) -> Self {
1384 if arrays.is_empty() || num_values == 0 {
1385 return Self::AllNull(AllNullDataBlock { num_values: 0 });
1386 }
1387
1388 let data_type = arrays[0].data_type();
1389 let nulls = extract_nulls(arrays, num_values);
1390
1391 if let Nullability::All = nulls {
1392 return Self::AllNull(AllNullDataBlock { num_values });
1393 }
1394
1395 let mut encoded = match data_type {
1396 DataType::Binary | DataType::Utf8 => arrow_binary_to_data_block(arrays, num_values, 32),
1397 DataType::BinaryView | DataType::Utf8View => {
1398 todo!()
1399 }
1400 DataType::LargeBinary | DataType::LargeUtf8 => {
1401 arrow_binary_to_data_block(arrays, num_values, 64)
1402 }
1403 DataType::Boolean => {
1404 let data = encode_bitmap_data(arrays, num_values);
1405 Self::FixedWidth(FixedWidthDataBlock {
1406 data,
1407 bits_per_value: 1,
1408 num_values,
1409 block_info: BlockInfo::new(),
1410 })
1411 }
1412 DataType::Date32
1413 | DataType::Date64
1414 | DataType::Decimal128(_, _)
1415 | DataType::Decimal256(_, _)
1416 | DataType::Duration(_)
1417 | DataType::FixedSizeBinary(_)
1418 | DataType::Float16
1419 | DataType::Float32
1420 | DataType::Float64
1421 | DataType::Int16
1422 | DataType::Int32
1423 | DataType::Int64
1424 | DataType::Int8
1425 | DataType::Interval(_)
1426 | DataType::Time32(_)
1427 | DataType::Time64(_)
1428 | DataType::Timestamp(_, _)
1429 | DataType::UInt16
1430 | DataType::UInt32
1431 | DataType::UInt64
1432 | DataType::UInt8 => {
1433 let data = encode_flat_data(arrays, num_values);
1434 Self::FixedWidth(FixedWidthDataBlock {
1435 data,
1436 bits_per_value: data_type.byte_width() as u64 * 8,
1437 num_values,
1438 block_info: BlockInfo::new(),
1439 })
1440 }
1441 DataType::Null => Self::AllNull(AllNullDataBlock { num_values }),
1442 DataType::Dictionary(_, _) => arrow_dictionary_to_data_block(arrays, nulls.to_option()),
1443 DataType::Struct(fields) => {
1444 let structs = arrays.iter().map(|arr| arr.as_struct()).collect::<Vec<_>>();
1445 let mut children = Vec::with_capacity(fields.len());
1446 for child_idx in 0..fields.len() {
1447 let child_vec = structs
1448 .iter()
1449 .map(|s| s.column(child_idx).clone())
1450 .collect::<Vec<_>>();
1451 children.push(Self::from_arrays(&child_vec, num_values));
1452 }
1453
1454 let validity = match &nulls {
1456 Nullability::None => None,
1457 Nullability::Some(null_buffer) => Some(null_buffer.clone()),
1458 Nullability::All => unreachable!("Should have returned AllNull earlier"),
1459 };
1460
1461 Self::Struct(StructDataBlock {
1462 children,
1463 block_info: BlockInfo::default(),
1464 validity,
1465 })
1466 }
1467 DataType::FixedSizeList(_, dim) => {
1468 let children = arrays
1469 .iter()
1470 .map(|arr| arr.as_fixed_size_list().values().clone())
1471 .collect::<Vec<_>>();
1472 let child_block = Self::from_arrays(&children, num_values * *dim as u64);
1473 Self::FixedSizeList(FixedSizeListBlock {
1474 child: Box::new(child_block),
1475 dimension: *dim as u64,
1476 })
1477 }
1478 DataType::LargeList(_)
1479 | DataType::List(_)
1480 | DataType::ListView(_)
1481 | DataType::LargeListView(_)
1482 | DataType::Map(_, _)
1483 | DataType::RunEndEncoded(_, _)
1484 | DataType::Union(_, _) => {
1485 panic!(
1486 "Field with data type {} cannot be converted to data block",
1487 data_type
1488 )
1489 }
1490 };
1491
1492 encoded.compute_stat();
1494
1495 if !matches!(data_type, DataType::Dictionary(_, _)) {
1496 match nulls {
1497 Nullability::None => encoded,
1498 Nullability::Some(nulls) => Self::Nullable(NullableDataBlock {
1499 data: Box::new(encoded),
1500 nulls: LanceBuffer::from(nulls.into_inner().into_inner()),
1501 block_info: BlockInfo::new(),
1502 }),
1503 _ => unreachable!(),
1504 }
1505 } else {
1506 encoded
1508 }
1509 }
1510
1511 pub fn from_array<T: Array + 'static>(array: T) -> Self {
1512 let num_values = array.len();
1513 Self::from_arrays(&[Arc::new(array)], num_values as u64)
1514 }
1515}
1516
1517impl From<ArrayRef> for DataBlock {
1518 fn from(array: ArrayRef) -> Self {
1519 let num_values = array.len() as u64;
1520 Self::from_arrays(&[array], num_values)
1521 }
1522}
1523
1524pub trait DataBlockBuilderImpl: std::fmt::Debug {
1525 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>);
1526 fn finish(self: Box<Self>) -> DataBlock;
1527}
1528
1529#[derive(Debug)]
1530pub struct DataBlockBuilder {
1531 estimated_size_bytes: u64,
1532 builder: Option<Box<dyn DataBlockBuilderImpl>>,
1533}
1534
1535impl DataBlockBuilder {
1536 pub fn with_capacity_estimate(estimated_size_bytes: u64) -> Self {
1537 Self {
1538 estimated_size_bytes,
1539 builder: None,
1540 }
1541 }
1542
1543 fn get_builder(&mut self, block: &DataBlock) -> &mut dyn DataBlockBuilderImpl {
1544 if self.builder.is_none() {
1545 self.builder = Some(block.make_builder(self.estimated_size_bytes));
1546 }
1547 self.builder.as_mut().unwrap().as_mut()
1548 }
1549
1550 pub fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
1551 self.get_builder(data_block).append(data_block, selection);
1552 }
1553
1554 pub fn finish(self) -> DataBlock {
1555 let builder = self.builder.expect("DataBlockBuilder didn't see any data");
1556 builder.finish()
1557 }
1558}
1559
1560#[cfg(test)]
1561mod tests {
1562 use std::sync::Arc;
1563
1564 use arrow_array::{
1565 make_array, new_null_array,
1566 types::{Int32Type, Int8Type},
1567 ArrayRef, DictionaryArray, Int8Array, LargeBinaryArray, StringArray, UInt16Array,
1568 UInt8Array,
1569 };
1570 use arrow_buffer::{BooleanBuffer, NullBuffer};
1571
1572 use arrow_schema::{DataType, Field, Fields};
1573 use lance_datagen::{array, ArrayGeneratorExt, RowCount, DEFAULT_SEED};
1574 use rand::SeedableRng;
1575
1576 use crate::buffer::LanceBuffer;
1577
1578 use super::{AllNullDataBlock, DataBlock};
1579
1580 use arrow_array::Array;
1581
1582 #[test]
1583 fn test_sliced_to_data_block() {
1584 let ints = UInt16Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
1585 let ints = ints.slice(2, 4);
1586 let data = DataBlock::from_array(ints);
1587
1588 let fixed_data = data.as_fixed_width().unwrap();
1589 assert_eq!(fixed_data.num_values, 4);
1590 assert_eq!(fixed_data.data.len(), 8);
1591
1592 let nullable_ints =
1593 UInt16Array::from(vec![Some(0), None, Some(2), None, Some(4), None, Some(6)]);
1594 let nullable_ints = nullable_ints.slice(1, 3);
1595 let data = DataBlock::from_array(nullable_ints);
1596
1597 let nullable = data.as_nullable().unwrap();
1598 assert_eq!(nullable.nulls, LanceBuffer::from(vec![0b00000010]));
1599 }
1600
1601 #[test]
1602 fn test_string_to_data_block() {
1603 let strings1 = StringArray::from(vec![Some("hello"), None, Some("world")]);
1605 let strings2 = StringArray::from(vec![Some("a"), Some("b")]);
1606 let strings3 = StringArray::from(vec![Option::<&'static str>::None, None]);
1607
1608 let arrays = &[strings1, strings2, strings3]
1609 .iter()
1610 .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1611 .collect::<Vec<_>>();
1612
1613 let block = DataBlock::from_arrays(arrays, 7);
1614
1615 assert_eq!(block.num_values(), 7);
1616 let block = block.as_nullable().unwrap();
1617
1618 assert_eq!(block.nulls, LanceBuffer::from(vec![0b00011101]));
1619
1620 let data = block.data.as_variable_width().unwrap();
1621 assert_eq!(
1622 data.offsets,
1623 LanceBuffer::reinterpret_vec(vec![0, 5, 5, 10, 11, 12, 12, 12])
1624 );
1625
1626 assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworldab"));
1627
1628 let strings1 = StringArray::from(vec![Some("a"), Some("bc")]);
1630 let strings2 = StringArray::from(vec![Some("def")]);
1631
1632 let arrays = &[strings1, strings2]
1633 .iter()
1634 .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1635 .collect::<Vec<_>>();
1636
1637 let block = DataBlock::from_arrays(arrays, 3);
1638
1639 assert_eq!(block.num_values(), 3);
1640 let data = block.as_variable_width().unwrap();
1642 assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(vec![0, 1, 3, 6]));
1643 assert_eq!(data.data, LanceBuffer::copy_slice(b"abcdef"));
1644 }
1645
1646 #[test]
1647 fn test_string_sliced() {
1648 let check = |arr: Vec<StringArray>, expected_off: Vec<i32>, expected_data: &[u8]| {
1649 let arrs = arr
1650 .into_iter()
1651 .map(|a| Arc::new(a) as ArrayRef)
1652 .collect::<Vec<_>>();
1653 let num_rows = arrs.iter().map(|a| a.len()).sum::<usize>() as u64;
1654 let data = DataBlock::from_arrays(&arrs, num_rows);
1655
1656 assert_eq!(data.num_values(), num_rows);
1657
1658 let data = data.as_variable_width().unwrap();
1659 assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(expected_off));
1660 assert_eq!(data.data, LanceBuffer::copy_slice(expected_data));
1661 };
1662
1663 let string = StringArray::from(vec![Some("hello"), Some("world")]);
1664 check(vec![string.slice(1, 1)], vec![0, 5], b"world");
1665 check(vec![string.slice(0, 1)], vec![0, 5], b"hello");
1666 check(
1667 vec![string.slice(0, 1), string.slice(1, 1)],
1668 vec![0, 5, 10],
1669 b"helloworld",
1670 );
1671
1672 let string2 = StringArray::from(vec![Some("foo"), Some("bar")]);
1673 check(
1674 vec![string.slice(0, 1), string2.slice(0, 1)],
1675 vec![0, 5, 8],
1676 b"hellofoo",
1677 );
1678 }
1679
1680 #[test]
1681 fn test_large() {
1682 let arr = LargeBinaryArray::from_vec(vec![b"hello", b"world"]);
1683 let data = DataBlock::from_array(arr);
1684
1685 assert_eq!(data.num_values(), 2);
1686 let data = data.as_variable_width().unwrap();
1687 assert_eq!(data.bits_per_offset, 64);
1688 assert_eq!(data.num_values, 2);
1689 assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworld"));
1690 assert_eq!(
1691 data.offsets,
1692 LanceBuffer::reinterpret_vec(vec![0_u64, 5, 10])
1693 );
1694 }
1695
1696 #[test]
1697 fn test_dictionary_indices_normalized() {
1698 let arr1 = DictionaryArray::<Int8Type>::from_iter([Some("a"), Some("a"), Some("b")]);
1699 let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("b"), Some("c")]);
1700
1701 let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1702
1703 assert_eq!(data.num_values(), 5);
1704 let data = data.as_dictionary().unwrap();
1705 let indices = data.indices;
1706 assert_eq!(indices.bits_per_value, 8);
1707 assert_eq!(indices.num_values, 5);
1708 assert_eq!(
1709 indices.data,
1710 LanceBuffer::reinterpret_vec::<i8>(vec![0, 0, 1, 2, 3])
1714 );
1715
1716 let items = data.dictionary.as_variable_width().unwrap();
1717 assert_eq!(items.bits_per_offset, 32);
1718 assert_eq!(items.num_values, 4);
1719 assert_eq!(items.data, LanceBuffer::copy_slice(b"abbc"));
1720 assert_eq!(
1721 items.offsets,
1722 LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 4],)
1723 );
1724 }
1725
1726 #[test]
1727 fn test_dictionary_nulls() {
1728 let arr1 = DictionaryArray::<Int8Type>::from_iter([None, Some("a"), Some("b")]);
1732 let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("c"), None]);
1733
1734 let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1735
1736 let check_common = |data: DataBlock| {
1737 assert_eq!(data.num_values(), 5);
1738 let dict = data.as_dictionary().unwrap();
1739
1740 let nullable_items = dict.dictionary.as_nullable().unwrap();
1741 assert_eq!(nullable_items.nulls, LanceBuffer::from(vec![0b00000111]));
1742 assert_eq!(nullable_items.data.num_values(), 4);
1743
1744 let items = nullable_items.data.as_variable_width().unwrap();
1745 assert_eq!(items.bits_per_offset, 32);
1746 assert_eq!(items.num_values, 4);
1747 assert_eq!(items.data, LanceBuffer::copy_slice(b"abc"));
1748 assert_eq!(
1749 items.offsets,
1750 LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 3],)
1751 );
1752
1753 let indices = dict.indices;
1754 assert_eq!(indices.bits_per_value, 8);
1755 assert_eq!(indices.num_values, 5);
1756 assert_eq!(
1757 indices.data,
1758 LanceBuffer::reinterpret_vec::<i8>(vec![3, 0, 1, 2, 3])
1759 );
1760 };
1761 check_common(data);
1762
1763 let items = StringArray::from(vec![Some("a"), Some("b"), Some("c"), None]);
1765 let indices = Int8Array::from(vec![Some(3), Some(0), Some(1), Some(2), Some(3)]);
1766 let dict = DictionaryArray::new(indices, Arc::new(items));
1767
1768 let data = DataBlock::from_array(dict);
1769
1770 check_common(data);
1771 }
1772
1773 #[test]
1774 fn test_dictionary_cannot_add_null() {
1775 let items = StringArray::from(
1777 (0..256)
1778 .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1779 .collect::<Vec<_>>(),
1780 );
1781 let indices = UInt8Array::from(
1783 (0..=256)
1784 .map(|i| if i == 256 { None } else { Some(i as u8) })
1785 .collect::<Vec<_>>(),
1786 );
1787 let dict = DictionaryArray::new(indices, Arc::new(items));
1790 let data = DataBlock::from_array(dict);
1791
1792 assert_eq!(data.num_values(), 257);
1793
1794 let dict = data.as_dictionary().unwrap();
1795
1796 assert_eq!(dict.indices.bits_per_value, 32);
1797 assert_eq!(
1798 dict.indices.data,
1799 LanceBuffer::reinterpret_vec((0_u32..257).collect::<Vec<_>>())
1800 );
1801
1802 let nullable_items = dict.dictionary.as_nullable().unwrap();
1803 let null_buffer = NullBuffer::new(BooleanBuffer::new(
1804 nullable_items.nulls.into_buffer(),
1805 0,
1806 257,
1807 ));
1808 for i in 0..256 {
1809 assert!(!null_buffer.is_null(i));
1810 }
1811 assert!(null_buffer.is_null(256));
1812
1813 assert_eq!(
1814 nullable_items.data.as_variable_width().unwrap().data.len(),
1815 32640
1816 );
1817 }
1818
1819 #[test]
1820 fn test_all_null() {
1821 for data_type in [
1822 DataType::UInt32,
1823 DataType::FixedSizeBinary(2),
1824 DataType::List(Arc::new(Field::new("item", DataType::UInt32, true))),
1825 DataType::Struct(Fields::from(vec![Field::new("a", DataType::UInt32, true)])),
1826 ] {
1827 let block = DataBlock::AllNull(AllNullDataBlock { num_values: 10 });
1828 let arr = block.into_arrow(data_type.clone(), true).unwrap();
1829 let arr = make_array(arr);
1830 let expected = new_null_array(&data_type, 10);
1831 assert_eq!(&arr, &expected);
1832 }
1833 }
1834
1835 #[test]
1836 fn test_dictionary_cannot_concatenate() {
1837 let items = StringArray::from(
1839 (0..256)
1840 .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1841 .collect::<Vec<_>>(),
1842 );
1843 let other_items = StringArray::from(
1845 (0..256)
1846 .map(|i| Some(String::from_utf8(vec![1; i + 1]).unwrap()))
1847 .collect::<Vec<_>>(),
1848 );
1849 let indices = UInt8Array::from_iter_values(0..=255);
1850 let dict1 = DictionaryArray::new(indices.clone(), Arc::new(items));
1851 let dict2 = DictionaryArray::new(indices, Arc::new(other_items));
1852 let data = DataBlock::from_arrays(&[Arc::new(dict1), Arc::new(dict2)], 512);
1853 assert_eq!(data.num_values(), 512);
1854
1855 let dict = data.as_dictionary().unwrap();
1856
1857 assert_eq!(dict.indices.bits_per_value, 32);
1858 assert_eq!(
1859 dict.indices.data,
1860 LanceBuffer::reinterpret_vec::<u32>((0..512).collect::<Vec<_>>())
1861 );
1862 assert_eq!(
1864 dict.dictionary.as_variable_width().unwrap().data.len(),
1865 65536
1866 );
1867 }
1868
1869 #[test]
1870 fn test_data_size() {
1871 let mut rng = rand_xoshiro::Xoshiro256PlusPlus::seed_from_u64(DEFAULT_SEED.0);
1872 let mut genn = array::rand::<Int32Type>().with_nulls(&[false, false, false]);
1874
1875 let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
1876 let block = DataBlock::from_array(arr.clone());
1877 assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
1878
1879 let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
1880 let block = DataBlock::from_array(arr.clone());
1881 assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
1882
1883 let mut genn = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
1885 let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
1886 let block = DataBlock::from_array(arr.clone());
1887
1888 let array_data = arr.to_data();
1889 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1890 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1892 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1893
1894 let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
1895 let block = DataBlock::from_array(arr.clone());
1896
1897 let array_data = arr.to_data();
1898 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1899 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1900 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1901
1902 let mut genn = array::rand::<Int32Type>().with_nulls(&[true, true, false]);
1903 let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
1904 let block = DataBlock::from_array(arr.clone());
1905
1906 let array_data = arr.to_data();
1907 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1908 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1909 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1910
1911 let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
1912 let block = DataBlock::from_array(arr.clone());
1913
1914 let array_data = arr.to_data();
1915 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1916 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1917 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1918
1919 let mut genn = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
1920 let arr1 = genn.generate(RowCount::from(3), &mut rng).unwrap();
1921 let arr2 = genn.generate(RowCount::from(3), &mut rng).unwrap();
1922 let arr3 = genn.generate(RowCount::from(3), &mut rng).unwrap();
1923 let block = DataBlock::from_arrays(&[arr1.clone(), arr2.clone(), arr3.clone()], 9);
1924
1925 let concatenated_array = arrow_select::concat::concat(&[
1926 &*Arc::new(arr1.clone()) as &dyn Array,
1927 &*Arc::new(arr2.clone()) as &dyn Array,
1928 &*Arc::new(arr3.clone()) as &dyn Array,
1929 ])
1930 .unwrap();
1931 let total_buffer_size: usize = concatenated_array
1932 .to_data()
1933 .buffers()
1934 .iter()
1935 .map(|buffer| buffer.len())
1936 .sum();
1937
1938 let total_nulls_size_in_bytes = concatenated_array.nulls().unwrap().len().div_ceil(8);
1939 assert!(block.data_size() == (total_buffer_size + total_nulls_size_in_bytes) as u64);
1940 }
1941}