lance_encoding/
data.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Data layouts to represent encoded data in a sub-Arrow format
5//!
6//! These [`DataBlock`] structures represent physical layouts.  They fill a gap somewhere
7//! between [`arrow_data::data::ArrayData`] (which, as a collection of buffers, is too
8//! generic because it doesn't give us enough information about what those buffers represent)
9//! and [`arrow_array::array::Array`] (which is too specific, because it cares about the
10//! logical data type).
11//!
12//! In addition, the layouts represented here are slightly stricter than Arrow's layout rules.
13//! For example, offset buffers MUST start with 0.  These additional restrictions impose a
14//! slight penalty on encode (to normalize arrow data) but make the development of encoders
15//! and decoders easier (since they can rely on a normalized representation)
16
17use std::{
18    ops::Range,
19    sync::{Arc, RwLock},
20};
21
22use arrow_array::{
23    cast::AsArray,
24    new_empty_array, new_null_array,
25    types::{ArrowDictionaryKeyType, UInt16Type, UInt32Type, UInt64Type, UInt8Type},
26    Array, ArrayRef, OffsetSizeTrait, UInt64Array,
27};
28use arrow_buffer::{
29    ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer, ScalarBuffer,
30};
31use arrow_data::{ArrayData, ArrayDataBuilder};
32use arrow_schema::DataType;
33use lance_arrow::DataTypeExt;
34use snafu::location;
35
36use lance_core::{Error, Result};
37
38use crate::{
39    buffer::LanceBuffer,
40    statistics::{ComputeStat, Stat},
41};
42
43/// A data block with no buffers where everything is null
44///
45/// Note: this data block should not be used for future work.  It will be deprecated
46/// in the 2.1 version of the format where nullability will be handled by the structural
47/// encoders.
48#[derive(Debug, Clone)]
49pub struct AllNullDataBlock {
50    /// The number of values represented by this block
51    pub num_values: u64,
52}
53
54impl AllNullDataBlock {
55    fn into_arrow(self, data_type: DataType, _validate: bool) -> Result<ArrayData> {
56        Ok(ArrayData::new_null(&data_type, self.num_values as usize))
57    }
58
59    fn into_buffers(self) -> Vec<LanceBuffer> {
60        vec![]
61    }
62}
63
64use std::collections::HashMap;
65
66// `BlockInfo` stores the statistics of this `DataBlock`, such as `NullCount` for `NullableDataBlock`,
67// `BitWidth` for `FixedWidthDataBlock`, `Cardinality` for all `DataBlock`
68#[derive(Debug, Clone)]
69pub struct BlockInfo(pub Arc<RwLock<HashMap<Stat, Arc<dyn Array>>>>);
70
71impl Default for BlockInfo {
72    fn default() -> Self {
73        Self::new()
74    }
75}
76
77impl BlockInfo {
78    pub fn new() -> Self {
79        Self(Arc::new(RwLock::new(HashMap::new())))
80    }
81}
82
83impl PartialEq for BlockInfo {
84    fn eq(&self, other: &Self) -> bool {
85        let self_info = self.0.read().unwrap();
86        let other_info = other.0.read().unwrap();
87        *self_info == *other_info
88    }
89}
90
91/// Wraps a data block and adds nullability information to it
92///
93/// Note: this data block should not be used for future work.  It will be deprecated
94/// in the 2.1 version of the format where nullability will be handled by the structural
95/// encoders.
96#[derive(Debug, Clone)]
97pub struct NullableDataBlock {
98    /// The underlying data
99    pub data: Box<DataBlock>,
100    /// A bitmap of validity for each value
101    pub nulls: LanceBuffer,
102
103    pub block_info: BlockInfo,
104}
105
106impl NullableDataBlock {
107    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
108        let nulls = self.nulls.into_buffer();
109        let data = self.data.into_arrow(data_type, validate)?.into_builder();
110        let data = data.null_bit_buffer(Some(nulls));
111        if validate {
112            Ok(data.build()?)
113        } else {
114            Ok(unsafe { data.build_unchecked() })
115        }
116    }
117
118    fn into_buffers(self) -> Vec<LanceBuffer> {
119        let mut buffers = vec![self.nulls];
120        buffers.extend(self.data.into_buffers());
121        buffers
122    }
123
124    pub fn data_size(&self) -> u64 {
125        self.data.data_size() + self.nulls.len() as u64
126    }
127}
128
129/// A block representing the same constant value repeated many times
130#[derive(Debug, PartialEq, Clone)]
131pub struct ConstantDataBlock {
132    /// Data buffer containing the value
133    pub data: LanceBuffer,
134    /// The number of values
135    pub num_values: u64,
136}
137
138impl ConstantDataBlock {
139    fn into_buffers(self) -> Vec<LanceBuffer> {
140        vec![self.data]
141    }
142
143    fn into_arrow(self, _data_type: DataType, _validate: bool) -> Result<ArrayData> {
144        // We don't need this yet but if we come up with some way of serializing
145        // scalars to/from bytes then we could implement it.
146        todo!()
147    }
148
149    pub fn try_clone(&self) -> Result<Self> {
150        Ok(Self {
151            data: self.data.clone(),
152            num_values: self.num_values,
153        })
154    }
155
156    pub fn data_size(&self) -> u64 {
157        self.data.len() as u64
158    }
159}
160
161/// A data block for a single buffer of data where each element has a fixed number of bits
162#[derive(Debug, PartialEq, Clone)]
163pub struct FixedWidthDataBlock {
164    /// The data buffer
165    pub data: LanceBuffer,
166    /// The number of bits per value
167    pub bits_per_value: u64,
168    /// The number of values represented by this block
169    pub num_values: u64,
170
171    pub block_info: BlockInfo,
172}
173
174impl FixedWidthDataBlock {
175    fn do_into_arrow(
176        self,
177        data_type: DataType,
178        num_values: u64,
179        validate: bool,
180    ) -> Result<ArrayData> {
181        let data_buffer = self.data.into_buffer();
182        let builder = ArrayDataBuilder::new(data_type)
183            .add_buffer(data_buffer)
184            .len(num_values as usize)
185            .null_count(0);
186        if validate {
187            Ok(builder.build()?)
188        } else {
189            Ok(unsafe { builder.build_unchecked() })
190        }
191    }
192
193    pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
194        let root_num_values = self.num_values;
195        self.do_into_arrow(data_type, root_num_values, validate)
196    }
197
198    pub fn into_buffers(self) -> Vec<LanceBuffer> {
199        vec![self.data]
200    }
201
202    pub fn try_clone(&self) -> Result<Self> {
203        Ok(Self {
204            data: self.data.clone(),
205            bits_per_value: self.bits_per_value,
206            num_values: self.num_values,
207            block_info: self.block_info.clone(),
208        })
209    }
210
211    pub fn data_size(&self) -> u64 {
212        self.data.len() as u64
213    }
214}
215
216#[derive(Debug)]
217pub struct VariableWidthDataBlockBuilder<T: OffsetSizeTrait> {
218    offsets: Vec<T>,
219    bytes: Vec<u8>,
220}
221
222impl<T: OffsetSizeTrait> VariableWidthDataBlockBuilder<T> {
223    fn new(estimated_size_bytes: u64) -> Self {
224        Self {
225            offsets: vec![T::from_usize(0).unwrap()],
226            bytes: Vec::with_capacity(estimated_size_bytes as usize),
227        }
228    }
229}
230
231impl<T: OffsetSizeTrait> DataBlockBuilderImpl for VariableWidthDataBlockBuilder<T> {
232    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
233        let block = data_block.as_variable_width_ref().unwrap();
234        assert!(block.bits_per_offset == T::get_byte_width() as u8 * 8);
235
236        let offsets: ScalarBuffer<T> = block.offsets.clone().borrow_to_typed_slice();
237
238        let start_offset = offsets[selection.start as usize];
239        let end_offset = offsets[selection.end as usize];
240        let mut previous_len = self.bytes.len();
241
242        self.bytes
243            .extend_from_slice(&block.data[start_offset.as_usize()..end_offset.as_usize()]);
244
245        self.offsets.extend(
246            offsets[selection.start as usize..selection.end as usize]
247                .iter()
248                .zip(&offsets[selection.start as usize + 1..=selection.end as usize])
249                .map(|(&current, &next)| {
250                    let this_value_len = next - current;
251                    previous_len += this_value_len.as_usize();
252                    T::from_usize(previous_len).unwrap()
253                }),
254        );
255    }
256
257    fn finish(self: Box<Self>) -> DataBlock {
258        let num_values = (self.offsets.len() - 1) as u64;
259        DataBlock::VariableWidth(VariableWidthBlock {
260            data: LanceBuffer::from(self.bytes),
261            offsets: LanceBuffer::reinterpret_vec(self.offsets),
262            bits_per_offset: T::get_byte_width() as u8 * 8,
263            num_values,
264            block_info: BlockInfo::new(),
265        })
266    }
267}
268
269#[derive(Debug)]
270struct BitmapDataBlockBuilder {
271    values: BooleanBufferBuilder,
272}
273
274impl BitmapDataBlockBuilder {
275    fn new(estimated_size_bytes: u64) -> Self {
276        Self {
277            values: BooleanBufferBuilder::new(estimated_size_bytes as usize * 8),
278        }
279    }
280}
281
282impl DataBlockBuilderImpl for BitmapDataBlockBuilder {
283    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
284        let bitmap_blk = data_block.as_fixed_width_ref().unwrap();
285        self.values.append_packed_range(
286            selection.start as usize..selection.end as usize,
287            &bitmap_blk.data,
288        );
289    }
290
291    fn finish(mut self: Box<Self>) -> DataBlock {
292        let bool_buf = self.values.finish();
293        let num_values = bool_buf.len() as u64;
294        let bits_buf = bool_buf.into_inner();
295        DataBlock::FixedWidth(FixedWidthDataBlock {
296            data: LanceBuffer::from(bits_buf),
297            bits_per_value: 1,
298            num_values,
299            block_info: BlockInfo::new(),
300        })
301    }
302}
303
304#[derive(Debug)]
305struct FixedWidthDataBlockBuilder {
306    bits_per_value: u64,
307    bytes_per_value: u64,
308    values: Vec<u8>,
309}
310
311impl FixedWidthDataBlockBuilder {
312    fn new(bits_per_value: u64, estimated_size_bytes: u64) -> Self {
313        assert!(bits_per_value % 8 == 0);
314        Self {
315            bits_per_value,
316            bytes_per_value: bits_per_value / 8,
317            values: Vec::with_capacity(estimated_size_bytes as usize),
318        }
319    }
320}
321
322impl DataBlockBuilderImpl for FixedWidthDataBlockBuilder {
323    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
324        let block = data_block.as_fixed_width_ref().unwrap();
325        assert_eq!(self.bits_per_value, block.bits_per_value);
326        let start = selection.start as usize * self.bytes_per_value as usize;
327        let end = selection.end as usize * self.bytes_per_value as usize;
328        self.values.extend_from_slice(&block.data[start..end]);
329    }
330
331    fn finish(self: Box<Self>) -> DataBlock {
332        let num_values = (self.values.len() / self.bytes_per_value as usize) as u64;
333        DataBlock::FixedWidth(FixedWidthDataBlock {
334            data: LanceBuffer::from(self.values),
335            bits_per_value: self.bits_per_value,
336            num_values,
337            block_info: BlockInfo::new(),
338        })
339    }
340}
341
342#[derive(Debug)]
343struct StructDataBlockBuilder {
344    children: Vec<Box<dyn DataBlockBuilderImpl>>,
345}
346
347impl StructDataBlockBuilder {
348    // Currently only Struct with fixed-width fields are supported.
349    // And the assumption that all fields have `bits_per_value % 8 == 0` is made here.
350    fn new(bits_per_values: Vec<u32>, estimated_size_bytes: u64) -> Self {
351        let mut children = vec![];
352
353        debug_assert!(bits_per_values.iter().all(|bpv| bpv % 8 == 0));
354
355        let bytes_per_row: u32 = bits_per_values.iter().sum::<u32>() / 8;
356        let bytes_per_row = bytes_per_row as u64;
357
358        for bits_per_value in bits_per_values.iter() {
359            let this_estimated_size_bytes =
360                estimated_size_bytes / bytes_per_row * (*bits_per_value as u64) / 8;
361            let child =
362                FixedWidthDataBlockBuilder::new(*bits_per_value as u64, this_estimated_size_bytes);
363            children.push(Box::new(child) as Box<dyn DataBlockBuilderImpl>);
364        }
365        Self { children }
366    }
367}
368
369impl DataBlockBuilderImpl for StructDataBlockBuilder {
370    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
371        let data_block = data_block.as_struct_ref().unwrap();
372        for i in 0..self.children.len() {
373            self.children[i].append(&data_block.children[i], selection.clone());
374        }
375    }
376
377    fn finish(self: Box<Self>) -> DataBlock {
378        let mut children_data_block = Vec::new();
379        for child in self.children {
380            let child_data_block = child.finish();
381            children_data_block.push(child_data_block);
382        }
383        DataBlock::Struct(StructDataBlock {
384            children: children_data_block,
385            block_info: BlockInfo::new(),
386            validity: None,
387        })
388    }
389}
390
391#[derive(Debug, Default)]
392struct AllNullDataBlockBuilder {
393    num_values: u64,
394}
395
396impl DataBlockBuilderImpl for AllNullDataBlockBuilder {
397    fn append(&mut self, _data_block: &DataBlock, selection: Range<u64>) {
398        self.num_values += selection.end - selection.start;
399    }
400
401    fn finish(self: Box<Self>) -> DataBlock {
402        DataBlock::AllNull(AllNullDataBlock {
403            num_values: self.num_values,
404        })
405    }
406}
407
408/// A data block to represent a fixed size list
409#[derive(Debug, Clone)]
410pub struct FixedSizeListBlock {
411    /// The child data block
412    pub child: Box<DataBlock>,
413    /// The number of items in each list
414    pub dimension: u64,
415}
416
417impl FixedSizeListBlock {
418    pub fn num_values(&self) -> u64 {
419        self.child.num_values() / self.dimension
420    }
421
422    /// Try to flatten a FixedSizeListBlock into a FixedWidthDataBlock
423    ///
424    /// Returns None if any children are nullable
425    pub fn try_into_flat(self) -> Option<FixedWidthDataBlock> {
426        match *self.child {
427            // Cannot flatten a nullable child
428            DataBlock::Nullable(_) => None,
429            DataBlock::FixedSizeList(inner) => {
430                let mut flat = inner.try_into_flat()?;
431                flat.bits_per_value *= self.dimension;
432                flat.num_values /= self.dimension;
433                Some(flat)
434            }
435            DataBlock::FixedWidth(mut inner) => {
436                inner.bits_per_value *= self.dimension;
437                inner.num_values /= self.dimension;
438                Some(inner)
439            }
440            _ => panic!(
441                "Expected FixedSizeList or FixedWidth data block but found {:?}",
442                self
443            ),
444        }
445    }
446
447    pub fn flatten_as_fixed(&mut self) -> FixedWidthDataBlock {
448        match self.child.as_mut() {
449            DataBlock::FixedSizeList(fsl) => fsl.flatten_as_fixed(),
450            DataBlock::FixedWidth(fw) => fw.clone(),
451            _ => panic!("Expected FixedSizeList or FixedWidth data block"),
452        }
453    }
454
455    /// Convert a flattened values block into a FixedSizeListBlock
456    pub fn from_flat(data: FixedWidthDataBlock, data_type: &DataType) -> DataBlock {
457        match data_type {
458            DataType::FixedSizeList(child_field, dimension) => {
459                let mut data = data;
460                data.bits_per_value /= *dimension as u64;
461                data.num_values *= *dimension as u64;
462                let child_data = Self::from_flat(data, child_field.data_type());
463                DataBlock::FixedSizeList(Self {
464                    child: Box::new(child_data),
465                    dimension: *dimension as u64,
466                })
467            }
468            // Base case, we've hit a non-list type
469            _ => DataBlock::FixedWidth(data),
470        }
471    }
472
473    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
474        let num_values = self.num_values();
475        let builder = match &data_type {
476            DataType::FixedSizeList(child_field, _) => {
477                let child_data = self
478                    .child
479                    .into_arrow(child_field.data_type().clone(), validate)?;
480                ArrayDataBuilder::new(data_type)
481                    .add_child_data(child_data)
482                    .len(num_values as usize)
483                    .null_count(0)
484            }
485            _ => panic!("Expected FixedSizeList data type and got {:?}", data_type),
486        };
487        if validate {
488            Ok(builder.build()?)
489        } else {
490            Ok(unsafe { builder.build_unchecked() })
491        }
492    }
493
494    fn into_buffers(self) -> Vec<LanceBuffer> {
495        self.child.into_buffers()
496    }
497
498    fn data_size(&self) -> u64 {
499        self.child.data_size()
500    }
501}
502
503#[derive(Debug)]
504struct FixedSizeListBlockBuilder {
505    inner: Box<dyn DataBlockBuilderImpl>,
506    dimension: u64,
507}
508
509impl FixedSizeListBlockBuilder {
510    fn new(inner: Box<dyn DataBlockBuilderImpl>, dimension: u64) -> Self {
511        Self { inner, dimension }
512    }
513}
514
515impl DataBlockBuilderImpl for FixedSizeListBlockBuilder {
516    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
517        let selection = selection.start * self.dimension..selection.end * self.dimension;
518        let fsl = data_block.as_fixed_size_list_ref().unwrap();
519        self.inner.append(fsl.child.as_ref(), selection);
520    }
521
522    fn finish(self: Box<Self>) -> DataBlock {
523        let inner_block = self.inner.finish();
524        DataBlock::FixedSizeList(FixedSizeListBlock {
525            child: Box::new(inner_block),
526            dimension: self.dimension,
527        })
528    }
529}
530
531#[derive(Debug)]
532struct NullableDataBlockBuilder {
533    inner: Box<dyn DataBlockBuilderImpl>,
534    validity: BooleanBufferBuilder,
535}
536
537impl NullableDataBlockBuilder {
538    fn new(inner: Box<dyn DataBlockBuilderImpl>, estimated_size_bytes: usize) -> Self {
539        Self {
540            inner,
541            validity: BooleanBufferBuilder::new(estimated_size_bytes * 8),
542        }
543    }
544}
545
546impl DataBlockBuilderImpl for NullableDataBlockBuilder {
547    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
548        let nullable = data_block.as_nullable_ref().unwrap();
549        let bool_buf = BooleanBuffer::new(
550            nullable.nulls.clone().into_buffer(),
551            selection.start as usize,
552            (selection.end - selection.start) as usize,
553        );
554        self.validity.append_buffer(&bool_buf);
555        self.inner.append(nullable.data.as_ref(), selection);
556    }
557
558    fn finish(mut self: Box<Self>) -> DataBlock {
559        let inner_block = self.inner.finish();
560        DataBlock::Nullable(NullableDataBlock {
561            data: Box::new(inner_block),
562            nulls: LanceBuffer::from(self.validity.finish().into_inner()),
563            block_info: BlockInfo::new(),
564        })
565    }
566}
567
568/// A data block with no regular structure.  There is no available spot to attach
569/// validity / repdef information and it cannot be converted to Arrow without being
570/// decoded
571#[derive(Debug, Clone)]
572pub struct OpaqueBlock {
573    pub buffers: Vec<LanceBuffer>,
574    pub num_values: u64,
575    pub block_info: BlockInfo,
576}
577
578impl OpaqueBlock {
579    pub fn data_size(&self) -> u64 {
580        self.buffers.iter().map(|b| b.len() as u64).sum()
581    }
582}
583
584/// A data block for variable-width data (e.g. strings, packed rows, etc.)
585#[derive(Debug, Clone)]
586pub struct VariableWidthBlock {
587    /// The data buffer
588    pub data: LanceBuffer,
589    /// The offsets buffer (contains num_values + 1 offsets)
590    ///
591    /// Offsets MUST start at 0
592    pub offsets: LanceBuffer,
593    /// The number of bits per offset
594    pub bits_per_offset: u8,
595    /// The number of values represented by this block
596    pub num_values: u64,
597
598    pub block_info: BlockInfo,
599}
600
601impl VariableWidthBlock {
602    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
603        let data_buffer = self.data.into_buffer();
604        let offsets_buffer = self.offsets.into_buffer();
605        let builder = ArrayDataBuilder::new(data_type)
606            .add_buffer(offsets_buffer)
607            .add_buffer(data_buffer)
608            .len(self.num_values as usize)
609            .null_count(0);
610        if validate {
611            Ok(builder.build()?)
612        } else {
613            Ok(unsafe { builder.build_unchecked() })
614        }
615    }
616
617    fn into_buffers(self) -> Vec<LanceBuffer> {
618        vec![self.offsets, self.data]
619    }
620
621    pub fn offsets_as_block(&mut self) -> DataBlock {
622        let offsets = self.offsets.clone();
623        DataBlock::FixedWidth(FixedWidthDataBlock {
624            data: offsets,
625            bits_per_value: self.bits_per_offset as u64,
626            num_values: self.num_values + 1,
627            block_info: BlockInfo::new(),
628        })
629    }
630
631    pub fn data_size(&self) -> u64 {
632        (self.data.len() + self.offsets.len()) as u64
633    }
634}
635
636/// A data block representing a struct
637#[derive(Debug, Clone)]
638pub struct StructDataBlock {
639    /// The child arrays
640    pub children: Vec<DataBlock>,
641    pub block_info: BlockInfo,
642    /// The validity bitmap for the struct (None means all valid)
643    pub validity: Option<NullBuffer>,
644}
645
646impl StructDataBlock {
647    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
648        if let DataType::Struct(fields) = &data_type {
649            let mut builder = ArrayDataBuilder::new(DataType::Struct(fields.clone()));
650            let mut num_rows = 0;
651            for (field, child) in fields.iter().zip(self.children) {
652                let child_data = child.into_arrow(field.data_type().clone(), validate)?;
653                num_rows = child_data.len();
654                builder = builder.add_child_data(child_data);
655            }
656
657            // Apply validity if present
658            let builder = if let Some(validity) = self.validity {
659                let null_count = validity.null_count();
660                builder
661                    .null_bit_buffer(Some(validity.into_inner().into_inner()))
662                    .null_count(null_count)
663            } else {
664                builder.null_count(0)
665            };
666
667            let builder = builder.len(num_rows);
668            if validate {
669                Ok(builder.build()?)
670            } else {
671                Ok(unsafe { builder.build_unchecked() })
672            }
673        } else {
674            Err(Error::Internal {
675                message: format!("Expected Struct, got {:?}", data_type),
676                location: location!(),
677            })
678        }
679    }
680
681    fn remove_outer_validity(self) -> Self {
682        Self {
683            children: self
684                .children
685                .into_iter()
686                .map(|c| c.remove_outer_validity())
687                .collect(),
688            block_info: self.block_info,
689            validity: None, // Remove the validity
690        }
691    }
692
693    fn into_buffers(self) -> Vec<LanceBuffer> {
694        self.children
695            .into_iter()
696            .flat_map(|c| c.into_buffers())
697            .collect()
698    }
699
700    pub fn data_size(&self) -> u64 {
701        self.children
702            .iter()
703            .map(|data_block| data_block.data_size())
704            .sum()
705    }
706}
707
708/// A data block for dictionary encoded data
709#[derive(Debug, Clone)]
710pub struct DictionaryDataBlock {
711    /// The indices buffer
712    pub indices: FixedWidthDataBlock,
713    /// The dictionary itself
714    pub dictionary: Box<DataBlock>,
715}
716
717impl DictionaryDataBlock {
718    fn decode_helper<K: ArrowDictionaryKeyType>(self) -> Result<DataBlock> {
719        // assume the indices are uniformly distributed.
720        let estimated_size_bytes = self.dictionary.data_size()
721            * (self.indices.num_values + self.dictionary.num_values() - 1)
722            / self.dictionary.num_values();
723        let mut data_builder = DataBlockBuilder::with_capacity_estimate(estimated_size_bytes);
724
725        let indices = self.indices.data.borrow_to_typed_slice::<K::Native>();
726        let indices = indices.as_ref();
727
728        indices
729            .iter()
730            .map(|idx| idx.to_usize().unwrap() as u64)
731            .for_each(|idx| {
732                data_builder.append(&self.dictionary, idx..idx + 1);
733            });
734
735        Ok(data_builder.finish())
736    }
737
738    pub fn decode(self) -> Result<DataBlock> {
739        match self.indices.bits_per_value {
740            8 => self.decode_helper::<UInt8Type>(),
741            16 => self.decode_helper::<UInt16Type>(),
742            32 => self.decode_helper::<UInt32Type>(),
743            64 => self.decode_helper::<UInt64Type>(),
744            _ => Err(lance_core::Error::Internal {
745                message: format!(
746                    "Unsupported dictionary index bit width: {} bits",
747                    self.indices.bits_per_value
748                ),
749                location: location!(),
750            }),
751        }
752    }
753
754    fn into_arrow_dict(
755        self,
756        key_type: Box<DataType>,
757        value_type: Box<DataType>,
758        validate: bool,
759    ) -> Result<ArrayData> {
760        let indices = self.indices.into_arrow((*key_type).clone(), validate)?;
761        let dictionary = self
762            .dictionary
763            .into_arrow((*value_type).clone(), validate)?;
764
765        let builder = indices
766            .into_builder()
767            .add_child_data(dictionary)
768            .data_type(DataType::Dictionary(key_type, value_type));
769
770        if validate {
771            Ok(builder.build()?)
772        } else {
773            Ok(unsafe { builder.build_unchecked() })
774        }
775    }
776
777    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
778        if let DataType::Dictionary(key_type, value_type) = data_type {
779            self.into_arrow_dict(key_type, value_type, validate)
780        } else {
781            self.decode()?.into_arrow(data_type, validate)
782        }
783    }
784
785    fn into_buffers(self) -> Vec<LanceBuffer> {
786        let mut buffers = self.indices.into_buffers();
787        buffers.extend(self.dictionary.into_buffers());
788        buffers
789    }
790
791    pub fn into_parts(self) -> (DataBlock, DataBlock) {
792        (DataBlock::FixedWidth(self.indices), *self.dictionary)
793    }
794
795    pub fn from_parts(indices: FixedWidthDataBlock, dictionary: DataBlock) -> Self {
796        Self {
797            indices,
798            dictionary: Box::new(dictionary),
799        }
800    }
801}
802
803/// A DataBlock is a collection of buffers that represents an "array" of data in very generic terms
804///
805/// The output of each decoder is a DataBlock.  Decoders can be chained together to transform
806/// one DataBlock into a different kind of DataBlock.
807///
808/// The DataBlock is somewhere in between Arrow's ArrayData and Array and represents a physical
809/// layout of the data.
810///
811/// A DataBlock can be converted into an Arrow ArrayData (and then Array) for a given array type.
812/// For example, a FixedWidthDataBlock can be converted into any primitive type or a fixed size
813/// list of a primitive type.  This is a zero-copy operation.
814///
815/// In addition, a DataBlock can be created from an Arrow array or arrays.  This is not a zero-copy
816/// operation as some normalization may be required.
817#[derive(Debug, Clone)]
818pub enum DataBlock {
819    Empty(),
820    Constant(ConstantDataBlock),
821    AllNull(AllNullDataBlock),
822    Nullable(NullableDataBlock),
823    FixedWidth(FixedWidthDataBlock),
824    FixedSizeList(FixedSizeListBlock),
825    VariableWidth(VariableWidthBlock),
826    Opaque(OpaqueBlock),
827    Struct(StructDataBlock),
828    Dictionary(DictionaryDataBlock),
829}
830
831impl DataBlock {
832    /// Convert self into an Arrow ArrayData
833    pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
834        match self {
835            Self::Empty() => Ok(new_empty_array(&data_type).to_data()),
836            Self::Constant(inner) => inner.into_arrow(data_type, validate),
837            Self::AllNull(inner) => inner.into_arrow(data_type, validate),
838            Self::Nullable(inner) => inner.into_arrow(data_type, validate),
839            Self::FixedWidth(inner) => inner.into_arrow(data_type, validate),
840            Self::FixedSizeList(inner) => inner.into_arrow(data_type, validate),
841            Self::VariableWidth(inner) => inner.into_arrow(data_type, validate),
842            Self::Struct(inner) => inner.into_arrow(data_type, validate),
843            Self::Dictionary(inner) => inner.into_arrow(data_type, validate),
844            Self::Opaque(_) => Err(Error::Internal {
845                message: "Cannot convert OpaqueBlock to Arrow".to_string(),
846                location: location!(),
847            }),
848        }
849    }
850
851    /// Convert the data block into a collection of buffers for serialization
852    ///
853    /// The order matters and will be used to reconstruct the data block at read time.
854    pub fn into_buffers(self) -> Vec<LanceBuffer> {
855        match self {
856            Self::Empty() => Vec::default(),
857            Self::Constant(inner) => inner.into_buffers(),
858            Self::AllNull(inner) => inner.into_buffers(),
859            Self::Nullable(inner) => inner.into_buffers(),
860            Self::FixedWidth(inner) => inner.into_buffers(),
861            Self::FixedSizeList(inner) => inner.into_buffers(),
862            Self::VariableWidth(inner) => inner.into_buffers(),
863            Self::Struct(inner) => inner.into_buffers(),
864            Self::Dictionary(inner) => inner.into_buffers(),
865            Self::Opaque(inner) => inner.buffers,
866        }
867    }
868
869    /// Converts the data buffers into borrowed mode and clones the block
870    ///
871    /// This is a zero-copy operation but requires a mutable reference to self and, afterwards,
872    /// all buffers will be in Borrowed mode.
873    /// Try and clone the block
874    ///
875    /// This will fail if any buffers are in owned mode.  You can call borrow_and_clone() to
876    /// ensure that all buffers are in borrowed mode before calling this method.
877    pub fn try_clone(&self) -> Result<Self> {
878        match self {
879            Self::Empty() => Ok(Self::Empty()),
880            Self::Constant(inner) => Ok(Self::Constant(inner.clone())),
881            Self::AllNull(inner) => Ok(Self::AllNull(inner.clone())),
882            Self::Nullable(inner) => Ok(Self::Nullable(inner.clone())),
883            Self::FixedWidth(inner) => Ok(Self::FixedWidth(inner.clone())),
884            Self::FixedSizeList(inner) => Ok(Self::FixedSizeList(inner.clone())),
885            Self::VariableWidth(inner) => Ok(Self::VariableWidth(inner.clone())),
886            Self::Struct(inner) => Ok(Self::Struct(inner.clone())),
887            Self::Dictionary(inner) => Ok(Self::Dictionary(inner.clone())),
888            Self::Opaque(inner) => Ok(Self::Opaque(inner.clone())),
889        }
890    }
891
892    pub fn name(&self) -> &'static str {
893        match self {
894            Self::Constant(_) => "Constant",
895            Self::Empty() => "Empty",
896            Self::AllNull(_) => "AllNull",
897            Self::Nullable(_) => "Nullable",
898            Self::FixedWidth(_) => "FixedWidth",
899            Self::FixedSizeList(_) => "FixedSizeList",
900            Self::VariableWidth(_) => "VariableWidth",
901            Self::Struct(_) => "Struct",
902            Self::Dictionary(_) => "Dictionary",
903            Self::Opaque(_) => "Opaque",
904        }
905    }
906
907    pub fn is_variable(&self) -> bool {
908        match self {
909            Self::Constant(_) => false,
910            Self::Empty() => false,
911            Self::AllNull(_) => false,
912            Self::Nullable(nullable) => nullable.data.is_variable(),
913            Self::FixedWidth(_) => false,
914            Self::FixedSizeList(fsl) => fsl.child.is_variable(),
915            Self::VariableWidth(_) => true,
916            Self::Struct(strct) => strct.children.iter().any(|c| c.is_variable()),
917            Self::Dictionary(_) => {
918                todo!("is_variable for DictionaryDataBlock is not implemented yet")
919            }
920            Self::Opaque(_) => panic!("Does not make sense to ask if an Opaque block is variable"),
921        }
922    }
923
924    pub fn is_nullable(&self) -> bool {
925        match self {
926            Self::AllNull(_) => true,
927            Self::Nullable(_) => true,
928            Self::FixedSizeList(fsl) => fsl.child.is_nullable(),
929            Self::Struct(strct) => strct.children.iter().any(|c| c.is_nullable()),
930            Self::Dictionary(_) => {
931                todo!("is_nullable for DictionaryDataBlock is not implemented yet")
932            }
933            Self::Opaque(_) => panic!("Does not make sense to ask if an Opaque block is nullable"),
934            _ => false,
935        }
936    }
937
938    /// The number of values in the block
939    ///
940    /// This function does not recurse into child blocks.  If this is a FSL then it will
941    /// be the number of lists and not the number of items.
942    pub fn num_values(&self) -> u64 {
943        match self {
944            Self::Empty() => 0,
945            Self::Constant(inner) => inner.num_values,
946            Self::AllNull(inner) => inner.num_values,
947            Self::Nullable(inner) => inner.data.num_values(),
948            Self::FixedWidth(inner) => inner.num_values,
949            Self::FixedSizeList(inner) => inner.num_values(),
950            Self::VariableWidth(inner) => inner.num_values,
951            Self::Struct(inner) => inner.children[0].num_values(),
952            Self::Dictionary(inner) => inner.indices.num_values,
953            Self::Opaque(inner) => inner.num_values,
954        }
955    }
956
957    /// The number of items in a single row
958    ///
959    /// This is always 1 unless there are layers of FSL
960    pub fn items_per_row(&self) -> u64 {
961        match self {
962            Self::Empty() => todo!(),     // Leave undefined until needed
963            Self::Constant(_) => todo!(), // Leave undefined until needed
964            Self::AllNull(_) => todo!(),  // Leave undefined until needed
965            Self::Nullable(nullable) => nullable.data.items_per_row(),
966            Self::FixedWidth(_) => 1,
967            Self::FixedSizeList(fsl) => fsl.dimension * fsl.child.items_per_row(),
968            Self::VariableWidth(_) => 1,
969            Self::Struct(_) => todo!(), // Leave undefined until needed
970            Self::Dictionary(_) => 1,
971            Self::Opaque(_) => 1,
972        }
973    }
974
975    /// The number of bytes in the data block (including any child blocks)
976    pub fn data_size(&self) -> u64 {
977        match self {
978            Self::Empty() => 0,
979            Self::Constant(inner) => inner.data_size(),
980            Self::AllNull(_) => 0,
981            Self::Nullable(inner) => inner.data_size(),
982            Self::FixedWidth(inner) => inner.data_size(),
983            Self::FixedSizeList(inner) => inner.data_size(),
984            Self::VariableWidth(inner) => inner.data_size(),
985            Self::Struct(_) => {
986                todo!("the data_size method for StructDataBlock is not implemented yet")
987            }
988            Self::Dictionary(_) => {
989                todo!("the data_size method for DictionaryDataBlock is not implemented yet")
990            }
991            Self::Opaque(inner) => inner.data_size(),
992        }
993    }
994
995    /// Removes any validity information from the block
996    ///
997    /// This does not filter the block (e.g. remove rows).  It only removes
998    /// the validity bitmaps (if present).  Any garbage masked by null bits
999    /// will now appear as proper values.
1000    ///
1001    /// If `recurse` is true, then this will also remove validity from any child blocks.
1002    pub fn remove_outer_validity(self) -> Self {
1003        match self {
1004            Self::AllNull(_) => panic!("Cannot remove validity on all-null data"),
1005            Self::Nullable(inner) => *inner.data,
1006            Self::Struct(inner) => Self::Struct(inner.remove_outer_validity()),
1007            other => other,
1008        }
1009    }
1010
1011    pub fn make_builder(&self, estimated_size_bytes: u64) -> Box<dyn DataBlockBuilderImpl> {
1012        match self {
1013            Self::FixedWidth(inner) => {
1014                if inner.bits_per_value == 1 {
1015                    Box::new(BitmapDataBlockBuilder::new(estimated_size_bytes))
1016                } else {
1017                    Box::new(FixedWidthDataBlockBuilder::new(
1018                        inner.bits_per_value,
1019                        estimated_size_bytes,
1020                    ))
1021                }
1022            }
1023            Self::VariableWidth(inner) => {
1024                if inner.bits_per_offset == 32 {
1025                    Box::new(VariableWidthDataBlockBuilder::<i32>::new(
1026                        estimated_size_bytes,
1027                    ))
1028                } else if inner.bits_per_offset == 64 {
1029                    Box::new(VariableWidthDataBlockBuilder::<i64>::new(
1030                        estimated_size_bytes,
1031                    ))
1032                } else {
1033                    todo!()
1034                }
1035            }
1036            Self::FixedSizeList(inner) => {
1037                let inner_builder = inner.child.make_builder(estimated_size_bytes);
1038                Box::new(FixedSizeListBlockBuilder::new(
1039                    inner_builder,
1040                    inner.dimension,
1041                ))
1042            }
1043            Self::Nullable(nullable) => {
1044                // There's no easy way to know what percentage of the data is in the valiidty buffer
1045                // but 1/16th seems like a reasonable guess.
1046                let estimated_validity_size_bytes = estimated_size_bytes / 16;
1047                let inner_builder = nullable
1048                    .data
1049                    .make_builder(estimated_size_bytes - estimated_validity_size_bytes);
1050                Box::new(NullableDataBlockBuilder::new(
1051                    inner_builder,
1052                    estimated_validity_size_bytes as usize,
1053                ))
1054            }
1055            Self::Struct(struct_data_block) => {
1056                let mut bits_per_values = vec![];
1057                for child in struct_data_block.children.iter() {
1058                    let child = child.as_fixed_width_ref().
1059                        expect("Currently StructDataBlockBuilder is only used in packed-struct encoding, and currently in packed-struct encoding, only fixed-width fields are supported.");
1060                    bits_per_values.push(child.bits_per_value as u32);
1061                }
1062                Box::new(StructDataBlockBuilder::new(
1063                    bits_per_values,
1064                    estimated_size_bytes,
1065                ))
1066            }
1067            Self::AllNull(_) => Box::new(AllNullDataBlockBuilder::default()),
1068            _ => todo!("make_builder for {:?}", self),
1069        }
1070    }
1071}
1072
1073macro_rules! as_type {
1074    ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1075        pub fn $fn_name(self) -> Option<$inner_type> {
1076            match self {
1077                Self::$inner(inner) => Some(inner),
1078                _ => None,
1079            }
1080        }
1081    };
1082}
1083
1084macro_rules! as_type_ref {
1085    ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1086        pub fn $fn_name(&self) -> Option<&$inner_type> {
1087            match self {
1088                Self::$inner(inner) => Some(inner),
1089                _ => None,
1090            }
1091        }
1092    };
1093}
1094
1095macro_rules! as_type_ref_mut {
1096    ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1097        pub fn $fn_name(&mut self) -> Option<&mut $inner_type> {
1098            match self {
1099                Self::$inner(inner) => Some(inner),
1100                _ => None,
1101            }
1102        }
1103    };
1104}
1105
1106// Cast implementations
1107impl DataBlock {
1108    as_type!(as_all_null, AllNull, AllNullDataBlock);
1109    as_type!(as_nullable, Nullable, NullableDataBlock);
1110    as_type!(as_fixed_width, FixedWidth, FixedWidthDataBlock);
1111    as_type!(as_fixed_size_list, FixedSizeList, FixedSizeListBlock);
1112    as_type!(as_variable_width, VariableWidth, VariableWidthBlock);
1113    as_type!(as_struct, Struct, StructDataBlock);
1114    as_type!(as_dictionary, Dictionary, DictionaryDataBlock);
1115    as_type_ref!(as_all_null_ref, AllNull, AllNullDataBlock);
1116    as_type_ref!(as_nullable_ref, Nullable, NullableDataBlock);
1117    as_type_ref!(as_fixed_width_ref, FixedWidth, FixedWidthDataBlock);
1118    as_type_ref!(as_fixed_size_list_ref, FixedSizeList, FixedSizeListBlock);
1119    as_type_ref!(as_variable_width_ref, VariableWidth, VariableWidthBlock);
1120    as_type_ref!(as_struct_ref, Struct, StructDataBlock);
1121    as_type_ref!(as_dictionary_ref, Dictionary, DictionaryDataBlock);
1122    as_type_ref_mut!(as_all_null_ref_mut, AllNull, AllNullDataBlock);
1123    as_type_ref_mut!(as_nullable_ref_mut, Nullable, NullableDataBlock);
1124    as_type_ref_mut!(as_fixed_width_ref_mut, FixedWidth, FixedWidthDataBlock);
1125    as_type_ref_mut!(
1126        as_fixed_size_list_ref_mut,
1127        FixedSizeList,
1128        FixedSizeListBlock
1129    );
1130    as_type_ref_mut!(as_variable_width_ref_mut, VariableWidth, VariableWidthBlock);
1131    as_type_ref_mut!(as_struct_ref_mut, Struct, StructDataBlock);
1132    as_type_ref_mut!(as_dictionary_ref_mut, Dictionary, DictionaryDataBlock);
1133}
1134
1135// Methods to convert from Arrow -> DataBlock
1136
1137fn get_byte_range<T: ArrowNativeType>(offsets: &mut LanceBuffer) -> Range<usize> {
1138    let offsets = offsets.borrow_to_typed_slice::<T>();
1139    if offsets.as_ref().is_empty() {
1140        0..0
1141    } else {
1142        offsets.as_ref().first().unwrap().as_usize()..offsets.as_ref().last().unwrap().as_usize()
1143    }
1144}
1145
1146// Given multiple offsets arrays [0, 5, 10], [0, 3, 7], etc. stitch
1147// them together to get [0, 5, 10, 13, 20, ...]
1148//
1149// Also returns the data range referenced by each offset array (may
1150// not be 0..len if there is slicing involved)
1151fn stitch_offsets<T: ArrowNativeType + std::ops::Add<Output = T> + std::ops::Sub<Output = T>>(
1152    offsets: Vec<LanceBuffer>,
1153) -> (LanceBuffer, Vec<Range<usize>>) {
1154    if offsets.is_empty() {
1155        return (LanceBuffer::empty(), Vec::default());
1156    }
1157    let len = offsets.iter().map(|b| b.len()).sum::<usize>();
1158    // Note: we are making a copy here, even if there is only one input, because we want to
1159    // normalize that input if it doesn't start with zero.  This could be micro-optimized out
1160    // if needed.
1161    let mut dest = Vec::with_capacity(len);
1162    let mut byte_ranges = Vec::with_capacity(offsets.len());
1163
1164    // We insert one leading 0 before processing any of the inputs
1165    dest.push(T::from_usize(0).unwrap());
1166
1167    for mut o in offsets.into_iter() {
1168        if !o.is_empty() {
1169            let last_offset = *dest.last().unwrap();
1170            let o = o.borrow_to_typed_slice::<T>();
1171            let start = *o.as_ref().first().unwrap();
1172            // First, we skip the first offset
1173            // Then, we subtract that first offset from each remaining offset
1174            //
1175            // This gives us a 0-based offset array (minus the leading 0)
1176            //
1177            // Then we add the last offset from the previous array to each offset
1178            // which shifts our offset array to the correct position
1179            //
1180            // For example, let's assume the last offset from the previous array
1181            // was 10 and we are given [13, 17, 22].  This means we have two values with
1182            // length 4 (17 - 13) and 5 (22 - 17).  The output from this step will be
1183            // [14, 19].  Combined with our last offset of 10, this gives us [10, 14, 19]
1184            // which is our same two values of length 4 and 5.
1185            dest.extend(o.as_ref()[1..].iter().map(|&x| x + last_offset - start));
1186        }
1187        byte_ranges.push(get_byte_range::<T>(&mut o));
1188    }
1189    (LanceBuffer::reinterpret_vec(dest), byte_ranges)
1190}
1191
1192fn arrow_binary_to_data_block(
1193    arrays: &[ArrayRef],
1194    num_values: u64,
1195    bits_per_offset: u8,
1196) -> DataBlock {
1197    let data_vec = arrays.iter().map(|arr| arr.to_data()).collect::<Vec<_>>();
1198    let bytes_per_offset = bits_per_offset as usize / 8;
1199    let offsets = data_vec
1200        .iter()
1201        .map(|d| {
1202            LanceBuffer::from(
1203                d.buffers()[0].slice_with_length(d.offset(), (d.len() + 1) * bytes_per_offset),
1204            )
1205        })
1206        .collect::<Vec<_>>();
1207    let (offsets, data_ranges) = if bits_per_offset == 32 {
1208        stitch_offsets::<i32>(offsets)
1209    } else {
1210        stitch_offsets::<i64>(offsets)
1211    };
1212    let data = data_vec
1213        .iter()
1214        .zip(data_ranges)
1215        .map(|(d, byte_range)| {
1216            LanceBuffer::from(
1217                d.buffers()[1]
1218                    .slice_with_length(byte_range.start, byte_range.end - byte_range.start),
1219            )
1220        })
1221        .collect::<Vec<_>>();
1222    let data = LanceBuffer::concat_into_one(data);
1223    DataBlock::VariableWidth(VariableWidthBlock {
1224        data,
1225        offsets,
1226        bits_per_offset,
1227        num_values,
1228        block_info: BlockInfo::new(),
1229    })
1230}
1231
1232fn encode_flat_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1233    let bytes_per_value = arrays[0].data_type().byte_width();
1234    let mut buffer = Vec::with_capacity(num_values as usize * bytes_per_value);
1235    for arr in arrays {
1236        let data = arr.to_data();
1237        buffer.extend_from_slice(data.buffers()[0].as_slice());
1238    }
1239    LanceBuffer::from(buffer)
1240}
1241
1242fn do_encode_bitmap_data(bitmaps: &[BooleanBuffer], num_values: u64) -> LanceBuffer {
1243    let mut builder = BooleanBufferBuilder::new(num_values as usize);
1244
1245    for buf in bitmaps {
1246        builder.append_buffer(buf);
1247    }
1248
1249    let buffer = builder.finish().into_inner();
1250    LanceBuffer::from(buffer)
1251}
1252
1253fn encode_bitmap_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1254    let bitmaps = arrays
1255        .iter()
1256        .map(|arr| arr.as_boolean().values().clone())
1257        .collect::<Vec<_>>();
1258    do_encode_bitmap_data(&bitmaps, num_values)
1259}
1260
1261// Concatenate dictionary arrays.  This is a bit tricky because we might overflow the
1262// index type.  If we do, we need to upscale the indices to a larger type.
1263fn concat_dict_arrays(arrays: &[ArrayRef]) -> ArrayRef {
1264    let value_type = arrays[0].as_any_dictionary().values().data_type();
1265    let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1266    match arrow_select::concat::concat(&array_refs) {
1267        Ok(array) => array,
1268        Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1269            // Slow, but hopefully a corner case.  Optimize later
1270            let upscaled = array_refs
1271                .iter()
1272                .map(|arr| {
1273                    match arrow_cast::cast(
1274                        *arr,
1275                        &DataType::Dictionary(
1276                            Box::new(DataType::UInt32),
1277                            Box::new(value_type.clone()),
1278                        ),
1279                    ) {
1280                        Ok(arr) => arr,
1281                        Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1282                            // Technically I think this means the input type was u64 already
1283                            unimplemented!("Dictionary arrays with more than 2^32 unique values")
1284                        }
1285                        err => err.unwrap(),
1286                    }
1287                })
1288                .collect::<Vec<_>>();
1289            let array_refs = upscaled.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1290            // Can still fail if concat pushes over u32 boundary
1291            match arrow_select::concat::concat(&array_refs) {
1292                Ok(array) => array,
1293                Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1294                    unimplemented!("Dictionary arrays with more than 2^32 unique values")
1295                }
1296                err => err.unwrap(),
1297            }
1298        }
1299        // Shouldn't be any other possible errors in concat
1300        err => err.unwrap(),
1301    }
1302}
1303
1304fn max_index_val(index_type: &DataType) -> u64 {
1305    match index_type {
1306        DataType::Int8 => i8::MAX as u64,
1307        DataType::Int16 => i16::MAX as u64,
1308        DataType::Int32 => i32::MAX as u64,
1309        DataType::Int64 => i64::MAX as u64,
1310        DataType::UInt8 => u8::MAX as u64,
1311        DataType::UInt16 => u16::MAX as u64,
1312        DataType::UInt32 => u32::MAX as u64,
1313        DataType::UInt64 => u64::MAX,
1314        _ => panic!("Invalid dictionary index type"),
1315    }
1316}
1317
1318// If we get multiple dictionary arrays and they don't all have the same dictionary
1319// then we need to normalize the indices.  Otherwise we might have something like:
1320//
1321// First chunk ["hello", "foo"], [0, 0, 1, 1, 1]
1322// Second chunk ["bar", "world"], [0, 1, 0, 1, 1]
1323//
1324// If we simply encode as ["hello", "foo", "bar", "world"], [0, 0, 1, 1, 1, 0, 1, 0, 1, 1]
1325// then we will get the wrong answer because the dictionaries were not merged and the indices
1326// were not remapped.
1327//
1328// A simple way to do this today is to just concatenate all the arrays.  This is because
1329// arrow's dictionary concatenation function already has the logic to merge dictionaries.
1330//
1331// TODO: We could be more efficient here by checking if the dictionaries are the same
1332//       Also, if they aren't, we can possibly do something cheaper than concatenating
1333//
1334// In addition, we want to normalize the representation of nulls.  The cheapest thing to
1335// do (space-wise) is to put the nulls in the dictionary.
1336fn arrow_dictionary_to_data_block(arrays: &[ArrayRef], validity: Option<NullBuffer>) -> DataBlock {
1337    let array = concat_dict_arrays(arrays);
1338    let array_dict = array.as_any_dictionary();
1339    let mut indices = array_dict.keys();
1340    let num_values = indices.len() as u64;
1341    let mut values = array_dict.values().clone();
1342    // Placeholder, if we need to upcast, we will initialize this and set `indices` to refer to it
1343    let mut upcast = None;
1344
1345    // TODO: Should we just always normalize indices to u32?  That would make logic simpler
1346    // and we're going to bitpack them soon anyways
1347
1348    let indices_block = if let Some(validity) = validity {
1349        // If there is validity then we find the first invalid index in the dictionary values, inserting
1350        // a new value if we need to.  Then we change all indices to point to that value.  This way we
1351        // never need to store nullability of the indices.
1352        let mut first_invalid_index = None;
1353        if let Some(values_validity) = values.nulls() {
1354            first_invalid_index = (!values_validity.inner()).set_indices().next();
1355        }
1356        let first_invalid_index = first_invalid_index.unwrap_or_else(|| {
1357            let null_arr = new_null_array(values.data_type(), 1);
1358            values = arrow_select::concat::concat(&[values.as_ref(), null_arr.as_ref()]).unwrap();
1359            let null_index = values.len() - 1;
1360            let max_index_val = max_index_val(indices.data_type());
1361            if null_index as u64 > max_index_val {
1362                // Widen the index type
1363                if max_index_val >= u32::MAX as u64 {
1364                    unimplemented!("Dictionary arrays with 2^32 unique value (or more) and a null")
1365                }
1366                upcast = Some(arrow_cast::cast(indices, &DataType::UInt32).unwrap());
1367                indices = upcast.as_ref().unwrap();
1368            }
1369            null_index
1370        });
1371        // This can't fail since we already checked for fit
1372        let null_index_arr = arrow_cast::cast(
1373            &UInt64Array::from(vec![first_invalid_index as u64]),
1374            indices.data_type(),
1375        )
1376        .unwrap();
1377
1378        let bytes_per_index = indices.data_type().byte_width();
1379        let bits_per_index = bytes_per_index as u64 * 8;
1380
1381        let null_index_arr = null_index_arr.into_data();
1382        let null_index_bytes = &null_index_arr.buffers()[0];
1383        // Need to make a copy here since indices isn't mutable, could be avoided in theory
1384        let mut indices_bytes = indices.to_data().buffers()[0].to_vec();
1385        for invalid_idx in (!validity.inner()).set_indices() {
1386            indices_bytes[invalid_idx * bytes_per_index..(invalid_idx + 1) * bytes_per_index]
1387                .copy_from_slice(null_index_bytes.as_slice());
1388        }
1389        FixedWidthDataBlock {
1390            data: LanceBuffer::from(indices_bytes),
1391            bits_per_value: bits_per_index,
1392            num_values,
1393            block_info: BlockInfo::new(),
1394        }
1395    } else {
1396        FixedWidthDataBlock {
1397            data: LanceBuffer::from(indices.to_data().buffers()[0].clone()),
1398            bits_per_value: indices.data_type().byte_width() as u64 * 8,
1399            num_values,
1400            block_info: BlockInfo::new(),
1401        }
1402    };
1403
1404    let items = DataBlock::from(values);
1405    DataBlock::Dictionary(DictionaryDataBlock {
1406        indices: indices_block,
1407        dictionary: Box::new(items),
1408    })
1409}
1410
1411enum Nullability {
1412    None,
1413    All,
1414    Some(NullBuffer),
1415}
1416
1417impl Nullability {
1418    fn to_option(&self) -> Option<NullBuffer> {
1419        match self {
1420            Self::Some(nulls) => Some(nulls.clone()),
1421            _ => None,
1422        }
1423    }
1424}
1425
1426fn extract_nulls(arrays: &[ArrayRef], num_values: u64) -> Nullability {
1427    let mut has_nulls = false;
1428    let nulls_and_lens = arrays
1429        .iter()
1430        .map(|arr| {
1431            let nulls = arr.logical_nulls();
1432            has_nulls |= nulls.is_some();
1433            (nulls, arr.len())
1434        })
1435        .collect::<Vec<_>>();
1436    if !has_nulls {
1437        return Nullability::None;
1438    }
1439    let mut builder = BooleanBufferBuilder::new(num_values as usize);
1440    let mut num_nulls = 0;
1441    for (null, len) in nulls_and_lens {
1442        if let Some(null) = null {
1443            num_nulls += null.null_count();
1444            builder.append_buffer(&null.into_inner());
1445        } else {
1446            builder.append_n(len, true);
1447        }
1448    }
1449    if num_nulls == num_values as usize {
1450        Nullability::All
1451    } else {
1452        Nullability::Some(NullBuffer::new(builder.finish()))
1453    }
1454}
1455
1456impl DataBlock {
1457    pub fn from_arrays(arrays: &[ArrayRef], num_values: u64) -> Self {
1458        if arrays.is_empty() || num_values == 0 {
1459            return Self::AllNull(AllNullDataBlock { num_values: 0 });
1460        }
1461
1462        let data_type = arrays[0].data_type();
1463        let nulls = extract_nulls(arrays, num_values);
1464
1465        if let Nullability::All = nulls {
1466            return Self::AllNull(AllNullDataBlock { num_values });
1467        }
1468
1469        let mut encoded = match data_type {
1470            DataType::Binary | DataType::Utf8 => arrow_binary_to_data_block(arrays, num_values, 32),
1471            DataType::BinaryView | DataType::Utf8View => {
1472                todo!()
1473            }
1474            DataType::LargeBinary | DataType::LargeUtf8 => {
1475                arrow_binary_to_data_block(arrays, num_values, 64)
1476            }
1477            DataType::Boolean => {
1478                let data = encode_bitmap_data(arrays, num_values);
1479                Self::FixedWidth(FixedWidthDataBlock {
1480                    data,
1481                    bits_per_value: 1,
1482                    num_values,
1483                    block_info: BlockInfo::new(),
1484                })
1485            }
1486            DataType::Date32
1487            | DataType::Date64
1488            | DataType::Decimal128(_, _)
1489            | DataType::Decimal256(_, _)
1490            | DataType::Duration(_)
1491            | DataType::FixedSizeBinary(_)
1492            | DataType::Float16
1493            | DataType::Float32
1494            | DataType::Float64
1495            | DataType::Int16
1496            | DataType::Int32
1497            | DataType::Int64
1498            | DataType::Int8
1499            | DataType::Interval(_)
1500            | DataType::Time32(_)
1501            | DataType::Time64(_)
1502            | DataType::Timestamp(_, _)
1503            | DataType::UInt16
1504            | DataType::UInt32
1505            | DataType::UInt64
1506            | DataType::UInt8 => {
1507                let data = encode_flat_data(arrays, num_values);
1508                Self::FixedWidth(FixedWidthDataBlock {
1509                    data,
1510                    bits_per_value: data_type.byte_width() as u64 * 8,
1511                    num_values,
1512                    block_info: BlockInfo::new(),
1513                })
1514            }
1515            DataType::Null => Self::AllNull(AllNullDataBlock { num_values }),
1516            DataType::Dictionary(_, _) => arrow_dictionary_to_data_block(arrays, nulls.to_option()),
1517            DataType::Struct(fields) => {
1518                let structs = arrays.iter().map(|arr| arr.as_struct()).collect::<Vec<_>>();
1519                let mut children = Vec::with_capacity(fields.len());
1520                for child_idx in 0..fields.len() {
1521                    let child_vec = structs
1522                        .iter()
1523                        .map(|s| s.column(child_idx).clone())
1524                        .collect::<Vec<_>>();
1525                    children.push(Self::from_arrays(&child_vec, num_values));
1526                }
1527
1528                // Extract validity for the struct array
1529                let validity = match &nulls {
1530                    Nullability::None => None,
1531                    Nullability::Some(null_buffer) => Some(null_buffer.clone()),
1532                    Nullability::All => unreachable!("Should have returned AllNull earlier"),
1533                };
1534
1535                Self::Struct(StructDataBlock {
1536                    children,
1537                    block_info: BlockInfo::default(),
1538                    validity,
1539                })
1540            }
1541            DataType::FixedSizeList(_, dim) => {
1542                let children = arrays
1543                    .iter()
1544                    .map(|arr| arr.as_fixed_size_list().values().clone())
1545                    .collect::<Vec<_>>();
1546                let child_block = Self::from_arrays(&children, num_values * *dim as u64);
1547                Self::FixedSizeList(FixedSizeListBlock {
1548                    child: Box::new(child_block),
1549                    dimension: *dim as u64,
1550                })
1551            }
1552            DataType::LargeList(_)
1553            | DataType::List(_)
1554            | DataType::ListView(_)
1555            | DataType::LargeListView(_)
1556            | DataType::Map(_, _)
1557            | DataType::RunEndEncoded(_, _)
1558            | DataType::Union(_, _) => {
1559                panic!(
1560                    "Field with data type {} cannot be converted to data block",
1561                    data_type
1562                )
1563            }
1564        };
1565
1566        // compute statistics
1567        encoded.compute_stat();
1568
1569        if !matches!(data_type, DataType::Dictionary(_, _)) {
1570            match nulls {
1571                Nullability::None => encoded,
1572                Nullability::Some(nulls) => Self::Nullable(NullableDataBlock {
1573                    data: Box::new(encoded),
1574                    nulls: LanceBuffer::from(nulls.into_inner().into_inner()),
1575                    block_info: BlockInfo::new(),
1576                }),
1577                _ => unreachable!(),
1578            }
1579        } else {
1580            // Dictionaries already insert the nulls into the dictionary items
1581            encoded
1582        }
1583    }
1584
1585    pub fn from_array<T: Array + 'static>(array: T) -> Self {
1586        let num_values = array.len();
1587        Self::from_arrays(&[Arc::new(array)], num_values as u64)
1588    }
1589}
1590
1591impl From<ArrayRef> for DataBlock {
1592    fn from(array: ArrayRef) -> Self {
1593        let num_values = array.len() as u64;
1594        Self::from_arrays(&[array], num_values)
1595    }
1596}
1597
1598pub trait DataBlockBuilderImpl: std::fmt::Debug {
1599    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>);
1600    fn finish(self: Box<Self>) -> DataBlock;
1601}
1602
1603#[derive(Debug)]
1604pub struct DataBlockBuilder {
1605    estimated_size_bytes: u64,
1606    builder: Option<Box<dyn DataBlockBuilderImpl>>,
1607}
1608
1609impl DataBlockBuilder {
1610    pub fn with_capacity_estimate(estimated_size_bytes: u64) -> Self {
1611        Self {
1612            estimated_size_bytes,
1613            builder: None,
1614        }
1615    }
1616
1617    fn get_builder(&mut self, block: &DataBlock) -> &mut dyn DataBlockBuilderImpl {
1618        if self.builder.is_none() {
1619            self.builder = Some(block.make_builder(self.estimated_size_bytes));
1620        }
1621        self.builder.as_mut().unwrap().as_mut()
1622    }
1623
1624    pub fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
1625        self.get_builder(data_block).append(data_block, selection);
1626    }
1627
1628    pub fn finish(self) -> DataBlock {
1629        let builder = self.builder.expect("DataBlockBuilder didn't see any data");
1630        builder.finish()
1631    }
1632}
1633
1634#[cfg(test)]
1635mod tests {
1636    use std::sync::Arc;
1637
1638    use arrow_array::{
1639        make_array, new_null_array,
1640        types::{Int32Type, Int8Type},
1641        ArrayRef, DictionaryArray, Int8Array, LargeBinaryArray, StringArray, UInt16Array,
1642        UInt8Array,
1643    };
1644    use arrow_buffer::{BooleanBuffer, NullBuffer};
1645
1646    use arrow_schema::{DataType, Field, Fields};
1647    use lance_datagen::{array, ArrayGeneratorExt, RowCount, DEFAULT_SEED};
1648    use rand::SeedableRng;
1649
1650    use crate::buffer::LanceBuffer;
1651
1652    use super::{AllNullDataBlock, DataBlock};
1653
1654    use arrow_array::Array;
1655
1656    #[test]
1657    fn test_sliced_to_data_block() {
1658        let ints = UInt16Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
1659        let ints = ints.slice(2, 4);
1660        let data = DataBlock::from_array(ints);
1661
1662        let fixed_data = data.as_fixed_width().unwrap();
1663        assert_eq!(fixed_data.num_values, 4);
1664        assert_eq!(fixed_data.data.len(), 8);
1665
1666        let nullable_ints =
1667            UInt16Array::from(vec![Some(0), None, Some(2), None, Some(4), None, Some(6)]);
1668        let nullable_ints = nullable_ints.slice(1, 3);
1669        let data = DataBlock::from_array(nullable_ints);
1670
1671        let nullable = data.as_nullable().unwrap();
1672        assert_eq!(nullable.nulls, LanceBuffer::from(vec![0b00000010]));
1673    }
1674
1675    #[test]
1676    fn test_string_to_data_block() {
1677        // Converting string arrays that contain nulls to DataBlock
1678        let strings1 = StringArray::from(vec![Some("hello"), None, Some("world")]);
1679        let strings2 = StringArray::from(vec![Some("a"), Some("b")]);
1680        let strings3 = StringArray::from(vec![Option::<&'static str>::None, None]);
1681
1682        let arrays = &[strings1, strings2, strings3]
1683            .iter()
1684            .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1685            .collect::<Vec<_>>();
1686
1687        let block = DataBlock::from_arrays(arrays, 7);
1688
1689        assert_eq!(block.num_values(), 7);
1690        let block = block.as_nullable().unwrap();
1691
1692        assert_eq!(block.nulls, LanceBuffer::from(vec![0b00011101]));
1693
1694        let data = block.data.as_variable_width().unwrap();
1695        assert_eq!(
1696            data.offsets,
1697            LanceBuffer::reinterpret_vec(vec![0, 5, 5, 10, 11, 12, 12, 12])
1698        );
1699
1700        assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworldab"));
1701
1702        // Converting string arrays that do not contain nulls to DataBlock
1703        let strings1 = StringArray::from(vec![Some("a"), Some("bc")]);
1704        let strings2 = StringArray::from(vec![Some("def")]);
1705
1706        let arrays = &[strings1, strings2]
1707            .iter()
1708            .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1709            .collect::<Vec<_>>();
1710
1711        let block = DataBlock::from_arrays(arrays, 3);
1712
1713        assert_eq!(block.num_values(), 3);
1714        // Should be no nullable wrapper
1715        let data = block.as_variable_width().unwrap();
1716        assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(vec![0, 1, 3, 6]));
1717        assert_eq!(data.data, LanceBuffer::copy_slice(b"abcdef"));
1718    }
1719
1720    #[test]
1721    fn test_string_sliced() {
1722        let check = |arr: Vec<StringArray>, expected_off: Vec<i32>, expected_data: &[u8]| {
1723            let arrs = arr
1724                .into_iter()
1725                .map(|a| Arc::new(a) as ArrayRef)
1726                .collect::<Vec<_>>();
1727            let num_rows = arrs.iter().map(|a| a.len()).sum::<usize>() as u64;
1728            let data = DataBlock::from_arrays(&arrs, num_rows);
1729
1730            assert_eq!(data.num_values(), num_rows);
1731
1732            let data = data.as_variable_width().unwrap();
1733            assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(expected_off));
1734            assert_eq!(data.data, LanceBuffer::copy_slice(expected_data));
1735        };
1736
1737        let string = StringArray::from(vec![Some("hello"), Some("world")]);
1738        check(vec![string.slice(1, 1)], vec![0, 5], b"world");
1739        check(vec![string.slice(0, 1)], vec![0, 5], b"hello");
1740        check(
1741            vec![string.slice(0, 1), string.slice(1, 1)],
1742            vec![0, 5, 10],
1743            b"helloworld",
1744        );
1745
1746        let string2 = StringArray::from(vec![Some("foo"), Some("bar")]);
1747        check(
1748            vec![string.slice(0, 1), string2.slice(0, 1)],
1749            vec![0, 5, 8],
1750            b"hellofoo",
1751        );
1752    }
1753
1754    #[test]
1755    fn test_large() {
1756        let arr = LargeBinaryArray::from_vec(vec![b"hello", b"world"]);
1757        let data = DataBlock::from_array(arr);
1758
1759        assert_eq!(data.num_values(), 2);
1760        let data = data.as_variable_width().unwrap();
1761        assert_eq!(data.bits_per_offset, 64);
1762        assert_eq!(data.num_values, 2);
1763        assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworld"));
1764        assert_eq!(
1765            data.offsets,
1766            LanceBuffer::reinterpret_vec(vec![0_u64, 5, 10])
1767        );
1768    }
1769
1770    #[test]
1771    fn test_dictionary_indices_normalized() {
1772        let arr1 = DictionaryArray::<Int8Type>::from_iter([Some("a"), Some("a"), Some("b")]);
1773        let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("b"), Some("c")]);
1774
1775        let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1776
1777        assert_eq!(data.num_values(), 5);
1778        let data = data.as_dictionary().unwrap();
1779        let indices = data.indices;
1780        assert_eq!(indices.bits_per_value, 8);
1781        assert_eq!(indices.num_values, 5);
1782        assert_eq!(
1783            indices.data,
1784            // You might expect 0, 0, 1, 1, 2 but it seems that arrow's dictionary concat does
1785            // not actually collapse dictionaries.  This is an arrow problem however, and we don't
1786            // need to fix it here.
1787            LanceBuffer::reinterpret_vec::<i8>(vec![0, 0, 1, 2, 3])
1788        );
1789
1790        let items = data.dictionary.as_variable_width().unwrap();
1791        assert_eq!(items.bits_per_offset, 32);
1792        assert_eq!(items.num_values, 4);
1793        assert_eq!(items.data, LanceBuffer::copy_slice(b"abbc"));
1794        assert_eq!(
1795            items.offsets,
1796            LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 4],)
1797        );
1798    }
1799
1800    #[test]
1801    fn test_dictionary_nulls() {
1802        // Test both ways of encoding nulls
1803
1804        // By default, nulls get encoded into the indices
1805        let arr1 = DictionaryArray::<Int8Type>::from_iter([None, Some("a"), Some("b")]);
1806        let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("c"), None]);
1807
1808        let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1809
1810        let check_common = |data: DataBlock| {
1811            assert_eq!(data.num_values(), 5);
1812            let dict = data.as_dictionary().unwrap();
1813
1814            let nullable_items = dict.dictionary.as_nullable().unwrap();
1815            assert_eq!(nullable_items.nulls, LanceBuffer::from(vec![0b00000111]));
1816            assert_eq!(nullable_items.data.num_values(), 4);
1817
1818            let items = nullable_items.data.as_variable_width().unwrap();
1819            assert_eq!(items.bits_per_offset, 32);
1820            assert_eq!(items.num_values, 4);
1821            assert_eq!(items.data, LanceBuffer::copy_slice(b"abc"));
1822            assert_eq!(
1823                items.offsets,
1824                LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 3],)
1825            );
1826
1827            let indices = dict.indices;
1828            assert_eq!(indices.bits_per_value, 8);
1829            assert_eq!(indices.num_values, 5);
1830            assert_eq!(
1831                indices.data,
1832                LanceBuffer::reinterpret_vec::<i8>(vec![3, 0, 1, 2, 3])
1833            );
1834        };
1835        check_common(data);
1836
1837        // However, we can manually create a dictionary where nulls are in the dictionary
1838        let items = StringArray::from(vec![Some("a"), Some("b"), Some("c"), None]);
1839        let indices = Int8Array::from(vec![Some(3), Some(0), Some(1), Some(2), Some(3)]);
1840        let dict = DictionaryArray::new(indices, Arc::new(items));
1841
1842        let data = DataBlock::from_array(dict);
1843
1844        check_common(data);
1845    }
1846
1847    #[test]
1848    fn test_dictionary_cannot_add_null() {
1849        // 256 unique strings
1850        let items = StringArray::from(
1851            (0..256)
1852                .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1853                .collect::<Vec<_>>(),
1854        );
1855        // 257 indices, covering the whole range, plus one null
1856        let indices = UInt8Array::from(
1857            (0..=256)
1858                .map(|i| if i == 256 { None } else { Some(i as u8) })
1859                .collect::<Vec<_>>(),
1860        );
1861        // We want to normalize this by pushing nulls into the dictionary, but we cannot because
1862        // the dictionary is too large for the index type
1863        let dict = DictionaryArray::new(indices, Arc::new(items));
1864        let data = DataBlock::from_array(dict);
1865
1866        assert_eq!(data.num_values(), 257);
1867
1868        let dict = data.as_dictionary().unwrap();
1869
1870        assert_eq!(dict.indices.bits_per_value, 32);
1871        assert_eq!(
1872            dict.indices.data,
1873            LanceBuffer::reinterpret_vec((0_u32..257).collect::<Vec<_>>())
1874        );
1875
1876        let nullable_items = dict.dictionary.as_nullable().unwrap();
1877        let null_buffer = NullBuffer::new(BooleanBuffer::new(
1878            nullable_items.nulls.into_buffer(),
1879            0,
1880            257,
1881        ));
1882        for i in 0..256 {
1883            assert!(!null_buffer.is_null(i));
1884        }
1885        assert!(null_buffer.is_null(256));
1886
1887        assert_eq!(
1888            nullable_items.data.as_variable_width().unwrap().data.len(),
1889            32640
1890        );
1891    }
1892
1893    #[test]
1894    fn test_all_null() {
1895        for data_type in [
1896            DataType::UInt32,
1897            DataType::FixedSizeBinary(2),
1898            DataType::List(Arc::new(Field::new("item", DataType::UInt32, true))),
1899            DataType::Struct(Fields::from(vec![Field::new("a", DataType::UInt32, true)])),
1900        ] {
1901            let block = DataBlock::AllNull(AllNullDataBlock { num_values: 10 });
1902            let arr = block.into_arrow(data_type.clone(), true).unwrap();
1903            let arr = make_array(arr);
1904            let expected = new_null_array(&data_type, 10);
1905            assert_eq!(&arr, &expected);
1906        }
1907    }
1908
1909    #[test]
1910    fn test_dictionary_cannot_concatenate() {
1911        // 256 unique strings
1912        let items = StringArray::from(
1913            (0..256)
1914                .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1915                .collect::<Vec<_>>(),
1916        );
1917        // 256 different unique strings
1918        let other_items = StringArray::from(
1919            (0..256)
1920                .map(|i| Some(String::from_utf8(vec![1; i + 1]).unwrap()))
1921                .collect::<Vec<_>>(),
1922        );
1923        let indices = UInt8Array::from_iter_values(0..=255);
1924        let dict1 = DictionaryArray::new(indices.clone(), Arc::new(items));
1925        let dict2 = DictionaryArray::new(indices, Arc::new(other_items));
1926        let data = DataBlock::from_arrays(&[Arc::new(dict1), Arc::new(dict2)], 512);
1927        assert_eq!(data.num_values(), 512);
1928
1929        let dict = data.as_dictionary().unwrap();
1930
1931        assert_eq!(dict.indices.bits_per_value, 32);
1932        assert_eq!(
1933            dict.indices.data,
1934            LanceBuffer::reinterpret_vec::<u32>((0..512).collect::<Vec<_>>())
1935        );
1936        // What fun: 0 + 1 + .. + 255 + 1 + 2 + .. + 256 = 2^16
1937        assert_eq!(
1938            dict.dictionary.as_variable_width().unwrap().data.len(),
1939            65536
1940        );
1941    }
1942
1943    #[test]
1944    fn test_data_size() {
1945        let mut rng = rand_xoshiro::Xoshiro256PlusPlus::seed_from_u64(DEFAULT_SEED.0);
1946        // test data_size() when input has no nulls
1947        let mut genn = array::rand::<Int32Type>().with_nulls(&[false, false, false]);
1948
1949        let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
1950        let block = DataBlock::from_array(arr.clone());
1951        assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
1952
1953        let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
1954        let block = DataBlock::from_array(arr.clone());
1955        assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
1956
1957        // test data_size() when input has nulls
1958        let mut genn = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
1959        let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
1960        let block = DataBlock::from_array(arr.clone());
1961
1962        let array_data = arr.to_data();
1963        let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1964        // the NullBuffer.len() returns the length in bits so we divide_round_up by 8
1965        let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1966        assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1967
1968        let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
1969        let block = DataBlock::from_array(arr.clone());
1970
1971        let array_data = arr.to_data();
1972        let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1973        let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1974        assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1975
1976        let mut genn = array::rand::<Int32Type>().with_nulls(&[true, true, false]);
1977        let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
1978        let block = DataBlock::from_array(arr.clone());
1979
1980        let array_data = arr.to_data();
1981        let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1982        let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1983        assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1984
1985        let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
1986        let block = DataBlock::from_array(arr.clone());
1987
1988        let array_data = arr.to_data();
1989        let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1990        let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1991        assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1992
1993        let mut genn = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
1994        let arr1 = genn.generate(RowCount::from(3), &mut rng).unwrap();
1995        let arr2 = genn.generate(RowCount::from(3), &mut rng).unwrap();
1996        let arr3 = genn.generate(RowCount::from(3), &mut rng).unwrap();
1997        let block = DataBlock::from_arrays(&[arr1.clone(), arr2.clone(), arr3.clone()], 9);
1998
1999        let concatenated_array = arrow_select::concat::concat(&[
2000            &*Arc::new(arr1.clone()) as &dyn Array,
2001            &*Arc::new(arr2.clone()) as &dyn Array,
2002            &*Arc::new(arr3.clone()) as &dyn Array,
2003        ])
2004        .unwrap();
2005        let total_buffer_size: usize = concatenated_array
2006            .to_data()
2007            .buffers()
2008            .iter()
2009            .map(|buffer| buffer.len())
2010            .sum();
2011
2012        let total_nulls_size_in_bytes = concatenated_array.nulls().unwrap().len().div_ceil(8);
2013        assert!(block.data_size() == (total_buffer_size + total_nulls_size_in_bytes) as u64);
2014    }
2015}