lance_encoding/
data.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Data layouts to represent encoded data in a sub-Arrow format
5//!
6//! These [`DataBlock`] structures represent physical layouts.  They fill a gap somewhere
7//! between [`arrow_data::data::ArrayData`] (which, as a collection of buffers, is too
8//! generic because it doesn't give us enough information about what those buffers represent)
9//! and [`arrow_array::array::Array`] (which is too specific, because it cares about the
10//! logical data type).
11//!
12//! In addition, the layouts represented here are slightly stricter than Arrow's layout rules.
13//! For example, offset buffers MUST start with 0.  These additional restrictions impose a
14//! slight penalty on encode (to normalize arrow data) but make the development of encoders
15//! and decoders easier (since they can rely on a normalized representation)
16
17use std::{
18    ops::Range,
19    sync::{Arc, RwLock},
20};
21
22use arrow_array::{
23    cast::AsArray,
24    new_empty_array, new_null_array,
25    types::{ArrowDictionaryKeyType, UInt16Type, UInt32Type, UInt64Type, UInt8Type},
26    Array, ArrayRef, OffsetSizeTrait, UInt64Array,
27};
28use arrow_buffer::{ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer};
29use arrow_data::{ArrayData, ArrayDataBuilder};
30use arrow_schema::DataType;
31use lance_arrow::DataTypeExt;
32use snafu::location;
33
34use lance_core::{Error, Result};
35
36use crate::{
37    buffer::LanceBuffer,
38    statistics::{ComputeStat, Stat},
39};
40
41/// A data block with no buffers where everything is null
42///
43/// Note: this data block should not be used for future work.  It will be deprecated
44/// in the 2.1 version of the format where nullability will be handled by the structural
45/// encoders.
46#[derive(Debug, Clone)]
47pub struct AllNullDataBlock {
48    /// The number of values represented by this block
49    pub num_values: u64,
50}
51
52impl AllNullDataBlock {
53    fn into_arrow(self, data_type: DataType, _validate: bool) -> Result<ArrayData> {
54        Ok(ArrayData::new_null(&data_type, self.num_values as usize))
55    }
56
57    fn into_buffers(self) -> Vec<LanceBuffer> {
58        vec![]
59    }
60}
61
62use std::collections::HashMap;
63
64// `BlockInfo` stores the statistics of this `DataBlock`, such as `NullCount` for `NullableDataBlock`,
65// `BitWidth` for `FixedWidthDataBlock`, `Cardinality` for all `DataBlock`
66#[derive(Debug, Clone)]
67pub struct BlockInfo(pub Arc<RwLock<HashMap<Stat, Arc<dyn Array>>>>);
68
69impl Default for BlockInfo {
70    fn default() -> Self {
71        Self::new()
72    }
73}
74
75impl BlockInfo {
76    pub fn new() -> Self {
77        Self(Arc::new(RwLock::new(HashMap::new())))
78    }
79}
80
81impl PartialEq for BlockInfo {
82    fn eq(&self, other: &Self) -> bool {
83        let self_info = self.0.read().unwrap();
84        let other_info = other.0.read().unwrap();
85        *self_info == *other_info
86    }
87}
88
89/// Wraps a data block and adds nullability information to it
90///
91/// Note: this data block should not be used for future work.  It will be deprecated
92/// in the 2.1 version of the format where nullability will be handled by the structural
93/// encoders.
94#[derive(Debug, Clone)]
95pub struct NullableDataBlock {
96    /// The underlying data
97    pub data: Box<DataBlock>,
98    /// A bitmap of validity for each value
99    pub nulls: LanceBuffer,
100
101    pub block_info: BlockInfo,
102}
103
104impl NullableDataBlock {
105    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
106        let nulls = self.nulls.into_buffer();
107        let data = self.data.into_arrow(data_type, validate)?.into_builder();
108        let data = data.null_bit_buffer(Some(nulls));
109        if validate {
110            Ok(data.build()?)
111        } else {
112            Ok(unsafe { data.build_unchecked() })
113        }
114    }
115
116    fn into_buffers(self) -> Vec<LanceBuffer> {
117        let mut buffers = vec![self.nulls];
118        buffers.extend(self.data.into_buffers());
119        buffers
120    }
121
122    pub fn data_size(&self) -> u64 {
123        self.data.data_size() + self.nulls.len() as u64
124    }
125}
126
127/// A block representing the same constant value repeated many times
128#[derive(Debug, PartialEq, Clone)]
129pub struct ConstantDataBlock {
130    /// Data buffer containing the value
131    pub data: LanceBuffer,
132    /// The number of values
133    pub num_values: u64,
134}
135
136impl ConstantDataBlock {
137    fn into_buffers(self) -> Vec<LanceBuffer> {
138        vec![self.data]
139    }
140
141    fn into_arrow(self, _data_type: DataType, _validate: bool) -> Result<ArrayData> {
142        // We don't need this yet but if we come up with some way of serializing
143        // scalars to/from bytes then we could implement it.
144        todo!()
145    }
146
147    pub fn try_clone(&self) -> Result<Self> {
148        Ok(Self {
149            data: self.data.clone(),
150            num_values: self.num_values,
151        })
152    }
153
154    pub fn data_size(&self) -> u64 {
155        self.data.len() as u64
156    }
157}
158
159/// A data block for a single buffer of data where each element has a fixed number of bits
160#[derive(Debug, PartialEq, Clone)]
161pub struct FixedWidthDataBlock {
162    /// The data buffer
163    pub data: LanceBuffer,
164    /// The number of bits per value
165    pub bits_per_value: u64,
166    /// The number of values represented by this block
167    pub num_values: u64,
168
169    pub block_info: BlockInfo,
170}
171
172impl FixedWidthDataBlock {
173    fn do_into_arrow(
174        self,
175        data_type: DataType,
176        num_values: u64,
177        validate: bool,
178    ) -> Result<ArrayData> {
179        let data_buffer = self.data.into_buffer();
180        let builder = ArrayDataBuilder::new(data_type)
181            .add_buffer(data_buffer)
182            .len(num_values as usize)
183            .null_count(0);
184        if validate {
185            Ok(builder.build()?)
186        } else {
187            Ok(unsafe { builder.build_unchecked() })
188        }
189    }
190
191    pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
192        let root_num_values = self.num_values;
193        self.do_into_arrow(data_type, root_num_values, validate)
194    }
195
196    pub fn into_buffers(self) -> Vec<LanceBuffer> {
197        vec![self.data]
198    }
199
200    pub fn try_clone(&self) -> Result<Self> {
201        Ok(Self {
202            data: self.data.clone(),
203            bits_per_value: self.bits_per_value,
204            num_values: self.num_values,
205            block_info: self.block_info.clone(),
206        })
207    }
208
209    pub fn data_size(&self) -> u64 {
210        self.data.len() as u64
211    }
212}
213
214#[derive(Debug)]
215pub struct VariableWidthDataBlockBuilder<T: OffsetSizeTrait> {
216    offsets: Vec<T>,
217    bytes: Vec<u8>,
218}
219
220impl<T: OffsetSizeTrait> VariableWidthDataBlockBuilder<T> {
221    fn new(estimated_size_bytes: u64) -> Self {
222        Self {
223            offsets: vec![T::from_usize(0).unwrap()],
224            bytes: Vec::with_capacity(estimated_size_bytes as usize),
225        }
226    }
227}
228
229impl<T: OffsetSizeTrait + bytemuck::Pod> DataBlockBuilderImpl for VariableWidthDataBlockBuilder<T> {
230    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
231        let block = data_block.as_variable_width_ref().unwrap();
232        assert!(block.bits_per_offset == T::get_byte_width() as u8 * 8);
233        let offsets = block.offsets.borrow_to_typed_view::<T>();
234
235        let start_offset = offsets[selection.start as usize];
236        let end_offset = offsets[selection.end as usize];
237        let mut previous_len = self.bytes.len();
238
239        self.bytes
240            .extend_from_slice(&block.data[start_offset.as_usize()..end_offset.as_usize()]);
241
242        self.offsets.extend(
243            offsets[selection.start as usize..selection.end as usize]
244                .iter()
245                .zip(&offsets[selection.start as usize + 1..=selection.end as usize])
246                .map(|(&current, &next)| {
247                    let this_value_len = next - current;
248                    previous_len += this_value_len.as_usize();
249                    T::from_usize(previous_len).unwrap()
250                }),
251        );
252    }
253
254    fn finish(self: Box<Self>) -> DataBlock {
255        let num_values = (self.offsets.len() - 1) as u64;
256        DataBlock::VariableWidth(VariableWidthBlock {
257            data: LanceBuffer::from(self.bytes),
258            offsets: LanceBuffer::reinterpret_vec(self.offsets),
259            bits_per_offset: T::get_byte_width() as u8 * 8,
260            num_values,
261            block_info: BlockInfo::new(),
262        })
263    }
264}
265
266#[derive(Debug)]
267struct BitmapDataBlockBuilder {
268    values: BooleanBufferBuilder,
269}
270
271impl BitmapDataBlockBuilder {
272    fn new(estimated_size_bytes: u64) -> Self {
273        Self {
274            values: BooleanBufferBuilder::new(estimated_size_bytes as usize * 8),
275        }
276    }
277}
278
279impl DataBlockBuilderImpl for BitmapDataBlockBuilder {
280    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
281        let bitmap_blk = data_block.as_fixed_width_ref().unwrap();
282        self.values.append_packed_range(
283            selection.start as usize..selection.end as usize,
284            &bitmap_blk.data,
285        );
286    }
287
288    fn finish(mut self: Box<Self>) -> DataBlock {
289        let bool_buf = self.values.finish();
290        let num_values = bool_buf.len() as u64;
291        let bits_buf = bool_buf.into_inner();
292        DataBlock::FixedWidth(FixedWidthDataBlock {
293            data: LanceBuffer::from(bits_buf),
294            bits_per_value: 1,
295            num_values,
296            block_info: BlockInfo::new(),
297        })
298    }
299}
300
301#[derive(Debug)]
302struct FixedWidthDataBlockBuilder {
303    bits_per_value: u64,
304    bytes_per_value: u64,
305    values: Vec<u8>,
306}
307
308impl FixedWidthDataBlockBuilder {
309    fn new(bits_per_value: u64, estimated_size_bytes: u64) -> Self {
310        assert!(bits_per_value % 8 == 0);
311        Self {
312            bits_per_value,
313            bytes_per_value: bits_per_value / 8,
314            values: Vec::with_capacity(estimated_size_bytes as usize),
315        }
316    }
317}
318
319impl DataBlockBuilderImpl for FixedWidthDataBlockBuilder {
320    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
321        let block = data_block.as_fixed_width_ref().unwrap();
322        assert_eq!(self.bits_per_value, block.bits_per_value);
323        let start = selection.start as usize * self.bytes_per_value as usize;
324        let end = selection.end as usize * self.bytes_per_value as usize;
325        self.values.extend_from_slice(&block.data[start..end]);
326    }
327
328    fn finish(self: Box<Self>) -> DataBlock {
329        let num_values = (self.values.len() / self.bytes_per_value as usize) as u64;
330        DataBlock::FixedWidth(FixedWidthDataBlock {
331            data: LanceBuffer::from(self.values),
332            bits_per_value: self.bits_per_value,
333            num_values,
334            block_info: BlockInfo::new(),
335        })
336    }
337}
338
339#[derive(Debug)]
340struct StructDataBlockBuilder {
341    children: Vec<Box<dyn DataBlockBuilderImpl>>,
342}
343
344impl StructDataBlockBuilder {
345    fn new(children: Vec<Box<dyn DataBlockBuilderImpl>>) -> Self {
346        Self { children }
347    }
348}
349
350impl DataBlockBuilderImpl for StructDataBlockBuilder {
351    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
352        let data_block = data_block.as_struct_ref().unwrap();
353        for i in 0..self.children.len() {
354            self.children[i].append(&data_block.children[i], selection.clone());
355        }
356    }
357
358    fn finish(self: Box<Self>) -> DataBlock {
359        let mut children_data_block = Vec::new();
360        for child in self.children {
361            let child_data_block = child.finish();
362            children_data_block.push(child_data_block);
363        }
364        DataBlock::Struct(StructDataBlock {
365            children: children_data_block,
366            block_info: BlockInfo::new(),
367            validity: None,
368        })
369    }
370}
371
372#[derive(Debug, Default)]
373struct AllNullDataBlockBuilder {
374    num_values: u64,
375}
376
377impl DataBlockBuilderImpl for AllNullDataBlockBuilder {
378    fn append(&mut self, _data_block: &DataBlock, selection: Range<u64>) {
379        self.num_values += selection.end - selection.start;
380    }
381
382    fn finish(self: Box<Self>) -> DataBlock {
383        DataBlock::AllNull(AllNullDataBlock {
384            num_values: self.num_values,
385        })
386    }
387}
388
389/// A data block to represent a fixed size list
390#[derive(Debug, Clone)]
391pub struct FixedSizeListBlock {
392    /// The child data block
393    pub child: Box<DataBlock>,
394    /// The number of items in each list
395    pub dimension: u64,
396}
397
398impl FixedSizeListBlock {
399    pub fn num_values(&self) -> u64 {
400        self.child.num_values() / self.dimension
401    }
402
403    /// Try to flatten a FixedSizeListBlock into a FixedWidthDataBlock
404    ///
405    /// Returns None if any children are nullable
406    pub fn try_into_flat(self) -> Option<FixedWidthDataBlock> {
407        match *self.child {
408            // Cannot flatten a nullable child
409            DataBlock::Nullable(_) => None,
410            DataBlock::FixedSizeList(inner) => {
411                let mut flat = inner.try_into_flat()?;
412                flat.bits_per_value *= self.dimension;
413                flat.num_values /= self.dimension;
414                Some(flat)
415            }
416            DataBlock::FixedWidth(mut inner) => {
417                inner.bits_per_value *= self.dimension;
418                inner.num_values /= self.dimension;
419                Some(inner)
420            }
421            _ => panic!(
422                "Expected FixedSizeList or FixedWidth data block but found {:?}",
423                self
424            ),
425        }
426    }
427
428    pub fn flatten_as_fixed(&mut self) -> FixedWidthDataBlock {
429        match self.child.as_mut() {
430            DataBlock::FixedSizeList(fsl) => fsl.flatten_as_fixed(),
431            DataBlock::FixedWidth(fw) => fw.clone(),
432            _ => panic!("Expected FixedSizeList or FixedWidth data block"),
433        }
434    }
435
436    /// Convert a flattened values block into a FixedSizeListBlock
437    pub fn from_flat(data: FixedWidthDataBlock, data_type: &DataType) -> DataBlock {
438        match data_type {
439            DataType::FixedSizeList(child_field, dimension) => {
440                let mut data = data;
441                data.bits_per_value /= *dimension as u64;
442                data.num_values *= *dimension as u64;
443                let child_data = Self::from_flat(data, child_field.data_type());
444                DataBlock::FixedSizeList(Self {
445                    child: Box::new(child_data),
446                    dimension: *dimension as u64,
447                })
448            }
449            // Base case, we've hit a non-list type
450            _ => DataBlock::FixedWidth(data),
451        }
452    }
453
454    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
455        let num_values = self.num_values();
456        let builder = match &data_type {
457            DataType::FixedSizeList(child_field, _) => {
458                let child_data = self
459                    .child
460                    .into_arrow(child_field.data_type().clone(), validate)?;
461                ArrayDataBuilder::new(data_type)
462                    .add_child_data(child_data)
463                    .len(num_values as usize)
464                    .null_count(0)
465            }
466            _ => panic!("Expected FixedSizeList data type and got {:?}", data_type),
467        };
468        if validate {
469            Ok(builder.build()?)
470        } else {
471            Ok(unsafe { builder.build_unchecked() })
472        }
473    }
474
475    fn into_buffers(self) -> Vec<LanceBuffer> {
476        self.child.into_buffers()
477    }
478
479    fn data_size(&self) -> u64 {
480        self.child.data_size()
481    }
482}
483
484#[derive(Debug)]
485struct FixedSizeListBlockBuilder {
486    inner: Box<dyn DataBlockBuilderImpl>,
487    dimension: u64,
488}
489
490impl FixedSizeListBlockBuilder {
491    fn new(inner: Box<dyn DataBlockBuilderImpl>, dimension: u64) -> Self {
492        Self { inner, dimension }
493    }
494}
495
496impl DataBlockBuilderImpl for FixedSizeListBlockBuilder {
497    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
498        let selection = selection.start * self.dimension..selection.end * self.dimension;
499        let fsl = data_block.as_fixed_size_list_ref().unwrap();
500        self.inner.append(fsl.child.as_ref(), selection);
501    }
502
503    fn finish(self: Box<Self>) -> DataBlock {
504        let inner_block = self.inner.finish();
505        DataBlock::FixedSizeList(FixedSizeListBlock {
506            child: Box::new(inner_block),
507            dimension: self.dimension,
508        })
509    }
510}
511
512#[derive(Debug)]
513struct NullableDataBlockBuilder {
514    inner: Box<dyn DataBlockBuilderImpl>,
515    validity: BooleanBufferBuilder,
516}
517
518impl NullableDataBlockBuilder {
519    fn new(inner: Box<dyn DataBlockBuilderImpl>, estimated_size_bytes: usize) -> Self {
520        Self {
521            inner,
522            validity: BooleanBufferBuilder::new(estimated_size_bytes * 8),
523        }
524    }
525}
526
527impl DataBlockBuilderImpl for NullableDataBlockBuilder {
528    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
529        let nullable = data_block.as_nullable_ref().unwrap();
530        let bool_buf = BooleanBuffer::new(
531            nullable.nulls.clone().into_buffer(),
532            selection.start as usize,
533            (selection.end - selection.start) as usize,
534        );
535        self.validity.append_buffer(&bool_buf);
536        self.inner.append(nullable.data.as_ref(), selection);
537    }
538
539    fn finish(mut self: Box<Self>) -> DataBlock {
540        let inner_block = self.inner.finish();
541        DataBlock::Nullable(NullableDataBlock {
542            data: Box::new(inner_block),
543            nulls: LanceBuffer::from(self.validity.finish().into_inner()),
544            block_info: BlockInfo::new(),
545        })
546    }
547}
548
549/// A data block with no regular structure.  There is no available spot to attach
550/// validity / repdef information and it cannot be converted to Arrow without being
551/// decoded
552#[derive(Debug, Clone)]
553pub struct OpaqueBlock {
554    pub buffers: Vec<LanceBuffer>,
555    pub num_values: u64,
556    pub block_info: BlockInfo,
557}
558
559impl OpaqueBlock {
560    pub fn data_size(&self) -> u64 {
561        self.buffers.iter().map(|b| b.len() as u64).sum()
562    }
563}
564
565/// A data block for variable-width data (e.g. strings, packed rows, etc.)
566#[derive(Debug, Clone)]
567pub struct VariableWidthBlock {
568    /// The data buffer
569    pub data: LanceBuffer,
570    /// The offsets buffer (contains num_values + 1 offsets)
571    ///
572    /// Offsets MUST start at 0
573    pub offsets: LanceBuffer,
574    /// The number of bits per offset
575    pub bits_per_offset: u8,
576    /// The number of values represented by this block
577    pub num_values: u64,
578
579    pub block_info: BlockInfo,
580}
581
582impl VariableWidthBlock {
583    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
584        let data_buffer = self.data.into_buffer();
585        let offsets_buffer = self.offsets.into_buffer();
586        let builder = ArrayDataBuilder::new(data_type)
587            .add_buffer(offsets_buffer)
588            .add_buffer(data_buffer)
589            .len(self.num_values as usize)
590            .null_count(0);
591        if validate {
592            Ok(builder.build()?)
593        } else {
594            Ok(unsafe { builder.build_unchecked() })
595        }
596    }
597
598    fn into_buffers(self) -> Vec<LanceBuffer> {
599        vec![self.offsets, self.data]
600    }
601
602    pub fn offsets_as_block(&mut self) -> DataBlock {
603        let offsets = self.offsets.clone();
604        DataBlock::FixedWidth(FixedWidthDataBlock {
605            data: offsets,
606            bits_per_value: self.bits_per_offset as u64,
607            num_values: self.num_values + 1,
608            block_info: BlockInfo::new(),
609        })
610    }
611
612    pub fn data_size(&self) -> u64 {
613        (self.data.len() + self.offsets.len()) as u64
614    }
615}
616
617/// A data block representing a struct
618#[derive(Debug, Clone)]
619pub struct StructDataBlock {
620    /// The child arrays
621    pub children: Vec<DataBlock>,
622    pub block_info: BlockInfo,
623    /// The validity bitmap for the struct (None means all valid)
624    pub validity: Option<NullBuffer>,
625}
626
627impl StructDataBlock {
628    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
629        if let DataType::Struct(fields) = &data_type {
630            let mut builder = ArrayDataBuilder::new(DataType::Struct(fields.clone()));
631            let mut num_rows = 0;
632            for (field, child) in fields.iter().zip(self.children) {
633                let child_data = child.into_arrow(field.data_type().clone(), validate)?;
634                num_rows = child_data.len();
635                builder = builder.add_child_data(child_data);
636            }
637
638            // Apply validity if present
639            let builder = if let Some(validity) = self.validity {
640                let null_count = validity.null_count();
641                builder
642                    .null_bit_buffer(Some(validity.into_inner().into_inner()))
643                    .null_count(null_count)
644            } else {
645                builder.null_count(0)
646            };
647
648            let builder = builder.len(num_rows);
649            if validate {
650                Ok(builder.build()?)
651            } else {
652                Ok(unsafe { builder.build_unchecked() })
653            }
654        } else {
655            Err(Error::Internal {
656                message: format!("Expected Struct, got {:?}", data_type),
657                location: location!(),
658            })
659        }
660    }
661
662    fn remove_outer_validity(self) -> Self {
663        Self {
664            children: self
665                .children
666                .into_iter()
667                .map(|c| c.remove_outer_validity())
668                .collect(),
669            block_info: self.block_info,
670            validity: None, // Remove the validity
671        }
672    }
673
674    fn into_buffers(self) -> Vec<LanceBuffer> {
675        self.children
676            .into_iter()
677            .flat_map(|c| c.into_buffers())
678            .collect()
679    }
680
681    pub fn has_variable_width_child(&self) -> bool {
682        self.children
683            .iter()
684            .any(|child| !matches!(child, DataBlock::FixedWidth(_)))
685    }
686
687    pub fn data_size(&self) -> u64 {
688        self.children
689            .iter()
690            .map(|data_block| data_block.data_size())
691            .sum()
692    }
693}
694
695/// A data block for dictionary encoded data
696#[derive(Debug, Clone)]
697pub struct DictionaryDataBlock {
698    /// The indices buffer
699    pub indices: FixedWidthDataBlock,
700    /// The dictionary itself
701    pub dictionary: Box<DataBlock>,
702}
703
704impl DictionaryDataBlock {
705    fn decode_helper<K: ArrowDictionaryKeyType>(self) -> Result<DataBlock> {
706        // Handle empty batch - this can happen when decoding a range that contains
707        // only empty/null lists, or when reading sparse data
708        if self.indices.num_values == 0 {
709            return Ok(DataBlock::AllNull(AllNullDataBlock { num_values: 0 }));
710        }
711
712        // assume the indices are uniformly distributed.
713        let estimated_size_bytes = self.dictionary.data_size()
714            * (self.indices.num_values + self.dictionary.num_values() - 1)
715            / self.dictionary.num_values();
716        let mut data_builder = DataBlockBuilder::with_capacity_estimate(estimated_size_bytes);
717
718        let indices = self.indices.data.borrow_to_typed_slice::<K::Native>();
719        let indices = indices.as_ref();
720
721        indices
722            .iter()
723            .map(|idx| idx.to_usize().unwrap() as u64)
724            .for_each(|idx| {
725                data_builder.append(&self.dictionary, idx..idx + 1);
726            });
727
728        Ok(data_builder.finish())
729    }
730
731    pub fn decode(self) -> Result<DataBlock> {
732        match self.indices.bits_per_value {
733            8 => self.decode_helper::<UInt8Type>(),
734            16 => self.decode_helper::<UInt16Type>(),
735            32 => self.decode_helper::<UInt32Type>(),
736            64 => self.decode_helper::<UInt64Type>(),
737            _ => Err(lance_core::Error::Internal {
738                message: format!(
739                    "Unsupported dictionary index bit width: {} bits",
740                    self.indices.bits_per_value
741                ),
742                location: location!(),
743            }),
744        }
745    }
746
747    fn into_arrow_dict(
748        self,
749        key_type: Box<DataType>,
750        value_type: Box<DataType>,
751        validate: bool,
752    ) -> Result<ArrayData> {
753        let indices = self.indices.into_arrow((*key_type).clone(), validate)?;
754        let dictionary = self
755            .dictionary
756            .into_arrow((*value_type).clone(), validate)?;
757
758        let builder = indices
759            .into_builder()
760            .add_child_data(dictionary)
761            .data_type(DataType::Dictionary(key_type, value_type));
762
763        if validate {
764            Ok(builder.build()?)
765        } else {
766            Ok(unsafe { builder.build_unchecked() })
767        }
768    }
769
770    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
771        if let DataType::Dictionary(key_type, value_type) = data_type {
772            self.into_arrow_dict(key_type, value_type, validate)
773        } else {
774            self.decode()?.into_arrow(data_type, validate)
775        }
776    }
777
778    fn into_buffers(self) -> Vec<LanceBuffer> {
779        let mut buffers = self.indices.into_buffers();
780        buffers.extend(self.dictionary.into_buffers());
781        buffers
782    }
783
784    pub fn into_parts(self) -> (DataBlock, DataBlock) {
785        (DataBlock::FixedWidth(self.indices), *self.dictionary)
786    }
787
788    pub fn from_parts(indices: FixedWidthDataBlock, dictionary: DataBlock) -> Self {
789        Self {
790            indices,
791            dictionary: Box::new(dictionary),
792        }
793    }
794}
795
796/// A DataBlock is a collection of buffers that represents an "array" of data in very generic terms
797///
798/// The output of each decoder is a DataBlock.  Decoders can be chained together to transform
799/// one DataBlock into a different kind of DataBlock.
800///
801/// The DataBlock is somewhere in between Arrow's ArrayData and Array and represents a physical
802/// layout of the data.
803///
804/// A DataBlock can be converted into an Arrow ArrayData (and then Array) for a given array type.
805/// For example, a FixedWidthDataBlock can be converted into any primitive type or a fixed size
806/// list of a primitive type.  This is a zero-copy operation.
807///
808/// In addition, a DataBlock can be created from an Arrow array or arrays.  This is not a zero-copy
809/// operation as some normalization may be required.
810#[derive(Debug, Clone)]
811pub enum DataBlock {
812    Empty(),
813    Constant(ConstantDataBlock),
814    AllNull(AllNullDataBlock),
815    Nullable(NullableDataBlock),
816    FixedWidth(FixedWidthDataBlock),
817    FixedSizeList(FixedSizeListBlock),
818    VariableWidth(VariableWidthBlock),
819    Opaque(OpaqueBlock),
820    Struct(StructDataBlock),
821    Dictionary(DictionaryDataBlock),
822}
823
824impl DataBlock {
825    /// Convert self into an Arrow ArrayData
826    pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
827        match self {
828            Self::Empty() => Ok(new_empty_array(&data_type).to_data()),
829            Self::Constant(inner) => inner.into_arrow(data_type, validate),
830            Self::AllNull(inner) => inner.into_arrow(data_type, validate),
831            Self::Nullable(inner) => inner.into_arrow(data_type, validate),
832            Self::FixedWidth(inner) => inner.into_arrow(data_type, validate),
833            Self::FixedSizeList(inner) => inner.into_arrow(data_type, validate),
834            Self::VariableWidth(inner) => inner.into_arrow(data_type, validate),
835            Self::Struct(inner) => inner.into_arrow(data_type, validate),
836            Self::Dictionary(inner) => inner.into_arrow(data_type, validate),
837            Self::Opaque(_) => Err(Error::Internal {
838                message: "Cannot convert OpaqueBlock to Arrow".to_string(),
839                location: location!(),
840            }),
841        }
842    }
843
844    /// Convert the data block into a collection of buffers for serialization
845    ///
846    /// The order matters and will be used to reconstruct the data block at read time.
847    pub fn into_buffers(self) -> Vec<LanceBuffer> {
848        match self {
849            Self::Empty() => Vec::default(),
850            Self::Constant(inner) => inner.into_buffers(),
851            Self::AllNull(inner) => inner.into_buffers(),
852            Self::Nullable(inner) => inner.into_buffers(),
853            Self::FixedWidth(inner) => inner.into_buffers(),
854            Self::FixedSizeList(inner) => inner.into_buffers(),
855            Self::VariableWidth(inner) => inner.into_buffers(),
856            Self::Struct(inner) => inner.into_buffers(),
857            Self::Dictionary(inner) => inner.into_buffers(),
858            Self::Opaque(inner) => inner.buffers,
859        }
860    }
861
862    /// Converts the data buffers into borrowed mode and clones the block
863    ///
864    /// This is a zero-copy operation but requires a mutable reference to self and, afterwards,
865    /// all buffers will be in Borrowed mode.
866    /// Try and clone the block
867    ///
868    /// This will fail if any buffers are in owned mode.  You can call borrow_and_clone() to
869    /// ensure that all buffers are in borrowed mode before calling this method.
870    pub fn try_clone(&self) -> Result<Self> {
871        match self {
872            Self::Empty() => Ok(Self::Empty()),
873            Self::Constant(inner) => Ok(Self::Constant(inner.clone())),
874            Self::AllNull(inner) => Ok(Self::AllNull(inner.clone())),
875            Self::Nullable(inner) => Ok(Self::Nullable(inner.clone())),
876            Self::FixedWidth(inner) => Ok(Self::FixedWidth(inner.clone())),
877            Self::FixedSizeList(inner) => Ok(Self::FixedSizeList(inner.clone())),
878            Self::VariableWidth(inner) => Ok(Self::VariableWidth(inner.clone())),
879            Self::Struct(inner) => Ok(Self::Struct(inner.clone())),
880            Self::Dictionary(inner) => Ok(Self::Dictionary(inner.clone())),
881            Self::Opaque(inner) => Ok(Self::Opaque(inner.clone())),
882        }
883    }
884
885    pub fn name(&self) -> &'static str {
886        match self {
887            Self::Constant(_) => "Constant",
888            Self::Empty() => "Empty",
889            Self::AllNull(_) => "AllNull",
890            Self::Nullable(_) => "Nullable",
891            Self::FixedWidth(_) => "FixedWidth",
892            Self::FixedSizeList(_) => "FixedSizeList",
893            Self::VariableWidth(_) => "VariableWidth",
894            Self::Struct(_) => "Struct",
895            Self::Dictionary(_) => "Dictionary",
896            Self::Opaque(_) => "Opaque",
897        }
898    }
899
900    pub fn is_variable(&self) -> bool {
901        match self {
902            Self::Constant(_) => false,
903            Self::Empty() => false,
904            Self::AllNull(_) => false,
905            Self::Nullable(nullable) => nullable.data.is_variable(),
906            Self::FixedWidth(_) => false,
907            Self::FixedSizeList(fsl) => fsl.child.is_variable(),
908            Self::VariableWidth(_) => true,
909            Self::Struct(strct) => strct.children.iter().any(|c| c.is_variable()),
910            Self::Dictionary(_) => {
911                todo!("is_variable for DictionaryDataBlock is not implemented yet")
912            }
913            Self::Opaque(_) => panic!("Does not make sense to ask if an Opaque block is variable"),
914        }
915    }
916
917    pub fn is_nullable(&self) -> bool {
918        match self {
919            Self::AllNull(_) => true,
920            Self::Nullable(_) => true,
921            Self::FixedSizeList(fsl) => fsl.child.is_nullable(),
922            Self::Struct(strct) => strct.children.iter().any(|c| c.is_nullable()),
923            Self::Dictionary(_) => {
924                todo!("is_nullable for DictionaryDataBlock is not implemented yet")
925            }
926            Self::Opaque(_) => panic!("Does not make sense to ask if an Opaque block is nullable"),
927            _ => false,
928        }
929    }
930
931    /// The number of values in the block
932    ///
933    /// This function does not recurse into child blocks.  If this is a FSL then it will
934    /// be the number of lists and not the number of items.
935    pub fn num_values(&self) -> u64 {
936        match self {
937            Self::Empty() => 0,
938            Self::Constant(inner) => inner.num_values,
939            Self::AllNull(inner) => inner.num_values,
940            Self::Nullable(inner) => inner.data.num_values(),
941            Self::FixedWidth(inner) => inner.num_values,
942            Self::FixedSizeList(inner) => inner.num_values(),
943            Self::VariableWidth(inner) => inner.num_values,
944            Self::Struct(inner) => inner.children[0].num_values(),
945            Self::Dictionary(inner) => inner.indices.num_values,
946            Self::Opaque(inner) => inner.num_values,
947        }
948    }
949
950    /// The number of items in a single row
951    ///
952    /// This is always 1 unless there are layers of FSL
953    pub fn items_per_row(&self) -> u64 {
954        match self {
955            Self::Empty() => todo!(),     // Leave undefined until needed
956            Self::Constant(_) => todo!(), // Leave undefined until needed
957            Self::AllNull(_) => todo!(),  // Leave undefined until needed
958            Self::Nullable(nullable) => nullable.data.items_per_row(),
959            Self::FixedWidth(_) => 1,
960            Self::FixedSizeList(fsl) => fsl.dimension * fsl.child.items_per_row(),
961            Self::VariableWidth(_) => 1,
962            Self::Struct(_) => todo!(), // Leave undefined until needed
963            Self::Dictionary(_) => 1,
964            Self::Opaque(_) => 1,
965        }
966    }
967
968    /// The number of bytes in the data block (including any child blocks)
969    pub fn data_size(&self) -> u64 {
970        match self {
971            Self::Empty() => 0,
972            Self::Constant(inner) => inner.data_size(),
973            Self::AllNull(_) => 0,
974            Self::Nullable(inner) => inner.data_size(),
975            Self::FixedWidth(inner) => inner.data_size(),
976            Self::FixedSizeList(inner) => inner.data_size(),
977            Self::VariableWidth(inner) => inner.data_size(),
978            Self::Struct(_) => {
979                todo!("the data_size method for StructDataBlock is not implemented yet")
980            }
981            Self::Dictionary(_) => {
982                todo!("the data_size method for DictionaryDataBlock is not implemented yet")
983            }
984            Self::Opaque(inner) => inner.data_size(),
985        }
986    }
987
988    /// Removes any validity information from the block
989    ///
990    /// This does not filter the block (e.g. remove rows).  It only removes
991    /// the validity bitmaps (if present).  Any garbage masked by null bits
992    /// will now appear as proper values.
993    ///
994    /// If `recurse` is true, then this will also remove validity from any child blocks.
995    pub fn remove_outer_validity(self) -> Self {
996        match self {
997            Self::AllNull(_) => panic!("Cannot remove validity on all-null data"),
998            Self::Nullable(inner) => *inner.data,
999            Self::Struct(inner) => Self::Struct(inner.remove_outer_validity()),
1000            other => other,
1001        }
1002    }
1003
1004    pub fn make_builder(&self, estimated_size_bytes: u64) -> Box<dyn DataBlockBuilderImpl> {
1005        match self {
1006            Self::FixedWidth(inner) => {
1007                if inner.bits_per_value == 1 {
1008                    Box::new(BitmapDataBlockBuilder::new(estimated_size_bytes))
1009                } else {
1010                    Box::new(FixedWidthDataBlockBuilder::new(
1011                        inner.bits_per_value,
1012                        estimated_size_bytes,
1013                    ))
1014                }
1015            }
1016            Self::VariableWidth(inner) => {
1017                if inner.bits_per_offset == 32 {
1018                    Box::new(VariableWidthDataBlockBuilder::<i32>::new(
1019                        estimated_size_bytes,
1020                    ))
1021                } else if inner.bits_per_offset == 64 {
1022                    Box::new(VariableWidthDataBlockBuilder::<i64>::new(
1023                        estimated_size_bytes,
1024                    ))
1025                } else {
1026                    todo!()
1027                }
1028            }
1029            Self::FixedSizeList(inner) => {
1030                let inner_builder = inner.child.make_builder(estimated_size_bytes);
1031                Box::new(FixedSizeListBlockBuilder::new(
1032                    inner_builder,
1033                    inner.dimension,
1034                ))
1035            }
1036            Self::Nullable(nullable) => {
1037                // There's no easy way to know what percentage of the data is in the valiidty buffer
1038                // but 1/16th seems like a reasonable guess.
1039                let estimated_validity_size_bytes = estimated_size_bytes / 16;
1040                let inner_builder = nullable
1041                    .data
1042                    .make_builder(estimated_size_bytes - estimated_validity_size_bytes);
1043                Box::new(NullableDataBlockBuilder::new(
1044                    inner_builder,
1045                    estimated_validity_size_bytes as usize,
1046                ))
1047            }
1048            Self::Struct(struct_data_block) => {
1049                let num_children = struct_data_block.children.len();
1050                let per_child_estimate = if num_children == 0 {
1051                    0
1052                } else {
1053                    estimated_size_bytes / num_children as u64
1054                };
1055                let child_builders = struct_data_block
1056                    .children
1057                    .iter()
1058                    .map(|child| child.make_builder(per_child_estimate))
1059                    .collect();
1060                Box::new(StructDataBlockBuilder::new(child_builders))
1061            }
1062            Self::AllNull(_) => Box::new(AllNullDataBlockBuilder::default()),
1063            _ => todo!("make_builder for {:?}", self),
1064        }
1065    }
1066}
1067
1068macro_rules! as_type {
1069    ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1070        pub fn $fn_name(self) -> Option<$inner_type> {
1071            match self {
1072                Self::$inner(inner) => Some(inner),
1073                _ => None,
1074            }
1075        }
1076    };
1077}
1078
1079macro_rules! as_type_ref {
1080    ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1081        pub fn $fn_name(&self) -> Option<&$inner_type> {
1082            match self {
1083                Self::$inner(inner) => Some(inner),
1084                _ => None,
1085            }
1086        }
1087    };
1088}
1089
1090macro_rules! as_type_ref_mut {
1091    ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1092        pub fn $fn_name(&mut self) -> Option<&mut $inner_type> {
1093            match self {
1094                Self::$inner(inner) => Some(inner),
1095                _ => None,
1096            }
1097        }
1098    };
1099}
1100
1101// Cast implementations
1102impl DataBlock {
1103    as_type!(as_all_null, AllNull, AllNullDataBlock);
1104    as_type!(as_nullable, Nullable, NullableDataBlock);
1105    as_type!(as_fixed_width, FixedWidth, FixedWidthDataBlock);
1106    as_type!(as_fixed_size_list, FixedSizeList, FixedSizeListBlock);
1107    as_type!(as_variable_width, VariableWidth, VariableWidthBlock);
1108    as_type!(as_struct, Struct, StructDataBlock);
1109    as_type!(as_dictionary, Dictionary, DictionaryDataBlock);
1110    as_type_ref!(as_all_null_ref, AllNull, AllNullDataBlock);
1111    as_type_ref!(as_nullable_ref, Nullable, NullableDataBlock);
1112    as_type_ref!(as_fixed_width_ref, FixedWidth, FixedWidthDataBlock);
1113    as_type_ref!(as_fixed_size_list_ref, FixedSizeList, FixedSizeListBlock);
1114    as_type_ref!(as_variable_width_ref, VariableWidth, VariableWidthBlock);
1115    as_type_ref!(as_struct_ref, Struct, StructDataBlock);
1116    as_type_ref!(as_dictionary_ref, Dictionary, DictionaryDataBlock);
1117    as_type_ref_mut!(as_all_null_ref_mut, AllNull, AllNullDataBlock);
1118    as_type_ref_mut!(as_nullable_ref_mut, Nullable, NullableDataBlock);
1119    as_type_ref_mut!(as_fixed_width_ref_mut, FixedWidth, FixedWidthDataBlock);
1120    as_type_ref_mut!(
1121        as_fixed_size_list_ref_mut,
1122        FixedSizeList,
1123        FixedSizeListBlock
1124    );
1125    as_type_ref_mut!(as_variable_width_ref_mut, VariableWidth, VariableWidthBlock);
1126    as_type_ref_mut!(as_struct_ref_mut, Struct, StructDataBlock);
1127    as_type_ref_mut!(as_dictionary_ref_mut, Dictionary, DictionaryDataBlock);
1128}
1129
1130// Methods to convert from Arrow -> DataBlock
1131
1132fn get_byte_range<T: ArrowNativeType>(offsets: &mut LanceBuffer) -> Range<usize> {
1133    let offsets = offsets.borrow_to_typed_slice::<T>();
1134    if offsets.as_ref().is_empty() {
1135        0..0
1136    } else {
1137        offsets.as_ref().first().unwrap().as_usize()..offsets.as_ref().last().unwrap().as_usize()
1138    }
1139}
1140
1141// Given multiple offsets arrays [0, 5, 10], [0, 3, 7], etc. stitch
1142// them together to get [0, 5, 10, 13, 20, ...]
1143//
1144// Also returns the data range referenced by each offset array (may
1145// not be 0..len if there is slicing involved)
1146fn stitch_offsets<T: ArrowNativeType + std::ops::Add<Output = T> + std::ops::Sub<Output = T>>(
1147    offsets: Vec<LanceBuffer>,
1148) -> (LanceBuffer, Vec<Range<usize>>) {
1149    if offsets.is_empty() {
1150        return (LanceBuffer::empty(), Vec::default());
1151    }
1152    let len = offsets.iter().map(|b| b.len()).sum::<usize>();
1153    // Note: we are making a copy here, even if there is only one input, because we want to
1154    // normalize that input if it doesn't start with zero.  This could be micro-optimized out
1155    // if needed.
1156    let mut dest = Vec::with_capacity(len);
1157    let mut byte_ranges = Vec::with_capacity(offsets.len());
1158
1159    // We insert one leading 0 before processing any of the inputs
1160    dest.push(T::from_usize(0).unwrap());
1161
1162    for mut o in offsets.into_iter() {
1163        if !o.is_empty() {
1164            let last_offset = *dest.last().unwrap();
1165            let o = o.borrow_to_typed_slice::<T>();
1166            let start = *o.as_ref().first().unwrap();
1167            // First, we skip the first offset
1168            // Then, we subtract that first offset from each remaining offset
1169            //
1170            // This gives us a 0-based offset array (minus the leading 0)
1171            //
1172            // Then we add the last offset from the previous array to each offset
1173            // which shifts our offset array to the correct position
1174            //
1175            // For example, let's assume the last offset from the previous array
1176            // was 10 and we are given [13, 17, 22].  This means we have two values with
1177            // length 4 (17 - 13) and 5 (22 - 17).  The output from this step will be
1178            // [14, 19].  Combined with our last offset of 10, this gives us [10, 14, 19]
1179            // which is our same two values of length 4 and 5.
1180            dest.extend(o.as_ref()[1..].iter().map(|&x| x + last_offset - start));
1181        }
1182        byte_ranges.push(get_byte_range::<T>(&mut o));
1183    }
1184    (LanceBuffer::reinterpret_vec(dest), byte_ranges)
1185}
1186
1187fn arrow_binary_to_data_block(
1188    arrays: &[ArrayRef],
1189    num_values: u64,
1190    bits_per_offset: u8,
1191) -> DataBlock {
1192    let data_vec = arrays.iter().map(|arr| arr.to_data()).collect::<Vec<_>>();
1193    let bytes_per_offset = bits_per_offset as usize / 8;
1194    let offsets = data_vec
1195        .iter()
1196        .map(|d| {
1197            LanceBuffer::from(
1198                d.buffers()[0].slice_with_length(d.offset(), (d.len() + 1) * bytes_per_offset),
1199            )
1200        })
1201        .collect::<Vec<_>>();
1202    let (offsets, data_ranges) = if bits_per_offset == 32 {
1203        stitch_offsets::<i32>(offsets)
1204    } else {
1205        stitch_offsets::<i64>(offsets)
1206    };
1207    let data = data_vec
1208        .iter()
1209        .zip(data_ranges)
1210        .map(|(d, byte_range)| {
1211            LanceBuffer::from(
1212                d.buffers()[1]
1213                    .slice_with_length(byte_range.start, byte_range.end - byte_range.start),
1214            )
1215        })
1216        .collect::<Vec<_>>();
1217    let data = LanceBuffer::concat_into_one(data);
1218    DataBlock::VariableWidth(VariableWidthBlock {
1219        data,
1220        offsets,
1221        bits_per_offset,
1222        num_values,
1223        block_info: BlockInfo::new(),
1224    })
1225}
1226
1227fn encode_flat_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1228    let bytes_per_value = arrays[0].data_type().byte_width();
1229    let mut buffer = Vec::with_capacity(num_values as usize * bytes_per_value);
1230    for arr in arrays {
1231        let data = arr.to_data();
1232        buffer.extend_from_slice(data.buffers()[0].as_slice());
1233    }
1234    LanceBuffer::from(buffer)
1235}
1236
1237fn do_encode_bitmap_data(bitmaps: &[BooleanBuffer], num_values: u64) -> LanceBuffer {
1238    let mut builder = BooleanBufferBuilder::new(num_values as usize);
1239
1240    for buf in bitmaps {
1241        builder.append_buffer(buf);
1242    }
1243
1244    let buffer = builder.finish().into_inner();
1245    LanceBuffer::from(buffer)
1246}
1247
1248fn encode_bitmap_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1249    let bitmaps = arrays
1250        .iter()
1251        .map(|arr| arr.as_boolean().values().clone())
1252        .collect::<Vec<_>>();
1253    do_encode_bitmap_data(&bitmaps, num_values)
1254}
1255
1256// Concatenate dictionary arrays.  This is a bit tricky because we might overflow the
1257// index type.  If we do, we need to upscale the indices to a larger type.
1258fn concat_dict_arrays(arrays: &[ArrayRef]) -> ArrayRef {
1259    let value_type = arrays[0].as_any_dictionary().values().data_type();
1260    let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1261    match arrow_select::concat::concat(&array_refs) {
1262        Ok(array) => array,
1263        Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1264            // Slow, but hopefully a corner case.  Optimize later
1265            let upscaled = array_refs
1266                .iter()
1267                .map(|arr| {
1268                    match arrow_cast::cast(
1269                        *arr,
1270                        &DataType::Dictionary(
1271                            Box::new(DataType::UInt32),
1272                            Box::new(value_type.clone()),
1273                        ),
1274                    ) {
1275                        Ok(arr) => arr,
1276                        Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1277                            // Technically I think this means the input type was u64 already
1278                            unimplemented!("Dictionary arrays with more than 2^32 unique values")
1279                        }
1280                        err => err.unwrap(),
1281                    }
1282                })
1283                .collect::<Vec<_>>();
1284            let array_refs = upscaled.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1285            // Can still fail if concat pushes over u32 boundary
1286            match arrow_select::concat::concat(&array_refs) {
1287                Ok(array) => array,
1288                Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1289                    unimplemented!("Dictionary arrays with more than 2^32 unique values")
1290                }
1291                err => err.unwrap(),
1292            }
1293        }
1294        // Shouldn't be any other possible errors in concat
1295        err => err.unwrap(),
1296    }
1297}
1298
1299fn max_index_val(index_type: &DataType) -> u64 {
1300    match index_type {
1301        DataType::Int8 => i8::MAX as u64,
1302        DataType::Int16 => i16::MAX as u64,
1303        DataType::Int32 => i32::MAX as u64,
1304        DataType::Int64 => i64::MAX as u64,
1305        DataType::UInt8 => u8::MAX as u64,
1306        DataType::UInt16 => u16::MAX as u64,
1307        DataType::UInt32 => u32::MAX as u64,
1308        DataType::UInt64 => u64::MAX,
1309        _ => panic!("Invalid dictionary index type"),
1310    }
1311}
1312
1313// If we get multiple dictionary arrays and they don't all have the same dictionary
1314// then we need to normalize the indices.  Otherwise we might have something like:
1315//
1316// First chunk ["hello", "foo"], [0, 0, 1, 1, 1]
1317// Second chunk ["bar", "world"], [0, 1, 0, 1, 1]
1318//
1319// If we simply encode as ["hello", "foo", "bar", "world"], [0, 0, 1, 1, 1, 0, 1, 0, 1, 1]
1320// then we will get the wrong answer because the dictionaries were not merged and the indices
1321// were not remapped.
1322//
1323// A simple way to do this today is to just concatenate all the arrays.  This is because
1324// arrow's dictionary concatenation function already has the logic to merge dictionaries.
1325//
1326// TODO: We could be more efficient here by checking if the dictionaries are the same
1327//       Also, if they aren't, we can possibly do something cheaper than concatenating
1328//
1329// In addition, we want to normalize the representation of nulls.  The cheapest thing to
1330// do (space-wise) is to put the nulls in the dictionary.
1331fn arrow_dictionary_to_data_block(arrays: &[ArrayRef], validity: Option<NullBuffer>) -> DataBlock {
1332    let array = concat_dict_arrays(arrays);
1333    let array_dict = array.as_any_dictionary();
1334    let mut indices = array_dict.keys();
1335    let num_values = indices.len() as u64;
1336    let mut values = array_dict.values().clone();
1337    // Placeholder, if we need to upcast, we will initialize this and set `indices` to refer to it
1338    let mut upcast = None;
1339
1340    // TODO: Should we just always normalize indices to u32?  That would make logic simpler
1341    // and we're going to bitpack them soon anyways
1342
1343    let indices_block = if let Some(validity) = validity {
1344        // If there is validity then we find the first invalid index in the dictionary values, inserting
1345        // a new value if we need to.  Then we change all indices to point to that value.  This way we
1346        // never need to store nullability of the indices.
1347        let mut first_invalid_index = None;
1348        if let Some(values_validity) = values.nulls() {
1349            first_invalid_index = (!values_validity.inner()).set_indices().next();
1350        }
1351        let first_invalid_index = first_invalid_index.unwrap_or_else(|| {
1352            let null_arr = new_null_array(values.data_type(), 1);
1353            values = arrow_select::concat::concat(&[values.as_ref(), null_arr.as_ref()]).unwrap();
1354            let null_index = values.len() - 1;
1355            let max_index_val = max_index_val(indices.data_type());
1356            if null_index as u64 > max_index_val {
1357                // Widen the index type
1358                if max_index_val >= u32::MAX as u64 {
1359                    unimplemented!("Dictionary arrays with 2^32 unique value (or more) and a null")
1360                }
1361                upcast = Some(arrow_cast::cast(indices, &DataType::UInt32).unwrap());
1362                indices = upcast.as_ref().unwrap();
1363            }
1364            null_index
1365        });
1366        // This can't fail since we already checked for fit
1367        let null_index_arr = arrow_cast::cast(
1368            &UInt64Array::from(vec![first_invalid_index as u64]),
1369            indices.data_type(),
1370        )
1371        .unwrap();
1372
1373        let bytes_per_index = indices.data_type().byte_width();
1374        let bits_per_index = bytes_per_index as u64 * 8;
1375
1376        let null_index_arr = null_index_arr.into_data();
1377        let null_index_bytes = &null_index_arr.buffers()[0];
1378        // Need to make a copy here since indices isn't mutable, could be avoided in theory
1379        let mut indices_bytes = indices.to_data().buffers()[0].to_vec();
1380        for invalid_idx in (!validity.inner()).set_indices() {
1381            indices_bytes[invalid_idx * bytes_per_index..(invalid_idx + 1) * bytes_per_index]
1382                .copy_from_slice(null_index_bytes.as_slice());
1383        }
1384        FixedWidthDataBlock {
1385            data: LanceBuffer::from(indices_bytes),
1386            bits_per_value: bits_per_index,
1387            num_values,
1388            block_info: BlockInfo::new(),
1389        }
1390    } else {
1391        FixedWidthDataBlock {
1392            data: LanceBuffer::from(indices.to_data().buffers()[0].clone()),
1393            bits_per_value: indices.data_type().byte_width() as u64 * 8,
1394            num_values,
1395            block_info: BlockInfo::new(),
1396        }
1397    };
1398
1399    let items = DataBlock::from(values);
1400    DataBlock::Dictionary(DictionaryDataBlock {
1401        indices: indices_block,
1402        dictionary: Box::new(items),
1403    })
1404}
1405
1406enum Nullability {
1407    None,
1408    All,
1409    Some(NullBuffer),
1410}
1411
1412impl Nullability {
1413    fn to_option(&self) -> Option<NullBuffer> {
1414        match self {
1415            Self::Some(nulls) => Some(nulls.clone()),
1416            _ => None,
1417        }
1418    }
1419}
1420
1421fn extract_nulls(arrays: &[ArrayRef], num_values: u64) -> Nullability {
1422    let mut has_nulls = false;
1423    let nulls_and_lens = arrays
1424        .iter()
1425        .map(|arr| {
1426            let nulls = arr.logical_nulls();
1427            has_nulls |= nulls.is_some();
1428            (nulls, arr.len())
1429        })
1430        .collect::<Vec<_>>();
1431    if !has_nulls {
1432        return Nullability::None;
1433    }
1434    let mut builder = BooleanBufferBuilder::new(num_values as usize);
1435    let mut num_nulls = 0;
1436    for (null, len) in nulls_and_lens {
1437        if let Some(null) = null {
1438            num_nulls += null.null_count();
1439            builder.append_buffer(&null.into_inner());
1440        } else {
1441            builder.append_n(len, true);
1442        }
1443    }
1444    if num_nulls == num_values as usize {
1445        Nullability::All
1446    } else {
1447        Nullability::Some(NullBuffer::new(builder.finish()))
1448    }
1449}
1450
1451impl DataBlock {
1452    pub fn from_arrays(arrays: &[ArrayRef], num_values: u64) -> Self {
1453        if arrays.is_empty() || num_values == 0 {
1454            return Self::AllNull(AllNullDataBlock { num_values: 0 });
1455        }
1456
1457        let data_type = arrays[0].data_type();
1458        let nulls = extract_nulls(arrays, num_values);
1459
1460        if let Nullability::All = nulls {
1461            return Self::AllNull(AllNullDataBlock { num_values });
1462        }
1463
1464        let mut encoded = match data_type {
1465            DataType::Binary | DataType::Utf8 => arrow_binary_to_data_block(arrays, num_values, 32),
1466            DataType::BinaryView | DataType::Utf8View => {
1467                todo!()
1468            }
1469            DataType::LargeBinary | DataType::LargeUtf8 => {
1470                arrow_binary_to_data_block(arrays, num_values, 64)
1471            }
1472            DataType::Boolean => {
1473                let data = encode_bitmap_data(arrays, num_values);
1474                Self::FixedWidth(FixedWidthDataBlock {
1475                    data,
1476                    bits_per_value: 1,
1477                    num_values,
1478                    block_info: BlockInfo::new(),
1479                })
1480            }
1481            DataType::Date32
1482            | DataType::Date64
1483            | DataType::Decimal32(_, _)
1484            | DataType::Decimal64(_, _)
1485            | DataType::Decimal128(_, _)
1486            | DataType::Decimal256(_, _)
1487            | DataType::Duration(_)
1488            | DataType::FixedSizeBinary(_)
1489            | DataType::Float16
1490            | DataType::Float32
1491            | DataType::Float64
1492            | DataType::Int16
1493            | DataType::Int32
1494            | DataType::Int64
1495            | DataType::Int8
1496            | DataType::Interval(_)
1497            | DataType::Time32(_)
1498            | DataType::Time64(_)
1499            | DataType::Timestamp(_, _)
1500            | DataType::UInt16
1501            | DataType::UInt32
1502            | DataType::UInt64
1503            | DataType::UInt8 => {
1504                let data = encode_flat_data(arrays, num_values);
1505                Self::FixedWidth(FixedWidthDataBlock {
1506                    data,
1507                    bits_per_value: data_type.byte_width() as u64 * 8,
1508                    num_values,
1509                    block_info: BlockInfo::new(),
1510                })
1511            }
1512            DataType::Null => Self::AllNull(AllNullDataBlock { num_values }),
1513            DataType::Dictionary(_, _) => arrow_dictionary_to_data_block(arrays, nulls.to_option()),
1514            DataType::Struct(fields) => {
1515                let structs = arrays.iter().map(|arr| arr.as_struct()).collect::<Vec<_>>();
1516                let mut children = Vec::with_capacity(fields.len());
1517                for child_idx in 0..fields.len() {
1518                    let child_vec = structs
1519                        .iter()
1520                        .map(|s| s.column(child_idx).clone())
1521                        .collect::<Vec<_>>();
1522                    children.push(Self::from_arrays(&child_vec, num_values));
1523                }
1524
1525                // Extract validity for the struct array
1526                let validity = match &nulls {
1527                    Nullability::None => None,
1528                    Nullability::Some(null_buffer) => Some(null_buffer.clone()),
1529                    Nullability::All => unreachable!("Should have returned AllNull earlier"),
1530                };
1531
1532                Self::Struct(StructDataBlock {
1533                    children,
1534                    block_info: BlockInfo::default(),
1535                    validity,
1536                })
1537            }
1538            DataType::FixedSizeList(_, dim) => {
1539                let children = arrays
1540                    .iter()
1541                    .map(|arr| arr.as_fixed_size_list().values().clone())
1542                    .collect::<Vec<_>>();
1543                let child_block = Self::from_arrays(&children, num_values * *dim as u64);
1544                Self::FixedSizeList(FixedSizeListBlock {
1545                    child: Box::new(child_block),
1546                    dimension: *dim as u64,
1547                })
1548            }
1549            DataType::LargeList(_)
1550            | DataType::List(_)
1551            | DataType::ListView(_)
1552            | DataType::LargeListView(_)
1553            | DataType::Map(_, _)
1554            | DataType::RunEndEncoded(_, _)
1555            | DataType::Union(_, _) => {
1556                panic!(
1557                    "Field with data type {} cannot be converted to data block",
1558                    data_type
1559                )
1560            }
1561        };
1562
1563        // compute statistics
1564        encoded.compute_stat();
1565
1566        if !matches!(data_type, DataType::Dictionary(_, _)) {
1567            match nulls {
1568                Nullability::None => encoded,
1569                Nullability::Some(nulls) => Self::Nullable(NullableDataBlock {
1570                    data: Box::new(encoded),
1571                    nulls: LanceBuffer::from(nulls.into_inner().into_inner()),
1572                    block_info: BlockInfo::new(),
1573                }),
1574                _ => unreachable!(),
1575            }
1576        } else {
1577            // Dictionaries already insert the nulls into the dictionary items
1578            encoded
1579        }
1580    }
1581
1582    pub fn from_array<T: Array + 'static>(array: T) -> Self {
1583        let num_values = array.len();
1584        Self::from_arrays(&[Arc::new(array)], num_values as u64)
1585    }
1586}
1587
1588impl From<ArrayRef> for DataBlock {
1589    fn from(array: ArrayRef) -> Self {
1590        let num_values = array.len() as u64;
1591        Self::from_arrays(&[array], num_values)
1592    }
1593}
1594
1595pub trait DataBlockBuilderImpl: std::fmt::Debug {
1596    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>);
1597    fn finish(self: Box<Self>) -> DataBlock;
1598}
1599
1600#[derive(Debug)]
1601pub struct DataBlockBuilder {
1602    estimated_size_bytes: u64,
1603    builder: Option<Box<dyn DataBlockBuilderImpl>>,
1604}
1605
1606impl DataBlockBuilder {
1607    pub fn with_capacity_estimate(estimated_size_bytes: u64) -> Self {
1608        Self {
1609            estimated_size_bytes,
1610            builder: None,
1611        }
1612    }
1613
1614    fn get_builder(&mut self, block: &DataBlock) -> &mut dyn DataBlockBuilderImpl {
1615        if self.builder.is_none() {
1616            self.builder = Some(block.make_builder(self.estimated_size_bytes));
1617        }
1618        self.builder.as_mut().unwrap().as_mut()
1619    }
1620
1621    pub fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
1622        self.get_builder(data_block).append(data_block, selection);
1623    }
1624
1625    pub fn finish(self) -> DataBlock {
1626        let builder = self.builder.expect("DataBlockBuilder didn't see any data");
1627        builder.finish()
1628    }
1629}
1630
1631#[cfg(test)]
1632mod tests {
1633    use std::sync::Arc;
1634
1635    use arrow_array::{
1636        make_array, new_null_array,
1637        types::{Int32Type, Int8Type},
1638        ArrayRef, DictionaryArray, Int8Array, LargeBinaryArray, StringArray, UInt16Array,
1639        UInt8Array,
1640    };
1641    use arrow_buffer::{BooleanBuffer, NullBuffer};
1642
1643    use arrow_schema::{DataType, Field, Fields};
1644    use lance_datagen::{array, ArrayGeneratorExt, RowCount, DEFAULT_SEED};
1645    use rand::SeedableRng;
1646
1647    use crate::buffer::LanceBuffer;
1648
1649    use super::{AllNullDataBlock, DataBlock};
1650
1651    use arrow_array::Array;
1652
1653    #[test]
1654    fn test_sliced_to_data_block() {
1655        let ints = UInt16Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
1656        let ints = ints.slice(2, 4);
1657        let data = DataBlock::from_array(ints);
1658
1659        let fixed_data = data.as_fixed_width().unwrap();
1660        assert_eq!(fixed_data.num_values, 4);
1661        assert_eq!(fixed_data.data.len(), 8);
1662
1663        let nullable_ints =
1664            UInt16Array::from(vec![Some(0), None, Some(2), None, Some(4), None, Some(6)]);
1665        let nullable_ints = nullable_ints.slice(1, 3);
1666        let data = DataBlock::from_array(nullable_ints);
1667
1668        let nullable = data.as_nullable().unwrap();
1669        assert_eq!(nullable.nulls, LanceBuffer::from(vec![0b00000010]));
1670    }
1671
1672    #[test]
1673    fn test_string_to_data_block() {
1674        // Converting string arrays that contain nulls to DataBlock
1675        let strings1 = StringArray::from(vec![Some("hello"), None, Some("world")]);
1676        let strings2 = StringArray::from(vec![Some("a"), Some("b")]);
1677        let strings3 = StringArray::from(vec![Option::<&'static str>::None, None]);
1678
1679        let arrays = &[strings1, strings2, strings3]
1680            .iter()
1681            .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1682            .collect::<Vec<_>>();
1683
1684        let block = DataBlock::from_arrays(arrays, 7);
1685
1686        assert_eq!(block.num_values(), 7);
1687        let block = block.as_nullable().unwrap();
1688
1689        assert_eq!(block.nulls, LanceBuffer::from(vec![0b00011101]));
1690
1691        let data = block.data.as_variable_width().unwrap();
1692        assert_eq!(
1693            data.offsets,
1694            LanceBuffer::reinterpret_vec(vec![0, 5, 5, 10, 11, 12, 12, 12])
1695        );
1696
1697        assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworldab"));
1698
1699        // Converting string arrays that do not contain nulls to DataBlock
1700        let strings1 = StringArray::from(vec![Some("a"), Some("bc")]);
1701        let strings2 = StringArray::from(vec![Some("def")]);
1702
1703        let arrays = &[strings1, strings2]
1704            .iter()
1705            .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1706            .collect::<Vec<_>>();
1707
1708        let block = DataBlock::from_arrays(arrays, 3);
1709
1710        assert_eq!(block.num_values(), 3);
1711        // Should be no nullable wrapper
1712        let data = block.as_variable_width().unwrap();
1713        assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(vec![0, 1, 3, 6]));
1714        assert_eq!(data.data, LanceBuffer::copy_slice(b"abcdef"));
1715    }
1716
1717    #[test]
1718    fn test_string_sliced() {
1719        let check = |arr: Vec<StringArray>, expected_off: Vec<i32>, expected_data: &[u8]| {
1720            let arrs = arr
1721                .into_iter()
1722                .map(|a| Arc::new(a) as ArrayRef)
1723                .collect::<Vec<_>>();
1724            let num_rows = arrs.iter().map(|a| a.len()).sum::<usize>() as u64;
1725            let data = DataBlock::from_arrays(&arrs, num_rows);
1726
1727            assert_eq!(data.num_values(), num_rows);
1728
1729            let data = data.as_variable_width().unwrap();
1730            assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(expected_off));
1731            assert_eq!(data.data, LanceBuffer::copy_slice(expected_data));
1732        };
1733
1734        let string = StringArray::from(vec![Some("hello"), Some("world")]);
1735        check(vec![string.slice(1, 1)], vec![0, 5], b"world");
1736        check(vec![string.slice(0, 1)], vec![0, 5], b"hello");
1737        check(
1738            vec![string.slice(0, 1), string.slice(1, 1)],
1739            vec![0, 5, 10],
1740            b"helloworld",
1741        );
1742
1743        let string2 = StringArray::from(vec![Some("foo"), Some("bar")]);
1744        check(
1745            vec![string.slice(0, 1), string2.slice(0, 1)],
1746            vec![0, 5, 8],
1747            b"hellofoo",
1748        );
1749    }
1750
1751    #[test]
1752    fn test_large() {
1753        let arr = LargeBinaryArray::from_vec(vec![b"hello", b"world"]);
1754        let data = DataBlock::from_array(arr);
1755
1756        assert_eq!(data.num_values(), 2);
1757        let data = data.as_variable_width().unwrap();
1758        assert_eq!(data.bits_per_offset, 64);
1759        assert_eq!(data.num_values, 2);
1760        assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworld"));
1761        assert_eq!(
1762            data.offsets,
1763            LanceBuffer::reinterpret_vec(vec![0_u64, 5, 10])
1764        );
1765    }
1766
1767    #[test]
1768    fn test_dictionary_indices_normalized() {
1769        let arr1 = DictionaryArray::<Int8Type>::from_iter([Some("a"), Some("a"), Some("b")]);
1770        let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("b"), Some("c")]);
1771
1772        let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1773
1774        assert_eq!(data.num_values(), 5);
1775        let data = data.as_dictionary().unwrap();
1776        let indices = data.indices;
1777        assert_eq!(indices.bits_per_value, 8);
1778        assert_eq!(indices.num_values, 5);
1779        assert_eq!(
1780            indices.data,
1781            // You might expect 0, 0, 1, 1, 2 but it seems that arrow's dictionary concat does
1782            // not actually collapse dictionaries.  This is an arrow problem however, and we don't
1783            // need to fix it here.
1784            LanceBuffer::reinterpret_vec::<i8>(vec![0, 0, 1, 2, 3])
1785        );
1786
1787        let items = data.dictionary.as_variable_width().unwrap();
1788        assert_eq!(items.bits_per_offset, 32);
1789        assert_eq!(items.num_values, 4);
1790        assert_eq!(items.data, LanceBuffer::copy_slice(b"abbc"));
1791        assert_eq!(
1792            items.offsets,
1793            LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 4],)
1794        );
1795    }
1796
1797    #[test]
1798    fn test_dictionary_nulls() {
1799        // Test both ways of encoding nulls
1800
1801        // By default, nulls get encoded into the indices
1802        let arr1 = DictionaryArray::<Int8Type>::from_iter([None, Some("a"), Some("b")]);
1803        let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("c"), None]);
1804
1805        let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1806
1807        let check_common = |data: DataBlock| {
1808            assert_eq!(data.num_values(), 5);
1809            let dict = data.as_dictionary().unwrap();
1810
1811            let nullable_items = dict.dictionary.as_nullable().unwrap();
1812            assert_eq!(nullable_items.nulls, LanceBuffer::from(vec![0b00000111]));
1813            assert_eq!(nullable_items.data.num_values(), 4);
1814
1815            let items = nullable_items.data.as_variable_width().unwrap();
1816            assert_eq!(items.bits_per_offset, 32);
1817            assert_eq!(items.num_values, 4);
1818            assert_eq!(items.data, LanceBuffer::copy_slice(b"abc"));
1819            assert_eq!(
1820                items.offsets,
1821                LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 3],)
1822            );
1823
1824            let indices = dict.indices;
1825            assert_eq!(indices.bits_per_value, 8);
1826            assert_eq!(indices.num_values, 5);
1827            assert_eq!(
1828                indices.data,
1829                LanceBuffer::reinterpret_vec::<i8>(vec![3, 0, 1, 2, 3])
1830            );
1831        };
1832        check_common(data);
1833
1834        // However, we can manually create a dictionary where nulls are in the dictionary
1835        let items = StringArray::from(vec![Some("a"), Some("b"), Some("c"), None]);
1836        let indices = Int8Array::from(vec![Some(3), Some(0), Some(1), Some(2), Some(3)]);
1837        let dict = DictionaryArray::new(indices, Arc::new(items));
1838
1839        let data = DataBlock::from_array(dict);
1840
1841        check_common(data);
1842    }
1843
1844    #[test]
1845    fn test_dictionary_cannot_add_null() {
1846        // 256 unique strings
1847        let items = StringArray::from(
1848            (0..256)
1849                .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1850                .collect::<Vec<_>>(),
1851        );
1852        // 257 indices, covering the whole range, plus one null
1853        let indices = UInt8Array::from(
1854            (0..=256)
1855                .map(|i| if i == 256 { None } else { Some(i as u8) })
1856                .collect::<Vec<_>>(),
1857        );
1858        // We want to normalize this by pushing nulls into the dictionary, but we cannot because
1859        // the dictionary is too large for the index type
1860        let dict = DictionaryArray::new(indices, Arc::new(items));
1861        let data = DataBlock::from_array(dict);
1862
1863        assert_eq!(data.num_values(), 257);
1864
1865        let dict = data.as_dictionary().unwrap();
1866
1867        assert_eq!(dict.indices.bits_per_value, 32);
1868        assert_eq!(
1869            dict.indices.data,
1870            LanceBuffer::reinterpret_vec((0_u32..257).collect::<Vec<_>>())
1871        );
1872
1873        let nullable_items = dict.dictionary.as_nullable().unwrap();
1874        let null_buffer = NullBuffer::new(BooleanBuffer::new(
1875            nullable_items.nulls.into_buffer(),
1876            0,
1877            257,
1878        ));
1879        for i in 0..256 {
1880            assert!(!null_buffer.is_null(i));
1881        }
1882        assert!(null_buffer.is_null(256));
1883
1884        assert_eq!(
1885            nullable_items.data.as_variable_width().unwrap().data.len(),
1886            32640
1887        );
1888    }
1889
1890    #[test]
1891    fn test_all_null() {
1892        for data_type in [
1893            DataType::UInt32,
1894            DataType::FixedSizeBinary(2),
1895            DataType::List(Arc::new(Field::new("item", DataType::UInt32, true))),
1896            DataType::Struct(Fields::from(vec![Field::new("a", DataType::UInt32, true)])),
1897        ] {
1898            let block = DataBlock::AllNull(AllNullDataBlock { num_values: 10 });
1899            let arr = block.into_arrow(data_type.clone(), true).unwrap();
1900            let arr = make_array(arr);
1901            let expected = new_null_array(&data_type, 10);
1902            assert_eq!(&arr, &expected);
1903        }
1904    }
1905
1906    #[test]
1907    fn test_dictionary_cannot_concatenate() {
1908        // 256 unique strings
1909        let items = StringArray::from(
1910            (0..256)
1911                .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1912                .collect::<Vec<_>>(),
1913        );
1914        // 256 different unique strings
1915        let other_items = StringArray::from(
1916            (0..256)
1917                .map(|i| Some(String::from_utf8(vec![1; i + 1]).unwrap()))
1918                .collect::<Vec<_>>(),
1919        );
1920        let indices = UInt8Array::from_iter_values(0..=255);
1921        let dict1 = DictionaryArray::new(indices.clone(), Arc::new(items));
1922        let dict2 = DictionaryArray::new(indices, Arc::new(other_items));
1923        let data = DataBlock::from_arrays(&[Arc::new(dict1), Arc::new(dict2)], 512);
1924        assert_eq!(data.num_values(), 512);
1925
1926        let dict = data.as_dictionary().unwrap();
1927
1928        assert_eq!(dict.indices.bits_per_value, 32);
1929        assert_eq!(
1930            dict.indices.data,
1931            LanceBuffer::reinterpret_vec::<u32>((0..512).collect::<Vec<_>>())
1932        );
1933        // What fun: 0 + 1 + .. + 255 + 1 + 2 + .. + 256 = 2^16
1934        assert_eq!(
1935            dict.dictionary.as_variable_width().unwrap().data.len(),
1936            65536
1937        );
1938    }
1939
1940    #[test]
1941    fn test_data_size() {
1942        let mut rng = rand_xoshiro::Xoshiro256PlusPlus::seed_from_u64(DEFAULT_SEED.0);
1943        // test data_size() when input has no nulls
1944        let mut genn = array::rand::<Int32Type>().with_nulls(&[false, false, false]);
1945
1946        let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
1947        let block = DataBlock::from_array(arr.clone());
1948        assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
1949
1950        let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
1951        let block = DataBlock::from_array(arr.clone());
1952        assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
1953
1954        // test data_size() when input has nulls
1955        let mut genn = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
1956        let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
1957        let block = DataBlock::from_array(arr.clone());
1958
1959        let array_data = arr.to_data();
1960        let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1961        // the NullBuffer.len() returns the length in bits so we divide_round_up by 8
1962        let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1963        assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1964
1965        let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
1966        let block = DataBlock::from_array(arr.clone());
1967
1968        let array_data = arr.to_data();
1969        let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1970        let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1971        assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1972
1973        let mut genn = array::rand::<Int32Type>().with_nulls(&[true, true, false]);
1974        let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
1975        let block = DataBlock::from_array(arr.clone());
1976
1977        let array_data = arr.to_data();
1978        let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1979        let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1980        assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1981
1982        let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
1983        let block = DataBlock::from_array(arr.clone());
1984
1985        let array_data = arr.to_data();
1986        let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1987        let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1988        assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1989
1990        let mut genn = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
1991        let arr1 = genn.generate(RowCount::from(3), &mut rng).unwrap();
1992        let arr2 = genn.generate(RowCount::from(3), &mut rng).unwrap();
1993        let arr3 = genn.generate(RowCount::from(3), &mut rng).unwrap();
1994        let block = DataBlock::from_arrays(&[arr1.clone(), arr2.clone(), arr3.clone()], 9);
1995
1996        let concatenated_array = arrow_select::concat::concat(&[
1997            &*Arc::new(arr1.clone()) as &dyn Array,
1998            &*Arc::new(arr2.clone()) as &dyn Array,
1999            &*Arc::new(arr3.clone()) as &dyn Array,
2000        ])
2001        .unwrap();
2002        let total_buffer_size: usize = concatenated_array
2003            .to_data()
2004            .buffers()
2005            .iter()
2006            .map(|buffer| buffer.len())
2007            .sum();
2008
2009        let total_nulls_size_in_bytes = concatenated_array.nulls().unwrap().len().div_ceil(8);
2010        assert!(block.data_size() == (total_buffer_size + total_nulls_size_in_bytes) as u64);
2011    }
2012}