lance_encoding/
data.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Data layouts to represent encoded data in a sub-Arrow format
5//!
6//! These [`DataBlock`] structures represent physical layouts.  They fill a gap somewhere
7//! between [`arrow_data::data::ArrayData`] (which, as a collection of buffers, is too
8//! generic because it doesn't give us enough information about what those buffers represent)
9//! and [`arrow_array::array::Array`] (which is too specific, because it cares about the
10//! logical data type).
11//!
12//! In addition, the layouts represented here are slightly stricter than Arrow's layout rules.
13//! For example, offset buffers MUST start with 0.  These additional restrictions impose a
14//! slight penalty on encode (to normalize arrow data) but make the development of encoders
15//! and decoders easier (since they can rely on a normalized representation)
16
17use std::{
18    ops::Range,
19    sync::{Arc, RwLock},
20};
21
22use arrow_array::{
23    cast::AsArray, new_empty_array, new_null_array, Array, ArrayRef, OffsetSizeTrait, UInt64Array,
24};
25use arrow_buffer::{
26    ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer, ScalarBuffer,
27};
28use arrow_data::{ArrayData, ArrayDataBuilder};
29use arrow_schema::DataType;
30use lance_arrow::DataTypeExt;
31use snafu::location;
32
33use lance_core::{Error, Result};
34
35use crate::{
36    buffer::LanceBuffer,
37    statistics::{ComputeStat, Stat},
38};
39
40/// A data block with no buffers where everything is null
41///
42/// Note: this data block should not be used for future work.  It will be deprecated
43/// in the 2.1 version of the format where nullability will be handled by the structural
44/// encoders.
45#[derive(Debug, Clone)]
46pub struct AllNullDataBlock {
47    /// The number of values represented by this block
48    pub num_values: u64,
49}
50
51impl AllNullDataBlock {
52    fn into_arrow(self, data_type: DataType, _validate: bool) -> Result<ArrayData> {
53        Ok(ArrayData::new_null(&data_type, self.num_values as usize))
54    }
55
56    fn into_buffers(self) -> Vec<LanceBuffer> {
57        vec![]
58    }
59}
60
61use std::collections::HashMap;
62
63// `BlockInfo` stores the statistics of this `DataBlock`, such as `NullCount` for `NullableDataBlock`,
64// `BitWidth` for `FixedWidthDataBlock`, `Cardinality` for all `DataBlock`
65#[derive(Debug, Clone)]
66pub struct BlockInfo(pub Arc<RwLock<HashMap<Stat, Arc<dyn Array>>>>);
67
68impl Default for BlockInfo {
69    fn default() -> Self {
70        Self::new()
71    }
72}
73
74impl BlockInfo {
75    pub fn new() -> Self {
76        Self(Arc::new(RwLock::new(HashMap::new())))
77    }
78}
79
80impl PartialEq for BlockInfo {
81    fn eq(&self, other: &Self) -> bool {
82        let self_info = self.0.read().unwrap();
83        let other_info = other.0.read().unwrap();
84        *self_info == *other_info
85    }
86}
87
88/// Wraps a data block and adds nullability information to it
89///
90/// Note: this data block should not be used for future work.  It will be deprecated
91/// in the 2.1 version of the format where nullability will be handled by the structural
92/// encoders.
93#[derive(Debug, Clone)]
94pub struct NullableDataBlock {
95    /// The underlying data
96    pub data: Box<DataBlock>,
97    /// A bitmap of validity for each value
98    pub nulls: LanceBuffer,
99
100    pub block_info: BlockInfo,
101}
102
103impl NullableDataBlock {
104    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
105        let nulls = self.nulls.into_buffer();
106        let data = self.data.into_arrow(data_type, validate)?.into_builder();
107        let data = data.null_bit_buffer(Some(nulls));
108        if validate {
109            Ok(data.build()?)
110        } else {
111            Ok(unsafe { data.build_unchecked() })
112        }
113    }
114
115    fn into_buffers(self) -> Vec<LanceBuffer> {
116        let mut buffers = vec![self.nulls];
117        buffers.extend(self.data.into_buffers());
118        buffers
119    }
120
121    pub fn data_size(&self) -> u64 {
122        self.data.data_size() + self.nulls.len() as u64
123    }
124}
125
126/// A block representing the same constant value repeated many times
127#[derive(Debug, PartialEq, Clone)]
128pub struct ConstantDataBlock {
129    /// Data buffer containing the value
130    pub data: LanceBuffer,
131    /// The number of values
132    pub num_values: u64,
133}
134
135impl ConstantDataBlock {
136    fn into_buffers(self) -> Vec<LanceBuffer> {
137        vec![self.data]
138    }
139
140    fn into_arrow(self, _data_type: DataType, _validate: bool) -> Result<ArrayData> {
141        // We don't need this yet but if we come up with some way of serializing
142        // scalars to/from bytes then we could implement it.
143        todo!()
144    }
145
146    pub fn try_clone(&self) -> Result<Self> {
147        Ok(Self {
148            data: self.data.clone(),
149            num_values: self.num_values,
150        })
151    }
152
153    pub fn data_size(&self) -> u64 {
154        self.data.len() as u64
155    }
156}
157
158/// A data block for a single buffer of data where each element has a fixed number of bits
159#[derive(Debug, PartialEq, Clone)]
160pub struct FixedWidthDataBlock {
161    /// The data buffer
162    pub data: LanceBuffer,
163    /// The number of bits per value
164    pub bits_per_value: u64,
165    /// The number of values represented by this block
166    pub num_values: u64,
167
168    pub block_info: BlockInfo,
169}
170
171impl FixedWidthDataBlock {
172    fn do_into_arrow(
173        self,
174        data_type: DataType,
175        num_values: u64,
176        validate: bool,
177    ) -> Result<ArrayData> {
178        let data_buffer = self.data.into_buffer();
179        let builder = ArrayDataBuilder::new(data_type)
180            .add_buffer(data_buffer)
181            .len(num_values as usize)
182            .null_count(0);
183        if validate {
184            Ok(builder.build()?)
185        } else {
186            Ok(unsafe { builder.build_unchecked() })
187        }
188    }
189
190    pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
191        let root_num_values = self.num_values;
192        self.do_into_arrow(data_type, root_num_values, validate)
193    }
194
195    pub fn into_buffers(self) -> Vec<LanceBuffer> {
196        vec![self.data]
197    }
198
199    pub fn try_clone(&self) -> Result<Self> {
200        Ok(Self {
201            data: self.data.clone(),
202            bits_per_value: self.bits_per_value,
203            num_values: self.num_values,
204            block_info: self.block_info.clone(),
205        })
206    }
207
208    pub fn data_size(&self) -> u64 {
209        self.data.len() as u64
210    }
211}
212
213#[derive(Debug)]
214pub struct VariableWidthDataBlockBuilder<T: OffsetSizeTrait> {
215    offsets: Vec<T>,
216    bytes: Vec<u8>,
217}
218
219impl<T: OffsetSizeTrait> VariableWidthDataBlockBuilder<T> {
220    fn new(estimated_size_bytes: u64) -> Self {
221        Self {
222            offsets: vec![T::from_usize(0).unwrap()],
223            bytes: Vec::with_capacity(estimated_size_bytes as usize),
224        }
225    }
226}
227
228impl<T: OffsetSizeTrait> DataBlockBuilderImpl for VariableWidthDataBlockBuilder<T> {
229    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
230        let block = data_block.as_variable_width_ref().unwrap();
231        assert!(block.bits_per_offset == T::get_byte_width() as u8 * 8);
232
233        let offsets: ScalarBuffer<T> = block.offsets.clone().borrow_to_typed_slice();
234
235        let start_offset = offsets[selection.start as usize];
236        let end_offset = offsets[selection.end as usize];
237        let mut previous_len = self.bytes.len();
238
239        self.bytes
240            .extend_from_slice(&block.data[start_offset.as_usize()..end_offset.as_usize()]);
241
242        self.offsets.extend(
243            offsets[selection.start as usize..selection.end as usize]
244                .iter()
245                .zip(&offsets[selection.start as usize + 1..=selection.end as usize])
246                .map(|(&current, &next)| {
247                    let this_value_len = next - current;
248                    previous_len += this_value_len.as_usize();
249                    T::from_usize(previous_len).unwrap()
250                }),
251        );
252    }
253
254    fn finish(self: Box<Self>) -> DataBlock {
255        let num_values = (self.offsets.len() - 1) as u64;
256        DataBlock::VariableWidth(VariableWidthBlock {
257            data: LanceBuffer::from(self.bytes),
258            offsets: LanceBuffer::reinterpret_vec(self.offsets),
259            bits_per_offset: T::get_byte_width() as u8 * 8,
260            num_values,
261            block_info: BlockInfo::new(),
262        })
263    }
264}
265
266#[derive(Debug)]
267struct BitmapDataBlockBuilder {
268    values: BooleanBufferBuilder,
269}
270
271impl BitmapDataBlockBuilder {
272    fn new(estimated_size_bytes: u64) -> Self {
273        Self {
274            values: BooleanBufferBuilder::new(estimated_size_bytes as usize * 8),
275        }
276    }
277}
278
279impl DataBlockBuilderImpl for BitmapDataBlockBuilder {
280    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
281        let bitmap_blk = data_block.as_fixed_width_ref().unwrap();
282        self.values.append_packed_range(
283            selection.start as usize..selection.end as usize,
284            &bitmap_blk.data,
285        );
286    }
287
288    fn finish(mut self: Box<Self>) -> DataBlock {
289        let bool_buf = self.values.finish();
290        let num_values = bool_buf.len() as u64;
291        let bits_buf = bool_buf.into_inner();
292        DataBlock::FixedWidth(FixedWidthDataBlock {
293            data: LanceBuffer::from(bits_buf),
294            bits_per_value: 1,
295            num_values,
296            block_info: BlockInfo::new(),
297        })
298    }
299}
300
301#[derive(Debug)]
302struct FixedWidthDataBlockBuilder {
303    bits_per_value: u64,
304    bytes_per_value: u64,
305    values: Vec<u8>,
306}
307
308impl FixedWidthDataBlockBuilder {
309    fn new(bits_per_value: u64, estimated_size_bytes: u64) -> Self {
310        assert!(bits_per_value % 8 == 0);
311        Self {
312            bits_per_value,
313            bytes_per_value: bits_per_value / 8,
314            values: Vec::with_capacity(estimated_size_bytes as usize),
315        }
316    }
317}
318
319impl DataBlockBuilderImpl for FixedWidthDataBlockBuilder {
320    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
321        let block = data_block.as_fixed_width_ref().unwrap();
322        assert_eq!(self.bits_per_value, block.bits_per_value);
323        let start = selection.start as usize * self.bytes_per_value as usize;
324        let end = selection.end as usize * self.bytes_per_value as usize;
325        self.values.extend_from_slice(&block.data[start..end]);
326    }
327
328    fn finish(self: Box<Self>) -> DataBlock {
329        let num_values = (self.values.len() / self.bytes_per_value as usize) as u64;
330        DataBlock::FixedWidth(FixedWidthDataBlock {
331            data: LanceBuffer::from(self.values),
332            bits_per_value: self.bits_per_value,
333            num_values,
334            block_info: BlockInfo::new(),
335        })
336    }
337}
338
339#[derive(Debug)]
340struct StructDataBlockBuilder {
341    children: Vec<Box<dyn DataBlockBuilderImpl>>,
342}
343
344impl StructDataBlockBuilder {
345    // Currently only Struct with fixed-width fields are supported.
346    // And the assumption that all fields have `bits_per_value % 8 == 0` is made here.
347    fn new(bits_per_values: Vec<u32>, estimated_size_bytes: u64) -> Self {
348        let mut children = vec![];
349
350        debug_assert!(bits_per_values.iter().all(|bpv| bpv % 8 == 0));
351
352        let bytes_per_row: u32 = bits_per_values.iter().sum::<u32>() / 8;
353        let bytes_per_row = bytes_per_row as u64;
354
355        for bits_per_value in bits_per_values.iter() {
356            let this_estimated_size_bytes =
357                estimated_size_bytes / bytes_per_row * (*bits_per_value as u64) / 8;
358            let child =
359                FixedWidthDataBlockBuilder::new(*bits_per_value as u64, this_estimated_size_bytes);
360            children.push(Box::new(child) as Box<dyn DataBlockBuilderImpl>);
361        }
362        Self { children }
363    }
364}
365
366impl DataBlockBuilderImpl for StructDataBlockBuilder {
367    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
368        let data_block = data_block.as_struct_ref().unwrap();
369        for i in 0..self.children.len() {
370            self.children[i].append(&data_block.children[i], selection.clone());
371        }
372    }
373
374    fn finish(self: Box<Self>) -> DataBlock {
375        let mut children_data_block = Vec::new();
376        for child in self.children {
377            let child_data_block = child.finish();
378            children_data_block.push(child_data_block);
379        }
380        DataBlock::Struct(StructDataBlock {
381            children: children_data_block,
382            block_info: BlockInfo::new(),
383            validity: None,
384        })
385    }
386}
387/// A data block to represent a fixed size list
388#[derive(Debug, Clone)]
389pub struct FixedSizeListBlock {
390    /// The child data block
391    pub child: Box<DataBlock>,
392    /// The number of items in each list
393    pub dimension: u64,
394}
395
396impl FixedSizeListBlock {
397    pub fn num_values(&self) -> u64 {
398        self.child.num_values() / self.dimension
399    }
400
401    /// Try to flatten a FixedSizeListBlock into a FixedWidthDataBlock
402    ///
403    /// Returns None if any children are nullable
404    pub fn try_into_flat(self) -> Option<FixedWidthDataBlock> {
405        match *self.child {
406            // Cannot flatten a nullable child
407            DataBlock::Nullable(_) => None,
408            DataBlock::FixedSizeList(inner) => {
409                let mut flat = inner.try_into_flat()?;
410                flat.bits_per_value *= self.dimension;
411                flat.num_values /= self.dimension;
412                Some(flat)
413            }
414            DataBlock::FixedWidth(mut inner) => {
415                inner.bits_per_value *= self.dimension;
416                inner.num_values /= self.dimension;
417                Some(inner)
418            }
419            _ => panic!(
420                "Expected FixedSizeList or FixedWidth data block but found {:?}",
421                self
422            ),
423        }
424    }
425
426    pub fn flatten_as_fixed(&mut self) -> FixedWidthDataBlock {
427        match self.child.as_mut() {
428            DataBlock::FixedSizeList(fsl) => fsl.flatten_as_fixed(),
429            DataBlock::FixedWidth(fw) => fw.clone(),
430            _ => panic!("Expected FixedSizeList or FixedWidth data block"),
431        }
432    }
433
434    /// Convert a flattened values block into a FixedSizeListBlock
435    pub fn from_flat(data: FixedWidthDataBlock, data_type: &DataType) -> DataBlock {
436        match data_type {
437            DataType::FixedSizeList(child_field, dimension) => {
438                let mut data = data;
439                data.bits_per_value /= *dimension as u64;
440                data.num_values *= *dimension as u64;
441                let child_data = Self::from_flat(data, child_field.data_type());
442                DataBlock::FixedSizeList(Self {
443                    child: Box::new(child_data),
444                    dimension: *dimension as u64,
445                })
446            }
447            // Base case, we've hit a non-list type
448            _ => DataBlock::FixedWidth(data),
449        }
450    }
451
452    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
453        let num_values = self.num_values();
454        let builder = match &data_type {
455            DataType::FixedSizeList(child_field, _) => {
456                let child_data = self
457                    .child
458                    .into_arrow(child_field.data_type().clone(), validate)?;
459                ArrayDataBuilder::new(data_type)
460                    .add_child_data(child_data)
461                    .len(num_values as usize)
462                    .null_count(0)
463            }
464            _ => panic!("Expected FixedSizeList data type and got {:?}", data_type),
465        };
466        if validate {
467            Ok(builder.build()?)
468        } else {
469            Ok(unsafe { builder.build_unchecked() })
470        }
471    }
472
473    fn into_buffers(self) -> Vec<LanceBuffer> {
474        self.child.into_buffers()
475    }
476
477    fn data_size(&self) -> u64 {
478        self.child.data_size()
479    }
480}
481
482#[derive(Debug)]
483struct FixedSizeListBlockBuilder {
484    inner: Box<dyn DataBlockBuilderImpl>,
485    dimension: u64,
486}
487
488impl FixedSizeListBlockBuilder {
489    fn new(inner: Box<dyn DataBlockBuilderImpl>, dimension: u64) -> Self {
490        Self { inner, dimension }
491    }
492}
493
494impl DataBlockBuilderImpl for FixedSizeListBlockBuilder {
495    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
496        let selection = selection.start * self.dimension..selection.end * self.dimension;
497        let fsl = data_block.as_fixed_size_list_ref().unwrap();
498        self.inner.append(fsl.child.as_ref(), selection);
499    }
500
501    fn finish(self: Box<Self>) -> DataBlock {
502        let inner_block = self.inner.finish();
503        DataBlock::FixedSizeList(FixedSizeListBlock {
504            child: Box::new(inner_block),
505            dimension: self.dimension,
506        })
507    }
508}
509
510#[derive(Debug)]
511struct NullableDataBlockBuilder {
512    inner: Box<dyn DataBlockBuilderImpl>,
513    validity: BooleanBufferBuilder,
514}
515
516impl NullableDataBlockBuilder {
517    fn new(inner: Box<dyn DataBlockBuilderImpl>, estimated_size_bytes: usize) -> Self {
518        Self {
519            inner,
520            validity: BooleanBufferBuilder::new(estimated_size_bytes * 8),
521        }
522    }
523}
524
525impl DataBlockBuilderImpl for NullableDataBlockBuilder {
526    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
527        let nullable = data_block.as_nullable_ref().unwrap();
528        let bool_buf = BooleanBuffer::new(
529            nullable.nulls.clone().into_buffer(),
530            selection.start as usize,
531            (selection.end - selection.start) as usize,
532        );
533        self.validity.append_buffer(&bool_buf);
534        self.inner.append(nullable.data.as_ref(), selection);
535    }
536
537    fn finish(mut self: Box<Self>) -> DataBlock {
538        let inner_block = self.inner.finish();
539        DataBlock::Nullable(NullableDataBlock {
540            data: Box::new(inner_block),
541            nulls: LanceBuffer::from(self.validity.finish().into_inner()),
542            block_info: BlockInfo::new(),
543        })
544    }
545}
546
547/// A data block with no regular structure.  There is no available spot to attach
548/// validity / repdef information and it cannot be converted to Arrow without being
549/// decoded
550#[derive(Debug, Clone)]
551pub struct OpaqueBlock {
552    pub buffers: Vec<LanceBuffer>,
553    pub num_values: u64,
554    pub block_info: BlockInfo,
555}
556
557impl OpaqueBlock {
558    pub fn data_size(&self) -> u64 {
559        self.buffers.iter().map(|b| b.len() as u64).sum()
560    }
561}
562
563/// A data block for variable-width data (e.g. strings, packed rows, etc.)
564#[derive(Debug, Clone)]
565pub struct VariableWidthBlock {
566    /// The data buffer
567    pub data: LanceBuffer,
568    /// The offsets buffer (contains num_values + 1 offsets)
569    ///
570    /// Offsets MUST start at 0
571    pub offsets: LanceBuffer,
572    /// The number of bits per offset
573    pub bits_per_offset: u8,
574    /// The number of values represented by this block
575    pub num_values: u64,
576
577    pub block_info: BlockInfo,
578}
579
580impl VariableWidthBlock {
581    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
582        let data_buffer = self.data.into_buffer();
583        let offsets_buffer = self.offsets.into_buffer();
584        let builder = ArrayDataBuilder::new(data_type)
585            .add_buffer(offsets_buffer)
586            .add_buffer(data_buffer)
587            .len(self.num_values as usize)
588            .null_count(0);
589        if validate {
590            Ok(builder.build()?)
591        } else {
592            Ok(unsafe { builder.build_unchecked() })
593        }
594    }
595
596    fn into_buffers(self) -> Vec<LanceBuffer> {
597        vec![self.offsets, self.data]
598    }
599
600    pub fn offsets_as_block(&mut self) -> DataBlock {
601        let offsets = self.offsets.clone();
602        DataBlock::FixedWidth(FixedWidthDataBlock {
603            data: offsets,
604            bits_per_value: self.bits_per_offset as u64,
605            num_values: self.num_values + 1,
606            block_info: BlockInfo::new(),
607        })
608    }
609
610    pub fn data_size(&self) -> u64 {
611        (self.data.len() + self.offsets.len()) as u64
612    }
613}
614
615/// A data block representing a struct
616#[derive(Debug, Clone)]
617pub struct StructDataBlock {
618    /// The child arrays
619    pub children: Vec<DataBlock>,
620    pub block_info: BlockInfo,
621    /// The validity bitmap for the struct (None means all valid)
622    pub validity: Option<NullBuffer>,
623}
624
625impl StructDataBlock {
626    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
627        if let DataType::Struct(fields) = &data_type {
628            let mut builder = ArrayDataBuilder::new(DataType::Struct(fields.clone()));
629            let mut num_rows = 0;
630            for (field, child) in fields.iter().zip(self.children) {
631                let child_data = child.into_arrow(field.data_type().clone(), validate)?;
632                num_rows = child_data.len();
633                builder = builder.add_child_data(child_data);
634            }
635
636            // Apply validity if present
637            let builder = if let Some(validity) = self.validity {
638                let null_count = validity.null_count();
639                builder
640                    .null_bit_buffer(Some(validity.into_inner().into_inner()))
641                    .null_count(null_count)
642            } else {
643                builder.null_count(0)
644            };
645
646            let builder = builder.len(num_rows);
647            if validate {
648                Ok(builder.build()?)
649            } else {
650                Ok(unsafe { builder.build_unchecked() })
651            }
652        } else {
653            Err(Error::Internal {
654                message: format!("Expected Struct, got {:?}", data_type),
655                location: location!(),
656            })
657        }
658    }
659
660    fn remove_outer_validity(self) -> Self {
661        Self {
662            children: self
663                .children
664                .into_iter()
665                .map(|c| c.remove_outer_validity())
666                .collect(),
667            block_info: self.block_info,
668            validity: None, // Remove the validity
669        }
670    }
671
672    fn into_buffers(self) -> Vec<LanceBuffer> {
673        self.children
674            .into_iter()
675            .flat_map(|c| c.into_buffers())
676            .collect()
677    }
678
679    pub fn data_size(&self) -> u64 {
680        self.children
681            .iter()
682            .map(|data_block| data_block.data_size())
683            .sum()
684    }
685}
686
687/// A data block for dictionary encoded data
688#[derive(Debug, Clone)]
689pub struct DictionaryDataBlock {
690    /// The indices buffer
691    pub indices: FixedWidthDataBlock,
692    /// The dictionary itself
693    pub dictionary: Box<DataBlock>,
694}
695
696impl DictionaryDataBlock {
697    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
698        let (key_type, value_type) = if let DataType::Dictionary(key_type, value_type) = &data_type
699        {
700            (key_type.as_ref().clone(), value_type.as_ref().clone())
701        } else {
702            return Err(Error::Internal {
703                message: format!("Expected Dictionary, got {:?}", data_type),
704                location: location!(),
705            });
706        };
707
708        let indices = self.indices.into_arrow(key_type, validate)?;
709        let dictionary = self.dictionary.into_arrow(value_type, validate)?;
710
711        let builder = indices
712            .into_builder()
713            .add_child_data(dictionary)
714            .data_type(data_type);
715
716        if validate {
717            Ok(builder.build()?)
718        } else {
719            Ok(unsafe { builder.build_unchecked() })
720        }
721    }
722
723    fn into_buffers(self) -> Vec<LanceBuffer> {
724        let mut buffers = self.indices.into_buffers();
725        buffers.extend(self.dictionary.into_buffers());
726        buffers
727    }
728}
729
730/// A DataBlock is a collection of buffers that represents an "array" of data in very generic terms
731///
732/// The output of each decoder is a DataBlock.  Decoders can be chained together to transform
733/// one DataBlock into a different kind of DataBlock.
734///
735/// The DataBlock is somewhere in between Arrow's ArrayData and Array and represents a physical
736/// layout of the data.
737///
738/// A DataBlock can be converted into an Arrow ArrayData (and then Array) for a given array type.
739/// For example, a FixedWidthDataBlock can be converted into any primitive type or a fixed size
740/// list of a primitive type.  This is a zero-copy operation.
741///
742/// In addition, a DataBlock can be created from an Arrow array or arrays.  This is not a zero-copy
743/// operation as some normalization may be required.
744#[derive(Debug, Clone)]
745pub enum DataBlock {
746    Empty(),
747    Constant(ConstantDataBlock),
748    AllNull(AllNullDataBlock),
749    Nullable(NullableDataBlock),
750    FixedWidth(FixedWidthDataBlock),
751    FixedSizeList(FixedSizeListBlock),
752    VariableWidth(VariableWidthBlock),
753    Opaque(OpaqueBlock),
754    Struct(StructDataBlock),
755    Dictionary(DictionaryDataBlock),
756}
757
758impl DataBlock {
759    /// Convert self into an Arrow ArrayData
760    pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
761        match self {
762            Self::Empty() => Ok(new_empty_array(&data_type).to_data()),
763            Self::Constant(inner) => inner.into_arrow(data_type, validate),
764            Self::AllNull(inner) => inner.into_arrow(data_type, validate),
765            Self::Nullable(inner) => inner.into_arrow(data_type, validate),
766            Self::FixedWidth(inner) => inner.into_arrow(data_type, validate),
767            Self::FixedSizeList(inner) => inner.into_arrow(data_type, validate),
768            Self::VariableWidth(inner) => inner.into_arrow(data_type, validate),
769            Self::Struct(inner) => inner.into_arrow(data_type, validate),
770            Self::Dictionary(inner) => inner.into_arrow(data_type, validate),
771            Self::Opaque(_) => Err(Error::Internal {
772                message: "Cannot convert OpaqueBlock to Arrow".to_string(),
773                location: location!(),
774            }),
775        }
776    }
777
778    /// Convert the data block into a collection of buffers for serialization
779    ///
780    /// The order matters and will be used to reconstruct the data block at read time.
781    pub fn into_buffers(self) -> Vec<LanceBuffer> {
782        match self {
783            Self::Empty() => Vec::default(),
784            Self::Constant(inner) => inner.into_buffers(),
785            Self::AllNull(inner) => inner.into_buffers(),
786            Self::Nullable(inner) => inner.into_buffers(),
787            Self::FixedWidth(inner) => inner.into_buffers(),
788            Self::FixedSizeList(inner) => inner.into_buffers(),
789            Self::VariableWidth(inner) => inner.into_buffers(),
790            Self::Struct(inner) => inner.into_buffers(),
791            Self::Dictionary(inner) => inner.into_buffers(),
792            Self::Opaque(inner) => inner.buffers,
793        }
794    }
795
796    /// Converts the data buffers into borrowed mode and clones the block
797    ///
798    /// This is a zero-copy operation but requires a mutable reference to self and, afterwards,
799    /// all buffers will be in Borrowed mode.
800    /// Try and clone the block
801    ///
802    /// This will fail if any buffers are in owned mode.  You can call borrow_and_clone() to
803    /// ensure that all buffers are in borrowed mode before calling this method.
804    pub fn try_clone(&self) -> Result<Self> {
805        match self {
806            Self::Empty() => Ok(Self::Empty()),
807            Self::Constant(inner) => Ok(Self::Constant(inner.clone())),
808            Self::AllNull(inner) => Ok(Self::AllNull(inner.clone())),
809            Self::Nullable(inner) => Ok(Self::Nullable(inner.clone())),
810            Self::FixedWidth(inner) => Ok(Self::FixedWidth(inner.clone())),
811            Self::FixedSizeList(inner) => Ok(Self::FixedSizeList(inner.clone())),
812            Self::VariableWidth(inner) => Ok(Self::VariableWidth(inner.clone())),
813            Self::Struct(inner) => Ok(Self::Struct(inner.clone())),
814            Self::Dictionary(inner) => Ok(Self::Dictionary(inner.clone())),
815            Self::Opaque(inner) => Ok(Self::Opaque(inner.clone())),
816        }
817    }
818
819    pub fn name(&self) -> &'static str {
820        match self {
821            Self::Constant(_) => "Constant",
822            Self::Empty() => "Empty",
823            Self::AllNull(_) => "AllNull",
824            Self::Nullable(_) => "Nullable",
825            Self::FixedWidth(_) => "FixedWidth",
826            Self::FixedSizeList(_) => "FixedSizeList",
827            Self::VariableWidth(_) => "VariableWidth",
828            Self::Struct(_) => "Struct",
829            Self::Dictionary(_) => "Dictionary",
830            Self::Opaque(_) => "Opaque",
831        }
832    }
833
834    pub fn is_variable(&self) -> bool {
835        match self {
836            Self::Constant(_) => false,
837            Self::Empty() => false,
838            Self::AllNull(_) => false,
839            Self::Nullable(nullable) => nullable.data.is_variable(),
840            Self::FixedWidth(_) => false,
841            Self::FixedSizeList(fsl) => fsl.child.is_variable(),
842            Self::VariableWidth(_) => true,
843            Self::Struct(strct) => strct.children.iter().any(|c| c.is_variable()),
844            Self::Dictionary(_) => {
845                todo!("is_variable for DictionaryDataBlock is not implemented yet")
846            }
847            Self::Opaque(_) => panic!("Does not make sense to ask if an Opaque block is variable"),
848        }
849    }
850
851    pub fn is_nullable(&self) -> bool {
852        match self {
853            Self::AllNull(_) => true,
854            Self::Nullable(_) => true,
855            Self::FixedSizeList(fsl) => fsl.child.is_nullable(),
856            Self::Struct(strct) => strct.children.iter().any(|c| c.is_nullable()),
857            Self::Dictionary(_) => {
858                todo!("is_nullable for DictionaryDataBlock is not implemented yet")
859            }
860            Self::Opaque(_) => panic!("Does not make sense to ask if an Opaque block is nullable"),
861            _ => false,
862        }
863    }
864
865    /// The number of values in the block
866    ///
867    /// This function does not recurse into child blocks.  If this is a FSL then it will
868    /// be the number of lists and not the number of items.
869    pub fn num_values(&self) -> u64 {
870        match self {
871            Self::Empty() => 0,
872            Self::Constant(inner) => inner.num_values,
873            Self::AllNull(inner) => inner.num_values,
874            Self::Nullable(inner) => inner.data.num_values(),
875            Self::FixedWidth(inner) => inner.num_values,
876            Self::FixedSizeList(inner) => inner.num_values(),
877            Self::VariableWidth(inner) => inner.num_values,
878            Self::Struct(inner) => inner.children[0].num_values(),
879            Self::Dictionary(inner) => inner.indices.num_values,
880            Self::Opaque(inner) => inner.num_values,
881        }
882    }
883
884    /// The number of items in a single row
885    ///
886    /// This is always 1 unless there are layers of FSL
887    pub fn items_per_row(&self) -> u64 {
888        match self {
889            Self::Empty() => todo!(),     // Leave undefined until needed
890            Self::Constant(_) => todo!(), // Leave undefined until needed
891            Self::AllNull(_) => todo!(),  // Leave undefined until needed
892            Self::Nullable(nullable) => nullable.data.items_per_row(),
893            Self::FixedWidth(_) => 1,
894            Self::FixedSizeList(fsl) => fsl.dimension * fsl.child.items_per_row(),
895            Self::VariableWidth(_) => 1,
896            Self::Struct(_) => todo!(), // Leave undefined until needed
897            Self::Dictionary(_) => 1,
898            Self::Opaque(_) => 1,
899        }
900    }
901
902    /// The number of bytes in the data block (including any child blocks)
903    pub fn data_size(&self) -> u64 {
904        match self {
905            Self::Empty() => 0,
906            Self::Constant(inner) => inner.data_size(),
907            Self::AllNull(_) => 0,
908            Self::Nullable(inner) => inner.data_size(),
909            Self::FixedWidth(inner) => inner.data_size(),
910            Self::FixedSizeList(inner) => inner.data_size(),
911            Self::VariableWidth(inner) => inner.data_size(),
912            Self::Struct(_) => {
913                todo!("the data_size method for StructDataBlock is not implemented yet")
914            }
915            Self::Dictionary(_) => {
916                todo!("the data_size method for DictionaryDataBlock is not implemented yet")
917            }
918            Self::Opaque(inner) => inner.data_size(),
919        }
920    }
921
922    /// Removes any validity information from the block
923    ///
924    /// This does not filter the block (e.g. remove rows).  It only removes
925    /// the validity bitmaps (if present).  Any garbage masked by null bits
926    /// will now appear as proper values.
927    ///
928    /// If `recurse` is true, then this will also remove validity from any child blocks.
929    pub fn remove_outer_validity(self) -> Self {
930        match self {
931            Self::AllNull(_) => panic!("Cannot remove validity on all-null data"),
932            Self::Nullable(inner) => *inner.data,
933            Self::Struct(inner) => Self::Struct(inner.remove_outer_validity()),
934            other => other,
935        }
936    }
937
938    pub fn make_builder(&self, estimated_size_bytes: u64) -> Box<dyn DataBlockBuilderImpl> {
939        match self {
940            Self::FixedWidth(inner) => {
941                if inner.bits_per_value == 1 {
942                    Box::new(BitmapDataBlockBuilder::new(estimated_size_bytes))
943                } else {
944                    Box::new(FixedWidthDataBlockBuilder::new(
945                        inner.bits_per_value,
946                        estimated_size_bytes,
947                    ))
948                }
949            }
950            Self::VariableWidth(inner) => {
951                if inner.bits_per_offset == 32 {
952                    Box::new(VariableWidthDataBlockBuilder::<i32>::new(
953                        estimated_size_bytes,
954                    ))
955                } else if inner.bits_per_offset == 64 {
956                    Box::new(VariableWidthDataBlockBuilder::<i64>::new(
957                        estimated_size_bytes,
958                    ))
959                } else {
960                    todo!()
961                }
962            }
963            Self::FixedSizeList(inner) => {
964                let inner_builder = inner.child.make_builder(estimated_size_bytes);
965                Box::new(FixedSizeListBlockBuilder::new(
966                    inner_builder,
967                    inner.dimension,
968                ))
969            }
970            Self::Nullable(nullable) => {
971                // There's no easy way to know what percentage of the data is in the valiidty buffer
972                // but 1/16th seems like a reasonable guess.
973                let estimated_validity_size_bytes = estimated_size_bytes / 16;
974                let inner_builder = nullable
975                    .data
976                    .make_builder(estimated_size_bytes - estimated_validity_size_bytes);
977                Box::new(NullableDataBlockBuilder::new(
978                    inner_builder,
979                    estimated_validity_size_bytes as usize,
980                ))
981            }
982            Self::Struct(struct_data_block) => {
983                let mut bits_per_values = vec![];
984                for child in struct_data_block.children.iter() {
985                    let child = child.as_fixed_width_ref().
986                        expect("Currently StructDataBlockBuilder is only used in packed-struct encoding, and currently in packed-struct encoding, only fixed-width fields are supported.");
987                    bits_per_values.push(child.bits_per_value as u32);
988                }
989                Box::new(StructDataBlockBuilder::new(
990                    bits_per_values,
991                    estimated_size_bytes,
992                ))
993            }
994            _ => todo!("make_builder for {:?}", self),
995        }
996    }
997}
998
999macro_rules! as_type {
1000    ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1001        pub fn $fn_name(self) -> Option<$inner_type> {
1002            match self {
1003                Self::$inner(inner) => Some(inner),
1004                _ => None,
1005            }
1006        }
1007    };
1008}
1009
1010macro_rules! as_type_ref {
1011    ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1012        pub fn $fn_name(&self) -> Option<&$inner_type> {
1013            match self {
1014                Self::$inner(inner) => Some(inner),
1015                _ => None,
1016            }
1017        }
1018    };
1019}
1020
1021macro_rules! as_type_ref_mut {
1022    ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1023        pub fn $fn_name(&mut self) -> Option<&mut $inner_type> {
1024            match self {
1025                Self::$inner(inner) => Some(inner),
1026                _ => None,
1027            }
1028        }
1029    };
1030}
1031
1032// Cast implementations
1033impl DataBlock {
1034    as_type!(as_all_null, AllNull, AllNullDataBlock);
1035    as_type!(as_nullable, Nullable, NullableDataBlock);
1036    as_type!(as_fixed_width, FixedWidth, FixedWidthDataBlock);
1037    as_type!(as_fixed_size_list, FixedSizeList, FixedSizeListBlock);
1038    as_type!(as_variable_width, VariableWidth, VariableWidthBlock);
1039    as_type!(as_struct, Struct, StructDataBlock);
1040    as_type!(as_dictionary, Dictionary, DictionaryDataBlock);
1041    as_type_ref!(as_all_null_ref, AllNull, AllNullDataBlock);
1042    as_type_ref!(as_nullable_ref, Nullable, NullableDataBlock);
1043    as_type_ref!(as_fixed_width_ref, FixedWidth, FixedWidthDataBlock);
1044    as_type_ref!(as_fixed_size_list_ref, FixedSizeList, FixedSizeListBlock);
1045    as_type_ref!(as_variable_width_ref, VariableWidth, VariableWidthBlock);
1046    as_type_ref!(as_struct_ref, Struct, StructDataBlock);
1047    as_type_ref!(as_dictionary_ref, Dictionary, DictionaryDataBlock);
1048    as_type_ref_mut!(as_all_null_ref_mut, AllNull, AllNullDataBlock);
1049    as_type_ref_mut!(as_nullable_ref_mut, Nullable, NullableDataBlock);
1050    as_type_ref_mut!(as_fixed_width_ref_mut, FixedWidth, FixedWidthDataBlock);
1051    as_type_ref_mut!(
1052        as_fixed_size_list_ref_mut,
1053        FixedSizeList,
1054        FixedSizeListBlock
1055    );
1056    as_type_ref_mut!(as_variable_width_ref_mut, VariableWidth, VariableWidthBlock);
1057    as_type_ref_mut!(as_struct_ref_mut, Struct, StructDataBlock);
1058    as_type_ref_mut!(as_dictionary_ref_mut, Dictionary, DictionaryDataBlock);
1059}
1060
1061// Methods to convert from Arrow -> DataBlock
1062
1063fn get_byte_range<T: ArrowNativeType>(offsets: &mut LanceBuffer) -> Range<usize> {
1064    let offsets = offsets.borrow_to_typed_slice::<T>();
1065    if offsets.as_ref().is_empty() {
1066        0..0
1067    } else {
1068        offsets.as_ref().first().unwrap().as_usize()..offsets.as_ref().last().unwrap().as_usize()
1069    }
1070}
1071
1072// Given multiple offsets arrays [0, 5, 10], [0, 3, 7], etc. stitch
1073// them together to get [0, 5, 10, 13, 20, ...]
1074//
1075// Also returns the data range referenced by each offset array (may
1076// not be 0..len if there is slicing involved)
1077fn stitch_offsets<T: ArrowNativeType + std::ops::Add<Output = T> + std::ops::Sub<Output = T>>(
1078    offsets: Vec<LanceBuffer>,
1079) -> (LanceBuffer, Vec<Range<usize>>) {
1080    if offsets.is_empty() {
1081        return (LanceBuffer::empty(), Vec::default());
1082    }
1083    let len = offsets.iter().map(|b| b.len()).sum::<usize>();
1084    // Note: we are making a copy here, even if there is only one input, because we want to
1085    // normalize that input if it doesn't start with zero.  This could be micro-optimized out
1086    // if needed.
1087    let mut dest = Vec::with_capacity(len);
1088    let mut byte_ranges = Vec::with_capacity(offsets.len());
1089
1090    // We insert one leading 0 before processing any of the inputs
1091    dest.push(T::from_usize(0).unwrap());
1092
1093    for mut o in offsets.into_iter() {
1094        if !o.is_empty() {
1095            let last_offset = *dest.last().unwrap();
1096            let o = o.borrow_to_typed_slice::<T>();
1097            let start = *o.as_ref().first().unwrap();
1098            // First, we skip the first offset
1099            // Then, we subtract that first offset from each remaining offset
1100            //
1101            // This gives us a 0-based offset array (minus the leading 0)
1102            //
1103            // Then we add the last offset from the previous array to each offset
1104            // which shifts our offset array to the correct position
1105            //
1106            // For example, let's assume the last offset from the previous array
1107            // was 10 and we are given [13, 17, 22].  This means we have two values with
1108            // length 4 (17 - 13) and 5 (22 - 17).  The output from this step will be
1109            // [14, 19].  Combined with our last offset of 10, this gives us [10, 14, 19]
1110            // which is our same two values of length 4 and 5.
1111            dest.extend(o.as_ref()[1..].iter().map(|&x| x + last_offset - start));
1112        }
1113        byte_ranges.push(get_byte_range::<T>(&mut o));
1114    }
1115    (LanceBuffer::reinterpret_vec(dest), byte_ranges)
1116}
1117
1118fn arrow_binary_to_data_block(
1119    arrays: &[ArrayRef],
1120    num_values: u64,
1121    bits_per_offset: u8,
1122) -> DataBlock {
1123    let data_vec = arrays.iter().map(|arr| arr.to_data()).collect::<Vec<_>>();
1124    let bytes_per_offset = bits_per_offset as usize / 8;
1125    let offsets = data_vec
1126        .iter()
1127        .map(|d| {
1128            LanceBuffer::from(
1129                d.buffers()[0].slice_with_length(d.offset(), (d.len() + 1) * bytes_per_offset),
1130            )
1131        })
1132        .collect::<Vec<_>>();
1133    let (offsets, data_ranges) = if bits_per_offset == 32 {
1134        stitch_offsets::<i32>(offsets)
1135    } else {
1136        stitch_offsets::<i64>(offsets)
1137    };
1138    let data = data_vec
1139        .iter()
1140        .zip(data_ranges)
1141        .map(|(d, byte_range)| {
1142            LanceBuffer::from(
1143                d.buffers()[1]
1144                    .slice_with_length(byte_range.start, byte_range.end - byte_range.start),
1145            )
1146        })
1147        .collect::<Vec<_>>();
1148    let data = LanceBuffer::concat_into_one(data);
1149    DataBlock::VariableWidth(VariableWidthBlock {
1150        data,
1151        offsets,
1152        bits_per_offset,
1153        num_values,
1154        block_info: BlockInfo::new(),
1155    })
1156}
1157
1158fn encode_flat_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1159    let bytes_per_value = arrays[0].data_type().byte_width();
1160    let mut buffer = Vec::with_capacity(num_values as usize * bytes_per_value);
1161    for arr in arrays {
1162        let data = arr.to_data();
1163        buffer.extend_from_slice(data.buffers()[0].as_slice());
1164    }
1165    LanceBuffer::from(buffer)
1166}
1167
1168fn do_encode_bitmap_data(bitmaps: &[BooleanBuffer], num_values: u64) -> LanceBuffer {
1169    let mut builder = BooleanBufferBuilder::new(num_values as usize);
1170
1171    for buf in bitmaps {
1172        builder.append_buffer(buf);
1173    }
1174
1175    let buffer = builder.finish().into_inner();
1176    LanceBuffer::from(buffer)
1177}
1178
1179fn encode_bitmap_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1180    let bitmaps = arrays
1181        .iter()
1182        .map(|arr| arr.as_boolean().values().clone())
1183        .collect::<Vec<_>>();
1184    do_encode_bitmap_data(&bitmaps, num_values)
1185}
1186
1187// Concatenate dictionary arrays.  This is a bit tricky because we might overflow the
1188// index type.  If we do, we need to upscale the indices to a larger type.
1189fn concat_dict_arrays(arrays: &[ArrayRef]) -> ArrayRef {
1190    let value_type = arrays[0].as_any_dictionary().values().data_type();
1191    let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1192    match arrow_select::concat::concat(&array_refs) {
1193        Ok(array) => array,
1194        Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1195            // Slow, but hopefully a corner case.  Optimize later
1196            let upscaled = array_refs
1197                .iter()
1198                .map(|arr| {
1199                    match arrow_cast::cast(
1200                        *arr,
1201                        &DataType::Dictionary(
1202                            Box::new(DataType::UInt32),
1203                            Box::new(value_type.clone()),
1204                        ),
1205                    ) {
1206                        Ok(arr) => arr,
1207                        Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1208                            // Technically I think this means the input type was u64 already
1209                            unimplemented!("Dictionary arrays with more than 2^32 unique values")
1210                        }
1211                        err => err.unwrap(),
1212                    }
1213                })
1214                .collect::<Vec<_>>();
1215            let array_refs = upscaled.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1216            // Can still fail if concat pushes over u32 boundary
1217            match arrow_select::concat::concat(&array_refs) {
1218                Ok(array) => array,
1219                Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1220                    unimplemented!("Dictionary arrays with more than 2^32 unique values")
1221                }
1222                err => err.unwrap(),
1223            }
1224        }
1225        // Shouldn't be any other possible errors in concat
1226        err => err.unwrap(),
1227    }
1228}
1229
1230fn max_index_val(index_type: &DataType) -> u64 {
1231    match index_type {
1232        DataType::Int8 => i8::MAX as u64,
1233        DataType::Int16 => i16::MAX as u64,
1234        DataType::Int32 => i32::MAX as u64,
1235        DataType::Int64 => i64::MAX as u64,
1236        DataType::UInt8 => u8::MAX as u64,
1237        DataType::UInt16 => u16::MAX as u64,
1238        DataType::UInt32 => u32::MAX as u64,
1239        DataType::UInt64 => u64::MAX,
1240        _ => panic!("Invalid dictionary index type"),
1241    }
1242}
1243
1244// If we get multiple dictionary arrays and they don't all have the same dictionary
1245// then we need to normalize the indices.  Otherwise we might have something like:
1246//
1247// First chunk ["hello", "foo"], [0, 0, 1, 1, 1]
1248// Second chunk ["bar", "world"], [0, 1, 0, 1, 1]
1249//
1250// If we simply encode as ["hello", "foo", "bar", "world"], [0, 0, 1, 1, 1, 0, 1, 0, 1, 1]
1251// then we will get the wrong answer because the dictionaries were not merged and the indices
1252// were not remapped.
1253//
1254// A simple way to do this today is to just concatenate all the arrays.  This is because
1255// arrow's dictionary concatenation function already has the logic to merge dictionaries.
1256//
1257// TODO: We could be more efficient here by checking if the dictionaries are the same
1258//       Also, if they aren't, we can possibly do something cheaper than concatenating
1259//
1260// In addition, we want to normalize the representation of nulls.  The cheapest thing to
1261// do (space-wise) is to put the nulls in the dictionary.
1262fn arrow_dictionary_to_data_block(arrays: &[ArrayRef], validity: Option<NullBuffer>) -> DataBlock {
1263    let array = concat_dict_arrays(arrays);
1264    let array_dict = array.as_any_dictionary();
1265    let mut indices = array_dict.keys();
1266    let num_values = indices.len() as u64;
1267    let mut values = array_dict.values().clone();
1268    // Placeholder, if we need to upcast, we will initialize this and set `indices` to refer to it
1269    let mut upcast = None;
1270
1271    // TODO: Should we just always normalize indices to u32?  That would make logic simpler
1272    // and we're going to bitpack them soon anyways
1273
1274    let indices_block = if let Some(validity) = validity {
1275        // If there is validity then we find the first invalid index in the dictionary values, inserting
1276        // a new value if we need to.  Then we change all indices to point to that value.  This way we
1277        // never need to store nullability of the indices.
1278        let mut first_invalid_index = None;
1279        if let Some(values_validity) = values.nulls() {
1280            first_invalid_index = (!values_validity.inner()).set_indices().next();
1281        }
1282        let first_invalid_index = first_invalid_index.unwrap_or_else(|| {
1283            let null_arr = new_null_array(values.data_type(), 1);
1284            values = arrow_select::concat::concat(&[values.as_ref(), null_arr.as_ref()]).unwrap();
1285            let null_index = values.len() - 1;
1286            let max_index_val = max_index_val(indices.data_type());
1287            if null_index as u64 > max_index_val {
1288                // Widen the index type
1289                if max_index_val >= u32::MAX as u64 {
1290                    unimplemented!("Dictionary arrays with 2^32 unique value (or more) and a null")
1291                }
1292                upcast = Some(arrow_cast::cast(indices, &DataType::UInt32).unwrap());
1293                indices = upcast.as_ref().unwrap();
1294            }
1295            null_index
1296        });
1297        // This can't fail since we already checked for fit
1298        let null_index_arr = arrow_cast::cast(
1299            &UInt64Array::from(vec![first_invalid_index as u64]),
1300            indices.data_type(),
1301        )
1302        .unwrap();
1303
1304        let bytes_per_index = indices.data_type().byte_width();
1305        let bits_per_index = bytes_per_index as u64 * 8;
1306
1307        let null_index_arr = null_index_arr.into_data();
1308        let null_index_bytes = &null_index_arr.buffers()[0];
1309        // Need to make a copy here since indices isn't mutable, could be avoided in theory
1310        let mut indices_bytes = indices.to_data().buffers()[0].to_vec();
1311        for invalid_idx in (!validity.inner()).set_indices() {
1312            indices_bytes[invalid_idx * bytes_per_index..(invalid_idx + 1) * bytes_per_index]
1313                .copy_from_slice(null_index_bytes.as_slice());
1314        }
1315        FixedWidthDataBlock {
1316            data: LanceBuffer::from(indices_bytes),
1317            bits_per_value: bits_per_index,
1318            num_values,
1319            block_info: BlockInfo::new(),
1320        }
1321    } else {
1322        FixedWidthDataBlock {
1323            data: LanceBuffer::from(indices.to_data().buffers()[0].clone()),
1324            bits_per_value: indices.data_type().byte_width() as u64 * 8,
1325            num_values,
1326            block_info: BlockInfo::new(),
1327        }
1328    };
1329
1330    let items = DataBlock::from(values);
1331    DataBlock::Dictionary(DictionaryDataBlock {
1332        indices: indices_block,
1333        dictionary: Box::new(items),
1334    })
1335}
1336
1337enum Nullability {
1338    None,
1339    All,
1340    Some(NullBuffer),
1341}
1342
1343impl Nullability {
1344    fn to_option(&self) -> Option<NullBuffer> {
1345        match self {
1346            Self::Some(nulls) => Some(nulls.clone()),
1347            _ => None,
1348        }
1349    }
1350}
1351
1352fn extract_nulls(arrays: &[ArrayRef], num_values: u64) -> Nullability {
1353    let mut has_nulls = false;
1354    let nulls_and_lens = arrays
1355        .iter()
1356        .map(|arr| {
1357            let nulls = arr.logical_nulls();
1358            has_nulls |= nulls.is_some();
1359            (nulls, arr.len())
1360        })
1361        .collect::<Vec<_>>();
1362    if !has_nulls {
1363        return Nullability::None;
1364    }
1365    let mut builder = BooleanBufferBuilder::new(num_values as usize);
1366    let mut num_nulls = 0;
1367    for (null, len) in nulls_and_lens {
1368        if let Some(null) = null {
1369            num_nulls += null.null_count();
1370            builder.append_buffer(&null.into_inner());
1371        } else {
1372            builder.append_n(len, true);
1373        }
1374    }
1375    if num_nulls == num_values as usize {
1376        Nullability::All
1377    } else {
1378        Nullability::Some(NullBuffer::new(builder.finish()))
1379    }
1380}
1381
1382impl DataBlock {
1383    pub fn from_arrays(arrays: &[ArrayRef], num_values: u64) -> Self {
1384        if arrays.is_empty() || num_values == 0 {
1385            return Self::AllNull(AllNullDataBlock { num_values: 0 });
1386        }
1387
1388        let data_type = arrays[0].data_type();
1389        let nulls = extract_nulls(arrays, num_values);
1390
1391        if let Nullability::All = nulls {
1392            return Self::AllNull(AllNullDataBlock { num_values });
1393        }
1394
1395        let mut encoded = match data_type {
1396            DataType::Binary | DataType::Utf8 => arrow_binary_to_data_block(arrays, num_values, 32),
1397            DataType::BinaryView | DataType::Utf8View => {
1398                todo!()
1399            }
1400            DataType::LargeBinary | DataType::LargeUtf8 => {
1401                arrow_binary_to_data_block(arrays, num_values, 64)
1402            }
1403            DataType::Boolean => {
1404                let data = encode_bitmap_data(arrays, num_values);
1405                Self::FixedWidth(FixedWidthDataBlock {
1406                    data,
1407                    bits_per_value: 1,
1408                    num_values,
1409                    block_info: BlockInfo::new(),
1410                })
1411            }
1412            DataType::Date32
1413            | DataType::Date64
1414            | DataType::Decimal128(_, _)
1415            | DataType::Decimal256(_, _)
1416            | DataType::Duration(_)
1417            | DataType::FixedSizeBinary(_)
1418            | DataType::Float16
1419            | DataType::Float32
1420            | DataType::Float64
1421            | DataType::Int16
1422            | DataType::Int32
1423            | DataType::Int64
1424            | DataType::Int8
1425            | DataType::Interval(_)
1426            | DataType::Time32(_)
1427            | DataType::Time64(_)
1428            | DataType::Timestamp(_, _)
1429            | DataType::UInt16
1430            | DataType::UInt32
1431            | DataType::UInt64
1432            | DataType::UInt8 => {
1433                let data = encode_flat_data(arrays, num_values);
1434                Self::FixedWidth(FixedWidthDataBlock {
1435                    data,
1436                    bits_per_value: data_type.byte_width() as u64 * 8,
1437                    num_values,
1438                    block_info: BlockInfo::new(),
1439                })
1440            }
1441            DataType::Null => Self::AllNull(AllNullDataBlock { num_values }),
1442            DataType::Dictionary(_, _) => arrow_dictionary_to_data_block(arrays, nulls.to_option()),
1443            DataType::Struct(fields) => {
1444                let structs = arrays.iter().map(|arr| arr.as_struct()).collect::<Vec<_>>();
1445                let mut children = Vec::with_capacity(fields.len());
1446                for child_idx in 0..fields.len() {
1447                    let child_vec = structs
1448                        .iter()
1449                        .map(|s| s.column(child_idx).clone())
1450                        .collect::<Vec<_>>();
1451                    children.push(Self::from_arrays(&child_vec, num_values));
1452                }
1453
1454                // Extract validity for the struct array
1455                let validity = match &nulls {
1456                    Nullability::None => None,
1457                    Nullability::Some(null_buffer) => Some(null_buffer.clone()),
1458                    Nullability::All => unreachable!("Should have returned AllNull earlier"),
1459                };
1460
1461                Self::Struct(StructDataBlock {
1462                    children,
1463                    block_info: BlockInfo::default(),
1464                    validity,
1465                })
1466            }
1467            DataType::FixedSizeList(_, dim) => {
1468                let children = arrays
1469                    .iter()
1470                    .map(|arr| arr.as_fixed_size_list().values().clone())
1471                    .collect::<Vec<_>>();
1472                let child_block = Self::from_arrays(&children, num_values * *dim as u64);
1473                Self::FixedSizeList(FixedSizeListBlock {
1474                    child: Box::new(child_block),
1475                    dimension: *dim as u64,
1476                })
1477            }
1478            DataType::LargeList(_)
1479            | DataType::List(_)
1480            | DataType::ListView(_)
1481            | DataType::LargeListView(_)
1482            | DataType::Map(_, _)
1483            | DataType::RunEndEncoded(_, _)
1484            | DataType::Union(_, _) => {
1485                panic!(
1486                    "Field with data type {} cannot be converted to data block",
1487                    data_type
1488                )
1489            }
1490        };
1491
1492        // compute statistics
1493        encoded.compute_stat();
1494
1495        if !matches!(data_type, DataType::Dictionary(_, _)) {
1496            match nulls {
1497                Nullability::None => encoded,
1498                Nullability::Some(nulls) => Self::Nullable(NullableDataBlock {
1499                    data: Box::new(encoded),
1500                    nulls: LanceBuffer::from(nulls.into_inner().into_inner()),
1501                    block_info: BlockInfo::new(),
1502                }),
1503                _ => unreachable!(),
1504            }
1505        } else {
1506            // Dictionaries already insert the nulls into the dictionary items
1507            encoded
1508        }
1509    }
1510
1511    pub fn from_array<T: Array + 'static>(array: T) -> Self {
1512        let num_values = array.len();
1513        Self::from_arrays(&[Arc::new(array)], num_values as u64)
1514    }
1515}
1516
1517impl From<ArrayRef> for DataBlock {
1518    fn from(array: ArrayRef) -> Self {
1519        let num_values = array.len() as u64;
1520        Self::from_arrays(&[array], num_values)
1521    }
1522}
1523
1524pub trait DataBlockBuilderImpl: std::fmt::Debug {
1525    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>);
1526    fn finish(self: Box<Self>) -> DataBlock;
1527}
1528
1529#[derive(Debug)]
1530pub struct DataBlockBuilder {
1531    estimated_size_bytes: u64,
1532    builder: Option<Box<dyn DataBlockBuilderImpl>>,
1533}
1534
1535impl DataBlockBuilder {
1536    pub fn with_capacity_estimate(estimated_size_bytes: u64) -> Self {
1537        Self {
1538            estimated_size_bytes,
1539            builder: None,
1540        }
1541    }
1542
1543    fn get_builder(&mut self, block: &DataBlock) -> &mut dyn DataBlockBuilderImpl {
1544        if self.builder.is_none() {
1545            self.builder = Some(block.make_builder(self.estimated_size_bytes));
1546        }
1547        self.builder.as_mut().unwrap().as_mut()
1548    }
1549
1550    pub fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
1551        self.get_builder(data_block).append(data_block, selection);
1552    }
1553
1554    pub fn finish(self) -> DataBlock {
1555        let builder = self.builder.expect("DataBlockBuilder didn't see any data");
1556        builder.finish()
1557    }
1558}
1559
1560#[cfg(test)]
1561mod tests {
1562    use std::sync::Arc;
1563
1564    use arrow_array::{
1565        make_array, new_null_array,
1566        types::{Int32Type, Int8Type},
1567        ArrayRef, DictionaryArray, Int8Array, LargeBinaryArray, StringArray, UInt16Array,
1568        UInt8Array,
1569    };
1570    use arrow_buffer::{BooleanBuffer, NullBuffer};
1571
1572    use arrow_schema::{DataType, Field, Fields};
1573    use lance_datagen::{array, ArrayGeneratorExt, RowCount, DEFAULT_SEED};
1574    use rand::SeedableRng;
1575
1576    use crate::buffer::LanceBuffer;
1577
1578    use super::{AllNullDataBlock, DataBlock};
1579
1580    use arrow_array::Array;
1581
1582    #[test]
1583    fn test_sliced_to_data_block() {
1584        let ints = UInt16Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
1585        let ints = ints.slice(2, 4);
1586        let data = DataBlock::from_array(ints);
1587
1588        let fixed_data = data.as_fixed_width().unwrap();
1589        assert_eq!(fixed_data.num_values, 4);
1590        assert_eq!(fixed_data.data.len(), 8);
1591
1592        let nullable_ints =
1593            UInt16Array::from(vec![Some(0), None, Some(2), None, Some(4), None, Some(6)]);
1594        let nullable_ints = nullable_ints.slice(1, 3);
1595        let data = DataBlock::from_array(nullable_ints);
1596
1597        let nullable = data.as_nullable().unwrap();
1598        assert_eq!(nullable.nulls, LanceBuffer::from(vec![0b00000010]));
1599    }
1600
1601    #[test]
1602    fn test_string_to_data_block() {
1603        // Converting string arrays that contain nulls to DataBlock
1604        let strings1 = StringArray::from(vec![Some("hello"), None, Some("world")]);
1605        let strings2 = StringArray::from(vec![Some("a"), Some("b")]);
1606        let strings3 = StringArray::from(vec![Option::<&'static str>::None, None]);
1607
1608        let arrays = &[strings1, strings2, strings3]
1609            .iter()
1610            .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1611            .collect::<Vec<_>>();
1612
1613        let block = DataBlock::from_arrays(arrays, 7);
1614
1615        assert_eq!(block.num_values(), 7);
1616        let block = block.as_nullable().unwrap();
1617
1618        assert_eq!(block.nulls, LanceBuffer::from(vec![0b00011101]));
1619
1620        let data = block.data.as_variable_width().unwrap();
1621        assert_eq!(
1622            data.offsets,
1623            LanceBuffer::reinterpret_vec(vec![0, 5, 5, 10, 11, 12, 12, 12])
1624        );
1625
1626        assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworldab"));
1627
1628        // Converting string arrays that do not contain nulls to DataBlock
1629        let strings1 = StringArray::from(vec![Some("a"), Some("bc")]);
1630        let strings2 = StringArray::from(vec![Some("def")]);
1631
1632        let arrays = &[strings1, strings2]
1633            .iter()
1634            .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1635            .collect::<Vec<_>>();
1636
1637        let block = DataBlock::from_arrays(arrays, 3);
1638
1639        assert_eq!(block.num_values(), 3);
1640        // Should be no nullable wrapper
1641        let data = block.as_variable_width().unwrap();
1642        assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(vec![0, 1, 3, 6]));
1643        assert_eq!(data.data, LanceBuffer::copy_slice(b"abcdef"));
1644    }
1645
1646    #[test]
1647    fn test_string_sliced() {
1648        let check = |arr: Vec<StringArray>, expected_off: Vec<i32>, expected_data: &[u8]| {
1649            let arrs = arr
1650                .into_iter()
1651                .map(|a| Arc::new(a) as ArrayRef)
1652                .collect::<Vec<_>>();
1653            let num_rows = arrs.iter().map(|a| a.len()).sum::<usize>() as u64;
1654            let data = DataBlock::from_arrays(&arrs, num_rows);
1655
1656            assert_eq!(data.num_values(), num_rows);
1657
1658            let data = data.as_variable_width().unwrap();
1659            assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(expected_off));
1660            assert_eq!(data.data, LanceBuffer::copy_slice(expected_data));
1661        };
1662
1663        let string = StringArray::from(vec![Some("hello"), Some("world")]);
1664        check(vec![string.slice(1, 1)], vec![0, 5], b"world");
1665        check(vec![string.slice(0, 1)], vec![0, 5], b"hello");
1666        check(
1667            vec![string.slice(0, 1), string.slice(1, 1)],
1668            vec![0, 5, 10],
1669            b"helloworld",
1670        );
1671
1672        let string2 = StringArray::from(vec![Some("foo"), Some("bar")]);
1673        check(
1674            vec![string.slice(0, 1), string2.slice(0, 1)],
1675            vec![0, 5, 8],
1676            b"hellofoo",
1677        );
1678    }
1679
1680    #[test]
1681    fn test_large() {
1682        let arr = LargeBinaryArray::from_vec(vec![b"hello", b"world"]);
1683        let data = DataBlock::from_array(arr);
1684
1685        assert_eq!(data.num_values(), 2);
1686        let data = data.as_variable_width().unwrap();
1687        assert_eq!(data.bits_per_offset, 64);
1688        assert_eq!(data.num_values, 2);
1689        assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworld"));
1690        assert_eq!(
1691            data.offsets,
1692            LanceBuffer::reinterpret_vec(vec![0_u64, 5, 10])
1693        );
1694    }
1695
1696    #[test]
1697    fn test_dictionary_indices_normalized() {
1698        let arr1 = DictionaryArray::<Int8Type>::from_iter([Some("a"), Some("a"), Some("b")]);
1699        let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("b"), Some("c")]);
1700
1701        let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1702
1703        assert_eq!(data.num_values(), 5);
1704        let data = data.as_dictionary().unwrap();
1705        let indices = data.indices;
1706        assert_eq!(indices.bits_per_value, 8);
1707        assert_eq!(indices.num_values, 5);
1708        assert_eq!(
1709            indices.data,
1710            // You might expect 0, 0, 1, 1, 2 but it seems that arrow's dictionary concat does
1711            // not actually collapse dictionaries.  This is an arrow problem however, and we don't
1712            // need to fix it here.
1713            LanceBuffer::reinterpret_vec::<i8>(vec![0, 0, 1, 2, 3])
1714        );
1715
1716        let items = data.dictionary.as_variable_width().unwrap();
1717        assert_eq!(items.bits_per_offset, 32);
1718        assert_eq!(items.num_values, 4);
1719        assert_eq!(items.data, LanceBuffer::copy_slice(b"abbc"));
1720        assert_eq!(
1721            items.offsets,
1722            LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 4],)
1723        );
1724    }
1725
1726    #[test]
1727    fn test_dictionary_nulls() {
1728        // Test both ways of encoding nulls
1729
1730        // By default, nulls get encoded into the indices
1731        let arr1 = DictionaryArray::<Int8Type>::from_iter([None, Some("a"), Some("b")]);
1732        let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("c"), None]);
1733
1734        let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1735
1736        let check_common = |data: DataBlock| {
1737            assert_eq!(data.num_values(), 5);
1738            let dict = data.as_dictionary().unwrap();
1739
1740            let nullable_items = dict.dictionary.as_nullable().unwrap();
1741            assert_eq!(nullable_items.nulls, LanceBuffer::from(vec![0b00000111]));
1742            assert_eq!(nullable_items.data.num_values(), 4);
1743
1744            let items = nullable_items.data.as_variable_width().unwrap();
1745            assert_eq!(items.bits_per_offset, 32);
1746            assert_eq!(items.num_values, 4);
1747            assert_eq!(items.data, LanceBuffer::copy_slice(b"abc"));
1748            assert_eq!(
1749                items.offsets,
1750                LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 3],)
1751            );
1752
1753            let indices = dict.indices;
1754            assert_eq!(indices.bits_per_value, 8);
1755            assert_eq!(indices.num_values, 5);
1756            assert_eq!(
1757                indices.data,
1758                LanceBuffer::reinterpret_vec::<i8>(vec![3, 0, 1, 2, 3])
1759            );
1760        };
1761        check_common(data);
1762
1763        // However, we can manually create a dictionary where nulls are in the dictionary
1764        let items = StringArray::from(vec![Some("a"), Some("b"), Some("c"), None]);
1765        let indices = Int8Array::from(vec![Some(3), Some(0), Some(1), Some(2), Some(3)]);
1766        let dict = DictionaryArray::new(indices, Arc::new(items));
1767
1768        let data = DataBlock::from_array(dict);
1769
1770        check_common(data);
1771    }
1772
1773    #[test]
1774    fn test_dictionary_cannot_add_null() {
1775        // 256 unique strings
1776        let items = StringArray::from(
1777            (0..256)
1778                .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1779                .collect::<Vec<_>>(),
1780        );
1781        // 257 indices, covering the whole range, plus one null
1782        let indices = UInt8Array::from(
1783            (0..=256)
1784                .map(|i| if i == 256 { None } else { Some(i as u8) })
1785                .collect::<Vec<_>>(),
1786        );
1787        // We want to normalize this by pushing nulls into the dictionary, but we cannot because
1788        // the dictionary is too large for the index type
1789        let dict = DictionaryArray::new(indices, Arc::new(items));
1790        let data = DataBlock::from_array(dict);
1791
1792        assert_eq!(data.num_values(), 257);
1793
1794        let dict = data.as_dictionary().unwrap();
1795
1796        assert_eq!(dict.indices.bits_per_value, 32);
1797        assert_eq!(
1798            dict.indices.data,
1799            LanceBuffer::reinterpret_vec((0_u32..257).collect::<Vec<_>>())
1800        );
1801
1802        let nullable_items = dict.dictionary.as_nullable().unwrap();
1803        let null_buffer = NullBuffer::new(BooleanBuffer::new(
1804            nullable_items.nulls.into_buffer(),
1805            0,
1806            257,
1807        ));
1808        for i in 0..256 {
1809            assert!(!null_buffer.is_null(i));
1810        }
1811        assert!(null_buffer.is_null(256));
1812
1813        assert_eq!(
1814            nullable_items.data.as_variable_width().unwrap().data.len(),
1815            32640
1816        );
1817    }
1818
1819    #[test]
1820    fn test_all_null() {
1821        for data_type in [
1822            DataType::UInt32,
1823            DataType::FixedSizeBinary(2),
1824            DataType::List(Arc::new(Field::new("item", DataType::UInt32, true))),
1825            DataType::Struct(Fields::from(vec![Field::new("a", DataType::UInt32, true)])),
1826        ] {
1827            let block = DataBlock::AllNull(AllNullDataBlock { num_values: 10 });
1828            let arr = block.into_arrow(data_type.clone(), true).unwrap();
1829            let arr = make_array(arr);
1830            let expected = new_null_array(&data_type, 10);
1831            assert_eq!(&arr, &expected);
1832        }
1833    }
1834
1835    #[test]
1836    fn test_dictionary_cannot_concatenate() {
1837        // 256 unique strings
1838        let items = StringArray::from(
1839            (0..256)
1840                .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1841                .collect::<Vec<_>>(),
1842        );
1843        // 256 different unique strings
1844        let other_items = StringArray::from(
1845            (0..256)
1846                .map(|i| Some(String::from_utf8(vec![1; i + 1]).unwrap()))
1847                .collect::<Vec<_>>(),
1848        );
1849        let indices = UInt8Array::from_iter_values(0..=255);
1850        let dict1 = DictionaryArray::new(indices.clone(), Arc::new(items));
1851        let dict2 = DictionaryArray::new(indices, Arc::new(other_items));
1852        let data = DataBlock::from_arrays(&[Arc::new(dict1), Arc::new(dict2)], 512);
1853        assert_eq!(data.num_values(), 512);
1854
1855        let dict = data.as_dictionary().unwrap();
1856
1857        assert_eq!(dict.indices.bits_per_value, 32);
1858        assert_eq!(
1859            dict.indices.data,
1860            LanceBuffer::reinterpret_vec::<u32>((0..512).collect::<Vec<_>>())
1861        );
1862        // What fun: 0 + 1 + .. + 255 + 1 + 2 + .. + 256 = 2^16
1863        assert_eq!(
1864            dict.dictionary.as_variable_width().unwrap().data.len(),
1865            65536
1866        );
1867    }
1868
1869    #[test]
1870    fn test_data_size() {
1871        let mut rng = rand_xoshiro::Xoshiro256PlusPlus::seed_from_u64(DEFAULT_SEED.0);
1872        // test data_size() when input has no nulls
1873        let mut genn = array::rand::<Int32Type>().with_nulls(&[false, false, false]);
1874
1875        let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
1876        let block = DataBlock::from_array(arr.clone());
1877        assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
1878
1879        let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
1880        let block = DataBlock::from_array(arr.clone());
1881        assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
1882
1883        // test data_size() when input has nulls
1884        let mut genn = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
1885        let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
1886        let block = DataBlock::from_array(arr.clone());
1887
1888        let array_data = arr.to_data();
1889        let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1890        // the NullBuffer.len() returns the length in bits so we divide_round_up by 8
1891        let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1892        assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1893
1894        let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
1895        let block = DataBlock::from_array(arr.clone());
1896
1897        let array_data = arr.to_data();
1898        let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1899        let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1900        assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1901
1902        let mut genn = array::rand::<Int32Type>().with_nulls(&[true, true, false]);
1903        let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
1904        let block = DataBlock::from_array(arr.clone());
1905
1906        let array_data = arr.to_data();
1907        let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1908        let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1909        assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1910
1911        let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
1912        let block = DataBlock::from_array(arr.clone());
1913
1914        let array_data = arr.to_data();
1915        let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1916        let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1917        assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1918
1919        let mut genn = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
1920        let arr1 = genn.generate(RowCount::from(3), &mut rng).unwrap();
1921        let arr2 = genn.generate(RowCount::from(3), &mut rng).unwrap();
1922        let arr3 = genn.generate(RowCount::from(3), &mut rng).unwrap();
1923        let block = DataBlock::from_arrays(&[arr1.clone(), arr2.clone(), arr3.clone()], 9);
1924
1925        let concatenated_array = arrow_select::concat::concat(&[
1926            &*Arc::new(arr1.clone()) as &dyn Array,
1927            &*Arc::new(arr2.clone()) as &dyn Array,
1928            &*Arc::new(arr3.clone()) as &dyn Array,
1929        ])
1930        .unwrap();
1931        let total_buffer_size: usize = concatenated_array
1932            .to_data()
1933            .buffers()
1934            .iter()
1935            .map(|buffer| buffer.len())
1936            .sum();
1937
1938        let total_nulls_size_in_bytes = concatenated_array.nulls().unwrap().len().div_ceil(8);
1939        assert!(block.data_size() == (total_buffer_size + total_nulls_size_in_bytes) as u64);
1940    }
1941}