1use std::{
18 ops::Range,
19 sync::{Arc, RwLock},
20};
21
22use arrow::array::{ArrayData, ArrayDataBuilder, AsArray};
23use arrow_array::{new_empty_array, new_null_array, Array, ArrayRef, OffsetSizeTrait, UInt64Array};
24use arrow_buffer::{
25 ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer, ScalarBuffer,
26};
27use arrow_schema::DataType;
28use lance_arrow::DataTypeExt;
29use snafu::location;
30
31use lance_core::{Error, Result};
32
33use crate::{
34 buffer::LanceBuffer,
35 statistics::{ComputeStat, Stat},
36};
37
38#[derive(Debug)]
44pub struct AllNullDataBlock {
45 pub num_values: u64,
47}
48
49impl AllNullDataBlock {
50 fn into_arrow(self, data_type: DataType, _validate: bool) -> Result<ArrayData> {
51 Ok(ArrayData::new_null(&data_type, self.num_values as usize))
52 }
53
54 fn into_buffers(self) -> Vec<LanceBuffer> {
55 vec![]
56 }
57
58 fn borrow_and_clone(&mut self) -> Self {
59 Self {
60 num_values: self.num_values,
61 }
62 }
63
64 fn try_clone(&self) -> Result<Self> {
65 Ok(Self {
66 num_values: self.num_values,
67 })
68 }
69}
70
71use std::collections::HashMap;
72
73#[derive(Debug, Clone)]
76pub struct BlockInfo(pub Arc<RwLock<HashMap<Stat, Arc<dyn Array>>>>);
77
78impl Default for BlockInfo {
79 fn default() -> Self {
80 Self::new()
81 }
82}
83
84impl BlockInfo {
85 pub fn new() -> Self {
86 Self(Arc::new(RwLock::new(HashMap::new())))
87 }
88}
89
90impl PartialEq for BlockInfo {
91 fn eq(&self, other: &Self) -> bool {
92 let self_info = self.0.read().unwrap();
93 let other_info = other.0.read().unwrap();
94 *self_info == *other_info
95 }
96}
97
98#[derive(Debug)]
104pub struct NullableDataBlock {
105 pub data: Box<DataBlock>,
107 pub nulls: LanceBuffer,
109
110 pub block_info: BlockInfo,
111}
112
113impl NullableDataBlock {
114 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
115 let nulls = self.nulls.into_buffer();
116 let data = self.data.into_arrow(data_type, validate)?.into_builder();
117 let data = data.null_bit_buffer(Some(nulls));
118 if validate {
119 Ok(data.build()?)
120 } else {
121 Ok(unsafe { data.build_unchecked() })
122 }
123 }
124
125 fn into_buffers(self) -> Vec<LanceBuffer> {
126 let mut buffers = vec![self.nulls];
127 buffers.extend(self.data.into_buffers());
128 buffers
129 }
130
131 fn borrow_and_clone(&mut self) -> Self {
132 Self {
133 data: Box::new(self.data.borrow_and_clone()),
134 nulls: self.nulls.borrow_and_clone(),
135 block_info: self.block_info.clone(),
136 }
137 }
138
139 fn try_clone(&self) -> Result<Self> {
140 Ok(Self {
141 data: Box::new(self.data.try_clone()?),
142 nulls: self.nulls.try_clone()?,
143 block_info: self.block_info.clone(),
144 })
145 }
146
147 pub fn data_size(&self) -> u64 {
148 self.data.data_size() + self.nulls.len() as u64
149 }
150}
151
152#[derive(Debug, PartialEq)]
154pub struct ConstantDataBlock {
155 pub data: LanceBuffer,
157 pub num_values: u64,
159}
160
161impl ConstantDataBlock {
162 fn into_buffers(self) -> Vec<LanceBuffer> {
163 vec![self.data]
164 }
165
166 fn into_arrow(self, _data_type: DataType, _validate: bool) -> Result<ArrayData> {
167 todo!()
170 }
171
172 pub fn borrow_and_clone(&mut self) -> Self {
173 Self {
174 data: self.data.borrow_and_clone(),
175 num_values: self.num_values,
176 }
177 }
178
179 pub fn try_clone(&self) -> Result<Self> {
180 Ok(Self {
181 data: self.data.try_clone()?,
182 num_values: self.num_values,
183 })
184 }
185
186 pub fn data_size(&self) -> u64 {
187 self.data.len() as u64
188 }
189}
190
191#[derive(Debug, PartialEq)]
193pub struct FixedWidthDataBlock {
194 pub data: LanceBuffer,
196 pub bits_per_value: u64,
198 pub num_values: u64,
200
201 pub block_info: BlockInfo,
202}
203
204impl FixedWidthDataBlock {
205 fn do_into_arrow(
206 self,
207 data_type: DataType,
208 num_values: u64,
209 validate: bool,
210 ) -> Result<ArrayData> {
211 let data_buffer = self.data.into_buffer();
212 let builder = ArrayDataBuilder::new(data_type)
213 .add_buffer(data_buffer)
214 .len(num_values as usize)
215 .null_count(0);
216 if validate {
217 Ok(builder.build()?)
218 } else {
219 Ok(unsafe { builder.build_unchecked() })
220 }
221 }
222
223 pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
224 let root_num_values = self.num_values;
225 self.do_into_arrow(data_type, root_num_values, validate)
226 }
227
228 pub fn into_buffers(self) -> Vec<LanceBuffer> {
229 vec![self.data]
230 }
231
232 pub fn borrow_and_clone(&mut self) -> Self {
233 Self {
234 data: self.data.borrow_and_clone(),
235 bits_per_value: self.bits_per_value,
236 num_values: self.num_values,
237 block_info: self.block_info.clone(),
238 }
239 }
240
241 pub fn try_clone(&self) -> Result<Self> {
242 Ok(Self {
243 data: self.data.try_clone()?,
244 bits_per_value: self.bits_per_value,
245 num_values: self.num_values,
246 block_info: self.block_info.clone(),
247 })
248 }
249
250 pub fn data_size(&self) -> u64 {
251 self.data.len() as u64
252 }
253}
254
255#[derive(Debug)]
256pub struct VariableWidthDataBlockBuilder<T: OffsetSizeTrait> {
257 offsets: Vec<T>,
258 bytes: Vec<u8>,
259}
260
261impl<T: OffsetSizeTrait> VariableWidthDataBlockBuilder<T> {
262 fn new(estimated_size_bytes: u64) -> Self {
263 Self {
264 offsets: vec![T::from_usize(0).unwrap()],
265 bytes: Vec::with_capacity(estimated_size_bytes as usize),
266 }
267 }
268}
269
270impl<T: OffsetSizeTrait> DataBlockBuilderImpl for VariableWidthDataBlockBuilder<T> {
271 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
272 let block = data_block.as_variable_width_ref().unwrap();
273 assert!(block.bits_per_offset == T::get_byte_width() as u8 * 8);
274
275 let offsets: ScalarBuffer<T> = block.offsets.try_clone().unwrap().borrow_to_typed_slice();
276
277 let start_offset = offsets[selection.start as usize];
278 let end_offset = offsets[selection.end as usize];
279 let mut previous_len = self.bytes.len();
280
281 self.bytes
282 .extend_from_slice(&block.data[start_offset.as_usize()..end_offset.as_usize()]);
283
284 self.offsets.extend(
285 offsets[selection.start as usize..selection.end as usize]
286 .iter()
287 .zip(&offsets[selection.start as usize + 1..=selection.end as usize])
288 .map(|(¤t, &next)| {
289 let this_value_len = next - current;
290 previous_len += this_value_len.as_usize();
291 T::from_usize(previous_len).unwrap()
292 }),
293 );
294 }
295
296 fn finish(self: Box<Self>) -> DataBlock {
297 let num_values = (self.offsets.len() - 1) as u64;
298 DataBlock::VariableWidth(VariableWidthBlock {
299 data: LanceBuffer::Owned(self.bytes),
300 offsets: LanceBuffer::reinterpret_vec(self.offsets),
301 bits_per_offset: T::get_byte_width() as u8 * 8,
302 num_values,
303 block_info: BlockInfo::new(),
304 })
305 }
306}
307
308#[derive(Debug)]
309struct BitmapDataBlockBuilder {
310 values: BooleanBufferBuilder,
311}
312
313impl BitmapDataBlockBuilder {
314 fn new(estimated_size_bytes: u64) -> Self {
315 Self {
316 values: BooleanBufferBuilder::new(estimated_size_bytes as usize * 8),
317 }
318 }
319}
320
321impl DataBlockBuilderImpl for BitmapDataBlockBuilder {
322 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
323 let bitmap_blk = data_block.as_fixed_width_ref().unwrap();
324 self.values.append_packed_range(
325 selection.start as usize..selection.end as usize,
326 &bitmap_blk.data,
327 );
328 }
329
330 fn finish(mut self: Box<Self>) -> DataBlock {
331 let bool_buf = self.values.finish();
332 let num_values = bool_buf.len() as u64;
333 let bits_buf = bool_buf.into_inner();
334 DataBlock::FixedWidth(FixedWidthDataBlock {
335 data: LanceBuffer::from(bits_buf),
336 bits_per_value: 1,
337 num_values,
338 block_info: BlockInfo::new(),
339 })
340 }
341}
342
343#[derive(Debug)]
344struct FixedWidthDataBlockBuilder {
345 bits_per_value: u64,
346 bytes_per_value: u64,
347 values: Vec<u8>,
348}
349
350impl FixedWidthDataBlockBuilder {
351 fn new(bits_per_value: u64, estimated_size_bytes: u64) -> Self {
352 assert!(bits_per_value % 8 == 0);
353 Self {
354 bits_per_value,
355 bytes_per_value: bits_per_value / 8,
356 values: Vec::with_capacity(estimated_size_bytes as usize),
357 }
358 }
359}
360
361impl DataBlockBuilderImpl for FixedWidthDataBlockBuilder {
362 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
363 let block = data_block.as_fixed_width_ref().unwrap();
364 assert_eq!(self.bits_per_value, block.bits_per_value);
365 let start = selection.start as usize * self.bytes_per_value as usize;
366 let end = selection.end as usize * self.bytes_per_value as usize;
367 self.values.extend_from_slice(&block.data[start..end]);
368 }
369
370 fn finish(self: Box<Self>) -> DataBlock {
371 let num_values = (self.values.len() / self.bytes_per_value as usize) as u64;
372 DataBlock::FixedWidth(FixedWidthDataBlock {
373 data: LanceBuffer::Owned(self.values),
374 bits_per_value: self.bits_per_value,
375 num_values,
376 block_info: BlockInfo::new(),
377 })
378 }
379}
380
381#[derive(Debug)]
382struct StructDataBlockBuilder {
383 children: Vec<Box<dyn DataBlockBuilderImpl>>,
384}
385
386impl StructDataBlockBuilder {
387 fn new(bits_per_values: Vec<u32>, estimated_size_bytes: u64) -> Self {
390 let mut children = vec![];
391
392 debug_assert!(bits_per_values.iter().all(|bpv| bpv % 8 == 0));
393
394 let bytes_per_row: u32 = bits_per_values.iter().sum::<u32>() / 8;
395 let bytes_per_row = bytes_per_row as u64;
396
397 for bits_per_value in bits_per_values.iter() {
398 let this_estimated_size_bytes =
399 estimated_size_bytes / bytes_per_row * (*bits_per_value as u64) / 8;
400 let child =
401 FixedWidthDataBlockBuilder::new(*bits_per_value as u64, this_estimated_size_bytes);
402 children.push(Box::new(child) as Box<dyn DataBlockBuilderImpl>);
403 }
404 Self { children }
405 }
406}
407
408impl DataBlockBuilderImpl for StructDataBlockBuilder {
409 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
410 let data_block = data_block.as_struct_ref().unwrap();
411 for i in 0..self.children.len() {
412 self.children[i].append(&data_block.children[i], selection.clone());
413 }
414 }
415
416 fn finish(self: Box<Self>) -> DataBlock {
417 let mut children_data_block = Vec::new();
418 for child in self.children {
419 let child_data_block = child.finish();
420 children_data_block.push(child_data_block);
421 }
422 DataBlock::Struct(StructDataBlock {
423 children: children_data_block,
424 block_info: BlockInfo::new(),
425 })
426 }
427}
428#[derive(Debug)]
430pub struct FixedSizeListBlock {
431 pub child: Box<DataBlock>,
433 pub dimension: u64,
435}
436
437impl FixedSizeListBlock {
438 fn borrow_and_clone(&mut self) -> Self {
439 Self {
440 child: Box::new(self.child.borrow_and_clone()),
441 dimension: self.dimension,
442 }
443 }
444
445 fn try_clone(&self) -> Result<Self> {
446 Ok(Self {
447 child: Box::new(self.child.try_clone()?),
448 dimension: self.dimension,
449 })
450 }
451
452 pub fn num_values(&self) -> u64 {
453 self.child.num_values() / self.dimension
454 }
455
456 pub fn try_into_flat(self) -> Option<FixedWidthDataBlock> {
460 match *self.child {
461 DataBlock::Nullable(_) => None,
463 DataBlock::FixedSizeList(inner) => {
464 let mut flat = inner.try_into_flat()?;
465 flat.bits_per_value *= self.dimension;
466 flat.num_values /= self.dimension;
467 Some(flat)
468 }
469 DataBlock::FixedWidth(mut inner) => {
470 inner.bits_per_value *= self.dimension;
471 inner.num_values /= self.dimension;
472 Some(inner)
473 }
474 _ => panic!(
475 "Expected FixedSizeList or FixedWidth data block but found {:?}",
476 self
477 ),
478 }
479 }
480
481 pub fn flatten_as_fixed(&mut self) -> FixedWidthDataBlock {
482 match self.child.as_mut() {
483 DataBlock::FixedSizeList(fsl) => fsl.flatten_as_fixed(),
484 DataBlock::FixedWidth(fw) => fw.borrow_and_clone(),
485 _ => panic!("Expected FixedSizeList or FixedWidth data block"),
486 }
487 }
488
489 pub fn from_flat(data: FixedWidthDataBlock, data_type: &DataType) -> DataBlock {
491 match data_type {
492 DataType::FixedSizeList(child_field, dimension) => {
493 let mut data = data;
494 data.bits_per_value /= *dimension as u64;
495 data.num_values *= *dimension as u64;
496 let child_data = Self::from_flat(data, child_field.data_type());
497 DataBlock::FixedSizeList(Self {
498 child: Box::new(child_data),
499 dimension: *dimension as u64,
500 })
501 }
502 _ => DataBlock::FixedWidth(data),
504 }
505 }
506
507 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
508 let num_values = self.num_values();
509 let builder = match &data_type {
510 DataType::FixedSizeList(child_field, _) => {
511 let child_data = self
512 .child
513 .into_arrow(child_field.data_type().clone(), validate)?;
514 ArrayDataBuilder::new(data_type)
515 .add_child_data(child_data)
516 .len(num_values as usize)
517 .null_count(0)
518 }
519 _ => panic!("Expected FixedSizeList data type and got {:?}", data_type),
520 };
521 if validate {
522 Ok(builder.build()?)
523 } else {
524 Ok(unsafe { builder.build_unchecked() })
525 }
526 }
527
528 fn into_buffers(self) -> Vec<LanceBuffer> {
529 self.child.into_buffers()
530 }
531
532 fn data_size(&self) -> u64 {
533 self.child.data_size()
534 }
535}
536
537#[derive(Debug)]
538struct FixedSizeListBlockBuilder {
539 inner: Box<dyn DataBlockBuilderImpl>,
540 dimension: u64,
541}
542
543impl FixedSizeListBlockBuilder {
544 fn new(inner: Box<dyn DataBlockBuilderImpl>, dimension: u64) -> Self {
545 Self { inner, dimension }
546 }
547}
548
549impl DataBlockBuilderImpl for FixedSizeListBlockBuilder {
550 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
551 let selection = selection.start * self.dimension..selection.end * self.dimension;
552 let fsl = data_block.as_fixed_size_list_ref().unwrap();
553 self.inner.append(fsl.child.as_ref(), selection);
554 }
555
556 fn finish(self: Box<Self>) -> DataBlock {
557 let inner_block = self.inner.finish();
558 DataBlock::FixedSizeList(FixedSizeListBlock {
559 child: Box::new(inner_block),
560 dimension: self.dimension,
561 })
562 }
563}
564
565#[derive(Debug)]
566struct NullableDataBlockBuilder {
567 inner: Box<dyn DataBlockBuilderImpl>,
568 validity: BooleanBufferBuilder,
569}
570
571impl NullableDataBlockBuilder {
572 fn new(inner: Box<dyn DataBlockBuilderImpl>, estimated_size_bytes: usize) -> Self {
573 Self {
574 inner,
575 validity: BooleanBufferBuilder::new(estimated_size_bytes * 8),
576 }
577 }
578}
579
580impl DataBlockBuilderImpl for NullableDataBlockBuilder {
581 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
582 let nullable = data_block.as_nullable_ref().unwrap();
583 let bool_buf = BooleanBuffer::new(
584 nullable.nulls.try_clone().unwrap().into_buffer(),
585 selection.start as usize,
586 (selection.end - selection.start) as usize,
587 );
588 self.validity.append_buffer(&bool_buf);
589 self.inner.append(nullable.data.as_ref(), selection);
590 }
591
592 fn finish(mut self: Box<Self>) -> DataBlock {
593 let inner_block = self.inner.finish();
594 DataBlock::Nullable(NullableDataBlock {
595 data: Box::new(inner_block),
596 nulls: LanceBuffer::Borrowed(self.validity.finish().into_inner()),
597 block_info: BlockInfo::new(),
598 })
599 }
600}
601
602#[derive(Debug)]
606pub struct OpaqueBlock {
607 pub buffers: Vec<LanceBuffer>,
608 pub num_values: u64,
609 pub block_info: BlockInfo,
610}
611
612impl OpaqueBlock {
613 fn borrow_and_clone(&mut self) -> Self {
614 Self {
615 buffers: self
616 .buffers
617 .iter_mut()
618 .map(|b| b.borrow_and_clone())
619 .collect(),
620 num_values: self.num_values,
621 block_info: self.block_info.clone(),
622 }
623 }
624
625 fn try_clone(&self) -> Result<Self> {
626 Ok(Self {
627 buffers: self
628 .buffers
629 .iter()
630 .map(|b| b.try_clone())
631 .collect::<Result<_>>()?,
632 num_values: self.num_values,
633 block_info: self.block_info.clone(),
634 })
635 }
636
637 pub fn data_size(&self) -> u64 {
638 self.buffers.iter().map(|b| b.len() as u64).sum()
639 }
640}
641
642#[derive(Debug)]
644pub struct VariableWidthBlock {
645 pub data: LanceBuffer,
647 pub offsets: LanceBuffer,
651 pub bits_per_offset: u8,
653 pub num_values: u64,
655
656 pub block_info: BlockInfo,
657}
658
659impl VariableWidthBlock {
660 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
661 let data_buffer = self.data.into_buffer();
662 let offsets_buffer = self.offsets.into_buffer();
663 let builder = ArrayDataBuilder::new(data_type)
664 .add_buffer(offsets_buffer)
665 .add_buffer(data_buffer)
666 .len(self.num_values as usize)
667 .null_count(0);
668 if validate {
669 Ok(builder.build()?)
670 } else {
671 Ok(unsafe { builder.build_unchecked() })
672 }
673 }
674
675 fn into_buffers(self) -> Vec<LanceBuffer> {
676 vec![self.offsets, self.data]
677 }
678
679 fn borrow_and_clone(&mut self) -> Self {
680 Self {
681 data: self.data.borrow_and_clone(),
682 offsets: self.offsets.borrow_and_clone(),
683 bits_per_offset: self.bits_per_offset,
684 num_values: self.num_values,
685 block_info: self.block_info.clone(),
686 }
687 }
688
689 fn try_clone(&self) -> Result<Self> {
690 Ok(Self {
691 data: self.data.try_clone()?,
692 offsets: self.offsets.try_clone()?,
693 bits_per_offset: self.bits_per_offset,
694 num_values: self.num_values,
695 block_info: self.block_info.clone(),
696 })
697 }
698
699 pub fn offsets_as_block(&mut self) -> DataBlock {
700 let offsets = self.offsets.borrow_and_clone();
701 DataBlock::FixedWidth(FixedWidthDataBlock {
702 data: offsets,
703 bits_per_value: self.bits_per_offset as u64,
704 num_values: self.num_values + 1,
705 block_info: BlockInfo::new(),
706 })
707 }
708
709 pub fn data_size(&self) -> u64 {
710 (self.data.len() + self.offsets.len()) as u64
711 }
712}
713
714#[derive(Debug)]
716pub struct StructDataBlock {
717 pub children: Vec<DataBlock>,
719 pub block_info: BlockInfo,
720}
721
722impl StructDataBlock {
723 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
724 if let DataType::Struct(fields) = &data_type {
725 let mut builder = ArrayDataBuilder::new(DataType::Struct(fields.clone()));
726 let mut num_rows = 0;
727 for (field, child) in fields.iter().zip(self.children) {
728 let child_data = child.into_arrow(field.data_type().clone(), validate)?;
729 num_rows = child_data.len();
730 builder = builder.add_child_data(child_data);
731 }
732 let builder = builder.null_count(0).len(num_rows);
733 if validate {
734 Ok(builder.build()?)
735 } else {
736 Ok(unsafe { builder.build_unchecked() })
737 }
738 } else {
739 Err(Error::Internal {
740 message: format!("Expected Struct, got {:?}", data_type),
741 location: location!(),
742 })
743 }
744 }
745
746 fn remove_outer_validity(self) -> Self {
747 Self {
748 children: self
749 .children
750 .into_iter()
751 .map(|c| c.remove_outer_validity())
752 .collect(),
753 block_info: self.block_info,
754 }
755 }
756
757 fn into_buffers(self) -> Vec<LanceBuffer> {
758 self.children
759 .into_iter()
760 .flat_map(|c| c.into_buffers())
761 .collect()
762 }
763
764 fn borrow_and_clone(&mut self) -> Self {
765 Self {
766 children: self
767 .children
768 .iter_mut()
769 .map(|c| c.borrow_and_clone())
770 .collect(),
771 block_info: self.block_info.clone(),
772 }
773 }
774
775 fn try_clone(&self) -> Result<Self> {
776 Ok(Self {
777 children: self
778 .children
779 .iter()
780 .map(|c| c.try_clone())
781 .collect::<Result<_>>()?,
782 block_info: self.block_info.clone(),
783 })
784 }
785
786 pub fn data_size(&self) -> u64 {
787 self.children
788 .iter()
789 .map(|data_block| data_block.data_size())
790 .sum()
791 }
792}
793
794#[derive(Debug)]
796pub struct DictionaryDataBlock {
797 pub indices: FixedWidthDataBlock,
799 pub dictionary: Box<DataBlock>,
801}
802
803impl DictionaryDataBlock {
804 fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
805 let (key_type, value_type) = if let DataType::Dictionary(key_type, value_type) = &data_type
806 {
807 (key_type.as_ref().clone(), value_type.as_ref().clone())
808 } else {
809 return Err(Error::Internal {
810 message: format!("Expected Dictionary, got {:?}", data_type),
811 location: location!(),
812 });
813 };
814
815 let indices = self.indices.into_arrow(key_type, validate)?;
816 let dictionary = self.dictionary.into_arrow(value_type, validate)?;
817
818 let builder = indices
819 .into_builder()
820 .add_child_data(dictionary)
821 .data_type(data_type);
822
823 if validate {
824 Ok(builder.build()?)
825 } else {
826 Ok(unsafe { builder.build_unchecked() })
827 }
828 }
829
830 fn into_buffers(self) -> Vec<LanceBuffer> {
831 let mut buffers = self.indices.into_buffers();
832 buffers.extend(self.dictionary.into_buffers());
833 buffers
834 }
835
836 fn borrow_and_clone(&mut self) -> Self {
837 Self {
838 indices: self.indices.borrow_and_clone(),
839 dictionary: Box::new(self.dictionary.borrow_and_clone()),
840 }
841 }
842
843 fn try_clone(&self) -> Result<Self> {
844 Ok(Self {
845 indices: self.indices.try_clone()?,
846 dictionary: Box::new(self.dictionary.try_clone()?),
847 })
848 }
849}
850
851#[derive(Debug)]
866pub enum DataBlock {
867 Empty(),
868 Constant(ConstantDataBlock),
869 AllNull(AllNullDataBlock),
870 Nullable(NullableDataBlock),
871 FixedWidth(FixedWidthDataBlock),
872 FixedSizeList(FixedSizeListBlock),
873 VariableWidth(VariableWidthBlock),
874 Opaque(OpaqueBlock),
875 Struct(StructDataBlock),
876 Dictionary(DictionaryDataBlock),
877}
878
879impl DataBlock {
880 pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
882 match self {
883 Self::Empty() => Ok(new_empty_array(&data_type).to_data()),
884 Self::Constant(inner) => inner.into_arrow(data_type, validate),
885 Self::AllNull(inner) => inner.into_arrow(data_type, validate),
886 Self::Nullable(inner) => inner.into_arrow(data_type, validate),
887 Self::FixedWidth(inner) => inner.into_arrow(data_type, validate),
888 Self::FixedSizeList(inner) => inner.into_arrow(data_type, validate),
889 Self::VariableWidth(inner) => inner.into_arrow(data_type, validate),
890 Self::Struct(inner) => inner.into_arrow(data_type, validate),
891 Self::Dictionary(inner) => inner.into_arrow(data_type, validate),
892 Self::Opaque(_) => Err(Error::Internal {
893 message: "Cannot convert OpaqueBlock to Arrow".to_string(),
894 location: location!(),
895 }),
896 }
897 }
898
899 pub fn into_buffers(self) -> Vec<LanceBuffer> {
903 match self {
904 Self::Empty() => Vec::default(),
905 Self::Constant(inner) => inner.into_buffers(),
906 Self::AllNull(inner) => inner.into_buffers(),
907 Self::Nullable(inner) => inner.into_buffers(),
908 Self::FixedWidth(inner) => inner.into_buffers(),
909 Self::FixedSizeList(inner) => inner.into_buffers(),
910 Self::VariableWidth(inner) => inner.into_buffers(),
911 Self::Struct(inner) => inner.into_buffers(),
912 Self::Dictionary(inner) => inner.into_buffers(),
913 Self::Opaque(inner) => inner.buffers,
914 }
915 }
916
917 pub fn borrow_and_clone(&mut self) -> Self {
922 match self {
923 Self::Empty() => Self::Empty(),
924 Self::Constant(inner) => Self::Constant(inner.borrow_and_clone()),
925 Self::AllNull(inner) => Self::AllNull(inner.borrow_and_clone()),
926 Self::Nullable(inner) => Self::Nullable(inner.borrow_and_clone()),
927 Self::FixedWidth(inner) => Self::FixedWidth(inner.borrow_and_clone()),
928 Self::FixedSizeList(inner) => Self::FixedSizeList(inner.borrow_and_clone()),
929 Self::VariableWidth(inner) => Self::VariableWidth(inner.borrow_and_clone()),
930 Self::Struct(inner) => Self::Struct(inner.borrow_and_clone()),
931 Self::Dictionary(inner) => Self::Dictionary(inner.borrow_and_clone()),
932 Self::Opaque(inner) => Self::Opaque(inner.borrow_and_clone()),
933 }
934 }
935
936 pub fn try_clone(&self) -> Result<Self> {
941 match self {
942 Self::Empty() => Ok(Self::Empty()),
943 Self::Constant(inner) => Ok(Self::Constant(inner.try_clone()?)),
944 Self::AllNull(inner) => Ok(Self::AllNull(inner.try_clone()?)),
945 Self::Nullable(inner) => Ok(Self::Nullable(inner.try_clone()?)),
946 Self::FixedWidth(inner) => Ok(Self::FixedWidth(inner.try_clone()?)),
947 Self::FixedSizeList(inner) => Ok(Self::FixedSizeList(inner.try_clone()?)),
948 Self::VariableWidth(inner) => Ok(Self::VariableWidth(inner.try_clone()?)),
949 Self::Struct(inner) => Ok(Self::Struct(inner.try_clone()?)),
950 Self::Dictionary(inner) => Ok(Self::Dictionary(inner.try_clone()?)),
951 Self::Opaque(inner) => Ok(Self::Opaque(inner.try_clone()?)),
952 }
953 }
954
955 pub fn name(&self) -> &'static str {
956 match self {
957 Self::Constant(_) => "Constant",
958 Self::Empty() => "Empty",
959 Self::AllNull(_) => "AllNull",
960 Self::Nullable(_) => "Nullable",
961 Self::FixedWidth(_) => "FixedWidth",
962 Self::FixedSizeList(_) => "FixedSizeList",
963 Self::VariableWidth(_) => "VariableWidth",
964 Self::Struct(_) => "Struct",
965 Self::Dictionary(_) => "Dictionary",
966 Self::Opaque(_) => "Opaque",
967 }
968 }
969
970 pub fn is_variable(&self) -> bool {
971 match self {
972 Self::Constant(_) => false,
973 Self::Empty() => false,
974 Self::AllNull(_) => false,
975 Self::Nullable(nullable) => nullable.data.is_variable(),
976 Self::FixedWidth(_) => false,
977 Self::FixedSizeList(fsl) => fsl.child.is_variable(),
978 Self::VariableWidth(_) => true,
979 Self::Struct(strct) => strct.children.iter().any(|c| c.is_variable()),
980 Self::Dictionary(_) => {
981 todo!("is_variable for DictionaryDataBlock is not implemented yet")
982 }
983 Self::Opaque(_) => panic!("Does not make sense to ask if an Opaque block is variable"),
984 }
985 }
986
987 pub fn is_nullable(&self) -> bool {
988 match self {
989 Self::AllNull(_) => true,
990 Self::Nullable(_) => true,
991 Self::FixedSizeList(fsl) => fsl.child.is_nullable(),
992 Self::Struct(strct) => strct.children.iter().any(|c| c.is_nullable()),
993 Self::Dictionary(_) => {
994 todo!("is_nullable for DictionaryDataBlock is not implemented yet")
995 }
996 Self::Opaque(_) => panic!("Does not make sense to ask if an Opaque block is nullable"),
997 _ => false,
998 }
999 }
1000
1001 pub fn num_values(&self) -> u64 {
1006 match self {
1007 Self::Empty() => 0,
1008 Self::Constant(inner) => inner.num_values,
1009 Self::AllNull(inner) => inner.num_values,
1010 Self::Nullable(inner) => inner.data.num_values(),
1011 Self::FixedWidth(inner) => inner.num_values,
1012 Self::FixedSizeList(inner) => inner.num_values(),
1013 Self::VariableWidth(inner) => inner.num_values,
1014 Self::Struct(inner) => inner.children[0].num_values(),
1015 Self::Dictionary(inner) => inner.indices.num_values,
1016 Self::Opaque(inner) => inner.num_values,
1017 }
1018 }
1019
1020 pub fn items_per_row(&self) -> u64 {
1024 match self {
1025 Self::Empty() => todo!(), Self::Constant(_) => todo!(), Self::AllNull(_) => todo!(), Self::Nullable(nullable) => nullable.data.items_per_row(),
1029 Self::FixedWidth(_) => 1,
1030 Self::FixedSizeList(fsl) => fsl.dimension * fsl.child.items_per_row(),
1031 Self::VariableWidth(_) => 1,
1032 Self::Struct(_) => todo!(), Self::Dictionary(_) => 1,
1034 Self::Opaque(_) => 1,
1035 }
1036 }
1037
1038 pub fn data_size(&self) -> u64 {
1040 match self {
1041 Self::Empty() => 0,
1042 Self::Constant(inner) => inner.data_size(),
1043 Self::AllNull(_) => 0,
1044 Self::Nullable(inner) => inner.data_size(),
1045 Self::FixedWidth(inner) => inner.data_size(),
1046 Self::FixedSizeList(inner) => inner.data_size(),
1047 Self::VariableWidth(inner) => inner.data_size(),
1048 Self::Struct(_) => {
1049 todo!("the data_size method for StructDataBlock is not implemented yet")
1050 }
1051 Self::Dictionary(_) => {
1052 todo!("the data_size method for DictionaryDataBlock is not implemented yet")
1053 }
1054 Self::Opaque(inner) => inner.data_size(),
1055 }
1056 }
1057
1058 pub fn remove_outer_validity(self) -> Self {
1066 match self {
1067 Self::AllNull(_) => panic!("Cannot remove validity on all-null data"),
1068 Self::Nullable(inner) => *inner.data,
1069 Self::Struct(inner) => Self::Struct(inner.remove_outer_validity()),
1070 other => other,
1071 }
1072 }
1073
1074 pub fn make_builder(&self, estimated_size_bytes: u64) -> Box<dyn DataBlockBuilderImpl> {
1075 match self {
1076 Self::FixedWidth(inner) => {
1077 if inner.bits_per_value == 1 {
1078 Box::new(BitmapDataBlockBuilder::new(estimated_size_bytes))
1079 } else {
1080 Box::new(FixedWidthDataBlockBuilder::new(
1081 inner.bits_per_value,
1082 estimated_size_bytes,
1083 ))
1084 }
1085 }
1086 Self::VariableWidth(inner) => {
1087 if inner.bits_per_offset == 32 {
1088 Box::new(VariableWidthDataBlockBuilder::<i32>::new(
1089 estimated_size_bytes,
1090 ))
1091 } else if inner.bits_per_offset == 64 {
1092 Box::new(VariableWidthDataBlockBuilder::<i64>::new(
1093 estimated_size_bytes,
1094 ))
1095 } else {
1096 todo!()
1097 }
1098 }
1099 Self::FixedSizeList(inner) => {
1100 let inner_builder = inner.child.make_builder(estimated_size_bytes);
1101 Box::new(FixedSizeListBlockBuilder::new(
1102 inner_builder,
1103 inner.dimension,
1104 ))
1105 }
1106 Self::Nullable(nullable) => {
1107 let estimated_validity_size_bytes = estimated_size_bytes / 16;
1110 let inner_builder = nullable
1111 .data
1112 .make_builder(estimated_size_bytes - estimated_validity_size_bytes);
1113 Box::new(NullableDataBlockBuilder::new(
1114 inner_builder,
1115 estimated_validity_size_bytes as usize,
1116 ))
1117 }
1118 Self::Struct(struct_data_block) => {
1119 let mut bits_per_values = vec![];
1120 for child in struct_data_block.children.iter() {
1121 let child = child.as_fixed_width_ref().
1122 expect("Currently StructDataBlockBuilder is only used in packed-struct encoding, and currently in packed-struct encoding, only fixed-width fields are supported.");
1123 bits_per_values.push(child.bits_per_value as u32);
1124 }
1125 Box::new(StructDataBlockBuilder::new(
1126 bits_per_values,
1127 estimated_size_bytes,
1128 ))
1129 }
1130 _ => todo!("make_builder for {:?}", self),
1131 }
1132 }
1133}
1134
1135macro_rules! as_type {
1136 ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1137 pub fn $fn_name(self) -> Option<$inner_type> {
1138 match self {
1139 Self::$inner(inner) => Some(inner),
1140 _ => None,
1141 }
1142 }
1143 };
1144}
1145
1146macro_rules! as_type_ref {
1147 ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1148 pub fn $fn_name(&self) -> Option<&$inner_type> {
1149 match self {
1150 Self::$inner(inner) => Some(inner),
1151 _ => None,
1152 }
1153 }
1154 };
1155}
1156
1157macro_rules! as_type_ref_mut {
1158 ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1159 pub fn $fn_name(&mut self) -> Option<&mut $inner_type> {
1160 match self {
1161 Self::$inner(inner) => Some(inner),
1162 _ => None,
1163 }
1164 }
1165 };
1166}
1167
1168impl DataBlock {
1170 as_type!(as_all_null, AllNull, AllNullDataBlock);
1171 as_type!(as_nullable, Nullable, NullableDataBlock);
1172 as_type!(as_fixed_width, FixedWidth, FixedWidthDataBlock);
1173 as_type!(as_fixed_size_list, FixedSizeList, FixedSizeListBlock);
1174 as_type!(as_variable_width, VariableWidth, VariableWidthBlock);
1175 as_type!(as_struct, Struct, StructDataBlock);
1176 as_type!(as_dictionary, Dictionary, DictionaryDataBlock);
1177 as_type_ref!(as_all_null_ref, AllNull, AllNullDataBlock);
1178 as_type_ref!(as_nullable_ref, Nullable, NullableDataBlock);
1179 as_type_ref!(as_fixed_width_ref, FixedWidth, FixedWidthDataBlock);
1180 as_type_ref!(as_fixed_size_list_ref, FixedSizeList, FixedSizeListBlock);
1181 as_type_ref!(as_variable_width_ref, VariableWidth, VariableWidthBlock);
1182 as_type_ref!(as_struct_ref, Struct, StructDataBlock);
1183 as_type_ref!(as_dictionary_ref, Dictionary, DictionaryDataBlock);
1184 as_type_ref_mut!(as_all_null_ref_mut, AllNull, AllNullDataBlock);
1185 as_type_ref_mut!(as_nullable_ref_mut, Nullable, NullableDataBlock);
1186 as_type_ref_mut!(as_fixed_width_ref_mut, FixedWidth, FixedWidthDataBlock);
1187 as_type_ref_mut!(
1188 as_fixed_size_list_ref_mut,
1189 FixedSizeList,
1190 FixedSizeListBlock
1191 );
1192 as_type_ref_mut!(as_variable_width_ref_mut, VariableWidth, VariableWidthBlock);
1193 as_type_ref_mut!(as_struct_ref_mut, Struct, StructDataBlock);
1194 as_type_ref_mut!(as_dictionary_ref_mut, Dictionary, DictionaryDataBlock);
1195}
1196
1197fn get_byte_range<T: ArrowNativeType>(offsets: &mut LanceBuffer) -> Range<usize> {
1200 let offsets = offsets.borrow_to_typed_slice::<T>();
1201 if offsets.as_ref().is_empty() {
1202 0..0
1203 } else {
1204 offsets.as_ref().first().unwrap().as_usize()..offsets.as_ref().last().unwrap().as_usize()
1205 }
1206}
1207
1208fn stitch_offsets<T: ArrowNativeType + std::ops::Add<Output = T> + std::ops::Sub<Output = T>>(
1214 offsets: Vec<LanceBuffer>,
1215) -> (LanceBuffer, Vec<Range<usize>>) {
1216 if offsets.is_empty() {
1217 return (LanceBuffer::empty(), Vec::default());
1218 }
1219 let len = offsets.iter().map(|b| b.len()).sum::<usize>();
1220 let mut dest = Vec::with_capacity(len);
1224 let mut byte_ranges = Vec::with_capacity(offsets.len());
1225
1226 dest.push(T::from_usize(0).unwrap());
1228
1229 for mut o in offsets.into_iter() {
1230 if !o.is_empty() {
1231 let last_offset = *dest.last().unwrap();
1232 let o = o.borrow_to_typed_slice::<T>();
1233 let start = *o.as_ref().first().unwrap();
1234 dest.extend(o.as_ref()[1..].iter().map(|&x| x + last_offset - start));
1248 }
1249 byte_ranges.push(get_byte_range::<T>(&mut o));
1250 }
1251 (LanceBuffer::reinterpret_vec(dest), byte_ranges)
1252}
1253
1254fn arrow_binary_to_data_block(
1255 arrays: &[ArrayRef],
1256 num_values: u64,
1257 bits_per_offset: u8,
1258) -> DataBlock {
1259 let data_vec = arrays.iter().map(|arr| arr.to_data()).collect::<Vec<_>>();
1260 let bytes_per_offset = bits_per_offset as usize / 8;
1261 let offsets = data_vec
1262 .iter()
1263 .map(|d| {
1264 LanceBuffer::Borrowed(
1265 d.buffers()[0].slice_with_length(d.offset(), (d.len() + 1) * bytes_per_offset),
1266 )
1267 })
1268 .collect::<Vec<_>>();
1269 let (offsets, data_ranges) = if bits_per_offset == 32 {
1270 stitch_offsets::<i32>(offsets)
1271 } else {
1272 stitch_offsets::<i64>(offsets)
1273 };
1274 let data = data_vec
1275 .iter()
1276 .zip(data_ranges)
1277 .map(|(d, byte_range)| {
1278 LanceBuffer::Borrowed(
1279 d.buffers()[1]
1280 .slice_with_length(byte_range.start, byte_range.end - byte_range.start),
1281 )
1282 })
1283 .collect::<Vec<_>>();
1284 let data = LanceBuffer::concat_into_one(data);
1285 DataBlock::VariableWidth(VariableWidthBlock {
1286 data,
1287 offsets,
1288 bits_per_offset,
1289 num_values,
1290 block_info: BlockInfo::new(),
1291 })
1292}
1293
1294fn encode_flat_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1295 let bytes_per_value = arrays[0].data_type().byte_width();
1296 let mut buffer = Vec::with_capacity(num_values as usize * bytes_per_value);
1297 for arr in arrays {
1298 let data = arr.to_data();
1299 buffer.extend_from_slice(data.buffers()[0].as_slice());
1300 }
1301 LanceBuffer::Owned(buffer)
1302}
1303
1304fn do_encode_bitmap_data(bitmaps: &[BooleanBuffer], num_values: u64) -> LanceBuffer {
1305 let mut builder = BooleanBufferBuilder::new(num_values as usize);
1306
1307 for buf in bitmaps {
1308 builder.append_buffer(buf);
1309 }
1310
1311 let buffer = builder.finish().into_inner();
1312 LanceBuffer::Borrowed(buffer)
1313}
1314
1315fn encode_bitmap_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1316 let bitmaps = arrays
1317 .iter()
1318 .map(|arr| arr.as_boolean().values().clone())
1319 .collect::<Vec<_>>();
1320 do_encode_bitmap_data(&bitmaps, num_values)
1321}
1322
1323fn concat_dict_arrays(arrays: &[ArrayRef]) -> ArrayRef {
1326 let value_type = arrays[0].as_any_dictionary().values().data_type();
1327 let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1328 match arrow_select::concat::concat(&array_refs) {
1329 Ok(array) => array,
1330 Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1331 let upscaled = array_refs
1333 .iter()
1334 .map(|arr| {
1335 match arrow_cast::cast(
1336 *arr,
1337 &DataType::Dictionary(
1338 Box::new(DataType::UInt32),
1339 Box::new(value_type.clone()),
1340 ),
1341 ) {
1342 Ok(arr) => arr,
1343 Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1344 unimplemented!("Dictionary arrays with more than 2^32 unique values")
1346 }
1347 err => err.unwrap(),
1348 }
1349 })
1350 .collect::<Vec<_>>();
1351 let array_refs = upscaled.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1352 match arrow_select::concat::concat(&array_refs) {
1354 Ok(array) => array,
1355 Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1356 unimplemented!("Dictionary arrays with more than 2^32 unique values")
1357 }
1358 err => err.unwrap(),
1359 }
1360 }
1361 err => err.unwrap(),
1363 }
1364}
1365
1366fn max_index_val(index_type: &DataType) -> u64 {
1367 match index_type {
1368 DataType::Int8 => i8::MAX as u64,
1369 DataType::Int16 => i16::MAX as u64,
1370 DataType::Int32 => i32::MAX as u64,
1371 DataType::Int64 => i64::MAX as u64,
1372 DataType::UInt8 => u8::MAX as u64,
1373 DataType::UInt16 => u16::MAX as u64,
1374 DataType::UInt32 => u32::MAX as u64,
1375 DataType::UInt64 => u64::MAX,
1376 _ => panic!("Invalid dictionary index type"),
1377 }
1378}
1379
1380fn arrow_dictionary_to_data_block(arrays: &[ArrayRef], validity: Option<NullBuffer>) -> DataBlock {
1399 let array = concat_dict_arrays(arrays);
1400 let array_dict = array.as_any_dictionary();
1401 let mut indices = array_dict.keys();
1402 let num_values = indices.len() as u64;
1403 let mut values = array_dict.values().clone();
1404 let mut upcast = None;
1406
1407 let indices_block = if let Some(validity) = validity {
1411 let mut first_invalid_index = None;
1415 if let Some(values_validity) = values.nulls() {
1416 first_invalid_index = (!values_validity.inner()).set_indices().next();
1417 }
1418 let first_invalid_index = first_invalid_index.unwrap_or_else(|| {
1419 let null_arr = new_null_array(values.data_type(), 1);
1420 values = arrow_select::concat::concat(&[values.as_ref(), null_arr.as_ref()]).unwrap();
1421 let null_index = values.len() - 1;
1422 let max_index_val = max_index_val(indices.data_type());
1423 if null_index as u64 > max_index_val {
1424 if max_index_val >= u32::MAX as u64 {
1426 unimplemented!("Dictionary arrays with 2^32 unique value (or more) and a null")
1427 }
1428 upcast = Some(arrow_cast::cast(indices, &DataType::UInt32).unwrap());
1429 indices = upcast.as_ref().unwrap();
1430 }
1431 null_index
1432 });
1433 let null_index_arr = arrow_cast::cast(
1435 &UInt64Array::from(vec![first_invalid_index as u64]),
1436 indices.data_type(),
1437 )
1438 .unwrap();
1439
1440 let bytes_per_index = indices.data_type().byte_width();
1441 let bits_per_index = bytes_per_index as u64 * 8;
1442
1443 let null_index_arr = null_index_arr.into_data();
1444 let null_index_bytes = &null_index_arr.buffers()[0];
1445 let mut indices_bytes = indices.to_data().buffers()[0].to_vec();
1447 for invalid_idx in (!validity.inner()).set_indices() {
1448 indices_bytes[invalid_idx * bytes_per_index..(invalid_idx + 1) * bytes_per_index]
1449 .copy_from_slice(null_index_bytes.as_slice());
1450 }
1451 FixedWidthDataBlock {
1452 data: LanceBuffer::Owned(indices_bytes),
1453 bits_per_value: bits_per_index,
1454 num_values,
1455 block_info: BlockInfo::new(),
1456 }
1457 } else {
1458 FixedWidthDataBlock {
1459 data: LanceBuffer::Borrowed(indices.to_data().buffers()[0].clone()),
1460 bits_per_value: indices.data_type().byte_width() as u64 * 8,
1461 num_values,
1462 block_info: BlockInfo::new(),
1463 }
1464 };
1465
1466 let items = DataBlock::from(values);
1467 DataBlock::Dictionary(DictionaryDataBlock {
1468 indices: indices_block,
1469 dictionary: Box::new(items),
1470 })
1471}
1472
1473enum Nullability {
1474 None,
1475 All,
1476 Some(NullBuffer),
1477}
1478
1479impl Nullability {
1480 fn to_option(&self) -> Option<NullBuffer> {
1481 match self {
1482 Self::Some(nulls) => Some(nulls.clone()),
1483 _ => None,
1484 }
1485 }
1486}
1487
1488fn extract_nulls(arrays: &[ArrayRef], num_values: u64) -> Nullability {
1489 let mut has_nulls = false;
1490 let nulls_and_lens = arrays
1491 .iter()
1492 .map(|arr| {
1493 let nulls = arr.logical_nulls();
1494 has_nulls |= nulls.is_some();
1495 (nulls, arr.len())
1496 })
1497 .collect::<Vec<_>>();
1498 if !has_nulls {
1499 return Nullability::None;
1500 }
1501 let mut builder = BooleanBufferBuilder::new(num_values as usize);
1502 let mut num_nulls = 0;
1503 for (null, len) in nulls_and_lens {
1504 if let Some(null) = null {
1505 num_nulls += null.null_count();
1506 builder.append_buffer(&null.into_inner());
1507 } else {
1508 builder.append_n(len, true);
1509 }
1510 }
1511 if num_nulls == num_values as usize {
1512 Nullability::All
1513 } else {
1514 Nullability::Some(NullBuffer::new(builder.finish()))
1515 }
1516}
1517
1518impl DataBlock {
1519 pub fn from_arrays(arrays: &[ArrayRef], num_values: u64) -> Self {
1520 if arrays.is_empty() || num_values == 0 {
1521 return Self::AllNull(AllNullDataBlock { num_values: 0 });
1522 }
1523
1524 let data_type = arrays[0].data_type();
1525 let nulls = extract_nulls(arrays, num_values);
1526
1527 if let Nullability::All = nulls {
1528 return Self::AllNull(AllNullDataBlock { num_values });
1529 }
1530
1531 let mut encoded = match data_type {
1532 DataType::Binary | DataType::Utf8 => arrow_binary_to_data_block(arrays, num_values, 32),
1533 DataType::BinaryView | DataType::Utf8View => {
1534 todo!()
1535 }
1536 DataType::LargeBinary | DataType::LargeUtf8 => {
1537 arrow_binary_to_data_block(arrays, num_values, 64)
1538 }
1539 DataType::Boolean => {
1540 let data = encode_bitmap_data(arrays, num_values);
1541 Self::FixedWidth(FixedWidthDataBlock {
1542 data,
1543 bits_per_value: 1,
1544 num_values,
1545 block_info: BlockInfo::new(),
1546 })
1547 }
1548 DataType::Date32
1549 | DataType::Date64
1550 | DataType::Decimal128(_, _)
1551 | DataType::Decimal256(_, _)
1552 | DataType::Duration(_)
1553 | DataType::FixedSizeBinary(_)
1554 | DataType::Float16
1555 | DataType::Float32
1556 | DataType::Float64
1557 | DataType::Int16
1558 | DataType::Int32
1559 | DataType::Int64
1560 | DataType::Int8
1561 | DataType::Interval(_)
1562 | DataType::Time32(_)
1563 | DataType::Time64(_)
1564 | DataType::Timestamp(_, _)
1565 | DataType::UInt16
1566 | DataType::UInt32
1567 | DataType::UInt64
1568 | DataType::UInt8 => {
1569 let data = encode_flat_data(arrays, num_values);
1570 Self::FixedWidth(FixedWidthDataBlock {
1571 data,
1572 bits_per_value: data_type.byte_width() as u64 * 8,
1573 num_values,
1574 block_info: BlockInfo::new(),
1575 })
1576 }
1577 DataType::Null => Self::AllNull(AllNullDataBlock { num_values }),
1578 DataType::Dictionary(_, _) => arrow_dictionary_to_data_block(arrays, nulls.to_option()),
1579 DataType::Struct(fields) => {
1580 let structs = arrays.iter().map(|arr| arr.as_struct()).collect::<Vec<_>>();
1581 let mut children = Vec::with_capacity(fields.len());
1582 for child_idx in 0..fields.len() {
1583 let child_vec = structs
1584 .iter()
1585 .map(|s| s.column(child_idx).clone())
1586 .collect::<Vec<_>>();
1587 children.push(Self::from_arrays(&child_vec, num_values));
1588 }
1589 Self::Struct(StructDataBlock {
1590 children,
1591 block_info: BlockInfo::default(),
1592 })
1593 }
1594 DataType::FixedSizeList(_, dim) => {
1595 let children = arrays
1596 .iter()
1597 .map(|arr| arr.as_fixed_size_list().values().clone())
1598 .collect::<Vec<_>>();
1599 let child_block = Self::from_arrays(&children, num_values * *dim as u64);
1600 Self::FixedSizeList(FixedSizeListBlock {
1601 child: Box::new(child_block),
1602 dimension: *dim as u64,
1603 })
1604 }
1605 DataType::LargeList(_)
1606 | DataType::List(_)
1607 | DataType::ListView(_)
1608 | DataType::LargeListView(_)
1609 | DataType::Map(_, _)
1610 | DataType::RunEndEncoded(_, _)
1611 | DataType::Union(_, _) => {
1612 panic!(
1613 "Field with data type {} cannot be converted to data block",
1614 data_type
1615 )
1616 }
1617 };
1618
1619 encoded.compute_stat();
1621
1622 if !matches!(data_type, DataType::Dictionary(_, _)) {
1623 match nulls {
1624 Nullability::None => encoded,
1625 Nullability::Some(nulls) => Self::Nullable(NullableDataBlock {
1626 data: Box::new(encoded),
1627 nulls: LanceBuffer::Borrowed(nulls.into_inner().into_inner()),
1628 block_info: BlockInfo::new(),
1629 }),
1630 _ => unreachable!(),
1631 }
1632 } else {
1633 encoded
1635 }
1636 }
1637
1638 pub fn from_array<T: Array + 'static>(array: T) -> Self {
1639 let num_values = array.len();
1640 Self::from_arrays(&[Arc::new(array)], num_values as u64)
1641 }
1642}
1643
1644impl From<ArrayRef> for DataBlock {
1645 fn from(array: ArrayRef) -> Self {
1646 let num_values = array.len() as u64;
1647 Self::from_arrays(&[array], num_values)
1648 }
1649}
1650
1651pub trait DataBlockBuilderImpl: std::fmt::Debug {
1652 fn append(&mut self, data_block: &DataBlock, selection: Range<u64>);
1653 fn finish(self: Box<Self>) -> DataBlock;
1654}
1655
1656#[derive(Debug)]
1657pub struct DataBlockBuilder {
1658 estimated_size_bytes: u64,
1659 builder: Option<Box<dyn DataBlockBuilderImpl>>,
1660}
1661
1662impl DataBlockBuilder {
1663 pub fn with_capacity_estimate(estimated_size_bytes: u64) -> Self {
1664 Self {
1665 estimated_size_bytes,
1666 builder: None,
1667 }
1668 }
1669
1670 fn get_builder(&mut self, block: &DataBlock) -> &mut dyn DataBlockBuilderImpl {
1671 if self.builder.is_none() {
1672 self.builder = Some(block.make_builder(self.estimated_size_bytes));
1673 }
1674 self.builder.as_mut().unwrap().as_mut()
1675 }
1676
1677 pub fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
1678 self.get_builder(data_block).append(data_block, selection);
1679 }
1680
1681 pub fn finish(self) -> DataBlock {
1682 let builder = self.builder.expect("DataBlockBuilder didn't see any data");
1683 builder.finish()
1684 }
1685}
1686
1687#[cfg(test)]
1688mod tests {
1689 use std::sync::Arc;
1690
1691 use arrow::datatypes::{Int32Type, Int8Type};
1692 use arrow_array::{
1693 make_array, new_null_array, ArrayRef, DictionaryArray, Int8Array, LargeBinaryArray,
1694 StringArray, UInt16Array, UInt8Array,
1695 };
1696 use arrow_buffer::{BooleanBuffer, NullBuffer};
1697
1698 use arrow_schema::{DataType, Field, Fields};
1699 use lance_datagen::{array, ArrayGeneratorExt, RowCount, DEFAULT_SEED};
1700 use rand::SeedableRng;
1701
1702 use crate::buffer::LanceBuffer;
1703
1704 use super::{AllNullDataBlock, DataBlock};
1705
1706 use arrow::compute::concat;
1707 use arrow_array::Array;
1708
1709 #[test]
1710 fn test_sliced_to_data_block() {
1711 let ints = UInt16Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
1712 let ints = ints.slice(2, 4);
1713 let data = DataBlock::from_array(ints);
1714
1715 let fixed_data = data.as_fixed_width().unwrap();
1716 assert_eq!(fixed_data.num_values, 4);
1717 assert_eq!(fixed_data.data.len(), 8);
1718
1719 let nullable_ints =
1720 UInt16Array::from(vec![Some(0), None, Some(2), None, Some(4), None, Some(6)]);
1721 let nullable_ints = nullable_ints.slice(1, 3);
1722 let data = DataBlock::from_array(nullable_ints);
1723
1724 let nullable = data.as_nullable().unwrap();
1725 assert_eq!(nullable.nulls, LanceBuffer::Owned(vec![0b00000010]));
1726 }
1727
1728 #[test]
1729 fn test_string_to_data_block() {
1730 let strings1 = StringArray::from(vec![Some("hello"), None, Some("world")]);
1732 let strings2 = StringArray::from(vec![Some("a"), Some("b")]);
1733 let strings3 = StringArray::from(vec![Option::<&'static str>::None, None]);
1734
1735 let arrays = &[strings1, strings2, strings3]
1736 .iter()
1737 .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1738 .collect::<Vec<_>>();
1739
1740 let block = DataBlock::from_arrays(arrays, 7);
1741
1742 assert_eq!(block.num_values(), 7);
1743 let block = block.as_nullable().unwrap();
1744
1745 assert_eq!(block.nulls, LanceBuffer::Owned(vec![0b00011101]));
1746
1747 let data = block.data.as_variable_width().unwrap();
1748 assert_eq!(
1749 data.offsets,
1750 LanceBuffer::reinterpret_vec(vec![0, 5, 5, 10, 11, 12, 12, 12])
1751 );
1752
1753 assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworldab"));
1754
1755 let strings1 = StringArray::from(vec![Some("a"), Some("bc")]);
1757 let strings2 = StringArray::from(vec![Some("def")]);
1758
1759 let arrays = &[strings1, strings2]
1760 .iter()
1761 .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1762 .collect::<Vec<_>>();
1763
1764 let block = DataBlock::from_arrays(arrays, 3);
1765
1766 assert_eq!(block.num_values(), 3);
1767 let data = block.as_variable_width().unwrap();
1769 assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(vec![0, 1, 3, 6]));
1770 assert_eq!(data.data, LanceBuffer::copy_slice(b"abcdef"));
1771 }
1772
1773 #[test]
1774 fn test_string_sliced() {
1775 let check = |arr: Vec<StringArray>, expected_off: Vec<i32>, expected_data: &[u8]| {
1776 let arrs = arr
1777 .into_iter()
1778 .map(|a| Arc::new(a) as ArrayRef)
1779 .collect::<Vec<_>>();
1780 let num_rows = arrs.iter().map(|a| a.len()).sum::<usize>() as u64;
1781 let data = DataBlock::from_arrays(&arrs, num_rows);
1782
1783 assert_eq!(data.num_values(), num_rows);
1784
1785 let data = data.as_variable_width().unwrap();
1786 assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(expected_off));
1787 assert_eq!(data.data, LanceBuffer::copy_slice(expected_data));
1788 };
1789
1790 let string = StringArray::from(vec![Some("hello"), Some("world")]);
1791 check(vec![string.slice(1, 1)], vec![0, 5], b"world");
1792 check(vec![string.slice(0, 1)], vec![0, 5], b"hello");
1793 check(
1794 vec![string.slice(0, 1), string.slice(1, 1)],
1795 vec![0, 5, 10],
1796 b"helloworld",
1797 );
1798
1799 let string2 = StringArray::from(vec![Some("foo"), Some("bar")]);
1800 check(
1801 vec![string.slice(0, 1), string2.slice(0, 1)],
1802 vec![0, 5, 8],
1803 b"hellofoo",
1804 );
1805 }
1806
1807 #[test]
1808 fn test_large() {
1809 let arr = LargeBinaryArray::from_vec(vec![b"hello", b"world"]);
1810 let data = DataBlock::from_array(arr);
1811
1812 assert_eq!(data.num_values(), 2);
1813 let data = data.as_variable_width().unwrap();
1814 assert_eq!(data.bits_per_offset, 64);
1815 assert_eq!(data.num_values, 2);
1816 assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworld"));
1817 assert_eq!(
1818 data.offsets,
1819 LanceBuffer::reinterpret_vec(vec![0_u64, 5, 10])
1820 );
1821 }
1822
1823 #[test]
1824 fn test_dictionary_indices_normalized() {
1825 let arr1 = DictionaryArray::<Int8Type>::from_iter([Some("a"), Some("a"), Some("b")]);
1826 let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("b"), Some("c")]);
1827
1828 let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1829
1830 assert_eq!(data.num_values(), 5);
1831 let data = data.as_dictionary().unwrap();
1832 let indices = data.indices;
1833 assert_eq!(indices.bits_per_value, 8);
1834 assert_eq!(indices.num_values, 5);
1835 assert_eq!(
1836 indices.data,
1837 LanceBuffer::reinterpret_vec::<i8>(vec![0, 0, 1, 2, 3])
1841 );
1842
1843 let items = data.dictionary.as_variable_width().unwrap();
1844 assert_eq!(items.bits_per_offset, 32);
1845 assert_eq!(items.num_values, 4);
1846 assert_eq!(items.data, LanceBuffer::copy_slice(b"abbc"));
1847 assert_eq!(
1848 items.offsets,
1849 LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 4],)
1850 );
1851 }
1852
1853 #[test]
1854 fn test_dictionary_nulls() {
1855 let arr1 = DictionaryArray::<Int8Type>::from_iter([None, Some("a"), Some("b")]);
1859 let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("c"), None]);
1860
1861 let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1862
1863 let check_common = |data: DataBlock| {
1864 assert_eq!(data.num_values(), 5);
1865 let dict = data.as_dictionary().unwrap();
1866
1867 let nullable_items = dict.dictionary.as_nullable().unwrap();
1868 assert_eq!(nullable_items.nulls, LanceBuffer::Owned(vec![0b00000111]));
1869 assert_eq!(nullable_items.data.num_values(), 4);
1870
1871 let items = nullable_items.data.as_variable_width().unwrap();
1872 assert_eq!(items.bits_per_offset, 32);
1873 assert_eq!(items.num_values, 4);
1874 assert_eq!(items.data, LanceBuffer::copy_slice(b"abc"));
1875 assert_eq!(
1876 items.offsets,
1877 LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 3],)
1878 );
1879
1880 let indices = dict.indices;
1881 assert_eq!(indices.bits_per_value, 8);
1882 assert_eq!(indices.num_values, 5);
1883 assert_eq!(
1884 indices.data,
1885 LanceBuffer::reinterpret_vec::<i8>(vec![3, 0, 1, 2, 3])
1886 );
1887 };
1888 check_common(data);
1889
1890 let items = StringArray::from(vec![Some("a"), Some("b"), Some("c"), None]);
1892 let indices = Int8Array::from(vec![Some(3), Some(0), Some(1), Some(2), Some(3)]);
1893 let dict = DictionaryArray::new(indices, Arc::new(items));
1894
1895 let data = DataBlock::from_array(dict);
1896
1897 check_common(data);
1898 }
1899
1900 #[test]
1901 fn test_dictionary_cannot_add_null() {
1902 let items = StringArray::from(
1904 (0..256)
1905 .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1906 .collect::<Vec<_>>(),
1907 );
1908 let indices = UInt8Array::from(
1910 (0..=256)
1911 .map(|i| if i == 256 { None } else { Some(i as u8) })
1912 .collect::<Vec<_>>(),
1913 );
1914 let dict = DictionaryArray::new(indices, Arc::new(items));
1917 let data = DataBlock::from_array(dict);
1918
1919 assert_eq!(data.num_values(), 257);
1920
1921 let dict = data.as_dictionary().unwrap();
1922
1923 assert_eq!(dict.indices.bits_per_value, 32);
1924 assert_eq!(
1925 dict.indices.data,
1926 LanceBuffer::reinterpret_vec((0_u32..257).collect::<Vec<_>>())
1927 );
1928
1929 let nullable_items = dict.dictionary.as_nullable().unwrap();
1930 let null_buffer = NullBuffer::new(BooleanBuffer::new(
1931 nullable_items.nulls.into_buffer(),
1932 0,
1933 257,
1934 ));
1935 for i in 0..256 {
1936 assert!(!null_buffer.is_null(i));
1937 }
1938 assert!(null_buffer.is_null(256));
1939
1940 assert_eq!(
1941 nullable_items.data.as_variable_width().unwrap().data.len(),
1942 32640
1943 );
1944 }
1945
1946 #[test]
1947 fn test_all_null() {
1948 for data_type in [
1949 DataType::UInt32,
1950 DataType::FixedSizeBinary(2),
1951 DataType::List(Arc::new(Field::new("item", DataType::UInt32, true))),
1952 DataType::Struct(Fields::from(vec![Field::new("a", DataType::UInt32, true)])),
1953 ] {
1954 let block = DataBlock::AllNull(AllNullDataBlock { num_values: 10 });
1955 let arr = block.into_arrow(data_type.clone(), true).unwrap();
1956 let arr = make_array(arr);
1957 let expected = new_null_array(&data_type, 10);
1958 assert_eq!(&arr, &expected);
1959 }
1960 }
1961
1962 #[test]
1963 fn test_dictionary_cannot_concatenate() {
1964 let items = StringArray::from(
1966 (0..256)
1967 .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1968 .collect::<Vec<_>>(),
1969 );
1970 let other_items = StringArray::from(
1972 (0..256)
1973 .map(|i| Some(String::from_utf8(vec![1; i + 1]).unwrap()))
1974 .collect::<Vec<_>>(),
1975 );
1976 let indices = UInt8Array::from_iter_values(0..=255);
1977 let dict1 = DictionaryArray::new(indices.clone(), Arc::new(items));
1978 let dict2 = DictionaryArray::new(indices, Arc::new(other_items));
1979 let data = DataBlock::from_arrays(&[Arc::new(dict1), Arc::new(dict2)], 512);
1980 assert_eq!(data.num_values(), 512);
1981
1982 let dict = data.as_dictionary().unwrap();
1983
1984 assert_eq!(dict.indices.bits_per_value, 32);
1985 assert_eq!(
1986 dict.indices.data,
1987 LanceBuffer::reinterpret_vec::<u32>((0..512).collect::<Vec<_>>())
1988 );
1989 assert_eq!(
1991 dict.dictionary.as_variable_width().unwrap().data.len(),
1992 65536
1993 );
1994 }
1995
1996 #[test]
1997 fn test_data_size() {
1998 let mut rng = rand_xoshiro::Xoshiro256PlusPlus::seed_from_u64(DEFAULT_SEED.0);
1999 let mut gen = array::rand::<Int32Type>().with_nulls(&[false, false, false]);
2001
2002 let arr = gen.generate(RowCount::from(3), &mut rng).unwrap();
2003 let block = DataBlock::from_array(arr.clone());
2004 assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
2005
2006 let arr = gen.generate(RowCount::from(400), &mut rng).unwrap();
2007 let block = DataBlock::from_array(arr.clone());
2008 assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
2009
2010 let mut gen = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
2012 let arr = gen.generate(RowCount::from(3), &mut rng).unwrap();
2013 let block = DataBlock::from_array(arr.clone());
2014
2015 let array_data = arr.to_data();
2016 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
2017 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
2019 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
2020
2021 let arr = gen.generate(RowCount::from(400), &mut rng).unwrap();
2022 let block = DataBlock::from_array(arr.clone());
2023
2024 let array_data = arr.to_data();
2025 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
2026 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
2027 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
2028
2029 let mut gen = array::rand::<Int32Type>().with_nulls(&[true, true, false]);
2030 let arr = gen.generate(RowCount::from(3), &mut rng).unwrap();
2031 let block = DataBlock::from_array(arr.clone());
2032
2033 let array_data = arr.to_data();
2034 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
2035 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
2036 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
2037
2038 let arr = gen.generate(RowCount::from(400), &mut rng).unwrap();
2039 let block = DataBlock::from_array(arr.clone());
2040
2041 let array_data = arr.to_data();
2042 let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
2043 let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
2044 assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
2045
2046 let mut gen = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
2047 let arr1 = gen.generate(RowCount::from(3), &mut rng).unwrap();
2048 let arr2 = gen.generate(RowCount::from(3), &mut rng).unwrap();
2049 let arr3 = gen.generate(RowCount::from(3), &mut rng).unwrap();
2050 let block = DataBlock::from_arrays(&[arr1.clone(), arr2.clone(), arr3.clone()], 9);
2051
2052 let concatenated_array = concat(&[
2053 &*Arc::new(arr1.clone()) as &dyn Array,
2054 &*Arc::new(arr2.clone()) as &dyn Array,
2055 &*Arc::new(arr3.clone()) as &dyn Array,
2056 ])
2057 .unwrap();
2058 let total_buffer_size: usize = concatenated_array
2059 .to_data()
2060 .buffers()
2061 .iter()
2062 .map(|buffer| buffer.len())
2063 .sum();
2064
2065 let total_nulls_size_in_bytes = concatenated_array.nulls().unwrap().len().div_ceil(8);
2066 assert!(block.data_size() == (total_buffer_size + total_nulls_size_in_bytes) as u64);
2067 }
2068}