Skip to main content

lance_encoding/
data.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Data layouts to represent encoded data in a sub-Arrow format
5//!
6//! These [`DataBlock`] structures represent physical layouts.  They fill a gap somewhere
7//! between [`arrow_data::ArrayData`] (which, as a collection of buffers, is too
8//! generic because it doesn't give us enough information about what those buffers represent)
9//! and [`arrow_array::array::Array`] (which is too specific, because it cares about the
10//! logical data type).
11//!
12//! In addition, the layouts represented here are slightly stricter than Arrow's layout rules.
13//! For example, offset buffers MUST start with 0.  These additional restrictions impose a
14//! slight penalty on encode (to normalize arrow data) but make the development of encoders
15//! and decoders easier (since they can rely on a normalized representation)
16
17use std::{
18    ops::Range,
19    sync::{Arc, RwLock},
20};
21
22use arrow_array::{
23    Array, ArrayRef, OffsetSizeTrait, UInt64Array,
24    cast::AsArray,
25    new_empty_array, new_null_array,
26    types::{ArrowDictionaryKeyType, UInt8Type, UInt16Type, UInt32Type, UInt64Type},
27};
28use arrow_buffer::{ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer};
29use arrow_data::{ArrayData, ArrayDataBuilder};
30use arrow_schema::DataType;
31use lance_arrow::DataTypeExt;
32
33use lance_core::{Error, Result};
34
35use crate::{
36    buffer::LanceBuffer,
37    statistics::{ComputeStat, Stat},
38};
39
40/// A data block with no buffers where everything is null
41///
42/// Note: this data block should not be used for future work.  It will be deprecated
43/// in the 2.1 version of the format where nullability will be handled by the structural
44/// encoders.
45#[derive(Debug, Clone)]
46pub struct AllNullDataBlock {
47    /// The number of values represented by this block
48    pub num_values: u64,
49}
50
51impl AllNullDataBlock {
52    fn into_arrow(self, data_type: DataType, _validate: bool) -> Result<ArrayData> {
53        Ok(ArrayData::new_null(&data_type, self.num_values as usize))
54    }
55
56    fn into_buffers(self) -> Vec<LanceBuffer> {
57        vec![]
58    }
59}
60
61use std::collections::HashMap;
62
63// `BlockInfo` stores the statistics of this `DataBlock`, such as `NullCount` for `NullableDataBlock`,
64// `BitWidth` for `FixedWidthDataBlock`, `Cardinality` for all `DataBlock`
65#[derive(Debug, Clone)]
66pub struct BlockInfo(pub Arc<RwLock<HashMap<Stat, Arc<dyn Array>>>>);
67
68impl Default for BlockInfo {
69    fn default() -> Self {
70        Self::new()
71    }
72}
73
74impl BlockInfo {
75    pub fn new() -> Self {
76        Self(Arc::new(RwLock::new(HashMap::new())))
77    }
78}
79
80impl PartialEq for BlockInfo {
81    fn eq(&self, other: &Self) -> bool {
82        let self_info = self.0.read().unwrap();
83        let other_info = other.0.read().unwrap();
84        *self_info == *other_info
85    }
86}
87
88/// Wraps a data block and adds nullability information to it
89///
90/// Note: this data block should not be used for future work.  It will be deprecated
91/// in the 2.1 version of the format where nullability will be handled by the structural
92/// encoders.
93#[derive(Debug, Clone)]
94pub struct NullableDataBlock {
95    /// The underlying data
96    pub data: Box<DataBlock>,
97    /// A bitmap of validity for each value
98    pub nulls: LanceBuffer,
99
100    pub block_info: BlockInfo,
101}
102
103impl NullableDataBlock {
104    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
105        let nulls = self.nulls.into_buffer();
106        let data = self.data.into_arrow(data_type, validate)?.into_builder();
107        let data = data.null_bit_buffer(Some(nulls));
108        if validate {
109            Ok(data.build()?)
110        } else {
111            Ok(unsafe { data.build_unchecked() })
112        }
113    }
114
115    fn into_buffers(self) -> Vec<LanceBuffer> {
116        let mut buffers = vec![self.nulls];
117        buffers.extend(self.data.into_buffers());
118        buffers
119    }
120
121    pub fn data_size(&self) -> u64 {
122        self.data.data_size() + self.nulls.len() as u64
123    }
124}
125
126/// A block representing the same constant value repeated many times
127#[derive(Debug, PartialEq, Clone)]
128pub struct ConstantDataBlock {
129    /// Data buffer containing the value
130    pub data: LanceBuffer,
131    /// The number of values
132    pub num_values: u64,
133}
134
135impl ConstantDataBlock {
136    fn into_buffers(self) -> Vec<LanceBuffer> {
137        vec![self.data]
138    }
139
140    fn into_arrow(self, _data_type: DataType, _validate: bool) -> Result<ArrayData> {
141        // We don't need this yet but if we come up with some way of serializing
142        // scalars to/from bytes then we could implement it.
143        todo!()
144    }
145
146    pub fn try_clone(&self) -> Result<Self> {
147        Ok(Self {
148            data: self.data.clone(),
149            num_values: self.num_values,
150        })
151    }
152
153    pub fn data_size(&self) -> u64 {
154        self.data.len() as u64
155    }
156}
157
158/// A data block for a single buffer of data where each element has a fixed number of bits
159#[derive(Debug, PartialEq, Clone)]
160pub struct FixedWidthDataBlock {
161    /// The data buffer
162    pub data: LanceBuffer,
163    /// The number of bits per value
164    pub bits_per_value: u64,
165    /// The number of values represented by this block
166    pub num_values: u64,
167
168    pub block_info: BlockInfo,
169}
170
171impl FixedWidthDataBlock {
172    fn do_into_arrow(
173        self,
174        data_type: DataType,
175        num_values: u64,
176        validate: bool,
177    ) -> Result<ArrayData> {
178        // Booleans expanded for full-zip (bits_per_value==8, one byte each) need re-packing to
179        // Arrow's bit-packed format.
180        let data_buffer = if matches!(data_type, DataType::Boolean) && self.bits_per_value == 8 {
181            let mut builder = BooleanBufferBuilder::new(num_values as usize);
182            for &byte in self.data.as_ref().iter().take(num_values as usize) {
183                builder.append(byte != 0);
184            }
185            builder.finish().into_inner()
186        } else {
187            self.data.into_buffer()
188        };
189        let builder = ArrayDataBuilder::new(data_type)
190            .add_buffer(data_buffer)
191            .len(num_values as usize)
192            .null_count(0);
193        if validate {
194            Ok(builder.build()?)
195        } else {
196            Ok(unsafe { builder.build_unchecked() })
197        }
198    }
199
200    pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
201        let root_num_values = self.num_values;
202        self.do_into_arrow(data_type, root_num_values, validate)
203    }
204
205    pub fn into_buffers(self) -> Vec<LanceBuffer> {
206        vec![self.data]
207    }
208
209    pub fn try_clone(&self) -> Result<Self> {
210        Ok(Self {
211            data: self.data.clone(),
212            bits_per_value: self.bits_per_value,
213            num_values: self.num_values,
214            block_info: self.block_info.clone(),
215        })
216    }
217
218    pub fn data_size(&self) -> u64 {
219        self.data.len() as u64
220    }
221}
222
223#[derive(Debug)]
224pub struct VariableWidthDataBlockBuilder<T: OffsetSizeTrait> {
225    offsets: Vec<T>,
226    bytes: Vec<u8>,
227}
228
229impl<T: OffsetSizeTrait> VariableWidthDataBlockBuilder<T> {
230    fn new(estimated_size_bytes: u64) -> Self {
231        Self {
232            offsets: vec![T::from_usize(0).unwrap()],
233            bytes: Vec::with_capacity(estimated_size_bytes as usize),
234        }
235    }
236}
237
238impl<T: OffsetSizeTrait + bytemuck::Pod> DataBlockBuilderImpl for VariableWidthDataBlockBuilder<T> {
239    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
240        let block = data_block.as_variable_width_ref().unwrap();
241        assert!(block.bits_per_offset == T::get_byte_width() as u8 * 8);
242        let offsets = block.offsets.borrow_to_typed_view::<T>();
243
244        let start_offset = offsets[selection.start as usize];
245        let end_offset = offsets[selection.end as usize];
246        let mut previous_len = self.bytes.len();
247
248        self.bytes
249            .extend_from_slice(&block.data[start_offset.as_usize()..end_offset.as_usize()]);
250
251        self.offsets.extend(
252            offsets[selection.start as usize..selection.end as usize]
253                .iter()
254                .zip(&offsets[selection.start as usize + 1..=selection.end as usize])
255                .map(|(&current, &next)| {
256                    let this_value_len = next - current;
257                    previous_len += this_value_len.as_usize();
258                    T::from_usize(previous_len).unwrap()
259                }),
260        );
261    }
262
263    fn finish(self: Box<Self>) -> DataBlock {
264        let num_values = (self.offsets.len() - 1) as u64;
265        DataBlock::VariableWidth(VariableWidthBlock {
266            data: LanceBuffer::from(self.bytes),
267            offsets: LanceBuffer::reinterpret_vec(self.offsets),
268            bits_per_offset: T::get_byte_width() as u8 * 8,
269            num_values,
270            block_info: BlockInfo::new(),
271        })
272    }
273}
274
275#[derive(Debug)]
276struct BitmapDataBlockBuilder {
277    values: BooleanBufferBuilder,
278}
279
280impl BitmapDataBlockBuilder {
281    fn new(estimated_size_bytes: u64) -> Self {
282        Self {
283            values: BooleanBufferBuilder::new(estimated_size_bytes as usize * 8),
284        }
285    }
286}
287
288impl DataBlockBuilderImpl for BitmapDataBlockBuilder {
289    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
290        let bitmap_blk = data_block.as_fixed_width_ref().unwrap();
291        self.values.append_packed_range(
292            selection.start as usize..selection.end as usize,
293            &bitmap_blk.data,
294        );
295    }
296
297    fn finish(mut self: Box<Self>) -> DataBlock {
298        let bool_buf = self.values.finish();
299        let num_values = bool_buf.len() as u64;
300        let bits_buf = bool_buf.into_inner();
301        DataBlock::FixedWidth(FixedWidthDataBlock {
302            data: LanceBuffer::from(bits_buf),
303            bits_per_value: 1,
304            num_values,
305            block_info: BlockInfo::new(),
306        })
307    }
308}
309
310#[derive(Debug)]
311struct FixedWidthDataBlockBuilder {
312    bits_per_value: u64,
313    bytes_per_value: u64,
314    values: Vec<u8>,
315}
316
317impl FixedWidthDataBlockBuilder {
318    fn new(bits_per_value: u64, estimated_size_bytes: u64) -> Self {
319        assert!(bits_per_value.is_multiple_of(8));
320        Self {
321            bits_per_value,
322            bytes_per_value: bits_per_value / 8,
323            values: Vec::with_capacity(estimated_size_bytes as usize),
324        }
325    }
326}
327
328impl DataBlockBuilderImpl for FixedWidthDataBlockBuilder {
329    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
330        let block = data_block.as_fixed_width_ref().unwrap();
331        assert_eq!(self.bits_per_value, block.bits_per_value);
332        let start = selection.start as usize * self.bytes_per_value as usize;
333        let end = selection.end as usize * self.bytes_per_value as usize;
334        self.values.extend_from_slice(&block.data[start..end]);
335    }
336
337    fn finish(self: Box<Self>) -> DataBlock {
338        let num_values = (self.values.len() / self.bytes_per_value as usize) as u64;
339        DataBlock::FixedWidth(FixedWidthDataBlock {
340            data: LanceBuffer::from(self.values),
341            bits_per_value: self.bits_per_value,
342            num_values,
343            block_info: BlockInfo::new(),
344        })
345    }
346}
347
348#[derive(Debug)]
349struct StructDataBlockBuilder {
350    children: Vec<Box<dyn DataBlockBuilderImpl>>,
351}
352
353impl StructDataBlockBuilder {
354    fn new(children: Vec<Box<dyn DataBlockBuilderImpl>>) -> Self {
355        Self { children }
356    }
357}
358
359impl DataBlockBuilderImpl for StructDataBlockBuilder {
360    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
361        let data_block = data_block.as_struct_ref().unwrap();
362        for i in 0..self.children.len() {
363            self.children[i].append(&data_block.children[i], selection.clone());
364        }
365    }
366
367    fn finish(self: Box<Self>) -> DataBlock {
368        let mut children_data_block = Vec::new();
369        for child in self.children {
370            let child_data_block = child.finish();
371            children_data_block.push(child_data_block);
372        }
373        DataBlock::Struct(StructDataBlock {
374            children: children_data_block,
375            block_info: BlockInfo::new(),
376            validity: None,
377        })
378    }
379}
380
381#[derive(Debug, Default)]
382struct AllNullDataBlockBuilder {
383    num_values: u64,
384}
385
386impl DataBlockBuilderImpl for AllNullDataBlockBuilder {
387    fn append(&mut self, _data_block: &DataBlock, selection: Range<u64>) {
388        self.num_values += selection.end - selection.start;
389    }
390
391    fn finish(self: Box<Self>) -> DataBlock {
392        DataBlock::AllNull(AllNullDataBlock {
393            num_values: self.num_values,
394        })
395    }
396}
397
398/// A data block to represent a fixed size list
399#[derive(Debug, Clone)]
400pub struct FixedSizeListBlock {
401    /// The child data block
402    pub child: Box<DataBlock>,
403    /// The number of items in each list
404    pub dimension: u64,
405}
406
407impl FixedSizeListBlock {
408    pub fn num_values(&self) -> u64 {
409        self.child.num_values() / self.dimension
410    }
411
412    /// Try to flatten a FixedSizeListBlock into a FixedWidthDataBlock
413    ///
414    /// Returns None if any children are nullable
415    pub fn try_into_flat(self) -> Option<FixedWidthDataBlock> {
416        match *self.child {
417            // Cannot flatten a nullable child
418            DataBlock::Nullable(_) => None,
419            DataBlock::FixedSizeList(inner) => {
420                let mut flat = inner.try_into_flat()?;
421                flat.bits_per_value *= self.dimension;
422                flat.num_values /= self.dimension;
423                Some(flat)
424            }
425            DataBlock::FixedWidth(mut inner) => {
426                inner.bits_per_value *= self.dimension;
427                inner.num_values /= self.dimension;
428                Some(inner)
429            }
430            _ => panic!(
431                "Expected FixedSizeList or FixedWidth data block but found {:?}",
432                self
433            ),
434        }
435    }
436
437    pub fn flatten_as_fixed(&mut self) -> FixedWidthDataBlock {
438        match self.child.as_mut() {
439            DataBlock::FixedSizeList(fsl) => fsl.flatten_as_fixed(),
440            DataBlock::FixedWidth(fw) => fw.clone(),
441            _ => panic!("Expected FixedSizeList or FixedWidth data block"),
442        }
443    }
444
445    /// Convert a flattened values block into a FixedSizeListBlock
446    pub fn from_flat(data: FixedWidthDataBlock, data_type: &DataType) -> DataBlock {
447        match data_type {
448            DataType::FixedSizeList(child_field, dimension) => {
449                let mut data = data;
450                data.bits_per_value /= *dimension as u64;
451                data.num_values *= *dimension as u64;
452                let child_data = Self::from_flat(data, child_field.data_type());
453                DataBlock::FixedSizeList(Self {
454                    child: Box::new(child_data),
455                    dimension: *dimension as u64,
456                })
457            }
458            // Base case, we've hit a non-list type
459            _ => DataBlock::FixedWidth(data),
460        }
461    }
462
463    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
464        let num_values = self.num_values();
465        let builder = match &data_type {
466            DataType::FixedSizeList(child_field, _) => {
467                let child_data = self
468                    .child
469                    .into_arrow(child_field.data_type().clone(), validate)?;
470                ArrayDataBuilder::new(data_type)
471                    .add_child_data(child_data)
472                    .len(num_values as usize)
473                    .null_count(0)
474            }
475            _ => panic!("Expected FixedSizeList data type and got {:?}", data_type),
476        };
477        if validate {
478            Ok(builder.build()?)
479        } else {
480            Ok(unsafe { builder.build_unchecked() })
481        }
482    }
483
484    fn into_buffers(self) -> Vec<LanceBuffer> {
485        self.child.into_buffers()
486    }
487
488    fn data_size(&self) -> u64 {
489        self.child.data_size()
490    }
491}
492
493#[derive(Debug)]
494struct FixedSizeListBlockBuilder {
495    inner: Box<dyn DataBlockBuilderImpl>,
496    dimension: u64,
497}
498
499impl FixedSizeListBlockBuilder {
500    fn new(inner: Box<dyn DataBlockBuilderImpl>, dimension: u64) -> Self {
501        Self { inner, dimension }
502    }
503}
504
505impl DataBlockBuilderImpl for FixedSizeListBlockBuilder {
506    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
507        let selection = selection.start * self.dimension..selection.end * self.dimension;
508        let fsl = data_block.as_fixed_size_list_ref().unwrap();
509        self.inner.append(fsl.child.as_ref(), selection);
510    }
511
512    fn finish(self: Box<Self>) -> DataBlock {
513        let inner_block = self.inner.finish();
514        DataBlock::FixedSizeList(FixedSizeListBlock {
515            child: Box::new(inner_block),
516            dimension: self.dimension,
517        })
518    }
519}
520
521#[derive(Debug)]
522struct NullableDataBlockBuilder {
523    inner: Box<dyn DataBlockBuilderImpl>,
524    validity: BooleanBufferBuilder,
525}
526
527impl NullableDataBlockBuilder {
528    fn new(inner: Box<dyn DataBlockBuilderImpl>, estimated_size_bytes: usize) -> Self {
529        Self {
530            inner,
531            validity: BooleanBufferBuilder::new(estimated_size_bytes * 8),
532        }
533    }
534}
535
536impl DataBlockBuilderImpl for NullableDataBlockBuilder {
537    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
538        let nullable = data_block.as_nullable_ref().unwrap();
539        let bool_buf = BooleanBuffer::new(
540            nullable.nulls.clone().into_buffer(),
541            selection.start as usize,
542            (selection.end - selection.start) as usize,
543        );
544        self.validity.append_buffer(&bool_buf);
545        self.inner.append(nullable.data.as_ref(), selection);
546    }
547
548    fn finish(mut self: Box<Self>) -> DataBlock {
549        let inner_block = self.inner.finish();
550        DataBlock::Nullable(NullableDataBlock {
551            data: Box::new(inner_block),
552            nulls: LanceBuffer::from(self.validity.finish().into_inner()),
553            block_info: BlockInfo::new(),
554        })
555    }
556}
557
558/// A data block with no regular structure.  There is no available spot to attach
559/// validity / repdef information and it cannot be converted to Arrow without being
560/// decoded
561#[derive(Debug, Clone)]
562pub struct OpaqueBlock {
563    pub buffers: Vec<LanceBuffer>,
564    pub num_values: u64,
565    pub block_info: BlockInfo,
566}
567
568impl OpaqueBlock {
569    pub fn data_size(&self) -> u64 {
570        self.buffers.iter().map(|b| b.len() as u64).sum()
571    }
572}
573
574/// A data block for variable-width data (e.g. strings, packed rows, etc.)
575#[derive(Debug, Clone)]
576pub struct VariableWidthBlock {
577    /// The data buffer
578    pub data: LanceBuffer,
579    /// The offsets buffer (contains num_values + 1 offsets)
580    ///
581    /// Offsets MUST start at 0
582    pub offsets: LanceBuffer,
583    /// The number of bits per offset
584    pub bits_per_offset: u8,
585    /// The number of values represented by this block
586    pub num_values: u64,
587
588    pub block_info: BlockInfo,
589}
590
591impl VariableWidthBlock {
592    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
593        let data_buffer = self.data.into_buffer();
594        let offsets_buffer = self.offsets.into_buffer();
595        let builder = ArrayDataBuilder::new(data_type)
596            .add_buffer(offsets_buffer)
597            .add_buffer(data_buffer)
598            .len(self.num_values as usize)
599            .null_count(0);
600        if validate {
601            Ok(builder.build()?)
602        } else {
603            Ok(unsafe { builder.build_unchecked() })
604        }
605    }
606
607    fn into_buffers(self) -> Vec<LanceBuffer> {
608        vec![self.offsets, self.data]
609    }
610
611    pub fn offsets_as_block(&mut self) -> DataBlock {
612        let offsets = self.offsets.clone();
613        DataBlock::FixedWidth(FixedWidthDataBlock {
614            data: offsets,
615            bits_per_value: self.bits_per_offset as u64,
616            num_values: self.num_values + 1,
617            block_info: BlockInfo::new(),
618        })
619    }
620
621    pub fn data_size(&self) -> u64 {
622        (self.data.len() + self.offsets.len()) as u64
623    }
624}
625
626/// A data block representing a struct
627#[derive(Debug, Clone)]
628pub struct StructDataBlock {
629    /// The child arrays
630    pub children: Vec<DataBlock>,
631    pub block_info: BlockInfo,
632    /// The validity bitmap for the struct (None means all valid)
633    pub validity: Option<NullBuffer>,
634}
635
636impl StructDataBlock {
637    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
638        if let DataType::Struct(fields) = &data_type {
639            let mut builder = ArrayDataBuilder::new(DataType::Struct(fields.clone()));
640            let mut num_rows = 0;
641            for (field, child) in fields.iter().zip(self.children) {
642                let child_data = child.into_arrow(field.data_type().clone(), validate)?;
643                num_rows = child_data.len();
644                builder = builder.add_child_data(child_data);
645            }
646
647            // Apply validity if present
648            let builder = if let Some(validity) = self.validity {
649                let null_count = validity.null_count();
650                builder
651                    .null_bit_buffer(Some(validity.into_inner().into_inner()))
652                    .null_count(null_count)
653            } else {
654                builder.null_count(0)
655            };
656
657            let builder = builder.len(num_rows);
658            if validate {
659                Ok(builder.build()?)
660            } else {
661                Ok(unsafe { builder.build_unchecked() })
662            }
663        } else {
664            Err(Error::internal(format!(
665                "Expected Struct, got {:?}",
666                data_type
667            )))
668        }
669    }
670
671    fn remove_outer_validity(self) -> Self {
672        Self {
673            children: self
674                .children
675                .into_iter()
676                .map(|c| c.remove_outer_validity())
677                .collect(),
678            block_info: self.block_info,
679            validity: None, // Remove the validity
680        }
681    }
682
683    fn into_buffers(self) -> Vec<LanceBuffer> {
684        self.children
685            .into_iter()
686            .flat_map(|c| c.into_buffers())
687            .collect()
688    }
689
690    pub fn has_variable_width_child(&self) -> bool {
691        self.children
692            .iter()
693            .any(|child| !matches!(child, DataBlock::FixedWidth(_)))
694    }
695
696    pub fn data_size(&self) -> u64 {
697        self.children
698            .iter()
699            .map(|data_block| data_block.data_size())
700            .sum()
701    }
702}
703
704/// A data block for dictionary encoded data
705#[derive(Debug, Clone)]
706pub struct DictionaryDataBlock {
707    /// The indices buffer
708    pub indices: FixedWidthDataBlock,
709    /// The dictionary itself
710    pub dictionary: Box<DataBlock>,
711}
712
713impl DictionaryDataBlock {
714    fn decode_helper<K: ArrowDictionaryKeyType>(self) -> Result<DataBlock> {
715        // Handle empty batch - this can happen when decoding a range that contains
716        // only empty/null lists, or when reading sparse data
717        if self.indices.num_values == 0 {
718            return Ok(DataBlock::AllNull(AllNullDataBlock { num_values: 0 }));
719        }
720
721        // assume the indices are uniformly distributed.
722        let estimated_size_bytes = self.dictionary.data_size()
723            * (self.indices.num_values + self.dictionary.num_values() - 1)
724            / self.dictionary.num_values();
725        let mut data_builder = DataBlockBuilder::with_capacity_estimate(estimated_size_bytes);
726
727        let indices = self.indices.data.borrow_to_typed_slice::<K::Native>();
728        let indices = indices.as_ref();
729
730        indices
731            .iter()
732            .map(|idx| idx.to_usize().unwrap() as u64)
733            .for_each(|idx| {
734                data_builder.append(&self.dictionary, idx..idx + 1);
735            });
736
737        Ok(data_builder.finish())
738    }
739
740    pub fn decode(self) -> Result<DataBlock> {
741        match self.indices.bits_per_value {
742            8 => self.decode_helper::<UInt8Type>(),
743            16 => self.decode_helper::<UInt16Type>(),
744            32 => self.decode_helper::<UInt32Type>(),
745            64 => self.decode_helper::<UInt64Type>(),
746            _ => Err(lance_core::Error::internal(format!(
747                "Unsupported dictionary index bit width: {} bits",
748                self.indices.bits_per_value
749            ))),
750        }
751    }
752
753    fn into_arrow_dict(
754        self,
755        key_type: Box<DataType>,
756        value_type: Box<DataType>,
757        validate: bool,
758    ) -> Result<ArrayData> {
759        let indices = self.indices.into_arrow((*key_type).clone(), validate)?;
760        let dictionary = self
761            .dictionary
762            .into_arrow((*value_type).clone(), validate)?;
763
764        let builder = indices
765            .into_builder()
766            .add_child_data(dictionary)
767            .data_type(DataType::Dictionary(key_type, value_type));
768
769        if validate {
770            Ok(builder.build()?)
771        } else {
772            Ok(unsafe { builder.build_unchecked() })
773        }
774    }
775
776    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
777        if let DataType::Dictionary(key_type, value_type) = data_type {
778            self.into_arrow_dict(key_type, value_type, validate)
779        } else {
780            self.decode()?.into_arrow(data_type, validate)
781        }
782    }
783
784    fn into_buffers(self) -> Vec<LanceBuffer> {
785        let mut buffers = self.indices.into_buffers();
786        buffers.extend(self.dictionary.into_buffers());
787        buffers
788    }
789
790    pub fn into_parts(self) -> (DataBlock, DataBlock) {
791        (DataBlock::FixedWidth(self.indices), *self.dictionary)
792    }
793
794    pub fn from_parts(indices: FixedWidthDataBlock, dictionary: DataBlock) -> Self {
795        Self {
796            indices,
797            dictionary: Box::new(dictionary),
798        }
799    }
800}
801
802/// A DataBlock is a collection of buffers that represents an "array" of data in very generic terms
803///
804/// The output of each decoder is a DataBlock.  Decoders can be chained together to transform
805/// one DataBlock into a different kind of DataBlock.
806///
807/// The DataBlock is somewhere in between Arrow's ArrayData and Array and represents a physical
808/// layout of the data.
809///
810/// A DataBlock can be converted into an Arrow ArrayData (and then Array) for a given array type.
811/// For example, a FixedWidthDataBlock can be converted into any primitive type or a fixed size
812/// list of a primitive type.  This is a zero-copy operation.
813///
814/// In addition, a DataBlock can be created from an Arrow array or arrays.  This is not a zero-copy
815/// operation as some normalization may be required.
816#[derive(Debug, Clone)]
817pub enum DataBlock {
818    Empty(),
819    Constant(ConstantDataBlock),
820    AllNull(AllNullDataBlock),
821    Nullable(NullableDataBlock),
822    FixedWidth(FixedWidthDataBlock),
823    FixedSizeList(FixedSizeListBlock),
824    VariableWidth(VariableWidthBlock),
825    Opaque(OpaqueBlock),
826    Struct(StructDataBlock),
827    Dictionary(DictionaryDataBlock),
828}
829
830impl DataBlock {
831    /// Convert self into an Arrow ArrayData
832    pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
833        match self {
834            Self::Empty() => Ok(new_empty_array(&data_type).to_data()),
835            Self::Constant(inner) => inner.into_arrow(data_type, validate),
836            Self::AllNull(inner) => inner.into_arrow(data_type, validate),
837            Self::Nullable(inner) => inner.into_arrow(data_type, validate),
838            Self::FixedWidth(inner) => inner.into_arrow(data_type, validate),
839            Self::FixedSizeList(inner) => inner.into_arrow(data_type, validate),
840            Self::VariableWidth(inner) => inner.into_arrow(data_type, validate),
841            Self::Struct(inner) => inner.into_arrow(data_type, validate),
842            Self::Dictionary(inner) => inner.into_arrow(data_type, validate),
843            Self::Opaque(_) => Err(Error::internal(
844                "Cannot convert OpaqueBlock to Arrow".to_string(),
845            )),
846        }
847    }
848
849    /// Convert the data block into a collection of buffers for serialization
850    ///
851    /// The order matters and will be used to reconstruct the data block at read time.
852    pub fn into_buffers(self) -> Vec<LanceBuffer> {
853        match self {
854            Self::Empty() => Vec::default(),
855            Self::Constant(inner) => inner.into_buffers(),
856            Self::AllNull(inner) => inner.into_buffers(),
857            Self::Nullable(inner) => inner.into_buffers(),
858            Self::FixedWidth(inner) => inner.into_buffers(),
859            Self::FixedSizeList(inner) => inner.into_buffers(),
860            Self::VariableWidth(inner) => inner.into_buffers(),
861            Self::Struct(inner) => inner.into_buffers(),
862            Self::Dictionary(inner) => inner.into_buffers(),
863            Self::Opaque(inner) => inner.buffers,
864        }
865    }
866
867    /// Converts the data buffers into borrowed mode and clones the block
868    ///
869    /// This is a zero-copy operation but requires a mutable reference to self and, afterwards,
870    /// all buffers will be in Borrowed mode.
871    /// Try and clone the block
872    ///
873    /// This will fail if any buffers are in owned mode.  You can call borrow_and_clone() to
874    /// ensure that all buffers are in borrowed mode before calling this method.
875    pub fn try_clone(&self) -> Result<Self> {
876        match self {
877            Self::Empty() => Ok(Self::Empty()),
878            Self::Constant(inner) => Ok(Self::Constant(inner.clone())),
879            Self::AllNull(inner) => Ok(Self::AllNull(inner.clone())),
880            Self::Nullable(inner) => Ok(Self::Nullable(inner.clone())),
881            Self::FixedWidth(inner) => Ok(Self::FixedWidth(inner.clone())),
882            Self::FixedSizeList(inner) => Ok(Self::FixedSizeList(inner.clone())),
883            Self::VariableWidth(inner) => Ok(Self::VariableWidth(inner.clone())),
884            Self::Struct(inner) => Ok(Self::Struct(inner.clone())),
885            Self::Dictionary(inner) => Ok(Self::Dictionary(inner.clone())),
886            Self::Opaque(inner) => Ok(Self::Opaque(inner.clone())),
887        }
888    }
889
890    pub fn name(&self) -> &'static str {
891        match self {
892            Self::Constant(_) => "Constant",
893            Self::Empty() => "Empty",
894            Self::AllNull(_) => "AllNull",
895            Self::Nullable(_) => "Nullable",
896            Self::FixedWidth(_) => "FixedWidth",
897            Self::FixedSizeList(_) => "FixedSizeList",
898            Self::VariableWidth(_) => "VariableWidth",
899            Self::Struct(_) => "Struct",
900            Self::Dictionary(_) => "Dictionary",
901            Self::Opaque(_) => "Opaque",
902        }
903    }
904
905    pub fn is_variable(&self) -> bool {
906        match self {
907            Self::Constant(_) => false,
908            Self::Empty() => false,
909            Self::AllNull(_) => false,
910            Self::Nullable(nullable) => nullable.data.is_variable(),
911            Self::FixedWidth(_) => false,
912            Self::FixedSizeList(fsl) => fsl.child.is_variable(),
913            Self::VariableWidth(_) => true,
914            Self::Struct(strct) => strct.children.iter().any(|c| c.is_variable()),
915            Self::Dictionary(_) => {
916                todo!("is_variable for DictionaryDataBlock is not implemented yet")
917            }
918            Self::Opaque(_) => panic!("Does not make sense to ask if an Opaque block is variable"),
919        }
920    }
921
922    pub fn is_nullable(&self) -> bool {
923        match self {
924            Self::AllNull(_) => true,
925            Self::Nullable(_) => true,
926            Self::FixedSizeList(fsl) => fsl.child.is_nullable(),
927            Self::Struct(strct) => strct.children.iter().any(|c| c.is_nullable()),
928            Self::Dictionary(_) => {
929                todo!("is_nullable for DictionaryDataBlock is not implemented yet")
930            }
931            Self::Opaque(_) => panic!("Does not make sense to ask if an Opaque block is nullable"),
932            _ => false,
933        }
934    }
935
936    /// The number of values in the block
937    ///
938    /// This function does not recurse into child blocks.  If this is a FSL then it will
939    /// be the number of lists and not the number of items.
940    pub fn num_values(&self) -> u64 {
941        match self {
942            Self::Empty() => 0,
943            Self::Constant(inner) => inner.num_values,
944            Self::AllNull(inner) => inner.num_values,
945            Self::Nullable(inner) => inner.data.num_values(),
946            Self::FixedWidth(inner) => inner.num_values,
947            Self::FixedSizeList(inner) => inner.num_values(),
948            Self::VariableWidth(inner) => inner.num_values,
949            Self::Struct(inner) => inner.children[0].num_values(),
950            Self::Dictionary(inner) => inner.indices.num_values,
951            Self::Opaque(inner) => inner.num_values,
952        }
953    }
954
955    /// The number of items in a single row
956    ///
957    /// This is always 1 unless there are layers of FSL
958    pub fn items_per_row(&self) -> u64 {
959        match self {
960            Self::Empty() => todo!(),     // Leave undefined until needed
961            Self::Constant(_) => todo!(), // Leave undefined until needed
962            Self::AllNull(_) => todo!(),  // Leave undefined until needed
963            Self::Nullable(nullable) => nullable.data.items_per_row(),
964            Self::FixedWidth(_) => 1,
965            Self::FixedSizeList(fsl) => fsl.dimension * fsl.child.items_per_row(),
966            Self::VariableWidth(_) => 1,
967            Self::Struct(_) => todo!(), // Leave undefined until needed
968            Self::Dictionary(_) => 1,
969            Self::Opaque(_) => 1,
970        }
971    }
972
973    /// The number of bytes in the data block (including any child blocks)
974    pub fn data_size(&self) -> u64 {
975        match self {
976            Self::Empty() => 0,
977            Self::Constant(inner) => inner.data_size(),
978            Self::AllNull(_) => 0,
979            Self::Nullable(inner) => inner.data_size(),
980            Self::FixedWidth(inner) => inner.data_size(),
981            Self::FixedSizeList(inner) => inner.data_size(),
982            Self::VariableWidth(inner) => inner.data_size(),
983            Self::Struct(inner) => inner.children.iter().map(|child| child.data_size()).sum(),
984            Self::Dictionary(inner) => inner.indices.data_size() + inner.dictionary.data_size(),
985            Self::Opaque(inner) => inner.data_size(),
986        }
987    }
988
989    /// Removes any validity information from the block
990    ///
991    /// This does not filter the block (e.g. remove rows).  It only removes
992    /// the validity bitmaps (if present).  Any garbage masked by null bits
993    /// will now appear as proper values.
994    ///
995    /// If `recurse` is true, then this will also remove validity from any child blocks.
996    pub fn remove_outer_validity(self) -> Self {
997        match self {
998            Self::AllNull(_) => panic!("Cannot remove validity on all-null data"),
999            Self::Nullable(inner) => *inner.data,
1000            Self::Struct(inner) => Self::Struct(inner.remove_outer_validity()),
1001            other => other,
1002        }
1003    }
1004
1005    pub fn make_builder(&self, estimated_size_bytes: u64) -> Box<dyn DataBlockBuilderImpl> {
1006        match self {
1007            Self::FixedWidth(inner) => {
1008                if inner.bits_per_value == 1 {
1009                    Box::new(BitmapDataBlockBuilder::new(estimated_size_bytes))
1010                } else {
1011                    Box::new(FixedWidthDataBlockBuilder::new(
1012                        inner.bits_per_value,
1013                        estimated_size_bytes,
1014                    ))
1015                }
1016            }
1017            Self::VariableWidth(inner) => {
1018                if inner.bits_per_offset == 32 {
1019                    Box::new(VariableWidthDataBlockBuilder::<i32>::new(
1020                        estimated_size_bytes,
1021                    ))
1022                } else if inner.bits_per_offset == 64 {
1023                    Box::new(VariableWidthDataBlockBuilder::<i64>::new(
1024                        estimated_size_bytes,
1025                    ))
1026                } else {
1027                    todo!()
1028                }
1029            }
1030            Self::FixedSizeList(inner) => {
1031                let inner_builder = inner.child.make_builder(estimated_size_bytes);
1032                Box::new(FixedSizeListBlockBuilder::new(
1033                    inner_builder,
1034                    inner.dimension,
1035                ))
1036            }
1037            Self::Nullable(nullable) => {
1038                // There's no easy way to know what percentage of the data is in the valiidty buffer
1039                // but 1/16th seems like a reasonable guess.
1040                let estimated_validity_size_bytes = estimated_size_bytes / 16;
1041                let inner_builder = nullable
1042                    .data
1043                    .make_builder(estimated_size_bytes - estimated_validity_size_bytes);
1044                Box::new(NullableDataBlockBuilder::new(
1045                    inner_builder,
1046                    estimated_validity_size_bytes as usize,
1047                ))
1048            }
1049            Self::Struct(struct_data_block) => {
1050                let num_children = struct_data_block.children.len();
1051                let per_child_estimate = if num_children == 0 {
1052                    0
1053                } else {
1054                    estimated_size_bytes / num_children as u64
1055                };
1056                let child_builders = struct_data_block
1057                    .children
1058                    .iter()
1059                    .map(|child| child.make_builder(per_child_estimate))
1060                    .collect();
1061                Box::new(StructDataBlockBuilder::new(child_builders))
1062            }
1063            Self::AllNull(_) => Box::new(AllNullDataBlockBuilder::default()),
1064            _ => todo!("make_builder for {:?}", self),
1065        }
1066    }
1067}
1068
1069macro_rules! as_type {
1070    ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1071        pub fn $fn_name(self) -> Option<$inner_type> {
1072            match self {
1073                Self::$inner(inner) => Some(inner),
1074                _ => None,
1075            }
1076        }
1077    };
1078}
1079
1080macro_rules! as_type_ref {
1081    ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1082        pub fn $fn_name(&self) -> Option<&$inner_type> {
1083            match self {
1084                Self::$inner(inner) => Some(inner),
1085                _ => None,
1086            }
1087        }
1088    };
1089}
1090
1091macro_rules! as_type_ref_mut {
1092    ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1093        pub fn $fn_name(&mut self) -> Option<&mut $inner_type> {
1094            match self {
1095                Self::$inner(inner) => Some(inner),
1096                _ => None,
1097            }
1098        }
1099    };
1100}
1101
1102// Cast implementations
1103impl DataBlock {
1104    as_type!(as_all_null, AllNull, AllNullDataBlock);
1105    as_type!(as_nullable, Nullable, NullableDataBlock);
1106    as_type!(as_fixed_width, FixedWidth, FixedWidthDataBlock);
1107    as_type!(as_fixed_size_list, FixedSizeList, FixedSizeListBlock);
1108    as_type!(as_variable_width, VariableWidth, VariableWidthBlock);
1109    as_type!(as_struct, Struct, StructDataBlock);
1110    as_type!(as_dictionary, Dictionary, DictionaryDataBlock);
1111    as_type_ref!(as_all_null_ref, AllNull, AllNullDataBlock);
1112    as_type_ref!(as_nullable_ref, Nullable, NullableDataBlock);
1113    as_type_ref!(as_fixed_width_ref, FixedWidth, FixedWidthDataBlock);
1114    as_type_ref!(as_fixed_size_list_ref, FixedSizeList, FixedSizeListBlock);
1115    as_type_ref!(as_variable_width_ref, VariableWidth, VariableWidthBlock);
1116    as_type_ref!(as_struct_ref, Struct, StructDataBlock);
1117    as_type_ref!(as_dictionary_ref, Dictionary, DictionaryDataBlock);
1118    as_type_ref_mut!(as_all_null_ref_mut, AllNull, AllNullDataBlock);
1119    as_type_ref_mut!(as_nullable_ref_mut, Nullable, NullableDataBlock);
1120    as_type_ref_mut!(as_fixed_width_ref_mut, FixedWidth, FixedWidthDataBlock);
1121    as_type_ref_mut!(
1122        as_fixed_size_list_ref_mut,
1123        FixedSizeList,
1124        FixedSizeListBlock
1125    );
1126    as_type_ref_mut!(as_variable_width_ref_mut, VariableWidth, VariableWidthBlock);
1127    as_type_ref_mut!(as_struct_ref_mut, Struct, StructDataBlock);
1128    as_type_ref_mut!(as_dictionary_ref_mut, Dictionary, DictionaryDataBlock);
1129}
1130
1131// Methods to convert from Arrow -> DataBlock
1132
1133fn get_byte_range<T: ArrowNativeType>(offsets: &mut LanceBuffer) -> Range<usize> {
1134    let offsets = offsets.borrow_to_typed_slice::<T>();
1135    if offsets.as_ref().is_empty() {
1136        0..0
1137    } else {
1138        offsets.as_ref().first().unwrap().as_usize()..offsets.as_ref().last().unwrap().as_usize()
1139    }
1140}
1141
1142// Given multiple offsets arrays [0, 5, 10], [0, 3, 7], etc. stitch
1143// them together to get [0, 5, 10, 13, 20, ...]
1144//
1145// Also returns the data range referenced by each offset array (may
1146// not be 0..len if there is slicing involved)
1147fn stitch_offsets<T: ArrowNativeType + std::ops::Add<Output = T> + std::ops::Sub<Output = T>>(
1148    offsets: Vec<LanceBuffer>,
1149) -> (LanceBuffer, Vec<Range<usize>>) {
1150    if offsets.is_empty() {
1151        return (LanceBuffer::empty(), Vec::default());
1152    }
1153    let len = offsets.iter().map(|b| b.len()).sum::<usize>();
1154    // Note: we are making a copy here, even if there is only one input, because we want to
1155    // normalize that input if it doesn't start with zero.  This could be micro-optimized out
1156    // if needed.
1157    let mut dest = Vec::with_capacity(len);
1158    let mut byte_ranges = Vec::with_capacity(offsets.len());
1159
1160    // We insert one leading 0 before processing any of the inputs
1161    dest.push(T::from_usize(0).unwrap());
1162
1163    for mut o in offsets.into_iter() {
1164        if !o.is_empty() {
1165            let last_offset = *dest.last().unwrap();
1166            let o = o.borrow_to_typed_slice::<T>();
1167            let start = *o.as_ref().first().unwrap();
1168            // First, we skip the first offset
1169            // Then, we subtract that first offset from each remaining offset
1170            //
1171            // This gives us a 0-based offset array (minus the leading 0)
1172            //
1173            // Then we add the last offset from the previous array to each offset
1174            // which shifts our offset array to the correct position
1175            //
1176            // For example, let's assume the last offset from the previous array
1177            // was 10 and we are given [13, 17, 22].  This means we have two values with
1178            // length 4 (17 - 13) and 5 (22 - 17).  The output from this step will be
1179            // [14, 19].  Combined with our last offset of 10, this gives us [10, 14, 19]
1180            // which is our same two values of length 4 and 5.
1181            dest.extend(o.as_ref()[1..].iter().map(|&x| x + last_offset - start));
1182        }
1183        byte_ranges.push(get_byte_range::<T>(&mut o));
1184    }
1185    (LanceBuffer::reinterpret_vec(dest), byte_ranges)
1186}
1187
1188fn arrow_binary_to_data_block(
1189    arrays: &[ArrayRef],
1190    num_values: u64,
1191    bits_per_offset: u8,
1192) -> DataBlock {
1193    let data_vec = arrays.iter().map(|arr| arr.to_data()).collect::<Vec<_>>();
1194    let bytes_per_offset = bits_per_offset as usize / 8;
1195    let offsets = data_vec
1196        .iter()
1197        .map(|d| {
1198            LanceBuffer::from(
1199                d.buffers()[0].slice_with_length(d.offset(), (d.len() + 1) * bytes_per_offset),
1200            )
1201        })
1202        .collect::<Vec<_>>();
1203    let (offsets, data_ranges) = if bits_per_offset == 32 {
1204        stitch_offsets::<i32>(offsets)
1205    } else {
1206        stitch_offsets::<i64>(offsets)
1207    };
1208    let data = data_vec
1209        .iter()
1210        .zip(data_ranges)
1211        .map(|(d, byte_range)| {
1212            LanceBuffer::from(
1213                d.buffers()[1]
1214                    .slice_with_length(byte_range.start, byte_range.end - byte_range.start),
1215            )
1216        })
1217        .collect::<Vec<_>>();
1218    let data = LanceBuffer::concat_into_one(data);
1219    DataBlock::VariableWidth(VariableWidthBlock {
1220        data,
1221        offsets,
1222        bits_per_offset,
1223        num_values,
1224        block_info: BlockInfo::new(),
1225    })
1226}
1227
1228fn encode_flat_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1229    let bytes_per_value = arrays[0].data_type().byte_width();
1230    let mut buffer = Vec::with_capacity(num_values as usize * bytes_per_value);
1231    for arr in arrays {
1232        let data = arr.to_data();
1233        buffer.extend_from_slice(data.buffers()[0].as_slice());
1234    }
1235    LanceBuffer::from(buffer)
1236}
1237
1238fn do_encode_bitmap_data(bitmaps: &[BooleanBuffer], num_values: u64) -> LanceBuffer {
1239    let mut builder = BooleanBufferBuilder::new(num_values as usize);
1240
1241    for buf in bitmaps {
1242        builder.append_buffer(buf);
1243    }
1244
1245    let buffer = builder.finish().into_inner();
1246    LanceBuffer::from(buffer)
1247}
1248
1249fn encode_bitmap_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1250    let bitmaps = arrays
1251        .iter()
1252        .map(|arr| arr.as_boolean().values().clone())
1253        .collect::<Vec<_>>();
1254    do_encode_bitmap_data(&bitmaps, num_values)
1255}
1256
1257// Concatenate dictionary arrays.  This is a bit tricky because we might overflow the
1258// index type.  If we do, we need to upscale the indices to a larger type.
1259fn concat_dict_arrays(arrays: &[ArrayRef]) -> ArrayRef {
1260    let value_type = arrays[0].as_any_dictionary().values().data_type();
1261    let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1262    match arrow_select::concat::concat(&array_refs) {
1263        Ok(array) => array,
1264        Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1265            // Slow, but hopefully a corner case.  Optimize later
1266            let upscaled = array_refs
1267                .iter()
1268                .map(|arr| {
1269                    match arrow_cast::cast(
1270                        *arr,
1271                        &DataType::Dictionary(
1272                            Box::new(DataType::UInt32),
1273                            Box::new(value_type.clone()),
1274                        ),
1275                    ) {
1276                        Ok(arr) => arr,
1277                        Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1278                            // Technically I think this means the input type was u64 already
1279                            unimplemented!("Dictionary arrays with more than 2^32 unique values")
1280                        }
1281                        err => err.unwrap(),
1282                    }
1283                })
1284                .collect::<Vec<_>>();
1285            let array_refs = upscaled.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1286            // Can still fail if concat pushes over u32 boundary
1287            match arrow_select::concat::concat(&array_refs) {
1288                Ok(array) => array,
1289                Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1290                    unimplemented!("Dictionary arrays with more than 2^32 unique values")
1291                }
1292                err => err.unwrap(),
1293            }
1294        }
1295        // Shouldn't be any other possible errors in concat
1296        err => err.unwrap(),
1297    }
1298}
1299
1300fn max_index_val(index_type: &DataType) -> u64 {
1301    match index_type {
1302        DataType::Int8 => i8::MAX as u64,
1303        DataType::Int16 => i16::MAX as u64,
1304        DataType::Int32 => i32::MAX as u64,
1305        DataType::Int64 => i64::MAX as u64,
1306        DataType::UInt8 => u8::MAX as u64,
1307        DataType::UInt16 => u16::MAX as u64,
1308        DataType::UInt32 => u32::MAX as u64,
1309        DataType::UInt64 => u64::MAX,
1310        _ => panic!("Invalid dictionary index type"),
1311    }
1312}
1313
1314// If we get multiple dictionary arrays and they don't all have the same dictionary
1315// then we need to normalize the indices.  Otherwise we might have something like:
1316//
1317// First chunk ["hello", "foo"], [0, 0, 1, 1, 1]
1318// Second chunk ["bar", "world"], [0, 1, 0, 1, 1]
1319//
1320// If we simply encode as ["hello", "foo", "bar", "world"], [0, 0, 1, 1, 1, 0, 1, 0, 1, 1]
1321// then we will get the wrong answer because the dictionaries were not merged and the indices
1322// were not remapped.
1323//
1324// A simple way to do this today is to just concatenate all the arrays.  This is because
1325// arrow's dictionary concatenation function already has the logic to merge dictionaries.
1326//
1327// TODO: We could be more efficient here by checking if the dictionaries are the same
1328//       Also, if they aren't, we can possibly do something cheaper than concatenating
1329//
1330// In addition, we want to normalize the representation of nulls.  The cheapest thing to
1331// do (space-wise) is to put the nulls in the dictionary.
1332fn arrow_dictionary_to_data_block(arrays: &[ArrayRef], validity: Option<NullBuffer>) -> DataBlock {
1333    let array = concat_dict_arrays(arrays);
1334    let array_dict = array.as_any_dictionary();
1335    let mut indices = array_dict.keys();
1336    let num_values = indices.len() as u64;
1337    let mut values = array_dict.values().clone();
1338    // Placeholder, if we need to upcast, we will initialize this and set `indices` to refer to it
1339    let mut upcast = None;
1340
1341    // TODO: Should we just always normalize indices to u32?  That would make logic simpler
1342    // and we're going to bitpack them soon anyways
1343
1344    let indices_block = if let Some(validity) = validity {
1345        // If there is validity then we find the first invalid index in the dictionary values, inserting
1346        // a new value if we need to.  Then we change all indices to point to that value.  This way we
1347        // never need to store nullability of the indices.
1348        let mut first_invalid_index = None;
1349        if let Some(values_validity) = values.nulls() {
1350            first_invalid_index = (!values_validity.inner()).set_indices().next();
1351        }
1352        let first_invalid_index = first_invalid_index.unwrap_or_else(|| {
1353            let null_arr = new_null_array(values.data_type(), 1);
1354            values = arrow_select::concat::concat(&[values.as_ref(), null_arr.as_ref()]).unwrap();
1355            let null_index = values.len() - 1;
1356            let max_index_val = max_index_val(indices.data_type());
1357            if null_index as u64 > max_index_val {
1358                // Widen the index type
1359                if max_index_val >= u32::MAX as u64 {
1360                    unimplemented!("Dictionary arrays with 2^32 unique value (or more) and a null")
1361                }
1362                upcast = Some(arrow_cast::cast(indices, &DataType::UInt32).unwrap());
1363                indices = upcast.as_ref().unwrap();
1364            }
1365            null_index
1366        });
1367        // This can't fail since we already checked for fit
1368        let null_index_arr = arrow_cast::cast(
1369            &UInt64Array::from(vec![first_invalid_index as u64]),
1370            indices.data_type(),
1371        )
1372        .unwrap();
1373
1374        let bytes_per_index = indices.data_type().byte_width();
1375        let bits_per_index = bytes_per_index as u64 * 8;
1376
1377        let null_index_arr = null_index_arr.into_data();
1378        let null_index_bytes = &null_index_arr.buffers()[0];
1379        // Need to make a copy here since indices isn't mutable, could be avoided in theory
1380        let mut indices_bytes = indices.to_data().buffers()[0].to_vec();
1381        for invalid_idx in (!validity.inner()).set_indices() {
1382            indices_bytes[invalid_idx * bytes_per_index..(invalid_idx + 1) * bytes_per_index]
1383                .copy_from_slice(null_index_bytes.as_slice());
1384        }
1385        FixedWidthDataBlock {
1386            data: LanceBuffer::from(indices_bytes),
1387            bits_per_value: bits_per_index,
1388            num_values,
1389            block_info: BlockInfo::new(),
1390        }
1391    } else {
1392        FixedWidthDataBlock {
1393            data: LanceBuffer::from(indices.to_data().buffers()[0].clone()),
1394            bits_per_value: indices.data_type().byte_width() as u64 * 8,
1395            num_values,
1396            block_info: BlockInfo::new(),
1397        }
1398    };
1399
1400    let items = DataBlock::from(values);
1401    DataBlock::Dictionary(DictionaryDataBlock {
1402        indices: indices_block,
1403        dictionary: Box::new(items),
1404    })
1405}
1406
1407enum Nullability {
1408    None,
1409    All,
1410    Some(NullBuffer),
1411}
1412
1413impl Nullability {
1414    fn to_option(&self) -> Option<NullBuffer> {
1415        match self {
1416            Self::Some(nulls) => Some(nulls.clone()),
1417            _ => None,
1418        }
1419    }
1420}
1421
1422fn extract_nulls(arrays: &[ArrayRef], num_values: u64) -> Nullability {
1423    let mut has_nulls = false;
1424    let nulls_and_lens = arrays
1425        .iter()
1426        .map(|arr| {
1427            let nulls = arr.logical_nulls();
1428            has_nulls |= nulls.is_some();
1429            (nulls, arr.len())
1430        })
1431        .collect::<Vec<_>>();
1432    if !has_nulls {
1433        return Nullability::None;
1434    }
1435    let mut builder = BooleanBufferBuilder::new(num_values as usize);
1436    let mut num_nulls = 0;
1437    for (null, len) in nulls_and_lens {
1438        if let Some(null) = null {
1439            num_nulls += null.null_count();
1440            builder.append_buffer(&null.into_inner());
1441        } else {
1442            builder.append_n(len, true);
1443        }
1444    }
1445    if num_nulls == num_values as usize {
1446        Nullability::All
1447    } else {
1448        Nullability::Some(NullBuffer::new(builder.finish()))
1449    }
1450}
1451
1452impl DataBlock {
1453    pub fn from_arrays(arrays: &[ArrayRef], num_values: u64) -> Self {
1454        if arrays.is_empty() || num_values == 0 {
1455            return Self::AllNull(AllNullDataBlock { num_values: 0 });
1456        }
1457
1458        let data_type = arrays[0].data_type();
1459        let nulls = extract_nulls(arrays, num_values);
1460
1461        if let Nullability::All = nulls {
1462            return Self::AllNull(AllNullDataBlock { num_values });
1463        }
1464
1465        let mut encoded = match data_type {
1466            DataType::Binary | DataType::Utf8 => arrow_binary_to_data_block(arrays, num_values, 32),
1467            DataType::BinaryView | DataType::Utf8View => {
1468                todo!()
1469            }
1470            DataType::LargeBinary | DataType::LargeUtf8 => {
1471                arrow_binary_to_data_block(arrays, num_values, 64)
1472            }
1473            DataType::Boolean => {
1474                let data = encode_bitmap_data(arrays, num_values);
1475                Self::FixedWidth(FixedWidthDataBlock {
1476                    data,
1477                    bits_per_value: 1,
1478                    num_values,
1479                    block_info: BlockInfo::new(),
1480                })
1481            }
1482            DataType::Date32
1483            | DataType::Date64
1484            | DataType::Decimal32(_, _)
1485            | DataType::Decimal64(_, _)
1486            | DataType::Decimal128(_, _)
1487            | DataType::Decimal256(_, _)
1488            | DataType::Duration(_)
1489            | DataType::FixedSizeBinary(_)
1490            | DataType::Float16
1491            | DataType::Float32
1492            | DataType::Float64
1493            | DataType::Int16
1494            | DataType::Int32
1495            | DataType::Int64
1496            | DataType::Int8
1497            | DataType::Interval(_)
1498            | DataType::Time32(_)
1499            | DataType::Time64(_)
1500            | DataType::Timestamp(_, _)
1501            | DataType::UInt16
1502            | DataType::UInt32
1503            | DataType::UInt64
1504            | DataType::UInt8 => {
1505                let data = encode_flat_data(arrays, num_values);
1506                Self::FixedWidth(FixedWidthDataBlock {
1507                    data,
1508                    bits_per_value: data_type.byte_width() as u64 * 8,
1509                    num_values,
1510                    block_info: BlockInfo::new(),
1511                })
1512            }
1513            DataType::Null => Self::AllNull(AllNullDataBlock { num_values }),
1514            DataType::Dictionary(_, _) => arrow_dictionary_to_data_block(arrays, nulls.to_option()),
1515            DataType::Struct(fields) => {
1516                let structs = arrays.iter().map(|arr| arr.as_struct()).collect::<Vec<_>>();
1517                let mut children = Vec::with_capacity(fields.len());
1518                for child_idx in 0..fields.len() {
1519                    let child_vec = structs
1520                        .iter()
1521                        .map(|s| s.column(child_idx).clone())
1522                        .collect::<Vec<_>>();
1523                    children.push(Self::from_arrays(&child_vec, num_values));
1524                }
1525
1526                // Extract validity for the struct array
1527                let validity = match &nulls {
1528                    Nullability::None => None,
1529                    Nullability::Some(null_buffer) => Some(null_buffer.clone()),
1530                    Nullability::All => unreachable!("Should have returned AllNull earlier"),
1531                };
1532
1533                Self::Struct(StructDataBlock {
1534                    children,
1535                    block_info: BlockInfo::default(),
1536                    validity,
1537                })
1538            }
1539            DataType::FixedSizeList(_, dim) => {
1540                let children = arrays
1541                    .iter()
1542                    .map(|arr| arr.as_fixed_size_list().values().clone())
1543                    .collect::<Vec<_>>();
1544                let child_block = Self::from_arrays(&children, num_values * *dim as u64);
1545                Self::FixedSizeList(FixedSizeListBlock {
1546                    child: Box::new(child_block),
1547                    dimension: *dim as u64,
1548                })
1549            }
1550            DataType::LargeList(_)
1551            | DataType::List(_)
1552            | DataType::ListView(_)
1553            | DataType::LargeListView(_)
1554            | DataType::Map(_, _)
1555            | DataType::RunEndEncoded(_, _)
1556            | DataType::Union(_, _) => {
1557                panic!(
1558                    "Field with data type {} cannot be converted to data block",
1559                    data_type
1560                )
1561            }
1562        };
1563
1564        // compute statistics
1565        encoded.compute_stat();
1566
1567        if !matches!(data_type, DataType::Dictionary(_, _)) {
1568            match nulls {
1569                Nullability::None => encoded,
1570                Nullability::Some(nulls) => Self::Nullable(NullableDataBlock {
1571                    data: Box::new(encoded),
1572                    nulls: LanceBuffer::from(nulls.into_inner().into_inner()),
1573                    block_info: BlockInfo::new(),
1574                }),
1575                _ => unreachable!(),
1576            }
1577        } else {
1578            // Dictionaries already insert the nulls into the dictionary items
1579            encoded
1580        }
1581    }
1582
1583    pub fn from_array<T: Array + 'static>(array: T) -> Self {
1584        let num_values = array.len();
1585        Self::from_arrays(&[Arc::new(array)], num_values as u64)
1586    }
1587}
1588
1589impl From<ArrayRef> for DataBlock {
1590    fn from(array: ArrayRef) -> Self {
1591        let num_values = array.len() as u64;
1592        Self::from_arrays(&[array], num_values)
1593    }
1594}
1595
1596pub trait DataBlockBuilderImpl: std::fmt::Debug {
1597    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>);
1598    fn finish(self: Box<Self>) -> DataBlock;
1599}
1600
1601#[derive(Debug)]
1602pub struct DataBlockBuilder {
1603    estimated_size_bytes: u64,
1604    builder: Option<Box<dyn DataBlockBuilderImpl>>,
1605}
1606
1607impl DataBlockBuilder {
1608    pub fn with_capacity_estimate(estimated_size_bytes: u64) -> Self {
1609        Self {
1610            estimated_size_bytes,
1611            builder: None,
1612        }
1613    }
1614
1615    fn get_builder(&mut self, block: &DataBlock) -> &mut dyn DataBlockBuilderImpl {
1616        if self.builder.is_none() {
1617            self.builder = Some(block.make_builder(self.estimated_size_bytes));
1618        }
1619        self.builder.as_mut().unwrap().as_mut()
1620    }
1621
1622    pub fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
1623        self.get_builder(data_block).append(data_block, selection);
1624    }
1625
1626    pub fn finish(self) -> DataBlock {
1627        let builder = self.builder.expect("DataBlockBuilder didn't see any data");
1628        builder.finish()
1629    }
1630}
1631
1632#[cfg(test)]
1633mod tests {
1634    use std::sync::Arc;
1635
1636    use arrow_array::{
1637        ArrayRef, DictionaryArray, Int8Array, LargeBinaryArray, StringArray, UInt8Array,
1638        UInt16Array, make_array, new_null_array,
1639        types::{Int8Type, Int32Type},
1640    };
1641    use arrow_buffer::{BooleanBuffer, NullBuffer};
1642
1643    use arrow_schema::{DataType, Field, Fields};
1644    use lance_datagen::{ArrayGeneratorExt, DEFAULT_SEED, RowCount, array};
1645    use rand::SeedableRng;
1646
1647    use crate::buffer::LanceBuffer;
1648
1649    use super::{AllNullDataBlock, DataBlock};
1650
1651    use arrow_array::Array;
1652
1653    #[test]
1654    fn test_sliced_to_data_block() {
1655        let ints = UInt16Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
1656        let ints = ints.slice(2, 4);
1657        let data = DataBlock::from_array(ints);
1658
1659        let fixed_data = data.as_fixed_width().unwrap();
1660        assert_eq!(fixed_data.num_values, 4);
1661        assert_eq!(fixed_data.data.len(), 8);
1662
1663        let nullable_ints =
1664            UInt16Array::from(vec![Some(0), None, Some(2), None, Some(4), None, Some(6)]);
1665        let nullable_ints = nullable_ints.slice(1, 3);
1666        let data = DataBlock::from_array(nullable_ints);
1667
1668        let nullable = data.as_nullable().unwrap();
1669        assert_eq!(nullable.nulls, LanceBuffer::from(vec![0b00000010]));
1670    }
1671
1672    #[test]
1673    fn test_string_to_data_block() {
1674        // Converting string arrays that contain nulls to DataBlock
1675        let strings1 = StringArray::from(vec![Some("hello"), None, Some("world")]);
1676        let strings2 = StringArray::from(vec![Some("a"), Some("b")]);
1677        let strings3 = StringArray::from(vec![Option::<&'static str>::None, None]);
1678
1679        let arrays = &[strings1, strings2, strings3]
1680            .iter()
1681            .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1682            .collect::<Vec<_>>();
1683
1684        let block = DataBlock::from_arrays(arrays, 7);
1685
1686        assert_eq!(block.num_values(), 7);
1687        let block = block.as_nullable().unwrap();
1688
1689        assert_eq!(block.nulls, LanceBuffer::from(vec![0b00011101]));
1690
1691        let data = block.data.as_variable_width().unwrap();
1692        assert_eq!(
1693            data.offsets,
1694            LanceBuffer::reinterpret_vec(vec![0, 5, 5, 10, 11, 12, 12, 12])
1695        );
1696
1697        assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworldab"));
1698
1699        // Converting string arrays that do not contain nulls to DataBlock
1700        let strings1 = StringArray::from(vec![Some("a"), Some("bc")]);
1701        let strings2 = StringArray::from(vec![Some("def")]);
1702
1703        let arrays = &[strings1, strings2]
1704            .iter()
1705            .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1706            .collect::<Vec<_>>();
1707
1708        let block = DataBlock::from_arrays(arrays, 3);
1709
1710        assert_eq!(block.num_values(), 3);
1711        // Should be no nullable wrapper
1712        let data = block.as_variable_width().unwrap();
1713        assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(vec![0, 1, 3, 6]));
1714        assert_eq!(data.data, LanceBuffer::copy_slice(b"abcdef"));
1715    }
1716
1717    #[test]
1718    fn test_string_sliced() {
1719        let check = |arr: Vec<StringArray>, expected_off: Vec<i32>, expected_data: &[u8]| {
1720            let arrs = arr
1721                .into_iter()
1722                .map(|a| Arc::new(a) as ArrayRef)
1723                .collect::<Vec<_>>();
1724            let num_rows = arrs.iter().map(|a| a.len()).sum::<usize>() as u64;
1725            let data = DataBlock::from_arrays(&arrs, num_rows);
1726
1727            assert_eq!(data.num_values(), num_rows);
1728
1729            let data = data.as_variable_width().unwrap();
1730            assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(expected_off));
1731            assert_eq!(data.data, LanceBuffer::copy_slice(expected_data));
1732        };
1733
1734        let string = StringArray::from(vec![Some("hello"), Some("world")]);
1735        check(vec![string.slice(1, 1)], vec![0, 5], b"world");
1736        check(vec![string.slice(0, 1)], vec![0, 5], b"hello");
1737        check(
1738            vec![string.slice(0, 1), string.slice(1, 1)],
1739            vec![0, 5, 10],
1740            b"helloworld",
1741        );
1742
1743        let string2 = StringArray::from(vec![Some("foo"), Some("bar")]);
1744        check(
1745            vec![string.slice(0, 1), string2.slice(0, 1)],
1746            vec![0, 5, 8],
1747            b"hellofoo",
1748        );
1749    }
1750
1751    #[test]
1752    fn test_large() {
1753        let arr = LargeBinaryArray::from_vec(vec![b"hello", b"world"]);
1754        let data = DataBlock::from_array(arr);
1755
1756        assert_eq!(data.num_values(), 2);
1757        let data = data.as_variable_width().unwrap();
1758        assert_eq!(data.bits_per_offset, 64);
1759        assert_eq!(data.num_values, 2);
1760        assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworld"));
1761        assert_eq!(
1762            data.offsets,
1763            LanceBuffer::reinterpret_vec(vec![0_u64, 5, 10])
1764        );
1765    }
1766
1767    #[test]
1768    fn test_dictionary_indices_normalized() {
1769        let arr1 = DictionaryArray::<Int8Type>::from_iter([Some("a"), Some("a"), Some("b")]);
1770        let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("b"), Some("c")]);
1771
1772        let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1773
1774        assert_eq!(data.num_values(), 5);
1775        let data = data.as_dictionary().unwrap();
1776        let indices = data.indices;
1777        assert_eq!(indices.bits_per_value, 8);
1778        assert_eq!(indices.num_values, 5);
1779        assert_eq!(
1780            indices.data,
1781            // You might expect 0, 0, 1, 1, 2 but it seems that arrow's dictionary concat does
1782            // not actually collapse dictionaries.  This is an arrow problem however, and we don't
1783            // need to fix it here.
1784            LanceBuffer::reinterpret_vec::<i8>(vec![0, 0, 1, 2, 3])
1785        );
1786
1787        let items = data.dictionary.as_variable_width().unwrap();
1788        assert_eq!(items.bits_per_offset, 32);
1789        assert_eq!(items.num_values, 4);
1790        assert_eq!(items.data, LanceBuffer::copy_slice(b"abbc"));
1791        assert_eq!(
1792            items.offsets,
1793            LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 4],)
1794        );
1795    }
1796
1797    #[test]
1798    fn test_dictionary_nulls() {
1799        // Test both ways of encoding nulls
1800
1801        // By default, nulls get encoded into the indices
1802        let arr1 = DictionaryArray::<Int8Type>::from_iter([None, Some("a"), Some("b")]);
1803        let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("c"), None]);
1804
1805        let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1806
1807        let check_common = |data: DataBlock| {
1808            assert_eq!(data.num_values(), 5);
1809            let dict = data.as_dictionary().unwrap();
1810
1811            let nullable_items = dict.dictionary.as_nullable().unwrap();
1812            assert_eq!(nullable_items.nulls, LanceBuffer::from(vec![0b00000111]));
1813            assert_eq!(nullable_items.data.num_values(), 4);
1814
1815            let items = nullable_items.data.as_variable_width().unwrap();
1816            assert_eq!(items.bits_per_offset, 32);
1817            assert_eq!(items.num_values, 4);
1818            assert_eq!(items.data, LanceBuffer::copy_slice(b"abc"));
1819            assert_eq!(
1820                items.offsets,
1821                LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 3],)
1822            );
1823
1824            let indices = dict.indices;
1825            assert_eq!(indices.bits_per_value, 8);
1826            assert_eq!(indices.num_values, 5);
1827            assert_eq!(
1828                indices.data,
1829                LanceBuffer::reinterpret_vec::<i8>(vec![3, 0, 1, 2, 3])
1830            );
1831        };
1832        check_common(data);
1833
1834        // However, we can manually create a dictionary where nulls are in the dictionary
1835        let items = StringArray::from(vec![Some("a"), Some("b"), Some("c"), None]);
1836        let indices = Int8Array::from(vec![Some(3), Some(0), Some(1), Some(2), Some(3)]);
1837        let dict = DictionaryArray::new(indices, Arc::new(items));
1838
1839        let data = DataBlock::from_array(dict);
1840
1841        check_common(data);
1842    }
1843
1844    #[test]
1845    fn test_dictionary_cannot_add_null() {
1846        // 256 unique strings
1847        let items = StringArray::from(
1848            (0..256)
1849                .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1850                .collect::<Vec<_>>(),
1851        );
1852        // 257 indices, covering the whole range, plus one null
1853        let indices = UInt8Array::from(
1854            (0..=256)
1855                .map(|i| if i == 256 { None } else { Some(i as u8) })
1856                .collect::<Vec<_>>(),
1857        );
1858        // We want to normalize this by pushing nulls into the dictionary, but we cannot because
1859        // the dictionary is too large for the index type
1860        let dict = DictionaryArray::new(indices, Arc::new(items));
1861        let data = DataBlock::from_array(dict);
1862
1863        assert_eq!(data.num_values(), 257);
1864
1865        let dict = data.as_dictionary().unwrap();
1866
1867        assert_eq!(dict.indices.bits_per_value, 32);
1868        assert_eq!(
1869            dict.indices.data,
1870            LanceBuffer::reinterpret_vec((0_u32..257).collect::<Vec<_>>())
1871        );
1872
1873        let nullable_items = dict.dictionary.as_nullable().unwrap();
1874        let null_buffer = NullBuffer::new(BooleanBuffer::new(
1875            nullable_items.nulls.into_buffer(),
1876            0,
1877            257,
1878        ));
1879        for i in 0..256 {
1880            assert!(!null_buffer.is_null(i));
1881        }
1882        assert!(null_buffer.is_null(256));
1883
1884        assert_eq!(
1885            nullable_items.data.as_variable_width().unwrap().data.len(),
1886            32640
1887        );
1888    }
1889
1890    #[test]
1891    fn test_all_null() {
1892        for data_type in [
1893            DataType::UInt32,
1894            DataType::FixedSizeBinary(2),
1895            DataType::List(Arc::new(Field::new("item", DataType::UInt32, true))),
1896            DataType::Struct(Fields::from(vec![Field::new("a", DataType::UInt32, true)])),
1897        ] {
1898            let block = DataBlock::AllNull(AllNullDataBlock { num_values: 10 });
1899            let arr = block.into_arrow(data_type.clone(), true).unwrap();
1900            let arr = make_array(arr);
1901            let expected = new_null_array(&data_type, 10);
1902            assert_eq!(&arr, &expected);
1903        }
1904    }
1905
1906    #[test]
1907    fn test_dictionary_cannot_concatenate() {
1908        // 256 unique strings
1909        let items = StringArray::from(
1910            (0..256)
1911                .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1912                .collect::<Vec<_>>(),
1913        );
1914        // 256 different unique strings
1915        let other_items = StringArray::from(
1916            (0..256)
1917                .map(|i| Some(String::from_utf8(vec![1; i + 1]).unwrap()))
1918                .collect::<Vec<_>>(),
1919        );
1920        let indices = UInt8Array::from_iter_values(0..=255);
1921        let dict1 = DictionaryArray::new(indices.clone(), Arc::new(items));
1922        let dict2 = DictionaryArray::new(indices, Arc::new(other_items));
1923        let data = DataBlock::from_arrays(&[Arc::new(dict1), Arc::new(dict2)], 512);
1924        assert_eq!(data.num_values(), 512);
1925
1926        let dict = data.as_dictionary().unwrap();
1927
1928        assert_eq!(dict.indices.bits_per_value, 32);
1929        assert_eq!(
1930            dict.indices.data,
1931            LanceBuffer::reinterpret_vec::<u32>((0..512).collect::<Vec<_>>())
1932        );
1933        // What fun: 0 + 1 + .. + 255 + 1 + 2 + .. + 256 = 2^16
1934        assert_eq!(
1935            dict.dictionary.as_variable_width().unwrap().data.len(),
1936            65536
1937        );
1938    }
1939
1940    #[test]
1941    fn test_data_size() {
1942        let mut rng = rand_xoshiro::Xoshiro256PlusPlus::seed_from_u64(DEFAULT_SEED.0);
1943        // test data_size() when input has no nulls
1944        let mut genn = array::rand::<Int32Type>().with_nulls(&[false, false, false]);
1945
1946        let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
1947        let block = DataBlock::from_array(arr.clone());
1948        assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
1949
1950        let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
1951        let block = DataBlock::from_array(arr.clone());
1952        assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
1953
1954        // test data_size() when input has nulls
1955        let mut genn = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
1956        let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
1957        let block = DataBlock::from_array(arr.clone());
1958
1959        let array_data = arr.to_data();
1960        let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1961        // the NullBuffer.len() returns the length in bits so we divide_round_up by 8
1962        let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1963        assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1964
1965        let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
1966        let block = DataBlock::from_array(arr.clone());
1967
1968        let array_data = arr.to_data();
1969        let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1970        let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1971        assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1972
1973        let mut genn = array::rand::<Int32Type>().with_nulls(&[true, true, false]);
1974        let arr = genn.generate(RowCount::from(3), &mut rng).unwrap();
1975        let block = DataBlock::from_array(arr.clone());
1976
1977        let array_data = arr.to_data();
1978        let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1979        let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1980        assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1981
1982        let arr = genn.generate(RowCount::from(400), &mut rng).unwrap();
1983        let block = DataBlock::from_array(arr.clone());
1984
1985        let array_data = arr.to_data();
1986        let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
1987        let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
1988        assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
1989
1990        let mut genn = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
1991        let arr1 = genn.generate(RowCount::from(3), &mut rng).unwrap();
1992        let arr2 = genn.generate(RowCount::from(3), &mut rng).unwrap();
1993        let arr3 = genn.generate(RowCount::from(3), &mut rng).unwrap();
1994        let block = DataBlock::from_arrays(&[arr1.clone(), arr2.clone(), arr3.clone()], 9);
1995
1996        let concatenated_array = arrow_select::concat::concat(&[
1997            &*Arc::new(arr1.clone()) as &dyn Array,
1998            &*Arc::new(arr2.clone()) as &dyn Array,
1999            &*Arc::new(arr3.clone()) as &dyn Array,
2000        ])
2001        .unwrap();
2002        let total_buffer_size: usize = concatenated_array
2003            .to_data()
2004            .buffers()
2005            .iter()
2006            .map(|buffer| buffer.len())
2007            .sum();
2008
2009        let total_nulls_size_in_bytes = concatenated_array.nulls().unwrap().len().div_ceil(8);
2010        assert!(block.data_size() == (total_buffer_size + total_nulls_size_in_bytes) as u64);
2011    }
2012}