Skip to main content

lance_encoding/
data.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Data layouts to represent encoded data in a sub-Arrow format
5//!
6//! These [`DataBlock`] structures represent physical layouts.  They fill a gap somewhere
7//! between [`arrow_data::ArrayData`] (which, as a collection of buffers, is too
8//! generic because it doesn't give us enough information about what those buffers represent)
9//! and [`arrow_array::array::Array`] (which is too specific, because it cares about the
10//! logical data type).
11//!
12//! In addition, the layouts represented here are slightly stricter than Arrow's layout rules.
13//! For example, offset buffers MUST start with 0.  These additional restrictions impose a
14//! slight penalty on encode (to normalize arrow data) but make the development of encoders
15//! and decoders easier (since they can rely on a normalized representation)
16
17use std::{
18    ops::Range,
19    sync::{Arc, RwLock},
20};
21
22use arrow_array::{
23    Array, ArrayRef, OffsetSizeTrait, UInt64Array,
24    cast::AsArray,
25    new_empty_array, new_null_array,
26    types::{ArrowDictionaryKeyType, UInt8Type, UInt16Type, UInt32Type, UInt64Type},
27};
28use arrow_buffer::{ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer};
29use arrow_data::{ArrayData, ArrayDataBuilder};
30use arrow_schema::DataType;
31use lance_arrow::DataTypeExt;
32
33use lance_core::{Error, Result};
34
35use crate::{
36    buffer::LanceBuffer,
37    statistics::{ComputeStat, Stat},
38};
39
40/// A data block with no buffers where everything is null
41///
42/// Note: this data block should not be used for future work.  It will be deprecated
43/// in the 2.1 version of the format where nullability will be handled by the structural
44/// encoders.
45#[derive(Debug, Clone)]
46pub struct AllNullDataBlock {
47    /// The number of values represented by this block
48    pub num_values: u64,
49}
50
51impl AllNullDataBlock {
52    fn into_arrow(self, data_type: DataType, _validate: bool) -> Result<ArrayData> {
53        Ok(ArrayData::new_null(&data_type, self.num_values as usize))
54    }
55
56    fn into_buffers(self) -> Vec<LanceBuffer> {
57        vec![]
58    }
59}
60
61use std::collections::HashMap;
62
63// `BlockInfo` stores the statistics of this `DataBlock`, such as `NullCount` for `NullableDataBlock`,
64// `BitWidth` for `FixedWidthDataBlock`, `Cardinality` for all `DataBlock`
65#[derive(Debug, Clone)]
66pub struct BlockInfo(pub Arc<RwLock<HashMap<Stat, Arc<dyn Array>>>>);
67
68impl Default for BlockInfo {
69    fn default() -> Self {
70        Self::new()
71    }
72}
73
74impl BlockInfo {
75    pub fn new() -> Self {
76        Self(Arc::new(RwLock::new(HashMap::new())))
77    }
78}
79
80impl PartialEq for BlockInfo {
81    fn eq(&self, other: &Self) -> bool {
82        let self_info = self.0.read().unwrap();
83        let other_info = other.0.read().unwrap();
84        *self_info == *other_info
85    }
86}
87
88/// Wraps a data block and adds nullability information to it
89///
90/// Note: this data block should not be used for future work.  It will be deprecated
91/// in the 2.1 version of the format where nullability will be handled by the structural
92/// encoders.
93#[derive(Debug, Clone)]
94pub struct NullableDataBlock {
95    /// The underlying data
96    pub data: Box<DataBlock>,
97    /// A bitmap of validity for each value
98    pub nulls: LanceBuffer,
99
100    pub block_info: BlockInfo,
101}
102
103impl NullableDataBlock {
104    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
105        let nulls = self.nulls.into_buffer();
106        let data = self.data.into_arrow(data_type, validate)?.into_builder();
107        let data = data.null_bit_buffer(Some(nulls));
108        if validate {
109            Ok(data.build()?)
110        } else {
111            Ok(unsafe { data.build_unchecked() })
112        }
113    }
114
115    fn into_buffers(self) -> Vec<LanceBuffer> {
116        let mut buffers = vec![self.nulls];
117        buffers.extend(self.data.into_buffers());
118        buffers
119    }
120
121    pub fn data_size(&self) -> u64 {
122        self.data.data_size() + self.nulls.len() as u64
123    }
124}
125
126/// A block representing the same constant value repeated many times
127#[derive(Debug, PartialEq, Clone)]
128pub struct ConstantDataBlock {
129    /// Data buffer containing the value
130    pub data: LanceBuffer,
131    /// The number of values
132    pub num_values: u64,
133}
134
135impl ConstantDataBlock {
136    fn into_buffers(self) -> Vec<LanceBuffer> {
137        vec![self.data]
138    }
139
140    fn into_arrow(self, _data_type: DataType, _validate: bool) -> Result<ArrayData> {
141        // We don't need this yet but if we come up with some way of serializing
142        // scalars to/from bytes then we could implement it.
143        todo!()
144    }
145
146    pub fn try_clone(&self) -> Result<Self> {
147        Ok(Self {
148            data: self.data.clone(),
149            num_values: self.num_values,
150        })
151    }
152
153    pub fn data_size(&self) -> u64 {
154        self.data.len() as u64
155    }
156}
157
158/// A data block for a single buffer of data where each element has a fixed number of bits
159#[derive(Debug, PartialEq, Clone)]
160pub struct FixedWidthDataBlock {
161    /// The data buffer
162    pub data: LanceBuffer,
163    /// The number of bits per value
164    pub bits_per_value: u64,
165    /// The number of values represented by this block
166    pub num_values: u64,
167
168    pub block_info: BlockInfo,
169}
170
171impl FixedWidthDataBlock {
172    fn do_into_arrow(
173        self,
174        data_type: DataType,
175        num_values: u64,
176        validate: bool,
177    ) -> Result<ArrayData> {
178        let data_buffer = self.data.into_buffer();
179        let builder = ArrayDataBuilder::new(data_type)
180            .add_buffer(data_buffer)
181            .len(num_values as usize)
182            .null_count(0);
183        if validate {
184            Ok(builder.build()?)
185        } else {
186            Ok(unsafe { builder.build_unchecked() })
187        }
188    }
189
190    pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
191        let root_num_values = self.num_values;
192        self.do_into_arrow(data_type, root_num_values, validate)
193    }
194
195    pub fn into_buffers(self) -> Vec<LanceBuffer> {
196        vec![self.data]
197    }
198
199    pub fn try_clone(&self) -> Result<Self> {
200        Ok(Self {
201            data: self.data.clone(),
202            bits_per_value: self.bits_per_value,
203            num_values: self.num_values,
204            block_info: self.block_info.clone(),
205        })
206    }
207
208    pub fn data_size(&self) -> u64 {
209        self.data.len() as u64
210    }
211}
212
213#[derive(Debug)]
214pub struct VariableWidthDataBlockBuilder<T: OffsetSizeTrait> {
215    offsets: Vec<T>,
216    bytes: Vec<u8>,
217}
218
219impl<T: OffsetSizeTrait> VariableWidthDataBlockBuilder<T> {
220    fn new(estimated_size_bytes: u64) -> Self {
221        Self {
222            offsets: vec![T::from_usize(0).unwrap()],
223            bytes: Vec::with_capacity(estimated_size_bytes as usize),
224        }
225    }
226}
227
228impl<T: OffsetSizeTrait + bytemuck::Pod> DataBlockBuilderImpl for VariableWidthDataBlockBuilder<T> {
229    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
230        let block = data_block.as_variable_width_ref().unwrap();
231        assert!(block.bits_per_offset == T::get_byte_width() as u8 * 8);
232        let offsets = block.offsets.borrow_to_typed_view::<T>();
233
234        let start_offset = offsets[selection.start as usize];
235        let end_offset = offsets[selection.end as usize];
236        let mut previous_len = self.bytes.len();
237
238        self.bytes
239            .extend_from_slice(&block.data[start_offset.as_usize()..end_offset.as_usize()]);
240
241        self.offsets.extend(
242            offsets[selection.start as usize..selection.end as usize]
243                .iter()
244                .zip(&offsets[selection.start as usize + 1..=selection.end as usize])
245                .map(|(&current, &next)| {
246                    let this_value_len = next - current;
247                    previous_len += this_value_len.as_usize();
248                    T::from_usize(previous_len).unwrap()
249                }),
250        );
251    }
252
253    fn finish(self: Box<Self>) -> DataBlock {
254        let num_values = (self.offsets.len() - 1) as u64;
255        DataBlock::VariableWidth(VariableWidthBlock {
256            data: LanceBuffer::from(self.bytes),
257            offsets: LanceBuffer::reinterpret_vec(self.offsets),
258            bits_per_offset: T::get_byte_width() as u8 * 8,
259            num_values,
260            block_info: BlockInfo::new(),
261        })
262    }
263}
264
265#[derive(Debug)]
266struct BitmapDataBlockBuilder {
267    values: BooleanBufferBuilder,
268}
269
270impl BitmapDataBlockBuilder {
271    fn new(estimated_size_bytes: u64) -> Self {
272        Self {
273            values: BooleanBufferBuilder::new(estimated_size_bytes as usize * 8),
274        }
275    }
276}
277
278impl DataBlockBuilderImpl for BitmapDataBlockBuilder {
279    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
280        let bitmap_blk = data_block.as_fixed_width_ref().unwrap();
281        self.values.append_packed_range(
282            selection.start as usize..selection.end as usize,
283            &bitmap_blk.data,
284        );
285    }
286
287    fn finish(mut self: Box<Self>) -> DataBlock {
288        let bool_buf = self.values.finish();
289        let num_values = bool_buf.len() as u64;
290        let bits_buf = bool_buf.into_inner();
291        DataBlock::FixedWidth(FixedWidthDataBlock {
292            data: LanceBuffer::from(bits_buf),
293            bits_per_value: 1,
294            num_values,
295            block_info: BlockInfo::new(),
296        })
297    }
298}
299
300#[derive(Debug)]
301struct FixedWidthDataBlockBuilder {
302    bits_per_value: u64,
303    bytes_per_value: u64,
304    values: Vec<u8>,
305}
306
307impl FixedWidthDataBlockBuilder {
308    fn new(bits_per_value: u64, estimated_size_bytes: u64) -> Self {
309        assert!(bits_per_value.is_multiple_of(8));
310        Self {
311            bits_per_value,
312            bytes_per_value: bits_per_value / 8,
313            values: Vec::with_capacity(estimated_size_bytes as usize),
314        }
315    }
316}
317
318impl DataBlockBuilderImpl for FixedWidthDataBlockBuilder {
319    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
320        let block = data_block.as_fixed_width_ref().unwrap();
321        assert_eq!(self.bits_per_value, block.bits_per_value);
322        let start = selection.start as usize * self.bytes_per_value as usize;
323        let end = selection.end as usize * self.bytes_per_value as usize;
324        self.values.extend_from_slice(&block.data[start..end]);
325    }
326
327    fn finish(self: Box<Self>) -> DataBlock {
328        let num_values = (self.values.len() / self.bytes_per_value as usize) as u64;
329        DataBlock::FixedWidth(FixedWidthDataBlock {
330            data: LanceBuffer::from(self.values),
331            bits_per_value: self.bits_per_value,
332            num_values,
333            block_info: BlockInfo::new(),
334        })
335    }
336}
337
338#[derive(Debug)]
339struct StructDataBlockBuilder {
340    children: Vec<Box<dyn DataBlockBuilderImpl>>,
341}
342
343impl StructDataBlockBuilder {
344    fn new(children: Vec<Box<dyn DataBlockBuilderImpl>>) -> Self {
345        Self { children }
346    }
347}
348
349impl DataBlockBuilderImpl for StructDataBlockBuilder {
350    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
351        let data_block = data_block.as_struct_ref().unwrap();
352        for i in 0..self.children.len() {
353            self.children[i].append(&data_block.children[i], selection.clone());
354        }
355    }
356
357    fn finish(self: Box<Self>) -> DataBlock {
358        let mut children_data_block = Vec::new();
359        for child in self.children {
360            let child_data_block = child.finish();
361            children_data_block.push(child_data_block);
362        }
363        DataBlock::Struct(StructDataBlock {
364            children: children_data_block,
365            block_info: BlockInfo::new(),
366            validity: None,
367        })
368    }
369}
370
371#[derive(Debug, Default)]
372struct AllNullDataBlockBuilder {
373    num_values: u64,
374}
375
376impl DataBlockBuilderImpl for AllNullDataBlockBuilder {
377    fn append(&mut self, _data_block: &DataBlock, selection: Range<u64>) {
378        self.num_values += selection.end - selection.start;
379    }
380
381    fn finish(self: Box<Self>) -> DataBlock {
382        DataBlock::AllNull(AllNullDataBlock {
383            num_values: self.num_values,
384        })
385    }
386}
387
388/// A data block to represent a fixed size list
389#[derive(Debug, Clone)]
390pub struct FixedSizeListBlock {
391    /// The child data block
392    pub child: Box<DataBlock>,
393    /// The number of items in each list
394    pub dimension: u64,
395}
396
397impl FixedSizeListBlock {
398    pub fn num_values(&self) -> u64 {
399        self.child.num_values() / self.dimension
400    }
401
402    /// Try to flatten a FixedSizeListBlock into a FixedWidthDataBlock
403    ///
404    /// Returns None if any children are nullable
405    pub fn try_into_flat(self) -> Option<FixedWidthDataBlock> {
406        match *self.child {
407            // Cannot flatten a nullable child
408            DataBlock::Nullable(_) => None,
409            DataBlock::FixedSizeList(inner) => {
410                let mut flat = inner.try_into_flat()?;
411                flat.bits_per_value *= self.dimension;
412                flat.num_values /= self.dimension;
413                Some(flat)
414            }
415            DataBlock::FixedWidth(mut inner) => {
416                inner.bits_per_value *= self.dimension;
417                inner.num_values /= self.dimension;
418                Some(inner)
419            }
420            _ => panic!(
421                "Expected FixedSizeList or FixedWidth data block but found {:?}",
422                self
423            ),
424        }
425    }
426
427    pub fn flatten_as_fixed(&mut self) -> FixedWidthDataBlock {
428        match self.child.as_mut() {
429            DataBlock::FixedSizeList(fsl) => fsl.flatten_as_fixed(),
430            DataBlock::FixedWidth(fw) => fw.clone(),
431            _ => panic!("Expected FixedSizeList or FixedWidth data block"),
432        }
433    }
434
435    /// Convert a flattened values block into a FixedSizeListBlock
436    pub fn from_flat(data: FixedWidthDataBlock, data_type: &DataType) -> DataBlock {
437        match data_type {
438            DataType::FixedSizeList(child_field, dimension) => {
439                let mut data = data;
440                data.bits_per_value /= *dimension as u64;
441                data.num_values *= *dimension as u64;
442                let child_data = Self::from_flat(data, child_field.data_type());
443                DataBlock::FixedSizeList(Self {
444                    child: Box::new(child_data),
445                    dimension: *dimension as u64,
446                })
447            }
448            // Base case, we've hit a non-list type
449            _ => DataBlock::FixedWidth(data),
450        }
451    }
452
453    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
454        let num_values = self.num_values();
455        let builder = match &data_type {
456            DataType::FixedSizeList(child_field, _) => {
457                let child_data = self
458                    .child
459                    .into_arrow(child_field.data_type().clone(), validate)?;
460                ArrayDataBuilder::new(data_type)
461                    .add_child_data(child_data)
462                    .len(num_values as usize)
463                    .null_count(0)
464            }
465            _ => panic!("Expected FixedSizeList data type and got {:?}", data_type),
466        };
467        if validate {
468            Ok(builder.build()?)
469        } else {
470            Ok(unsafe { builder.build_unchecked() })
471        }
472    }
473
474    fn into_buffers(self) -> Vec<LanceBuffer> {
475        self.child.into_buffers()
476    }
477
478    fn data_size(&self) -> u64 {
479        self.child.data_size()
480    }
481}
482
483#[derive(Debug)]
484struct FixedSizeListBlockBuilder {
485    inner: Box<dyn DataBlockBuilderImpl>,
486    dimension: u64,
487}
488
489impl FixedSizeListBlockBuilder {
490    fn new(inner: Box<dyn DataBlockBuilderImpl>, dimension: u64) -> Self {
491        Self { inner, dimension }
492    }
493}
494
495impl DataBlockBuilderImpl for FixedSizeListBlockBuilder {
496    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
497        let selection = selection.start * self.dimension..selection.end * self.dimension;
498        let fsl = data_block.as_fixed_size_list_ref().unwrap();
499        self.inner.append(fsl.child.as_ref(), selection);
500    }
501
502    fn finish(self: Box<Self>) -> DataBlock {
503        let inner_block = self.inner.finish();
504        DataBlock::FixedSizeList(FixedSizeListBlock {
505            child: Box::new(inner_block),
506            dimension: self.dimension,
507        })
508    }
509}
510
511#[derive(Debug)]
512struct NullableDataBlockBuilder {
513    inner: Box<dyn DataBlockBuilderImpl>,
514    validity: BooleanBufferBuilder,
515}
516
517impl NullableDataBlockBuilder {
518    fn new(inner: Box<dyn DataBlockBuilderImpl>, estimated_size_bytes: usize) -> Self {
519        Self {
520            inner,
521            validity: BooleanBufferBuilder::new(estimated_size_bytes * 8),
522        }
523    }
524}
525
526impl DataBlockBuilderImpl for NullableDataBlockBuilder {
527    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
528        let nullable = data_block.as_nullable_ref().unwrap();
529        let bool_buf = BooleanBuffer::new(
530            nullable.nulls.clone().into_buffer(),
531            selection.start as usize,
532            (selection.end - selection.start) as usize,
533        );
534        self.validity.append_buffer(&bool_buf);
535        self.inner.append(nullable.data.as_ref(), selection);
536    }
537
538    fn finish(mut self: Box<Self>) -> DataBlock {
539        let inner_block = self.inner.finish();
540        DataBlock::Nullable(NullableDataBlock {
541            data: Box::new(inner_block),
542            nulls: LanceBuffer::from(self.validity.finish().into_inner()),
543            block_info: BlockInfo::new(),
544        })
545    }
546}
547
548/// A data block with no regular structure.  There is no available spot to attach
549/// validity / repdef information and it cannot be converted to Arrow without being
550/// decoded
551#[derive(Debug, Clone)]
552pub struct OpaqueBlock {
553    pub buffers: Vec<LanceBuffer>,
554    pub num_values: u64,
555    pub block_info: BlockInfo,
556}
557
558impl OpaqueBlock {
559    pub fn data_size(&self) -> u64 {
560        self.buffers.iter().map(|b| b.len() as u64).sum()
561    }
562}
563
564/// A data block for variable-width data (e.g. strings, packed rows, etc.)
565#[derive(Debug, Clone)]
566pub struct VariableWidthBlock {
567    /// The data buffer
568    pub data: LanceBuffer,
569    /// The offsets buffer (contains num_values + 1 offsets)
570    ///
571    /// Offsets MUST start at 0
572    pub offsets: LanceBuffer,
573    /// The number of bits per offset
574    pub bits_per_offset: u8,
575    /// The number of values represented by this block
576    pub num_values: u64,
577
578    pub block_info: BlockInfo,
579}
580
581impl VariableWidthBlock {
582    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
583        let data_buffer = self.data.into_buffer();
584        let offsets_buffer = self.offsets.into_buffer();
585        let builder = ArrayDataBuilder::new(data_type)
586            .add_buffer(offsets_buffer)
587            .add_buffer(data_buffer)
588            .len(self.num_values as usize)
589            .null_count(0);
590        if validate {
591            Ok(builder.build()?)
592        } else {
593            Ok(unsafe { builder.build_unchecked() })
594        }
595    }
596
597    fn into_buffers(self) -> Vec<LanceBuffer> {
598        vec![self.offsets, self.data]
599    }
600
601    pub fn offsets_as_block(&mut self) -> DataBlock {
602        let offsets = self.offsets.clone();
603        DataBlock::FixedWidth(FixedWidthDataBlock {
604            data: offsets,
605            bits_per_value: self.bits_per_offset as u64,
606            num_values: self.num_values + 1,
607            block_info: BlockInfo::new(),
608        })
609    }
610
611    pub fn data_size(&self) -> u64 {
612        (self.data.len() + self.offsets.len()) as u64
613    }
614}
615
616/// A data block representing a struct
617#[derive(Debug, Clone)]
618pub struct StructDataBlock {
619    /// The child arrays
620    pub children: Vec<DataBlock>,
621    pub block_info: BlockInfo,
622    /// The validity bitmap for the struct (None means all valid)
623    pub validity: Option<NullBuffer>,
624}
625
626impl StructDataBlock {
627    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
628        if let DataType::Struct(fields) = &data_type {
629            let mut builder = ArrayDataBuilder::new(DataType::Struct(fields.clone()));
630            let mut num_rows = 0;
631            for (field, child) in fields.iter().zip(self.children) {
632                let child_data = child.into_arrow(field.data_type().clone(), validate)?;
633                num_rows = child_data.len();
634                builder = builder.add_child_data(child_data);
635            }
636
637            // Apply validity if present
638            let builder = if let Some(validity) = self.validity {
639                let null_count = validity.null_count();
640                builder
641                    .null_bit_buffer(Some(validity.into_inner().into_inner()))
642                    .null_count(null_count)
643            } else {
644                builder.null_count(0)
645            };
646
647            let builder = builder.len(num_rows);
648            if validate {
649                Ok(builder.build()?)
650            } else {
651                Ok(unsafe { builder.build_unchecked() })
652            }
653        } else {
654            Err(Error::internal(format!(
655                "Expected Struct, got {:?}",
656                data_type
657            )))
658        }
659    }
660
661    fn remove_outer_validity(self) -> Self {
662        Self {
663            children: self
664                .children
665                .into_iter()
666                .map(|c| c.remove_outer_validity())
667                .collect(),
668            block_info: self.block_info,
669            validity: None, // Remove the validity
670        }
671    }
672
673    fn into_buffers(self) -> Vec<LanceBuffer> {
674        self.children
675            .into_iter()
676            .flat_map(|c| c.into_buffers())
677            .collect()
678    }
679
680    pub fn has_variable_width_child(&self) -> bool {
681        self.children
682            .iter()
683            .any(|child| !matches!(child, DataBlock::FixedWidth(_)))
684    }
685
686    pub fn data_size(&self) -> u64 {
687        self.children
688            .iter()
689            .map(|data_block| data_block.data_size())
690            .sum()
691    }
692}
693
694/// A data block for dictionary encoded data
695#[derive(Debug, Clone)]
696pub struct DictionaryDataBlock {
697    /// The indices buffer
698    pub indices: FixedWidthDataBlock,
699    /// The dictionary itself
700    pub dictionary: Box<DataBlock>,
701}
702
703impl DictionaryDataBlock {
704    fn decode_helper<K: ArrowDictionaryKeyType>(self) -> Result<DataBlock> {
705        // Handle empty batch - this can happen when decoding a range that contains
706        // only empty/null lists, or when reading sparse data
707        if self.indices.num_values == 0 {
708            return Ok(DataBlock::AllNull(AllNullDataBlock { num_values: 0 }));
709        }
710
711        // assume the indices are uniformly distributed.
712        let estimated_size_bytes = self.dictionary.data_size()
713            * (self.indices.num_values + self.dictionary.num_values() - 1)
714            / self.dictionary.num_values();
715        let mut data_builder = DataBlockBuilder::with_capacity_estimate(estimated_size_bytes);
716
717        let indices = self.indices.data.borrow_to_typed_slice::<K::Native>();
718        let indices = indices.as_ref();
719
720        indices
721            .iter()
722            .map(|idx| idx.to_usize().unwrap() as u64)
723            .for_each(|idx| {
724                data_builder.append(&self.dictionary, idx..idx + 1);
725            });
726
727        Ok(data_builder.finish())
728    }
729
730    pub fn decode(self) -> Result<DataBlock> {
731        match self.indices.bits_per_value {
732            8 => self.decode_helper::<UInt8Type>(),
733            16 => self.decode_helper::<UInt16Type>(),
734            32 => self.decode_helper::<UInt32Type>(),
735            64 => self.decode_helper::<UInt64Type>(),
736            _ => Err(lance_core::Error::internal(format!(
737                "Unsupported dictionary index bit width: {} bits",
738                self.indices.bits_per_value
739            ))),
740        }
741    }
742
743    fn into_arrow_dict(
744        self,
745        key_type: Box<DataType>,
746        value_type: Box<DataType>,
747        validate: bool,
748    ) -> Result<ArrayData> {
749        let indices = self.indices.into_arrow((*key_type).clone(), validate)?;
750        let dictionary = self
751            .dictionary
752            .into_arrow((*value_type).clone(), validate)?;
753
754        let builder = indices
755            .into_builder()
756            .add_child_data(dictionary)
757            .data_type(DataType::Dictionary(key_type, value_type));
758
759        if validate {
760            Ok(builder.build()?)
761        } else {
762            Ok(unsafe { builder.build_unchecked() })
763        }
764    }
765
766    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
767        if let DataType::Dictionary(key_type, value_type) = data_type {
768            self.into_arrow_dict(key_type, value_type, validate)
769        } else {
770            self.decode()?.into_arrow(data_type, validate)
771        }
772    }
773
774    fn into_buffers(self) -> Vec<LanceBuffer> {
775        let mut buffers = self.indices.into_buffers();
776        buffers.extend(self.dictionary.into_buffers());
777        buffers
778    }
779
780    pub fn into_parts(self) -> (DataBlock, DataBlock) {
781        (DataBlock::FixedWidth(self.indices), *self.dictionary)
782    }
783
784    pub fn from_parts(indices: FixedWidthDataBlock, dictionary: DataBlock) -> Self {
785        Self {
786            indices,
787            dictionary: Box::new(dictionary),
788        }
789    }
790}
791
792/// A DataBlock is a collection of buffers that represents an "array" of data in very generic terms
793///
794/// The output of each decoder is a DataBlock.  Decoders can be chained together to transform
795/// one DataBlock into a different kind of DataBlock.
796///
797/// The DataBlock is somewhere in between Arrow's ArrayData and Array and represents a physical
798/// layout of the data.
799///
800/// A DataBlock can be converted into an Arrow ArrayData (and then Array) for a given array type.
801/// For example, a FixedWidthDataBlock can be converted into any primitive type or a fixed size
802/// list of a primitive type.  This is a zero-copy operation.
803///
804/// In addition, a DataBlock can be created from an Arrow array or arrays.  This is not a zero-copy
805/// operation as some normalization may be required.
806#[derive(Debug, Clone)]
807pub enum DataBlock {
808    Empty(),
809    Constant(ConstantDataBlock),
810    AllNull(AllNullDataBlock),
811    Nullable(NullableDataBlock),
812    FixedWidth(FixedWidthDataBlock),
813    FixedSizeList(FixedSizeListBlock),
814    VariableWidth(VariableWidthBlock),
815    Opaque(OpaqueBlock),
816    Struct(StructDataBlock),
817    Dictionary(DictionaryDataBlock),
818}
819
820impl DataBlock {
821    /// Convert self into an Arrow ArrayData
822    pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
823        match self {
824            Self::Empty() => Ok(new_empty_array(&data_type).to_data()),
825            Self::Constant(inner) => inner.into_arrow(data_type, validate),
826            Self::AllNull(inner) => inner.into_arrow(data_type, validate),
827            Self::Nullable(inner) => inner.into_arrow(data_type, validate),
828            Self::FixedWidth(inner) => inner.into_arrow(data_type, validate),
829            Self::FixedSizeList(inner) => inner.into_arrow(data_type, validate),
830            Self::VariableWidth(inner) => inner.into_arrow(data_type, validate),
831            Self::Struct(inner) => inner.into_arrow(data_type, validate),
832            Self::Dictionary(inner) => inner.into_arrow(data_type, validate),
833            Self::Opaque(_) => Err(Error::internal(
834                "Cannot convert OpaqueBlock to Arrow".to_string(),
835            )),
836        }
837    }
838
839    /// Convert the data block into a collection of buffers for serialization
840    ///
841    /// The order matters and will be used to reconstruct the data block at read time.
842    pub fn into_buffers(self) -> Vec<LanceBuffer> {
843        match self {
844            Self::Empty() => Vec::default(),
845            Self::Constant(inner) => inner.into_buffers(),
846            Self::AllNull(inner) => inner.into_buffers(),
847            Self::Nullable(inner) => inner.into_buffers(),
848            Self::FixedWidth(inner) => inner.into_buffers(),
849            Self::FixedSizeList(inner) => inner.into_buffers(),
850            Self::VariableWidth(inner) => inner.into_buffers(),
851            Self::Struct(inner) => inner.into_buffers(),
852            Self::Dictionary(inner) => inner.into_buffers(),
853            Self::Opaque(inner) => inner.buffers,
854        }
855    }
856
857    /// Converts the data buffers into borrowed mode and clones the block
858    ///
859    /// This is a zero-copy operation but requires a mutable reference to self and, afterwards,
860    /// all buffers will be in Borrowed mode.
861    /// Try and clone the block
862    ///
863    /// This will fail if any buffers are in owned mode.  You can call borrow_and_clone() to
864    /// ensure that all buffers are in borrowed mode before calling this method.
865    pub fn try_clone(&self) -> Result<Self> {
866        match self {
867            Self::Empty() => Ok(Self::Empty()),
868            Self::Constant(inner) => Ok(Self::Constant(inner.clone())),
869            Self::AllNull(inner) => Ok(Self::AllNull(inner.clone())),
870            Self::Nullable(inner) => Ok(Self::Nullable(inner.clone())),
871            Self::FixedWidth(inner) => Ok(Self::FixedWidth(inner.clone())),
872            Self::FixedSizeList(inner) => Ok(Self::FixedSizeList(inner.clone())),
873            Self::VariableWidth(inner) => Ok(Self::VariableWidth(inner.clone())),
874            Self::Struct(inner) => Ok(Self::Struct(inner.clone())),
875            Self::Dictionary(inner) => Ok(Self::Dictionary(inner.clone())),
876            Self::Opaque(inner) => Ok(Self::Opaque(inner.clone())),
877        }
878    }
879
880    pub fn name(&self) -> &'static str {
881        match self {
882            Self::Constant(_) => "Constant",
883            Self::Empty() => "Empty",
884            Self::AllNull(_) => "AllNull",
885            Self::Nullable(_) => "Nullable",
886            Self::FixedWidth(_) => "FixedWidth",
887            Self::FixedSizeList(_) => "FixedSizeList",
888            Self::VariableWidth(_) => "VariableWidth",
889            Self::Struct(_) => "Struct",
890            Self::Dictionary(_) => "Dictionary",
891            Self::Opaque(_) => "Opaque",
892        }
893    }
894
895    pub fn is_variable(&self) -> bool {
896        match self {
897            Self::Constant(_) => false,
898            Self::Empty() => false,
899            Self::AllNull(_) => false,
900            Self::Nullable(nullable) => nullable.data.is_variable(),
901            Self::FixedWidth(_) => false,
902            Self::FixedSizeList(fsl) => fsl.child.is_variable(),
903            Self::VariableWidth(_) => true,
904            Self::Struct(strct) => strct.children.iter().any(|c| c.is_variable()),
905            Self::Dictionary(_) => {
906                todo!("is_variable for DictionaryDataBlock is not implemented yet")
907            }
908            Self::Opaque(_) => panic!("Does not make sense to ask if an Opaque block is variable"),
909        }
910    }
911
912    pub fn is_nullable(&self) -> bool {
913        match self {
914            Self::AllNull(_) => true,
915            Self::Nullable(_) => true,
916            Self::FixedSizeList(fsl) => fsl.child.is_nullable(),
917            Self::Struct(strct) => strct.children.iter().any(|c| c.is_nullable()),
918            Self::Dictionary(_) => {
919                todo!("is_nullable for DictionaryDataBlock is not implemented yet")
920            }
921            Self::Opaque(_) => panic!("Does not make sense to ask if an Opaque block is nullable"),
922            _ => false,
923        }
924    }
925
926    /// The number of values in the block
927    ///
928    /// This function does not recurse into child blocks.  If this is a FSL then it will
929    /// be the number of lists and not the number of items.
930    pub fn num_values(&self) -> u64 {
931        match self {
932            Self::Empty() => 0,
933            Self::Constant(inner) => inner.num_values,
934            Self::AllNull(inner) => inner.num_values,
935            Self::Nullable(inner) => inner.data.num_values(),
936            Self::FixedWidth(inner) => inner.num_values,
937            Self::FixedSizeList(inner) => inner.num_values(),
938            Self::VariableWidth(inner) => inner.num_values,
939            Self::Struct(inner) => inner.children[0].num_values(),
940            Self::Dictionary(inner) => inner.indices.num_values,
941            Self::Opaque(inner) => inner.num_values,
942        }
943    }
944
945    /// The number of items in a single row
946    ///
947    /// This is always 1 unless there are layers of FSL
948    pub fn items_per_row(&self) -> u64 {
949        match self {
950            Self::Empty() => todo!(),     // Leave undefined until needed
951            Self::Constant(_) => todo!(), // Leave undefined until needed
952            Self::AllNull(_) => todo!(),  // Leave undefined until needed
953            Self::Nullable(nullable) => nullable.data.items_per_row(),
954            Self::FixedWidth(_) => 1,
955            Self::FixedSizeList(fsl) => fsl.dimension * fsl.child.items_per_row(),
956            Self::VariableWidth(_) => 1,
957            Self::Struct(_) => todo!(), // Leave undefined until needed
958            Self::Dictionary(_) => 1,
959            Self::Opaque(_) => 1,
960        }
961    }
962
963    /// The number of bytes in the data block (including any child blocks)
964    pub fn data_size(&self) -> u64 {
965        match self {
966            Self::Empty() => 0,
967            Self::Constant(inner) => inner.data_size(),
968            Self::AllNull(_) => 0,
969            Self::Nullable(inner) => inner.data_size(),
970            Self::FixedWidth(inner) => inner.data_size(),
971            Self::FixedSizeList(inner) => inner.data_size(),
972            Self::VariableWidth(inner) => inner.data_size(),
973            Self::Struct(_) => {
974                todo!("the data_size method for StructDataBlock is not implemented yet")
975            }
976            Self::Dictionary(_) => {
977                todo!("the data_size method for DictionaryDataBlock is not implemented yet")
978            }
979            Self::Opaque(inner) => inner.data_size(),
980        }
981    }
982
983    /// Removes any validity information from the block
984    ///
985    /// This does not filter the block (e.g. remove rows).  It only removes
986    /// the validity bitmaps (if present).  Any garbage masked by null bits
987    /// will now appear as proper values.
988    ///
989    /// If `recurse` is true, then this will also remove validity from any child blocks.
990    pub fn remove_outer_validity(self) -> Self {
991        match self {
992            Self::AllNull(_) => panic!("Cannot remove validity on all-null data"),
993            Self::Nullable(inner) => *inner.data,
994            Self::Struct(inner) => Self::Struct(inner.remove_outer_validity()),
995            other => other,
996        }
997    }
998
999    pub fn make_builder(&self, estimated_size_bytes: u64) -> Box<dyn DataBlockBuilderImpl> {
1000        match self {
1001            Self::FixedWidth(inner) => {
1002                if inner.bits_per_value == 1 {
1003                    Box::new(BitmapDataBlockBuilder::new(estimated_size_bytes))
1004                } else {
1005                    Box::new(FixedWidthDataBlockBuilder::new(
1006                        inner.bits_per_value,
1007                        estimated_size_bytes,
1008                    ))
1009                }
1010            }
1011            Self::VariableWidth(inner) => {
1012                if inner.bits_per_offset == 32 {
1013                    Box::new(VariableWidthDataBlockBuilder::<i32>::new(
1014                        estimated_size_bytes,
1015                    ))
1016                } else if inner.bits_per_offset == 64 {
1017                    Box::new(VariableWidthDataBlockBuilder::<i64>::new(
1018                        estimated_size_bytes,
1019                    ))
1020                } else {
1021                    todo!()
1022                }
1023            }
1024            Self::FixedSizeList(inner) => {
1025                let inner_builder = inner.child.make_builder(estimated_size_bytes);
1026                Box::new(FixedSizeListBlockBuilder::new(
1027                    inner_builder,
1028                    inner.dimension,
1029                ))
1030            }
1031            Self::Nullable(nullable) => {
1032                // There's no easy way to know what percentage of the data is in the valiidty buffer
1033                // but 1/16th seems like a reasonable guess.
1034                let estimated_validity_size_bytes = estimated_size_bytes / 16;
1035                let inner_builder = nullable
1036                    .data
1037                    .make_builder(estimated_size_bytes - estimated_validity_size_bytes);
1038                Box::new(NullableDataBlockBuilder::new(
1039                    inner_builder,
1040                    estimated_validity_size_bytes as usize,
1041                ))
1042            }
1043            Self::Struct(struct_data_block) => {
1044                let num_children = struct_data_block.children.len();
1045                let per_child_estimate = if num_children == 0 {
1046                    0
1047                } else {
1048                    estimated_size_bytes / num_children as u64
1049                };
1050                let child_builders = struct_data_block
1051                    .children
1052                    .iter()
1053                    .map(|child| child.make_builder(per_child_estimate))
1054                    .collect();
1055                Box::new(StructDataBlockBuilder::new(child_builders))
1056            }
1057            Self::AllNull(_) => Box::new(AllNullDataBlockBuilder::default()),
1058            _ => todo!("make_builder for {:?}", self),
1059        }
1060    }
1061}
1062
1063macro_rules! as_type {
1064    ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1065        pub fn $fn_name(self) -> Option<$inner_type> {
1066            match self {
1067                Self::$inner(inner) => Some(inner),
1068                _ => None,
1069            }
1070        }
1071    };
1072}
1073
1074macro_rules! as_type_ref {
1075    ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1076        pub fn $fn_name(&self) -> Option<&$inner_type> {
1077            match self {
1078                Self::$inner(inner) => Some(inner),
1079                _ => None,
1080            }
1081        }
1082    };
1083}
1084
1085macro_rules! as_type_ref_mut {
1086    ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1087        pub fn $fn_name(&mut self) -> Option<&mut $inner_type> {
1088            match self {
1089                Self::$inner(inner) => Some(inner),
1090                _ => None,
1091            }
1092        }
1093    };
1094}
1095
1096// Cast implementations
1097impl DataBlock {
1098    as_type!(as_all_null, AllNull, AllNullDataBlock);
1099    as_type!(as_nullable, Nullable, NullableDataBlock);
1100    as_type!(as_fixed_width, FixedWidth, FixedWidthDataBlock);
1101    as_type!(as_fixed_size_list, FixedSizeList, FixedSizeListBlock);
1102    as_type!(as_variable_width, VariableWidth, VariableWidthBlock);
1103    as_type!(as_struct, Struct, StructDataBlock);
1104    as_type!(as_dictionary, Dictionary, DictionaryDataBlock);
1105    as_type_ref!(as_all_null_ref, AllNull, AllNullDataBlock);
1106    as_type_ref!(as_nullable_ref, Nullable, NullableDataBlock);
1107    as_type_ref!(as_fixed_width_ref, FixedWidth, FixedWidthDataBlock);
1108    as_type_ref!(as_fixed_size_list_ref, FixedSizeList, FixedSizeListBlock);
1109    as_type_ref!(as_variable_width_ref, VariableWidth, VariableWidthBlock);
1110    as_type_ref!(as_struct_ref, Struct, StructDataBlock);
1111    as_type_ref!(as_dictionary_ref, Dictionary, DictionaryDataBlock);
1112    as_type_ref_mut!(as_all_null_ref_mut, AllNull, AllNullDataBlock);
1113    as_type_ref_mut!(as_nullable_ref_mut, Nullable, NullableDataBlock);
1114    as_type_ref_mut!(as_fixed_width_ref_mut, FixedWidth, FixedWidthDataBlock);
1115    as_type_ref_mut!(
1116        as_fixed_size_list_ref_mut,
1117        FixedSizeList,
1118        FixedSizeListBlock
1119    );
1120    as_type_ref_mut!(as_variable_width_ref_mut, VariableWidth, VariableWidthBlock);
1121    as_type_ref_mut!(as_struct_ref_mut, Struct, StructDataBlock);
1122    as_type_ref_mut!(as_dictionary_ref_mut, Dictionary, DictionaryDataBlock);
1123}
1124
1125// Methods to convert from Arrow -> DataBlock
1126
1127fn get_byte_range<T: ArrowNativeType>(offsets: &mut LanceBuffer) -> Range<usize> {
1128    let offsets = offsets.borrow_to_typed_slice::<T>();
1129    if offsets.as_ref().is_empty() {
1130        0..0
1131    } else {
1132        offsets.as_ref().first().unwrap().as_usize()..offsets.as_ref().last().unwrap().as_usize()
1133    }
1134}
1135
1136// Given multiple offsets arrays [0, 5, 10], [0, 3, 7], etc. stitch
1137// them together to get [0, 5, 10, 13, 20, ...]
1138//
1139// Also returns the data range referenced by each offset array (may
1140// not be 0..len if there is slicing involved)
1141fn stitch_offsets<T: ArrowNativeType + std::ops::Add<Output = T> + std::ops::Sub<Output = T>>(
1142    offsets: Vec<LanceBuffer>,
1143) -> (LanceBuffer, Vec<Range<usize>>) {
1144    if offsets.is_empty() {
1145        return (LanceBuffer::empty(), Vec::default());
1146    }
1147    let len = offsets.iter().map(|b| b.len()).sum::<usize>();
1148    // Note: we are making a copy here, even if there is only one input, because we want to
1149    // normalize that input if it doesn't start with zero.  This could be micro-optimized out
1150    // if needed.
1151    let mut dest = Vec::with_capacity(len);
1152    let mut byte_ranges = Vec::with_capacity(offsets.len());
1153
1154    // We insert one leading 0 before processing any of the inputs
1155    dest.push(T::from_usize(0).unwrap());
1156
1157    for mut o in offsets.into_iter() {
1158        if !o.is_empty() {
1159            let last_offset = *dest.last().unwrap();
1160            let o = o.borrow_to_typed_slice::<T>();
1161            let start = *o.as_ref().first().unwrap();
1162            // First, we skip the first offset
1163            // Then, we subtract that first offset from each remaining offset
1164            //
1165            // This gives us a 0-based offset array (minus the leading 0)
1166            //
1167            // Then we add the last offset from the previous array to each offset
1168            // which shifts our offset array to the correct position
1169            //
1170            // For example, let's assume the last offset from the previous array
1171            // was 10 and we are given [13, 17, 22].  This means we have two values with
1172            // length 4 (17 - 13) and 5 (22 - 17).  The output from this step will be
1173            // [14, 19].  Combined with our last offset of 10, this gives us [10, 14, 19]
1174            // which is our same two values of length 4 and 5.
1175            dest.extend(o.as_ref()[1..].iter().map(|&x| x + last_offset - start));
1176        }
1177        byte_ranges.push(get_byte_range::<T>(&mut o));
1178    }
1179    (LanceBuffer::reinterpret_vec(dest), byte_ranges)
1180}
1181
1182fn arrow_binary_to_data_block(
1183    arrays: &[ArrayRef],
1184    num_values: u64,
1185    bits_per_offset: u8,
1186) -> DataBlock {
1187    let data_vec = arrays.iter().map(|arr| arr.to_data()).collect::<Vec<_>>();
1188    let bytes_per_offset = bits_per_offset as usize / 8;
1189    let offsets = data_vec
1190        .iter()
1191        .map(|d| {
1192            LanceBuffer::from(
1193                d.buffers()[0].slice_with_length(d.offset(), (d.len() + 1) * bytes_per_offset),
1194            )
1195        })
1196        .collect::<Vec<_>>();
1197    let (offsets, data_ranges) = if bits_per_offset == 32 {
1198        stitch_offsets::<i32>(offsets)
1199    } else {
1200        stitch_offsets::<i64>(offsets)
1201    };
1202    let data = data_vec
1203        .iter()
1204        .zip(data_ranges)
1205        .map(|(d, byte_range)| {
1206            LanceBuffer::from(
1207                d.buffers()[1]
1208                    .slice_with_length(byte_range.start, byte_range.end - byte_range.start),
1209            )
1210        })
1211        .collect::<Vec<_>>();
1212    let data = LanceBuffer::concat_into_one(data);
1213    DataBlock::VariableWidth(VariableWidthBlock {
1214        data,
1215        offsets,
1216        bits_per_offset,
1217        num_values,
1218        block_info: BlockInfo::new(),
1219    })
1220}
1221
1222fn encode_flat_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1223    let bytes_per_value = arrays[0].data_type().byte_width();
1224    let mut buffer = Vec::with_capacity(num_values as usize * bytes_per_value);
1225    for arr in arrays {
1226        let data = arr.to_data();
1227        buffer.extend_from_slice(data.buffers()[0].as_slice());
1228    }
1229    LanceBuffer::from(buffer)
1230}
1231
1232fn do_encode_bitmap_data(bitmaps: &[BooleanBuffer], num_values: u64) -> LanceBuffer {
1233    let mut builder = BooleanBufferBuilder::new(num_values as usize);
1234
1235    for buf in bitmaps {
1236        builder.append_buffer(buf);
1237    }
1238
1239    let buffer = builder.finish().into_inner();
1240    LanceBuffer::from(buffer)
1241}
1242
1243fn encode_bitmap_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1244    let bitmaps = arrays
1245        .iter()
1246        .map(|arr| arr.as_boolean().values().clone())
1247        .collect::<Vec<_>>();
1248    do_encode_bitmap_data(&bitmaps, num_values)
1249}
1250
1251// Concatenate dictionary arrays.  This is a bit tricky because we might overflow the
1252// index type.  If we do, we need to upscale the indices to a larger type.
1253fn concat_dict_arrays(arrays: &[ArrayRef]) -> ArrayRef {
1254    let value_type = arrays[0].as_any_dictionary().values().data_type();
1255    let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1256    match arrow_select::concat::concat(&array_refs) {
1257        Ok(array) => array,
1258        Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1259            // Slow, but hopefully a corner case.  Optimize later
1260            let upscaled = array_refs
1261                .iter()
1262                .map(|arr| {
1263                    match arrow_cast::cast(
1264                        *arr,
1265                        &DataType::Dictionary(
1266                            Box::new(DataType::UInt32),
1267                            Box::new(value_type.clone()),
1268                        ),
1269                    ) {
1270                        Ok(arr) => arr,
1271                        Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1272                            // Technically I think this means the input type was u64 already
1273                            unimplemented!("Dictionary arrays with more than 2^32 unique values")
1274                        }
1275                        err => err.unwrap(),
1276                    }
1277                })
1278                .collect::<Vec<_>>();
1279            let array_refs = upscaled.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1280            // Can still fail if concat pushes over u32 boundary
1281            match arrow_select::concat::concat(&array_refs) {
1282                Ok(array) => array,
1283                Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1284                    unimplemented!("Dictionary arrays with more than 2^32 unique values")
1285                }
1286                err => err.unwrap(),
1287            }
1288        }
1289        // Shouldn't be any other possible errors in concat
1290        err => err.unwrap(),
1291    }
1292}
1293
1294fn max_index_val(index_type: &DataType) -> u64 {
1295    match index_type {
1296        DataType::Int8 => i8::MAX as u64,
1297        DataType::Int16 => i16::MAX as u64,
1298        DataType::Int32 => i32::MAX as u64,
1299        DataType::Int64 => i64::MAX as u64,
1300        DataType::UInt8 => u8::MAX as u64,
1301        DataType::UInt16 => u16::MAX as u64,
1302        DataType::UInt32 => u32::MAX as u64,
1303        DataType::UInt64 => u64::MAX,
1304        _ => panic!("Invalid dictionary index type"),
1305    }
1306}
1307
1308// If we get multiple dictionary arrays and they don't all have the same dictionary
1309// then we need to normalize the indices.  Otherwise we might have something like:
1310//
1311// First chunk ["hello", "foo"], [0, 0, 1, 1, 1]
1312// Second chunk ["bar", "world"], [0, 1, 0, 1, 1]
1313//
1314// If we simply encode as ["hello", "foo", "bar", "world"], [0, 0, 1, 1, 1, 0, 1, 0, 1, 1]
1315// then we will get the wrong answer because the dictionaries were not merged and the indices
1316// were not remapped.
1317//
1318// A simple way to do this today is to just concatenate all the arrays.  This is because
1319// arrow's dictionary concatenation function already has the logic to merge dictionaries.
1320//
1321// TODO: We could be more efficient here by checking if the dictionaries are the same
1322//       Also, if they aren't, we can possibly do something cheaper than concatenating
1323//
1324// In addition, we want to normalize the representation of nulls.  The cheapest thing to
1325// do (space-wise) is to put the nulls in the dictionary.
1326fn arrow_dictionary_to_data_block(arrays: &[ArrayRef], validity: Option<NullBuffer>) -> DataBlock {
1327    let array = concat_dict_arrays(arrays);
1328    let array_dict = array.as_any_dictionary();
1329    let mut indices = array_dict.keys();
1330    let num_values = indices.len() as u64;
1331    let mut values = array_dict.values().clone();
1332    // Placeholder, if we need to upcast, we will initialize this and set `indices` to refer to it
1333    let mut upcast = None;
1334
1335    // TODO: Should we just always normalize indices to u32?  That would make logic simpler
1336    // and we're going to bitpack them soon anyways
1337
1338    let indices_block = if let Some(validity) = validity {
1339        // If there is validity then we find the first invalid index in the dictionary values, inserting
1340        // a new value if we need to.  Then we change all indices to point to that value.  This way we
1341        // never need to store nullability of the indices.
1342        let mut first_invalid_index = None;
1343        if let Some(values_validity) = values.nulls() {
1344            first_invalid_index = (!values_validity.inner()).set_indices().next();
1345        }
1346        let first_invalid_index = first_invalid_index.unwrap_or_else(|| {
1347            let null_arr = new_null_array(values.data_type(), 1);
1348            values = arrow_select::concat::concat(&[values.as_ref(), null_arr.as_ref()]).unwrap();
1349            let null_index = values.len() - 1;
1350            let max_index_val = max_index_val(indices.data_type());
1351            if null_index as u64 > max_index_val {
1352                // Widen the index type
1353                if max_index_val >= u32::MAX as u64 {
1354                    unimplemented!("Dictionary arrays with 2^32 unique value (or more) and a null")
1355                }
1356                upcast = Some(arrow_cast::cast(indices, &DataType::UInt32).unwrap());
1357                indices = upcast.as_ref().unwrap();
1358            }
1359            null_index
1360        });
1361        // This can't fail since we already checked for fit
1362        let null_index_arr = arrow_cast::cast(
1363            &UInt64Array::from(vec![first_invalid_index as u64]),
1364            indices.data_type(),
1365        )
1366        .unwrap();
1367
1368        let bytes_per_index = indices.data_type().byte_width();
1369        let bits_per_index = bytes_per_index as u64 * 8;
1370
1371        let null_index_arr = null_index_arr.into_data();
1372        let null_index_bytes = &null_index_arr.buffers()[0];
1373        // Need to make a copy here since indices isn't mutable, could be avoided in theory
1374        let mut indices_bytes = indices.to_data().buffers()[0].to_vec();
1375        for invalid_idx in (!validity.inner()).set_indices() {
1376            indices_bytes[invalid_idx * bytes_per_index..(invalid_idx + 1) * bytes_per_index]
1377                .copy_from_slice(null_index_bytes.as_slice());
1378        }
1379        FixedWidthDataBlock {
1380            data: LanceBuffer::from(indices_bytes),
1381            bits_per_value: bits_per_index,
1382            num_values,
1383            block_info: BlockInfo::new(),
1384        }
1385    } else {
1386        FixedWidthDataBlock {
1387            data: LanceBuffer::from(indices.to_data().buffers()[0].clone()),
1388            bits_per_value: indices.data_type().byte_width() as u64 * 8,
1389            num_values,
1390            block_info: BlockInfo::new(),
1391        }
1392    };
1393
1394    let items = DataBlock::from(values);
1395    DataBlock::Dictionary(DictionaryDataBlock {
1396        indices: indices_block,
1397        dictionary: Box::new(items),
1398    })
1399}
1400
1401enum Nullability {
1402    None,
1403    All,
1404    Some(NullBuffer),
1405}
1406
1407impl Nullability {
1408    fn to_option(&self) -> Option<NullBuffer> {
1409        match self {
1410            Self::Some(nulls) => Some(nulls.clone()),
1411            _ => None,
1412        }
1413    }
1414}
1415
1416fn extract_nulls(arrays: &[ArrayRef], num_values: u64) -> Nullability {
1417    let mut has_nulls = false;
1418    let nulls_and_lens = arrays
1419        .iter()
1420        .map(|arr| {
1421            let nulls = arr.logical_nulls();
1422            has_nulls |= nulls.is_some();
1423            (nulls, arr.len())
1424        })
1425        .collect::<Vec<_>>();
1426    if !has_nulls {
1427        return Nullability::None;
1428    }
1429    let mut builder = BooleanBufferBuilder::new(num_values as usize);
1430    let mut num_nulls = 0;
1431    for (null, len) in nulls_and_lens {
1432        if let Some(null) = null {
1433            num_nulls += null.null_count();
1434            builder.append_buffer(&null.into_inner());
1435        } else {
1436            builder.append_n(len, true);
1437        }
1438    }
1439    if num_nulls == num_values as usize {
1440        Nullability::All
1441    } else {
1442        Nullability::Some(NullBuffer::new(builder.finish()))
1443    }
1444}
1445
1446impl DataBlock {
1447    pub fn from_arrays(arrays: &[ArrayRef], num_values: u64) -> Self {
1448        if arrays.is_empty() || num_values == 0 {
1449            return Self::AllNull(AllNullDataBlock { num_values: 0 });
1450        }
1451
1452        let data_type = arrays[0].data_type();
1453        let nulls = extract_nulls(arrays, num_values);
1454
1455        if let Nullability::All = nulls {
1456            return Self::AllNull(AllNullDataBlock { num_values });
1457        }
1458
1459        let mut encoded = match data_type {
1460            DataType::Binary | DataType::Utf8 => arrow_binary_to_data_block(arrays, num_values, 32),
1461            DataType::BinaryView | DataType::Utf8View => {
1462                todo!()
1463            }
1464            DataType::LargeBinary | DataType::LargeUtf8 => {
1465                arrow_binary_to_data_block(arrays, num_values, 64)
1466            }
1467            DataType::Boolean => {
1468                let data = encode_bitmap_data(arrays, num_values);
1469                Self::FixedWidth(FixedWidthDataBlock {
1470                    data,
1471                    bits_per_value: 1,
1472                    num_values,
1473                    block_info: BlockInfo::new(),
1474                })
1475            }
1476            DataType::Date32
1477            | DataType::Date64
1478            | DataType::Decimal32(_, _)
1479            | DataType::Decimal64(_, _)
1480            | DataType::Decimal128(_, _)
1481            | DataType::Decimal256(_, _)
1482            | DataType::Duration(_)
1483            | DataType::FixedSizeBinary(_)
1484            | DataType::Float16
1485            | DataType::Float32
1486            | DataType::Float64
1487            | DataType::Int16
1488            | DataType::Int32
1489            | DataType::Int64
1490            | DataType::Int8
1491            | DataType::Interval(_)
1492            | DataType::Time32(_)
1493            | DataType::Time64(_)
1494            | DataType::Timestamp(_, _)
1495            | DataType::UInt16
1496            | DataType::UInt32
1497            | DataType::UInt64
1498            | DataType::UInt8 => {
1499                let data = encode_flat_data(arrays, num_values);
1500                Self::FixedWidth(FixedWidthDataBlock {
1501                    data,
1502                    bits_per_value: data_type.byte_width() as u64 * 8,
1503                    num_values,
1504                    block_info: BlockInfo::new(),
1505                })
1506            }
1507            DataType::Null => Self::AllNull(AllNullDataBlock { num_values }),
1508            DataType::Dictionary(_, _) => arrow_dictionary_to_data_block(arrays, nulls.to_option()),
1509            DataType::Struct(fields) => {
1510                let structs = arrays.iter().map(|arr| arr.as_struct()).collect::<Vec<_>>();
1511                let mut children = Vec::with_capacity(fields.len());
1512                for child_idx in 0..fields.len() {
1513                    let child_vec = structs
1514                        .iter()
1515                        .map(|s| s.column(child_idx).clone())
1516                        .collect::<Vec<_>>();
1517                    children.push(Self::from_arrays(&child_vec, num_values));
1518                }
1519
1520                // Extract validity for the struct array
1521                let validity = match &nulls {
1522                    Nullability::None => None,
1523                    Nullability::Some(null_buffer) => Some(null_buffer.clone()),
1524                    Nullability::All => unreachable!("Should have returned AllNull earlier"),
1525                };
1526
1527                Self::Struct(StructDataBlock {
1528                    children,
1529                    block_info: BlockInfo::default(),
1530                    validity,
1531                })
1532            }
1533            DataType::FixedSizeList(_, dim) => {
1534                let children = arrays
1535                    .iter()
1536                    .map(|arr| arr.as_fixed_size_list().values().clone())
1537                    .collect::<Vec<_>>();
1538                let child_block = Self::from_arrays(&children, num_values * *dim as u64);
1539                Self::FixedSizeList(FixedSizeListBlock {
1540                    child: Box::new(child_block),
1541                    dimension: *dim as u64,
1542                })
1543            }
1544            DataType::LargeList(_)
1545            | DataType::List(_)
1546            | DataType::ListView(_)
1547            | DataType::LargeListView(_)
1548            | DataType::Map(_, _)
1549            | DataType::RunEndEncoded(_, _)
1550            | DataType::Union(_, _) => {
1551                panic!(
1552                    "Field with data type {} cannot be converted to data block",
1553                    data_type
1554                )
1555            }
1556        };
1557
1558        // compute statistics
1559        encoded.compute_stat();
1560
1561        if !matches!(data_type, DataType::Dictionary(_, _)) {
1562            match nulls {
1563                Nullability::None => encoded,
1564                Nullability::Some(nulls) => Self::Nullable(NullableDataBlock {
1565                    data: Box::new(encoded),
1566                    nulls: LanceBuffer::from(nulls.into_inner().into_inner()),
1567                    block_info: BlockInfo::new(),
1568                }),
1569                _ => unreachable!(),
1570            }
1571        } else {
1572            // Dictionaries already insert the nulls into the dictionary items
1573            encoded
1574        }
1575    }
1576
1577    pub fn from_array<T: Array + 'static>(array: T) -> Self {
1578        let num_values = array.len();
1579        Self::from_arrays(&[Arc::new(array)], num_values as u64)
1580    }
1581}
1582
1583impl From<ArrayRef> for DataBlock {
1584    fn from(array: ArrayRef) -> Self {
1585        let num_values = array.len() as u64;
1586        Self::from_arrays(&[array], num_values)
1587    }
1588}
1589
1590pub trait DataBlockBuilderImpl: std::fmt::Debug {
1591    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>);
1592    fn finish(self: Box<Self>) -> DataBlock;
1593}
1594
1595#[derive(Debug)]
1596pub struct DataBlockBuilder {
1597    estimated_size_bytes: u64,
1598    builder: Option<Box<dyn DataBlockBuilderImpl>>,
1599}
1600
1601impl DataBlockBuilder {
1602    pub fn with_capacity_estimate(estimated_size_bytes: u64) -> Self {
1603        Self {
1604            estimated_size_bytes,
1605            builder: None,
1606        }
1607    }
1608
1609    fn get_builder(&mut self, block: &DataBlock) -> &mut dyn DataBlockBuilderImpl {
1610        if self.builder.is_none() {
1611            self.builder = Some(block.make_builder(self.estimated_size_bytes));
1612        }
1613        self.builder.as_mut().unwrap().as_mut()
1614    }
1615
1616    pub fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
1617        self.get_builder(data_block).append(data_block, selection);
1618    }
1619
1620    pub fn finish(self) -> DataBlock {
1621        let builder = self.builder.expect("DataBlockBuilder didn't see any data");
1622        builder.finish()
1623    }
1624}
1625
1626#[cfg(test)]
1627mod tests {
1628    use std::sync::Arc;
1629
1630    use arrow_array::{
1631        ArrayRef, DictionaryArray, Int8Array, LargeBinaryArray, StringArray, UInt8Array,
1632        UInt16Array, make_array, new_null_array,
1633        types::{Int8Type, Int32Type},
1634    };
1635    use arrow_buffer::{BooleanBuffer, NullBuffer};
1636
1637    use arrow_schema::{DataType, Field, Fields};
1638    use lance_datagen::{ArrayGeneratorExt, DEFAULT_SEED, RowCount, array};
1639    use rand::SeedableRng;
1640
1641    use crate::buffer::LanceBuffer;
1642
1643    use super::{AllNullDataBlock, DataBlock};
1644
1645    use arrow_array::Array;
1646
1647    #[test]
1648    fn test_sliced_to_data_block() {
1649        let ints = UInt16Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
1650        let ints = ints.slice(2, 4);
1651        let data = DataBlock::from_array(ints);
1652
1653        let fixed_data = data.as_fixed_width().unwrap();
1654        assert_eq!(fixed_data.num_values, 4);
1655        assert_eq!(fixed_data.data.len(), 8);
1656
1657        let nullable_ints =
1658            UInt16Array::from(vec![Some(0), None, Some(2), None, Some(4), None, Some(6)]);
1659        let nullable_ints = nullable_ints.slice(1, 3);
1660        let data = DataBlock::from_array(nullable_ints);
1661
1662        let nullable = data.as_nullable().unwrap();
1663        assert_eq!(nullable.nulls, LanceBuffer::from(vec![0b00000010]));
1664    }
1665
1666    #[test]
1667    fn test_string_to_data_block() {
1668        // Converting string arrays that contain nulls to DataBlock
1669        let strings1 = StringArray::from(vec![Some("hello"), None, Some("world")]);
1670        let strings2 = StringArray::from(vec![Some("a"), Some("b")]);
1671        let strings3 = StringArray::from(vec![Option::<&'static str>::None, None]);
1672
1673        let arrays = &[strings1, strings2, strings3]
1674            .iter()
1675            .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1676            .collect::<Vec<_>>();
1677
1678        let block = DataBlock::from_arrays(arrays, 7);
1679
1680        assert_eq!(block.num_values(), 7);
1681        let block = block.as_nullable().unwrap();
1682
1683        assert_eq!(block.nulls, LanceBuffer::from(vec![0b00011101]));
1684
1685        let data = block.data.as_variable_width().unwrap();
1686        assert_eq!(
1687            data.offsets,
1688            LanceBuffer::reinterpret_vec(vec![0, 5, 5, 10, 11, 12, 12, 12])
1689        );
1690
1691        assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworldab"));
1692
1693        // Converting string arrays that do not contain nulls to DataBlock
1694        let strings1 = StringArray::from(vec![Some("a"), Some("bc")]);
1695        let strings2 = StringArray::from(vec![Some("def")]);
1696
1697        let arrays = &[strings1, strings2]
1698            .iter()
1699            .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1700            .collect::<Vec<_>>();
1701
1702        let block = DataBlock::from_arrays(arrays, 3);
1703
1704        assert_eq!(block.num_values(), 3);
1705        // Should be no nullable wrapper
1706        let data = block.as_variable_width().unwrap();
1707        assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(vec![0, 1, 3, 6]));
1708        assert_eq!(data.data, LanceBuffer::copy_slice(b"abcdef"));
1709    }
1710
1711    #[test]
1712    fn test_string_sliced() {
1713        let check = |arr: Vec<StringArray>, expected_off: Vec<i32>, expected_data: &[u8]| {
1714            let arrs = arr
1715                .into_iter()
1716                .map(|a| Arc::new(a) as ArrayRef)
1717                .collect::<Vec<_>>();
1718            let num_rows = arrs.iter().map(|a| a.len()).sum::<usize>() as u64;
1719            let data = DataBlock::from_arrays(&arrs, num_rows);
1720
1721            assert_eq!(data.num_values(), num_rows);
1722
1723            let data = data.as_variable_width().unwrap();
1724            assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(expected_off));
1725            assert_eq!(data.data, LanceBuffer::copy_slice(expected_data));
1726        };
1727
1728        let string = StringArray::from(vec![Some("hello"), Some("world")]);
1729        check(vec![string.slice(1, 1)], vec![0, 5], b"world");
1730        check(vec![string.slice(0, 1)], vec![0, 5], b"hello");
1731        check(
1732            vec![string.slice(0, 1), string.slice(1, 1)],
1733            vec![0, 5, 10],
1734            b"helloworld",
1735        );
1736
1737        let string2 = StringArray::from(vec![Some("foo"), Some("bar")]);
1738        check(
1739            vec![string.slice(0, 1), string2.slice(0, 1)],
1740            vec![0, 5, 8],
1741            b"hellofoo",
1742        );
1743    }
1744
1745    #[test]
1746    fn test_large() {
1747        let arr = LargeBinaryArray::from_vec(vec![b"hello", b"world"]);
1748        let data = DataBlock::from_array(arr);
1749
1750        assert_eq!(data.num_values(), 2);
1751        let data = data.as_variable_width().unwrap();
1752        assert_eq!(data.bits_per_offset, 64);
1753        assert_eq!(data.num_values, 2);
1754        assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworld"));
1755        assert_eq!(
1756            data.offsets,
1757            LanceBuffer::reinterpret_vec(vec![0_u64, 5, 10])
1758        );
1759    }
1760
1761    #[test]
1762    fn test_dictionary_indices_normalized() {
1763        let arr1 = DictionaryArray::<Int8Type>::from_iter([Some("a"), Some("a"), Some("b")]);
1764        let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("b"), Some("c")]);
1765
1766        let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1767
1768        assert_eq!(data.num_values(), 5);
1769        let data = data.as_dictionary().unwrap();
1770        let indices = data.indices;
1771        assert_eq!(indices.bits_per_value, 8);
1772        assert_eq!(indices.num_values, 5);
1773        assert_eq!(
1774            indices.data,
1775            // You might expect 0, 0, 1, 1, 2 but it seems that arrow's dictionary concat does
1776            // not actually collapse dictionaries.  This is an arrow problem however, and we don't
1777            // need to fix it here.
1778            LanceBuffer::reinterpret_vec::<i8>(vec![0, 0, 1, 2, 3])
1779        );
1780
1781        let items = data.dictionary.as_variable_width().unwrap();
1782        assert_eq!(items.bits_per_offset, 32);
1783        assert_eq!(items.num_values, 4);
1784        assert_eq!(items.data, LanceBuffer::copy_slice(b"abbc"));
1785        assert_eq!(
1786            items.offsets,
1787            LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 4],)
1788        );
1789    }
1790
1791    #[test]
1792    fn test_dictionary_nulls() {
1793        // Test both ways of encoding nulls
1794
1795        // By default, nulls get encoded into the indices
1796        let arr1 = DictionaryArray::<Int8Type>::from_iter([None, Some("a"), Some("b")]);
1797        let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("c"), None]);
1798
1799        let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1800
1801        let check_common = |data: DataBlock| {
1802            assert_eq!(data.num_values(), 5);
1803            let dict = data.as_dictionary().unwrap();
1804
1805            let nullable_items = dict.dictionary.as_nullable().unwrap();
1806            assert_eq!(nullable_items.nulls, LanceBuffer::from(vec![0b00000111]));
1807            assert_eq!(nullable_items.data.num_values(), 4);
1808
1809            let items = nullable_items.data.as_variable_width().unwrap();
1810            assert_eq!(items.bits_per_offset, 32);
1811            assert_eq!(items.num_values, 4);
1812            assert_eq!(items.data, LanceBuffer::copy_slice(b"abc"));
1813            assert_eq!(
1814                items.offsets,
1815                LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 3],)
1816            );
1817
1818            let indices = dict.indices;
1819            assert_eq!(indices.bits_per_value, 8);
1820            assert_eq!(indices.num_values, 5);
1821            assert_eq!(
1822                indices.data,
1823                LanceBuffer::reinterpret_vec::<i8>(vec![3, 0, 1, 2, 3])
1824            );
1825        };
1826        check_common(data);
1827
1828        // However, we can manually create a dictionary where nulls are in the dictionary
1829        let items = StringArray::from(vec![Some("a"), Some("b"), Some("c"), None]);
1830        let indices = Int8Array::from(vec![Some(3), Some(0), Some(1), Some(2), Some(3)]);
1831        let dict = DictionaryArray::new(indices, Arc::new(items));
1832
1833        let data = DataBlock::from_array(dict);
1834
1835        check_common(data);
1836    }
1837
1838    #[test]
1839    fn test_dictionary_cannot_add_null() {
1840        // 256 unique strings
1841        let items = StringArray::from(
1842            (0..256)
1843                .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1844                .collect::<Vec<_>>(),
1845        );
1846        // 257 indices, covering the whole range, plus one null
1847        let indices = UInt8Array::from(
1848            (0..=256)
1849                .map(|i| if i == 256 { None } else { Some(i as u8) })
1850                .collect::<Vec<_>>(),
1851        );
1852        // We want to normalize this by pushing nulls into the dictionary, but we cannot because
1853        // the dictionary is too large for the index type
1854        let dict = DictionaryArray::new(indices, Arc::new(items));
1855        let data = DataBlock::from_array(dict);
1856
1857        assert_eq!(data.num_values(), 257);
1858
1859        let dict = data.as_dictionary().unwrap();
1860
1861        assert_eq!(dict.indices.bits_per_value, 32);
1862        assert_eq!(
1863            dict.indices.data,
1864            LanceBuffer::reinterpret_vec((0_u32..257).collect::<Vec<_>>())
1865        );
1866
1867        let nullable_items = dict.dictionary.as_nullable().unwrap();
1868        let null_buffer = NullBuffer::new(BooleanBuffer::new(
1869            nullable_items.nulls.into_buffer(),
1870            0,
1871            257,
1872        ));
1873        for i in 0..256 {
1874            assert!(!null_buffer.is_null(i));
1875        }
1876        assert!(null_buffer.is_null(256));
1877
1878        assert_eq!(
1879            nullable_items.data.as_variable_width().unwrap().data.len(),
1880            32640
1881        );
1882    }
1883
1884    #[test]
1885    fn test_all_null() {
1886        for data_type in [
1887            DataType::UInt32,
1888            DataType::FixedSizeBinary(2),
1889            DataType::List(Arc::new(Field::new("item", DataType::UInt32, true))),
1890            DataType::Struct(Fields::from(vec![Field::new("a", DataType::UInt32, true)])),
1891        ] {
1892            let block = DataBlock::AllNull(AllNullDataBlock { num_values: 10 });
1893            let arr = block.into_arrow(data_type.clone(), true).unwrap();
1894            let arr = make_array(arr);
1895            let expected = new_null_array(&data_type, 10);
1896            assert_eq!(&arr, &expected);
1897        }
1898    }
1899
1900    #[test]
1901    fn test_dictionary_cannot_concatenate() {
1902        // 256 unique strings
1903        let items = StringArray::from(
1904            (0..256)
1905                .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1906                .collect::<Vec<_>>(),
1907        );
1908        // 256 different unique strings
1909        let other_items = StringArray::from(
1910            (0..256)
1911                .map(|i| Some(String::from_utf8(vec![1; i + 1]).unwrap()))
1912                .collect::<Vec<_>>(),
1913        );
1914        let indices = UInt8Array::from_iter_values(0..=255);
1915        let dict1 = DictionaryArray::new(indices.clone(), Arc::new(items));
1916        let dict2 = DictionaryArray::new(indices, Arc::new(other_items));
1917        let data = DataBlock::from_arrays(&[Arc::new(dict1), Arc::new(dict2)], 512);
1918        assert_eq!(data.num_values(), 512);
1919
1920        let dict = data.as_dictionary().unwrap();
1921
1922        assert_eq!(dict.indices.bits_per_value, 32);
1923        assert_eq!(
1924            dict.indices.data,
1925            LanceBuffer::reinterpret_vec::<u32>((0..512).collect::<Vec<_>>())
1926        );
1927        // What fun: 0 + 1 + .. + 255 + 1 + 2 + .. + 256 = 2^16
1928        assert_eq!(
1929            dict.dictionary.as_variable_width().unwrap().data.len(),
1930            65536
1931        );
1932    }
1933
1934    #[test]
1935    fn test_data_size() {
1936        let mut rng = rand_xoshiro::Xoshiro256PlusPlus::seed_from_u64(DEFAULT_SEED.0);
1937        // test data_size() when input has no nulls
1938        let mut genn = array::rand::<Int32Type>().with_nulls(&[false, false, false]);
1939
1940        let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
1941        let block = DataBlock::from_array(arr.clone());
1942        assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
1943
1944        let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
1945        let block = DataBlock::from_array(arr.clone());
1946        assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
1947
1948        // test data_size() when input has nulls
1949        let mut genn = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
1950        let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
1951        let block = DataBlock::from_array(arr.clone());
1952
1953        let array_data = arr.to_data();
1954        let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1955        // the NullBuffer.len() returns the length in bits so we divide_round_up by 8
1956        let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1957        assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1958
1959        let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
1960        let block = DataBlock::from_array(arr.clone());
1961
1962        let array_data = arr.to_data();
1963        let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1964        let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1965        assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1966
1967        let mut genn = array::rand::<Int32Type>().with_nulls(&[true, true, false]);
1968        let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
1969        let block = DataBlock::from_array(arr.clone());
1970
1971        let array_data = arr.to_data();
1972        let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1973        let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1974        assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1975
1976        let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
1977        let block = DataBlock::from_array(arr.clone());
1978
1979        let array_data = arr.to_data();
1980        let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1981        let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1982        assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1983
1984        let mut genn = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
1985        let arr1 = genn.generate(RowCount::from(3), &mut rng).unwrap();
1986        let arr2 = genn.generate(RowCount::from(3), &mut rng).unwrap();
1987        let arr3 = genn.generate(RowCount::from(3), &mut rng).unwrap();
1988        let block = DataBlock::from_arrays(&[arr1.clone(), arr2.clone(), arr3.clone()], 9);
1989
1990        let concatenated_array = arrow_select::concat::concat(&[
1991            &*Arc::new(arr1.clone()) as &dyn Array,
1992            &*Arc::new(arr2.clone()) as &dyn Array,
1993            &*Arc::new(arr3.clone()) as &dyn Array,
1994        ])
1995        .unwrap();
1996        let total_buffer_size: usize = concatenated_array
1997            .to_data()
1998            .buffers()
1999            .iter()
2000            .map(|buffer| buffer.len())
2001            .sum();
2002
2003        let total_nulls_size_in_bytes = concatenated_array.nulls().unwrap().len().div_ceil(8);
2004        assert!(block.data_size() == (total_buffer_size + total_nulls_size_in_bytes) as u64);
2005    }
2006}