lance_encoding/
data.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Data layouts to represent encoded data in a sub-Arrow format
5//!
6//! These [`DataBlock`] structures represent physical layouts.  They fill a gap somewhere
7//! between [`arrow_data::data::ArrayData`] (which, as a collection of buffers, is too
8//! generic because it doesn't give us enough information about what those buffers represent)
9//! and [`arrow_array::array::Array`] (which is too specific, because it cares about the
10//! logical data type).
11//!
12//! In addition, the layouts represented here are slightly stricter than Arrow's layout rules.
13//! For example, offset buffers MUST start with 0.  These additional restrictions impose a
14//! slight penalty on encode (to normalize arrow data) but make the development of encoders
15//! and decoders easier (since they can rely on a normalized representation)
16
17use std::{
18    ops::Range,
19    sync::{Arc, RwLock},
20};
21
22use arrow::array::{ArrayData, ArrayDataBuilder, AsArray};
23use arrow_array::{new_empty_array, new_null_array, Array, ArrayRef, OffsetSizeTrait, UInt64Array};
24use arrow_buffer::{
25    ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer, ScalarBuffer,
26};
27use arrow_schema::DataType;
28use lance_arrow::DataTypeExt;
29use snafu::location;
30
31use lance_core::{Error, Result};
32
33use crate::{
34    buffer::LanceBuffer,
35    statistics::{ComputeStat, Stat},
36};
37
38/// A data block with no buffers where everything is null
39///
40/// Note: this data block should not be used for future work.  It will be deprecated
41/// in the 2.1 version of the format where nullability will be handled by the structural
42/// encoders.
43#[derive(Debug)]
44pub struct AllNullDataBlock {
45    /// The number of values represented by this block
46    pub num_values: u64,
47}
48
49impl AllNullDataBlock {
50    fn into_arrow(self, data_type: DataType, _validate: bool) -> Result<ArrayData> {
51        Ok(ArrayData::new_null(&data_type, self.num_values as usize))
52    }
53
54    fn into_buffers(self) -> Vec<LanceBuffer> {
55        vec![]
56    }
57
58    fn borrow_and_clone(&mut self) -> Self {
59        Self {
60            num_values: self.num_values,
61        }
62    }
63
64    fn try_clone(&self) -> Result<Self> {
65        Ok(Self {
66            num_values: self.num_values,
67        })
68    }
69}
70
71use std::collections::HashMap;
72
73// `BlockInfo` stores the statistics of this `DataBlock`, such as `NullCount` for `NullableDataBlock`,
74// `BitWidth` for `FixedWidthDataBlock`, `Cardinality` for all `DataBlock`
75#[derive(Debug, Clone)]
76pub struct BlockInfo(pub Arc<RwLock<HashMap<Stat, Arc<dyn Array>>>>);
77
78impl Default for BlockInfo {
79    fn default() -> Self {
80        Self::new()
81    }
82}
83
84impl BlockInfo {
85    pub fn new() -> Self {
86        Self(Arc::new(RwLock::new(HashMap::new())))
87    }
88}
89
90impl PartialEq for BlockInfo {
91    fn eq(&self, other: &Self) -> bool {
92        let self_info = self.0.read().unwrap();
93        let other_info = other.0.read().unwrap();
94        *self_info == *other_info
95    }
96}
97
98/// Wraps a data block and adds nullability information to it
99///
100/// Note: this data block should not be used for future work.  It will be deprecated
101/// in the 2.1 version of the format where nullability will be handled by the structural
102/// encoders.
103#[derive(Debug)]
104pub struct NullableDataBlock {
105    /// The underlying data
106    pub data: Box<DataBlock>,
107    /// A bitmap of validity for each value
108    pub nulls: LanceBuffer,
109
110    pub block_info: BlockInfo,
111}
112
113impl NullableDataBlock {
114    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
115        let nulls = self.nulls.into_buffer();
116        let data = self.data.into_arrow(data_type, validate)?.into_builder();
117        let data = data.null_bit_buffer(Some(nulls));
118        if validate {
119            Ok(data.build()?)
120        } else {
121            Ok(unsafe { data.build_unchecked() })
122        }
123    }
124
125    fn into_buffers(self) -> Vec<LanceBuffer> {
126        let mut buffers = vec![self.nulls];
127        buffers.extend(self.data.into_buffers());
128        buffers
129    }
130
131    fn borrow_and_clone(&mut self) -> Self {
132        Self {
133            data: Box::new(self.data.borrow_and_clone()),
134            nulls: self.nulls.borrow_and_clone(),
135            block_info: self.block_info.clone(),
136        }
137    }
138
139    fn try_clone(&self) -> Result<Self> {
140        Ok(Self {
141            data: Box::new(self.data.try_clone()?),
142            nulls: self.nulls.try_clone()?,
143            block_info: self.block_info.clone(),
144        })
145    }
146
147    pub fn data_size(&self) -> u64 {
148        self.data.data_size() + self.nulls.len() as u64
149    }
150}
151
152/// A block representing the same constant value repeated many times
153#[derive(Debug, PartialEq)]
154pub struct ConstantDataBlock {
155    /// Data buffer containing the value
156    pub data: LanceBuffer,
157    /// The number of values
158    pub num_values: u64,
159}
160
161impl ConstantDataBlock {
162    fn into_buffers(self) -> Vec<LanceBuffer> {
163        vec![self.data]
164    }
165
166    fn into_arrow(self, _data_type: DataType, _validate: bool) -> Result<ArrayData> {
167        // We don't need this yet but if we come up with some way of serializing
168        // scalars to/from bytes then we could implement it.
169        todo!()
170    }
171
172    pub fn borrow_and_clone(&mut self) -> Self {
173        Self {
174            data: self.data.borrow_and_clone(),
175            num_values: self.num_values,
176        }
177    }
178
179    pub fn try_clone(&self) -> Result<Self> {
180        Ok(Self {
181            data: self.data.try_clone()?,
182            num_values: self.num_values,
183        })
184    }
185
186    pub fn data_size(&self) -> u64 {
187        self.data.len() as u64
188    }
189}
190
191/// A data block for a single buffer of data where each element has a fixed number of bits
192#[derive(Debug, PartialEq)]
193pub struct FixedWidthDataBlock {
194    /// The data buffer
195    pub data: LanceBuffer,
196    /// The number of bits per value
197    pub bits_per_value: u64,
198    /// The number of values represented by this block
199    pub num_values: u64,
200
201    pub block_info: BlockInfo,
202}
203
204impl FixedWidthDataBlock {
205    fn do_into_arrow(
206        self,
207        data_type: DataType,
208        num_values: u64,
209        validate: bool,
210    ) -> Result<ArrayData> {
211        let data_buffer = self.data.into_buffer();
212        let builder = ArrayDataBuilder::new(data_type)
213            .add_buffer(data_buffer)
214            .len(num_values as usize)
215            .null_count(0);
216        if validate {
217            Ok(builder.build()?)
218        } else {
219            Ok(unsafe { builder.build_unchecked() })
220        }
221    }
222
223    pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
224        let root_num_values = self.num_values;
225        self.do_into_arrow(data_type, root_num_values, validate)
226    }
227
228    pub fn into_buffers(self) -> Vec<LanceBuffer> {
229        vec![self.data]
230    }
231
232    pub fn borrow_and_clone(&mut self) -> Self {
233        Self {
234            data: self.data.borrow_and_clone(),
235            bits_per_value: self.bits_per_value,
236            num_values: self.num_values,
237            block_info: self.block_info.clone(),
238        }
239    }
240
241    pub fn try_clone(&self) -> Result<Self> {
242        Ok(Self {
243            data: self.data.try_clone()?,
244            bits_per_value: self.bits_per_value,
245            num_values: self.num_values,
246            block_info: self.block_info.clone(),
247        })
248    }
249
250    pub fn data_size(&self) -> u64 {
251        self.data.len() as u64
252    }
253}
254
255#[derive(Debug)]
256pub struct VariableWidthDataBlockBuilder<T: OffsetSizeTrait> {
257    offsets: Vec<T>,
258    bytes: Vec<u8>,
259}
260
261impl<T: OffsetSizeTrait> VariableWidthDataBlockBuilder<T> {
262    fn new(estimated_size_bytes: u64) -> Self {
263        Self {
264            offsets: vec![T::from_usize(0).unwrap()],
265            bytes: Vec::with_capacity(estimated_size_bytes as usize),
266        }
267    }
268}
269
270impl<T: OffsetSizeTrait> DataBlockBuilderImpl for VariableWidthDataBlockBuilder<T> {
271    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
272        let block = data_block.as_variable_width_ref().unwrap();
273        assert!(block.bits_per_offset == T::get_byte_width() as u8 * 8);
274
275        let offsets: ScalarBuffer<T> = block.offsets.try_clone().unwrap().borrow_to_typed_slice();
276
277        let start_offset = offsets[selection.start as usize];
278        let end_offset = offsets[selection.end as usize];
279        let mut previous_len = self.bytes.len();
280
281        self.bytes
282            .extend_from_slice(&block.data[start_offset.as_usize()..end_offset.as_usize()]);
283
284        self.offsets.extend(
285            offsets[selection.start as usize..selection.end as usize]
286                .iter()
287                .zip(&offsets[selection.start as usize + 1..=selection.end as usize])
288                .map(|(&current, &next)| {
289                    let this_value_len = next - current;
290                    previous_len += this_value_len.as_usize();
291                    T::from_usize(previous_len).unwrap()
292                }),
293        );
294    }
295
296    fn finish(self: Box<Self>) -> DataBlock {
297        let num_values = (self.offsets.len() - 1) as u64;
298        DataBlock::VariableWidth(VariableWidthBlock {
299            data: LanceBuffer::Owned(self.bytes),
300            offsets: LanceBuffer::reinterpret_vec(self.offsets),
301            bits_per_offset: T::get_byte_width() as u8 * 8,
302            num_values,
303            block_info: BlockInfo::new(),
304        })
305    }
306}
307
308#[derive(Debug)]
309struct BitmapDataBlockBuilder {
310    values: BooleanBufferBuilder,
311}
312
313impl BitmapDataBlockBuilder {
314    fn new(estimated_size_bytes: u64) -> Self {
315        Self {
316            values: BooleanBufferBuilder::new(estimated_size_bytes as usize * 8),
317        }
318    }
319}
320
321impl DataBlockBuilderImpl for BitmapDataBlockBuilder {
322    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
323        let bitmap_blk = data_block.as_fixed_width_ref().unwrap();
324        self.values.append_packed_range(
325            selection.start as usize..selection.end as usize,
326            &bitmap_blk.data,
327        );
328    }
329
330    fn finish(mut self: Box<Self>) -> DataBlock {
331        let bool_buf = self.values.finish();
332        let num_values = bool_buf.len() as u64;
333        let bits_buf = bool_buf.into_inner();
334        DataBlock::FixedWidth(FixedWidthDataBlock {
335            data: LanceBuffer::from(bits_buf),
336            bits_per_value: 1,
337            num_values,
338            block_info: BlockInfo::new(),
339        })
340    }
341}
342
343#[derive(Debug)]
344struct FixedWidthDataBlockBuilder {
345    bits_per_value: u64,
346    bytes_per_value: u64,
347    values: Vec<u8>,
348}
349
350impl FixedWidthDataBlockBuilder {
351    fn new(bits_per_value: u64, estimated_size_bytes: u64) -> Self {
352        assert!(bits_per_value % 8 == 0);
353        Self {
354            bits_per_value,
355            bytes_per_value: bits_per_value / 8,
356            values: Vec::with_capacity(estimated_size_bytes as usize),
357        }
358    }
359}
360
361impl DataBlockBuilderImpl for FixedWidthDataBlockBuilder {
362    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
363        let block = data_block.as_fixed_width_ref().unwrap();
364        assert_eq!(self.bits_per_value, block.bits_per_value);
365        let start = selection.start as usize * self.bytes_per_value as usize;
366        let end = selection.end as usize * self.bytes_per_value as usize;
367        self.values.extend_from_slice(&block.data[start..end]);
368    }
369
370    fn finish(self: Box<Self>) -> DataBlock {
371        let num_values = (self.values.len() / self.bytes_per_value as usize) as u64;
372        DataBlock::FixedWidth(FixedWidthDataBlock {
373            data: LanceBuffer::Owned(self.values),
374            bits_per_value: self.bits_per_value,
375            num_values,
376            block_info: BlockInfo::new(),
377        })
378    }
379}
380
381#[derive(Debug)]
382struct StructDataBlockBuilder {
383    children: Vec<Box<dyn DataBlockBuilderImpl>>,
384}
385
386impl StructDataBlockBuilder {
387    // Currently only Struct with fixed-width fields are supported.
388    // And the assumption that all fields have `bits_per_value % 8 == 0` is made here.
389    fn new(bits_per_values: Vec<u32>, estimated_size_bytes: u64) -> Self {
390        let mut children = vec![];
391
392        debug_assert!(bits_per_values.iter().all(|bpv| bpv % 8 == 0));
393
394        let bytes_per_row: u32 = bits_per_values.iter().sum::<u32>() / 8;
395        let bytes_per_row = bytes_per_row as u64;
396
397        for bits_per_value in bits_per_values.iter() {
398            let this_estimated_size_bytes =
399                estimated_size_bytes / bytes_per_row * (*bits_per_value as u64) / 8;
400            let child =
401                FixedWidthDataBlockBuilder::new(*bits_per_value as u64, this_estimated_size_bytes);
402            children.push(Box::new(child) as Box<dyn DataBlockBuilderImpl>);
403        }
404        Self { children }
405    }
406}
407
408impl DataBlockBuilderImpl for StructDataBlockBuilder {
409    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
410        let data_block = data_block.as_struct_ref().unwrap();
411        for i in 0..self.children.len() {
412            self.children[i].append(&data_block.children[i], selection.clone());
413        }
414    }
415
416    fn finish(self: Box<Self>) -> DataBlock {
417        let mut children_data_block = Vec::new();
418        for child in self.children {
419            let child_data_block = child.finish();
420            children_data_block.push(child_data_block);
421        }
422        DataBlock::Struct(StructDataBlock {
423            children: children_data_block,
424            block_info: BlockInfo::new(),
425            validity: None,
426        })
427    }
428}
429/// A data block to represent a fixed size list
430#[derive(Debug)]
431pub struct FixedSizeListBlock {
432    /// The child data block
433    pub child: Box<DataBlock>,
434    /// The number of items in each list
435    pub dimension: u64,
436}
437
438impl FixedSizeListBlock {
439    fn borrow_and_clone(&mut self) -> Self {
440        Self {
441            child: Box::new(self.child.borrow_and_clone()),
442            dimension: self.dimension,
443        }
444    }
445
446    fn try_clone(&self) -> Result<Self> {
447        Ok(Self {
448            child: Box::new(self.child.try_clone()?),
449            dimension: self.dimension,
450        })
451    }
452
453    pub fn num_values(&self) -> u64 {
454        self.child.num_values() / self.dimension
455    }
456
457    /// Try to flatten a FixedSizeListBlock into a FixedWidthDataBlock
458    ///
459    /// Returns None if any children are nullable
460    pub fn try_into_flat(self) -> Option<FixedWidthDataBlock> {
461        match *self.child {
462            // Cannot flatten a nullable child
463            DataBlock::Nullable(_) => None,
464            DataBlock::FixedSizeList(inner) => {
465                let mut flat = inner.try_into_flat()?;
466                flat.bits_per_value *= self.dimension;
467                flat.num_values /= self.dimension;
468                Some(flat)
469            }
470            DataBlock::FixedWidth(mut inner) => {
471                inner.bits_per_value *= self.dimension;
472                inner.num_values /= self.dimension;
473                Some(inner)
474            }
475            _ => panic!(
476                "Expected FixedSizeList or FixedWidth data block but found {:?}",
477                self
478            ),
479        }
480    }
481
482    pub fn flatten_as_fixed(&mut self) -> FixedWidthDataBlock {
483        match self.child.as_mut() {
484            DataBlock::FixedSizeList(fsl) => fsl.flatten_as_fixed(),
485            DataBlock::FixedWidth(fw) => fw.borrow_and_clone(),
486            _ => panic!("Expected FixedSizeList or FixedWidth data block"),
487        }
488    }
489
490    /// Convert a flattened values block into a FixedSizeListBlock
491    pub fn from_flat(data: FixedWidthDataBlock, data_type: &DataType) -> DataBlock {
492        match data_type {
493            DataType::FixedSizeList(child_field, dimension) => {
494                let mut data = data;
495                data.bits_per_value /= *dimension as u64;
496                data.num_values *= *dimension as u64;
497                let child_data = Self::from_flat(data, child_field.data_type());
498                DataBlock::FixedSizeList(Self {
499                    child: Box::new(child_data),
500                    dimension: *dimension as u64,
501                })
502            }
503            // Base case, we've hit a non-list type
504            _ => DataBlock::FixedWidth(data),
505        }
506    }
507
508    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
509        let num_values = self.num_values();
510        let builder = match &data_type {
511            DataType::FixedSizeList(child_field, _) => {
512                let child_data = self
513                    .child
514                    .into_arrow(child_field.data_type().clone(), validate)?;
515                ArrayDataBuilder::new(data_type)
516                    .add_child_data(child_data)
517                    .len(num_values as usize)
518                    .null_count(0)
519            }
520            _ => panic!("Expected FixedSizeList data type and got {:?}", data_type),
521        };
522        if validate {
523            Ok(builder.build()?)
524        } else {
525            Ok(unsafe { builder.build_unchecked() })
526        }
527    }
528
529    fn into_buffers(self) -> Vec<LanceBuffer> {
530        self.child.into_buffers()
531    }
532
533    fn data_size(&self) -> u64 {
534        self.child.data_size()
535    }
536}
537
538#[derive(Debug)]
539struct FixedSizeListBlockBuilder {
540    inner: Box<dyn DataBlockBuilderImpl>,
541    dimension: u64,
542}
543
544impl FixedSizeListBlockBuilder {
545    fn new(inner: Box<dyn DataBlockBuilderImpl>, dimension: u64) -> Self {
546        Self { inner, dimension }
547    }
548}
549
550impl DataBlockBuilderImpl for FixedSizeListBlockBuilder {
551    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
552        let selection = selection.start * self.dimension..selection.end * self.dimension;
553        let fsl = data_block.as_fixed_size_list_ref().unwrap();
554        self.inner.append(fsl.child.as_ref(), selection);
555    }
556
557    fn finish(self: Box<Self>) -> DataBlock {
558        let inner_block = self.inner.finish();
559        DataBlock::FixedSizeList(FixedSizeListBlock {
560            child: Box::new(inner_block),
561            dimension: self.dimension,
562        })
563    }
564}
565
566#[derive(Debug)]
567struct NullableDataBlockBuilder {
568    inner: Box<dyn DataBlockBuilderImpl>,
569    validity: BooleanBufferBuilder,
570}
571
572impl NullableDataBlockBuilder {
573    fn new(inner: Box<dyn DataBlockBuilderImpl>, estimated_size_bytes: usize) -> Self {
574        Self {
575            inner,
576            validity: BooleanBufferBuilder::new(estimated_size_bytes * 8),
577        }
578    }
579}
580
581impl DataBlockBuilderImpl for NullableDataBlockBuilder {
582    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
583        let nullable = data_block.as_nullable_ref().unwrap();
584        let bool_buf = BooleanBuffer::new(
585            nullable.nulls.try_clone().unwrap().into_buffer(),
586            selection.start as usize,
587            (selection.end - selection.start) as usize,
588        );
589        self.validity.append_buffer(&bool_buf);
590        self.inner.append(nullable.data.as_ref(), selection);
591    }
592
593    fn finish(mut self: Box<Self>) -> DataBlock {
594        let inner_block = self.inner.finish();
595        DataBlock::Nullable(NullableDataBlock {
596            data: Box::new(inner_block),
597            nulls: LanceBuffer::Borrowed(self.validity.finish().into_inner()),
598            block_info: BlockInfo::new(),
599        })
600    }
601}
602
603/// A data block with no regular structure.  There is no available spot to attach
604/// validity / repdef information and it cannot be converted to Arrow without being
605/// decoded
606#[derive(Debug)]
607pub struct OpaqueBlock {
608    pub buffers: Vec<LanceBuffer>,
609    pub num_values: u64,
610    pub block_info: BlockInfo,
611}
612
613impl OpaqueBlock {
614    fn borrow_and_clone(&mut self) -> Self {
615        Self {
616            buffers: self
617                .buffers
618                .iter_mut()
619                .map(|b| b.borrow_and_clone())
620                .collect(),
621            num_values: self.num_values,
622            block_info: self.block_info.clone(),
623        }
624    }
625
626    fn try_clone(&self) -> Result<Self> {
627        Ok(Self {
628            buffers: self
629                .buffers
630                .iter()
631                .map(|b| b.try_clone())
632                .collect::<Result<_>>()?,
633            num_values: self.num_values,
634            block_info: self.block_info.clone(),
635        })
636    }
637
638    pub fn data_size(&self) -> u64 {
639        self.buffers.iter().map(|b| b.len() as u64).sum()
640    }
641}
642
643/// A data block for variable-width data (e.g. strings, packed rows, etc.)
644#[derive(Debug)]
645pub struct VariableWidthBlock {
646    /// The data buffer
647    pub data: LanceBuffer,
648    /// The offsets buffer (contains num_values + 1 offsets)
649    ///
650    /// Offsets MUST start at 0
651    pub offsets: LanceBuffer,
652    /// The number of bits per offset
653    pub bits_per_offset: u8,
654    /// The number of values represented by this block
655    pub num_values: u64,
656
657    pub block_info: BlockInfo,
658}
659
660impl VariableWidthBlock {
661    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
662        let data_buffer = self.data.into_buffer();
663        let offsets_buffer = self.offsets.into_buffer();
664        let builder = ArrayDataBuilder::new(data_type)
665            .add_buffer(offsets_buffer)
666            .add_buffer(data_buffer)
667            .len(self.num_values as usize)
668            .null_count(0);
669        if validate {
670            Ok(builder.build()?)
671        } else {
672            Ok(unsafe { builder.build_unchecked() })
673        }
674    }
675
676    fn into_buffers(self) -> Vec<LanceBuffer> {
677        vec![self.offsets, self.data]
678    }
679
680    fn borrow_and_clone(&mut self) -> Self {
681        Self {
682            data: self.data.borrow_and_clone(),
683            offsets: self.offsets.borrow_and_clone(),
684            bits_per_offset: self.bits_per_offset,
685            num_values: self.num_values,
686            block_info: self.block_info.clone(),
687        }
688    }
689
690    fn try_clone(&self) -> Result<Self> {
691        Ok(Self {
692            data: self.data.try_clone()?,
693            offsets: self.offsets.try_clone()?,
694            bits_per_offset: self.bits_per_offset,
695            num_values: self.num_values,
696            block_info: self.block_info.clone(),
697        })
698    }
699
700    pub fn offsets_as_block(&mut self) -> DataBlock {
701        let offsets = self.offsets.borrow_and_clone();
702        DataBlock::FixedWidth(FixedWidthDataBlock {
703            data: offsets,
704            bits_per_value: self.bits_per_offset as u64,
705            num_values: self.num_values + 1,
706            block_info: BlockInfo::new(),
707        })
708    }
709
710    pub fn data_size(&self) -> u64 {
711        (self.data.len() + self.offsets.len()) as u64
712    }
713}
714
715/// A data block representing a struct
716#[derive(Debug)]
717pub struct StructDataBlock {
718    /// The child arrays
719    pub children: Vec<DataBlock>,
720    pub block_info: BlockInfo,
721    /// The validity bitmap for the struct (None means all valid)
722    pub validity: Option<NullBuffer>,
723}
724
725impl StructDataBlock {
726    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
727        if let DataType::Struct(fields) = &data_type {
728            let mut builder = ArrayDataBuilder::new(DataType::Struct(fields.clone()));
729            let mut num_rows = 0;
730            for (field, child) in fields.iter().zip(self.children) {
731                let child_data = child.into_arrow(field.data_type().clone(), validate)?;
732                num_rows = child_data.len();
733                builder = builder.add_child_data(child_data);
734            }
735
736            // Apply validity if present
737            let builder = if let Some(validity) = self.validity {
738                let null_count = validity.null_count();
739                builder
740                    .null_bit_buffer(Some(validity.into_inner().into_inner()))
741                    .null_count(null_count)
742            } else {
743                builder.null_count(0)
744            };
745
746            let builder = builder.len(num_rows);
747            if validate {
748                Ok(builder.build()?)
749            } else {
750                Ok(unsafe { builder.build_unchecked() })
751            }
752        } else {
753            Err(Error::Internal {
754                message: format!("Expected Struct, got {:?}", data_type),
755                location: location!(),
756            })
757        }
758    }
759
760    fn remove_outer_validity(self) -> Self {
761        Self {
762            children: self
763                .children
764                .into_iter()
765                .map(|c| c.remove_outer_validity())
766                .collect(),
767            block_info: self.block_info,
768            validity: None, // Remove the validity
769        }
770    }
771
772    fn into_buffers(self) -> Vec<LanceBuffer> {
773        self.children
774            .into_iter()
775            .flat_map(|c| c.into_buffers())
776            .collect()
777    }
778
779    fn borrow_and_clone(&mut self) -> Self {
780        Self {
781            children: self
782                .children
783                .iter_mut()
784                .map(|c| c.borrow_and_clone())
785                .collect(),
786            block_info: self.block_info.clone(),
787            validity: self.validity.clone(),
788        }
789    }
790
791    fn try_clone(&self) -> Result<Self> {
792        Ok(Self {
793            children: self
794                .children
795                .iter()
796                .map(|c| c.try_clone())
797                .collect::<Result<_>>()?,
798            block_info: self.block_info.clone(),
799            validity: self.validity.clone(),
800        })
801    }
802
803    pub fn data_size(&self) -> u64 {
804        self.children
805            .iter()
806            .map(|data_block| data_block.data_size())
807            .sum()
808    }
809}
810
811/// A data block for dictionary encoded data
812#[derive(Debug)]
813pub struct DictionaryDataBlock {
814    /// The indices buffer
815    pub indices: FixedWidthDataBlock,
816    /// The dictionary itself
817    pub dictionary: Box<DataBlock>,
818}
819
820impl DictionaryDataBlock {
821    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
822        let (key_type, value_type) = if let DataType::Dictionary(key_type, value_type) = &data_type
823        {
824            (key_type.as_ref().clone(), value_type.as_ref().clone())
825        } else {
826            return Err(Error::Internal {
827                message: format!("Expected Dictionary, got {:?}", data_type),
828                location: location!(),
829            });
830        };
831
832        let indices = self.indices.into_arrow(key_type, validate)?;
833        let dictionary = self.dictionary.into_arrow(value_type, validate)?;
834
835        let builder = indices
836            .into_builder()
837            .add_child_data(dictionary)
838            .data_type(data_type);
839
840        if validate {
841            Ok(builder.build()?)
842        } else {
843            Ok(unsafe { builder.build_unchecked() })
844        }
845    }
846
847    fn into_buffers(self) -> Vec<LanceBuffer> {
848        let mut buffers = self.indices.into_buffers();
849        buffers.extend(self.dictionary.into_buffers());
850        buffers
851    }
852
853    fn borrow_and_clone(&mut self) -> Self {
854        Self {
855            indices: self.indices.borrow_and_clone(),
856            dictionary: Box::new(self.dictionary.borrow_and_clone()),
857        }
858    }
859
860    fn try_clone(&self) -> Result<Self> {
861        Ok(Self {
862            indices: self.indices.try_clone()?,
863            dictionary: Box::new(self.dictionary.try_clone()?),
864        })
865    }
866}
867
868/// A DataBlock is a collection of buffers that represents an "array" of data in very generic terms
869///
870/// The output of each decoder is a DataBlock.  Decoders can be chained together to transform
871/// one DataBlock into a different kind of DataBlock.
872///
873/// The DataBlock is somewhere in between Arrow's ArrayData and Array and represents a physical
874/// layout of the data.
875///
876/// A DataBlock can be converted into an Arrow ArrayData (and then Array) for a given array type.
877/// For example, a FixedWidthDataBlock can be converted into any primitive type or a fixed size
878/// list of a primitive type.  This is a zero-copy operation.
879///
880/// In addition, a DataBlock can be created from an Arrow array or arrays.  This is not a zero-copy
881/// operation as some normalization may be required.
882#[derive(Debug)]
883pub enum DataBlock {
884    Empty(),
885    Constant(ConstantDataBlock),
886    AllNull(AllNullDataBlock),
887    Nullable(NullableDataBlock),
888    FixedWidth(FixedWidthDataBlock),
889    FixedSizeList(FixedSizeListBlock),
890    VariableWidth(VariableWidthBlock),
891    Opaque(OpaqueBlock),
892    Struct(StructDataBlock),
893    Dictionary(DictionaryDataBlock),
894}
895
896impl DataBlock {
897    /// Convert self into an Arrow ArrayData
898    pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
899        match self {
900            Self::Empty() => Ok(new_empty_array(&data_type).to_data()),
901            Self::Constant(inner) => inner.into_arrow(data_type, validate),
902            Self::AllNull(inner) => inner.into_arrow(data_type, validate),
903            Self::Nullable(inner) => inner.into_arrow(data_type, validate),
904            Self::FixedWidth(inner) => inner.into_arrow(data_type, validate),
905            Self::FixedSizeList(inner) => inner.into_arrow(data_type, validate),
906            Self::VariableWidth(inner) => inner.into_arrow(data_type, validate),
907            Self::Struct(inner) => inner.into_arrow(data_type, validate),
908            Self::Dictionary(inner) => inner.into_arrow(data_type, validate),
909            Self::Opaque(_) => Err(Error::Internal {
910                message: "Cannot convert OpaqueBlock to Arrow".to_string(),
911                location: location!(),
912            }),
913        }
914    }
915
916    /// Convert the data block into a collection of buffers for serialization
917    ///
918    /// The order matters and will be used to reconstruct the data block at read time.
919    pub fn into_buffers(self) -> Vec<LanceBuffer> {
920        match self {
921            Self::Empty() => Vec::default(),
922            Self::Constant(inner) => inner.into_buffers(),
923            Self::AllNull(inner) => inner.into_buffers(),
924            Self::Nullable(inner) => inner.into_buffers(),
925            Self::FixedWidth(inner) => inner.into_buffers(),
926            Self::FixedSizeList(inner) => inner.into_buffers(),
927            Self::VariableWidth(inner) => inner.into_buffers(),
928            Self::Struct(inner) => inner.into_buffers(),
929            Self::Dictionary(inner) => inner.into_buffers(),
930            Self::Opaque(inner) => inner.buffers,
931        }
932    }
933
934    /// Converts the data buffers into borrowed mode and clones the block
935    ///
936    /// This is a zero-copy operation but requires a mutable reference to self and, afterwards,
937    /// all buffers will be in Borrowed mode.
938    pub fn borrow_and_clone(&mut self) -> Self {
939        match self {
940            Self::Empty() => Self::Empty(),
941            Self::Constant(inner) => Self::Constant(inner.borrow_and_clone()),
942            Self::AllNull(inner) => Self::AllNull(inner.borrow_and_clone()),
943            Self::Nullable(inner) => Self::Nullable(inner.borrow_and_clone()),
944            Self::FixedWidth(inner) => Self::FixedWidth(inner.borrow_and_clone()),
945            Self::FixedSizeList(inner) => Self::FixedSizeList(inner.borrow_and_clone()),
946            Self::VariableWidth(inner) => Self::VariableWidth(inner.borrow_and_clone()),
947            Self::Struct(inner) => Self::Struct(inner.borrow_and_clone()),
948            Self::Dictionary(inner) => Self::Dictionary(inner.borrow_and_clone()),
949            Self::Opaque(inner) => Self::Opaque(inner.borrow_and_clone()),
950        }
951    }
952
953    /// Try and clone the block
954    ///
955    /// This will fail if any buffers are in owned mode.  You can call borrow_and_clone() to
956    /// ensure that all buffers are in borrowed mode before calling this method.
957    pub fn try_clone(&self) -> Result<Self> {
958        match self {
959            Self::Empty() => Ok(Self::Empty()),
960            Self::Constant(inner) => Ok(Self::Constant(inner.try_clone()?)),
961            Self::AllNull(inner) => Ok(Self::AllNull(inner.try_clone()?)),
962            Self::Nullable(inner) => Ok(Self::Nullable(inner.try_clone()?)),
963            Self::FixedWidth(inner) => Ok(Self::FixedWidth(inner.try_clone()?)),
964            Self::FixedSizeList(inner) => Ok(Self::FixedSizeList(inner.try_clone()?)),
965            Self::VariableWidth(inner) => Ok(Self::VariableWidth(inner.try_clone()?)),
966            Self::Struct(inner) => Ok(Self::Struct(inner.try_clone()?)),
967            Self::Dictionary(inner) => Ok(Self::Dictionary(inner.try_clone()?)),
968            Self::Opaque(inner) => Ok(Self::Opaque(inner.try_clone()?)),
969        }
970    }
971
972    pub fn name(&self) -> &'static str {
973        match self {
974            Self::Constant(_) => "Constant",
975            Self::Empty() => "Empty",
976            Self::AllNull(_) => "AllNull",
977            Self::Nullable(_) => "Nullable",
978            Self::FixedWidth(_) => "FixedWidth",
979            Self::FixedSizeList(_) => "FixedSizeList",
980            Self::VariableWidth(_) => "VariableWidth",
981            Self::Struct(_) => "Struct",
982            Self::Dictionary(_) => "Dictionary",
983            Self::Opaque(_) => "Opaque",
984        }
985    }
986
987    pub fn is_variable(&self) -> bool {
988        match self {
989            Self::Constant(_) => false,
990            Self::Empty() => false,
991            Self::AllNull(_) => false,
992            Self::Nullable(nullable) => nullable.data.is_variable(),
993            Self::FixedWidth(_) => false,
994            Self::FixedSizeList(fsl) => fsl.child.is_variable(),
995            Self::VariableWidth(_) => true,
996            Self::Struct(strct) => strct.children.iter().any(|c| c.is_variable()),
997            Self::Dictionary(_) => {
998                todo!("is_variable for DictionaryDataBlock is not implemented yet")
999            }
1000            Self::Opaque(_) => panic!("Does not make sense to ask if an Opaque block is variable"),
1001        }
1002    }
1003
1004    pub fn is_nullable(&self) -> bool {
1005        match self {
1006            Self::AllNull(_) => true,
1007            Self::Nullable(_) => true,
1008            Self::FixedSizeList(fsl) => fsl.child.is_nullable(),
1009            Self::Struct(strct) => strct.children.iter().any(|c| c.is_nullable()),
1010            Self::Dictionary(_) => {
1011                todo!("is_nullable for DictionaryDataBlock is not implemented yet")
1012            }
1013            Self::Opaque(_) => panic!("Does not make sense to ask if an Opaque block is nullable"),
1014            _ => false,
1015        }
1016    }
1017
1018    /// The number of values in the block
1019    ///
1020    /// This function does not recurse into child blocks.  If this is a FSL then it will
1021    /// be the number of lists and not the number of items.
1022    pub fn num_values(&self) -> u64 {
1023        match self {
1024            Self::Empty() => 0,
1025            Self::Constant(inner) => inner.num_values,
1026            Self::AllNull(inner) => inner.num_values,
1027            Self::Nullable(inner) => inner.data.num_values(),
1028            Self::FixedWidth(inner) => inner.num_values,
1029            Self::FixedSizeList(inner) => inner.num_values(),
1030            Self::VariableWidth(inner) => inner.num_values,
1031            Self::Struct(inner) => inner.children[0].num_values(),
1032            Self::Dictionary(inner) => inner.indices.num_values,
1033            Self::Opaque(inner) => inner.num_values,
1034        }
1035    }
1036
1037    /// The number of items in a single row
1038    ///
1039    /// This is always 1 unless there are layers of FSL
1040    pub fn items_per_row(&self) -> u64 {
1041        match self {
1042            Self::Empty() => todo!(),     // Leave undefined until needed
1043            Self::Constant(_) => todo!(), // Leave undefined until needed
1044            Self::AllNull(_) => todo!(),  // Leave undefined until needed
1045            Self::Nullable(nullable) => nullable.data.items_per_row(),
1046            Self::FixedWidth(_) => 1,
1047            Self::FixedSizeList(fsl) => fsl.dimension * fsl.child.items_per_row(),
1048            Self::VariableWidth(_) => 1,
1049            Self::Struct(_) => todo!(), // Leave undefined until needed
1050            Self::Dictionary(_) => 1,
1051            Self::Opaque(_) => 1,
1052        }
1053    }
1054
1055    /// The number of bytes in the data block (including any child blocks)
1056    pub fn data_size(&self) -> u64 {
1057        match self {
1058            Self::Empty() => 0,
1059            Self::Constant(inner) => inner.data_size(),
1060            Self::AllNull(_) => 0,
1061            Self::Nullable(inner) => inner.data_size(),
1062            Self::FixedWidth(inner) => inner.data_size(),
1063            Self::FixedSizeList(inner) => inner.data_size(),
1064            Self::VariableWidth(inner) => inner.data_size(),
1065            Self::Struct(_) => {
1066                todo!("the data_size method for StructDataBlock is not implemented yet")
1067            }
1068            Self::Dictionary(_) => {
1069                todo!("the data_size method for DictionaryDataBlock is not implemented yet")
1070            }
1071            Self::Opaque(inner) => inner.data_size(),
1072        }
1073    }
1074
1075    /// Removes any validity information from the block
1076    ///
1077    /// This does not filter the block (e.g. remove rows).  It only removes
1078    /// the validity bitmaps (if present).  Any garbage masked by null bits
1079    /// will now appear as proper values.
1080    ///
1081    /// If `recurse` is true, then this will also remove validity from any child blocks.
1082    pub fn remove_outer_validity(self) -> Self {
1083        match self {
1084            Self::AllNull(_) => panic!("Cannot remove validity on all-null data"),
1085            Self::Nullable(inner) => *inner.data,
1086            Self::Struct(inner) => Self::Struct(inner.remove_outer_validity()),
1087            other => other,
1088        }
1089    }
1090
1091    pub fn make_builder(&self, estimated_size_bytes: u64) -> Box<dyn DataBlockBuilderImpl> {
1092        match self {
1093            Self::FixedWidth(inner) => {
1094                if inner.bits_per_value == 1 {
1095                    Box::new(BitmapDataBlockBuilder::new(estimated_size_bytes))
1096                } else {
1097                    Box::new(FixedWidthDataBlockBuilder::new(
1098                        inner.bits_per_value,
1099                        estimated_size_bytes,
1100                    ))
1101                }
1102            }
1103            Self::VariableWidth(inner) => {
1104                if inner.bits_per_offset == 32 {
1105                    Box::new(VariableWidthDataBlockBuilder::<i32>::new(
1106                        estimated_size_bytes,
1107                    ))
1108                } else if inner.bits_per_offset == 64 {
1109                    Box::new(VariableWidthDataBlockBuilder::<i64>::new(
1110                        estimated_size_bytes,
1111                    ))
1112                } else {
1113                    todo!()
1114                }
1115            }
1116            Self::FixedSizeList(inner) => {
1117                let inner_builder = inner.child.make_builder(estimated_size_bytes);
1118                Box::new(FixedSizeListBlockBuilder::new(
1119                    inner_builder,
1120                    inner.dimension,
1121                ))
1122            }
1123            Self::Nullable(nullable) => {
1124                // There's no easy way to know what percentage of the data is in the valiidty buffer
1125                // but 1/16th seems like a reasonable guess.
1126                let estimated_validity_size_bytes = estimated_size_bytes / 16;
1127                let inner_builder = nullable
1128                    .data
1129                    .make_builder(estimated_size_bytes - estimated_validity_size_bytes);
1130                Box::new(NullableDataBlockBuilder::new(
1131                    inner_builder,
1132                    estimated_validity_size_bytes as usize,
1133                ))
1134            }
1135            Self::Struct(struct_data_block) => {
1136                let mut bits_per_values = vec![];
1137                for child in struct_data_block.children.iter() {
1138                    let child = child.as_fixed_width_ref().
1139                        expect("Currently StructDataBlockBuilder is only used in packed-struct encoding, and currently in packed-struct encoding, only fixed-width fields are supported.");
1140                    bits_per_values.push(child.bits_per_value as u32);
1141                }
1142                Box::new(StructDataBlockBuilder::new(
1143                    bits_per_values,
1144                    estimated_size_bytes,
1145                ))
1146            }
1147            _ => todo!("make_builder for {:?}", self),
1148        }
1149    }
1150}
1151
1152macro_rules! as_type {
1153    ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1154        pub fn $fn_name(self) -> Option<$inner_type> {
1155            match self {
1156                Self::$inner(inner) => Some(inner),
1157                _ => None,
1158            }
1159        }
1160    };
1161}
1162
1163macro_rules! as_type_ref {
1164    ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1165        pub fn $fn_name(&self) -> Option<&$inner_type> {
1166            match self {
1167                Self::$inner(inner) => Some(inner),
1168                _ => None,
1169            }
1170        }
1171    };
1172}
1173
1174macro_rules! as_type_ref_mut {
1175    ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1176        pub fn $fn_name(&mut self) -> Option<&mut $inner_type> {
1177            match self {
1178                Self::$inner(inner) => Some(inner),
1179                _ => None,
1180            }
1181        }
1182    };
1183}
1184
1185// Cast implementations
1186impl DataBlock {
1187    as_type!(as_all_null, AllNull, AllNullDataBlock);
1188    as_type!(as_nullable, Nullable, NullableDataBlock);
1189    as_type!(as_fixed_width, FixedWidth, FixedWidthDataBlock);
1190    as_type!(as_fixed_size_list, FixedSizeList, FixedSizeListBlock);
1191    as_type!(as_variable_width, VariableWidth, VariableWidthBlock);
1192    as_type!(as_struct, Struct, StructDataBlock);
1193    as_type!(as_dictionary, Dictionary, DictionaryDataBlock);
1194    as_type_ref!(as_all_null_ref, AllNull, AllNullDataBlock);
1195    as_type_ref!(as_nullable_ref, Nullable, NullableDataBlock);
1196    as_type_ref!(as_fixed_width_ref, FixedWidth, FixedWidthDataBlock);
1197    as_type_ref!(as_fixed_size_list_ref, FixedSizeList, FixedSizeListBlock);
1198    as_type_ref!(as_variable_width_ref, VariableWidth, VariableWidthBlock);
1199    as_type_ref!(as_struct_ref, Struct, StructDataBlock);
1200    as_type_ref!(as_dictionary_ref, Dictionary, DictionaryDataBlock);
1201    as_type_ref_mut!(as_all_null_ref_mut, AllNull, AllNullDataBlock);
1202    as_type_ref_mut!(as_nullable_ref_mut, Nullable, NullableDataBlock);
1203    as_type_ref_mut!(as_fixed_width_ref_mut, FixedWidth, FixedWidthDataBlock);
1204    as_type_ref_mut!(
1205        as_fixed_size_list_ref_mut,
1206        FixedSizeList,
1207        FixedSizeListBlock
1208    );
1209    as_type_ref_mut!(as_variable_width_ref_mut, VariableWidth, VariableWidthBlock);
1210    as_type_ref_mut!(as_struct_ref_mut, Struct, StructDataBlock);
1211    as_type_ref_mut!(as_dictionary_ref_mut, Dictionary, DictionaryDataBlock);
1212}
1213
1214// Methods to convert from Arrow -> DataBlock
1215
1216fn get_byte_range<T: ArrowNativeType>(offsets: &mut LanceBuffer) -> Range<usize> {
1217    let offsets = offsets.borrow_to_typed_slice::<T>();
1218    if offsets.as_ref().is_empty() {
1219        0..0
1220    } else {
1221        offsets.as_ref().first().unwrap().as_usize()..offsets.as_ref().last().unwrap().as_usize()
1222    }
1223}
1224
1225// Given multiple offsets arrays [0, 5, 10], [0, 3, 7], etc. stitch
1226// them together to get [0, 5, 10, 13, 20, ...]
1227//
1228// Also returns the data range referenced by each offset array (may
1229// not be 0..len if there is slicing involved)
1230fn stitch_offsets<T: ArrowNativeType + std::ops::Add<Output = T> + std::ops::Sub<Output = T>>(
1231    offsets: Vec<LanceBuffer>,
1232) -> (LanceBuffer, Vec<Range<usize>>) {
1233    if offsets.is_empty() {
1234        return (LanceBuffer::empty(), Vec::default());
1235    }
1236    let len = offsets.iter().map(|b| b.len()).sum::<usize>();
1237    // Note: we are making a copy here, even if there is only one input, because we want to
1238    // normalize that input if it doesn't start with zero.  This could be micro-optimized out
1239    // if needed.
1240    let mut dest = Vec::with_capacity(len);
1241    let mut byte_ranges = Vec::with_capacity(offsets.len());
1242
1243    // We insert one leading 0 before processing any of the inputs
1244    dest.push(T::from_usize(0).unwrap());
1245
1246    for mut o in offsets.into_iter() {
1247        if !o.is_empty() {
1248            let last_offset = *dest.last().unwrap();
1249            let o = o.borrow_to_typed_slice::<T>();
1250            let start = *o.as_ref().first().unwrap();
1251            // First, we skip the first offset
1252            // Then, we subtract that first offset from each remaining offset
1253            //
1254            // This gives us a 0-based offset array (minus the leading 0)
1255            //
1256            // Then we add the last offset from the previous array to each offset
1257            // which shifts our offset array to the correct position
1258            //
1259            // For example, let's assume the last offset from the previous array
1260            // was 10 and we are given [13, 17, 22].  This means we have two values with
1261            // length 4 (17 - 13) and 5 (22 - 17).  The output from this step will be
1262            // [14, 19].  Combined with our last offset of 10, this gives us [10, 14, 19]
1263            // which is our same two values of length 4 and 5.
1264            dest.extend(o.as_ref()[1..].iter().map(|&x| x + last_offset - start));
1265        }
1266        byte_ranges.push(get_byte_range::<T>(&mut o));
1267    }
1268    (LanceBuffer::reinterpret_vec(dest), byte_ranges)
1269}
1270
1271fn arrow_binary_to_data_block(
1272    arrays: &[ArrayRef],
1273    num_values: u64,
1274    bits_per_offset: u8,
1275) -> DataBlock {
1276    let data_vec = arrays.iter().map(|arr| arr.to_data()).collect::<Vec<_>>();
1277    let bytes_per_offset = bits_per_offset as usize / 8;
1278    let offsets = data_vec
1279        .iter()
1280        .map(|d| {
1281            LanceBuffer::Borrowed(
1282                d.buffers()[0].slice_with_length(d.offset(), (d.len() + 1) * bytes_per_offset),
1283            )
1284        })
1285        .collect::<Vec<_>>();
1286    let (offsets, data_ranges) = if bits_per_offset == 32 {
1287        stitch_offsets::<i32>(offsets)
1288    } else {
1289        stitch_offsets::<i64>(offsets)
1290    };
1291    let data = data_vec
1292        .iter()
1293        .zip(data_ranges)
1294        .map(|(d, byte_range)| {
1295            LanceBuffer::Borrowed(
1296                d.buffers()[1]
1297                    .slice_with_length(byte_range.start, byte_range.end - byte_range.start),
1298            )
1299        })
1300        .collect::<Vec<_>>();
1301    let data = LanceBuffer::concat_into_one(data);
1302    DataBlock::VariableWidth(VariableWidthBlock {
1303        data,
1304        offsets,
1305        bits_per_offset,
1306        num_values,
1307        block_info: BlockInfo::new(),
1308    })
1309}
1310
1311fn encode_flat_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1312    let bytes_per_value = arrays[0].data_type().byte_width();
1313    let mut buffer = Vec::with_capacity(num_values as usize * bytes_per_value);
1314    for arr in arrays {
1315        let data = arr.to_data();
1316        buffer.extend_from_slice(data.buffers()[0].as_slice());
1317    }
1318    LanceBuffer::Owned(buffer)
1319}
1320
1321fn do_encode_bitmap_data(bitmaps: &[BooleanBuffer], num_values: u64) -> LanceBuffer {
1322    let mut builder = BooleanBufferBuilder::new(num_values as usize);
1323
1324    for buf in bitmaps {
1325        builder.append_buffer(buf);
1326    }
1327
1328    let buffer = builder.finish().into_inner();
1329    LanceBuffer::Borrowed(buffer)
1330}
1331
1332fn encode_bitmap_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1333    let bitmaps = arrays
1334        .iter()
1335        .map(|arr| arr.as_boolean().values().clone())
1336        .collect::<Vec<_>>();
1337    do_encode_bitmap_data(&bitmaps, num_values)
1338}
1339
1340// Concatenate dictionary arrays.  This is a bit tricky because we might overflow the
1341// index type.  If we do, we need to upscale the indices to a larger type.
1342fn concat_dict_arrays(arrays: &[ArrayRef]) -> ArrayRef {
1343    let value_type = arrays[0].as_any_dictionary().values().data_type();
1344    let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1345    match arrow_select::concat::concat(&array_refs) {
1346        Ok(array) => array,
1347        Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1348            // Slow, but hopefully a corner case.  Optimize later
1349            let upscaled = array_refs
1350                .iter()
1351                .map(|arr| {
1352                    match arrow_cast::cast(
1353                        *arr,
1354                        &DataType::Dictionary(
1355                            Box::new(DataType::UInt32),
1356                            Box::new(value_type.clone()),
1357                        ),
1358                    ) {
1359                        Ok(arr) => arr,
1360                        Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1361                            // Technically I think this means the input type was u64 already
1362                            unimplemented!("Dictionary arrays with more than 2^32 unique values")
1363                        }
1364                        err => err.unwrap(),
1365                    }
1366                })
1367                .collect::<Vec<_>>();
1368            let array_refs = upscaled.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1369            // Can still fail if concat pushes over u32 boundary
1370            match arrow_select::concat::concat(&array_refs) {
1371                Ok(array) => array,
1372                Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1373                    unimplemented!("Dictionary arrays with more than 2^32 unique values")
1374                }
1375                err => err.unwrap(),
1376            }
1377        }
1378        // Shouldn't be any other possible errors in concat
1379        err => err.unwrap(),
1380    }
1381}
1382
1383fn max_index_val(index_type: &DataType) -> u64 {
1384    match index_type {
1385        DataType::Int8 => i8::MAX as u64,
1386        DataType::Int16 => i16::MAX as u64,
1387        DataType::Int32 => i32::MAX as u64,
1388        DataType::Int64 => i64::MAX as u64,
1389        DataType::UInt8 => u8::MAX as u64,
1390        DataType::UInt16 => u16::MAX as u64,
1391        DataType::UInt32 => u32::MAX as u64,
1392        DataType::UInt64 => u64::MAX,
1393        _ => panic!("Invalid dictionary index type"),
1394    }
1395}
1396
1397// If we get multiple dictionary arrays and they don't all have the same dictionary
1398// then we need to normalize the indices.  Otherwise we might have something like:
1399//
1400// First chunk ["hello", "foo"], [0, 0, 1, 1, 1]
1401// Second chunk ["bar", "world"], [0, 1, 0, 1, 1]
1402//
1403// If we simply encode as ["hello", "foo", "bar", "world"], [0, 0, 1, 1, 1, 0, 1, 0, 1, 1]
1404// then we will get the wrong answer because the dictionaries were not merged and the indices
1405// were not remapped.
1406//
1407// A simple way to do this today is to just concatenate all the arrays.  This is because
1408// arrow's dictionary concatenation function already has the logic to merge dictionaries.
1409//
1410// TODO: We could be more efficient here by checking if the dictionaries are the same
1411//       Also, if they aren't, we can possibly do something cheaper than concatenating
1412//
1413// In addition, we want to normalize the representation of nulls.  The cheapest thing to
1414// do (space-wise) is to put the nulls in the dictionary.
1415fn arrow_dictionary_to_data_block(arrays: &[ArrayRef], validity: Option<NullBuffer>) -> DataBlock {
1416    let array = concat_dict_arrays(arrays);
1417    let array_dict = array.as_any_dictionary();
1418    let mut indices = array_dict.keys();
1419    let num_values = indices.len() as u64;
1420    let mut values = array_dict.values().clone();
1421    // Placeholder, if we need to upcast, we will initialize this and set `indices` to refer to it
1422    let mut upcast = None;
1423
1424    // TODO: Should we just always normalize indices to u32?  That would make logic simpler
1425    // and we're going to bitpack them soon anyways
1426
1427    let indices_block = if let Some(validity) = validity {
1428        // If there is validity then we find the first invalid index in the dictionary values, inserting
1429        // a new value if we need to.  Then we change all indices to point to that value.  This way we
1430        // never need to store nullability of the indices.
1431        let mut first_invalid_index = None;
1432        if let Some(values_validity) = values.nulls() {
1433            first_invalid_index = (!values_validity.inner()).set_indices().next();
1434        }
1435        let first_invalid_index = first_invalid_index.unwrap_or_else(|| {
1436            let null_arr = new_null_array(values.data_type(), 1);
1437            values = arrow_select::concat::concat(&[values.as_ref(), null_arr.as_ref()]).unwrap();
1438            let null_index = values.len() - 1;
1439            let max_index_val = max_index_val(indices.data_type());
1440            if null_index as u64 > max_index_val {
1441                // Widen the index type
1442                if max_index_val >= u32::MAX as u64 {
1443                    unimplemented!("Dictionary arrays with 2^32 unique value (or more) and a null")
1444                }
1445                upcast = Some(arrow_cast::cast(indices, &DataType::UInt32).unwrap());
1446                indices = upcast.as_ref().unwrap();
1447            }
1448            null_index
1449        });
1450        // This can't fail since we already checked for fit
1451        let null_index_arr = arrow_cast::cast(
1452            &UInt64Array::from(vec![first_invalid_index as u64]),
1453            indices.data_type(),
1454        )
1455        .unwrap();
1456
1457        let bytes_per_index = indices.data_type().byte_width();
1458        let bits_per_index = bytes_per_index as u64 * 8;
1459
1460        let null_index_arr = null_index_arr.into_data();
1461        let null_index_bytes = &null_index_arr.buffers()[0];
1462        // Need to make a copy here since indices isn't mutable, could be avoided in theory
1463        let mut indices_bytes = indices.to_data().buffers()[0].to_vec();
1464        for invalid_idx in (!validity.inner()).set_indices() {
1465            indices_bytes[invalid_idx * bytes_per_index..(invalid_idx + 1) * bytes_per_index]
1466                .copy_from_slice(null_index_bytes.as_slice());
1467        }
1468        FixedWidthDataBlock {
1469            data: LanceBuffer::Owned(indices_bytes),
1470            bits_per_value: bits_per_index,
1471            num_values,
1472            block_info: BlockInfo::new(),
1473        }
1474    } else {
1475        FixedWidthDataBlock {
1476            data: LanceBuffer::Borrowed(indices.to_data().buffers()[0].clone()),
1477            bits_per_value: indices.data_type().byte_width() as u64 * 8,
1478            num_values,
1479            block_info: BlockInfo::new(),
1480        }
1481    };
1482
1483    let items = DataBlock::from(values);
1484    DataBlock::Dictionary(DictionaryDataBlock {
1485        indices: indices_block,
1486        dictionary: Box::new(items),
1487    })
1488}
1489
1490enum Nullability {
1491    None,
1492    All,
1493    Some(NullBuffer),
1494}
1495
1496impl Nullability {
1497    fn to_option(&self) -> Option<NullBuffer> {
1498        match self {
1499            Self::Some(nulls) => Some(nulls.clone()),
1500            _ => None,
1501        }
1502    }
1503}
1504
1505fn extract_nulls(arrays: &[ArrayRef], num_values: u64) -> Nullability {
1506    let mut has_nulls = false;
1507    let nulls_and_lens = arrays
1508        .iter()
1509        .map(|arr| {
1510            let nulls = arr.logical_nulls();
1511            has_nulls |= nulls.is_some();
1512            (nulls, arr.len())
1513        })
1514        .collect::<Vec<_>>();
1515    if !has_nulls {
1516        return Nullability::None;
1517    }
1518    let mut builder = BooleanBufferBuilder::new(num_values as usize);
1519    let mut num_nulls = 0;
1520    for (null, len) in nulls_and_lens {
1521        if let Some(null) = null {
1522            num_nulls += null.null_count();
1523            builder.append_buffer(&null.into_inner());
1524        } else {
1525            builder.append_n(len, true);
1526        }
1527    }
1528    if num_nulls == num_values as usize {
1529        Nullability::All
1530    } else {
1531        Nullability::Some(NullBuffer::new(builder.finish()))
1532    }
1533}
1534
1535impl DataBlock {
1536    pub fn from_arrays(arrays: &[ArrayRef], num_values: u64) -> Self {
1537        if arrays.is_empty() || num_values == 0 {
1538            return Self::AllNull(AllNullDataBlock { num_values: 0 });
1539        }
1540
1541        let data_type = arrays[0].data_type();
1542        let nulls = extract_nulls(arrays, num_values);
1543
1544        if let Nullability::All = nulls {
1545            return Self::AllNull(AllNullDataBlock { num_values });
1546        }
1547
1548        let mut encoded = match data_type {
1549            DataType::Binary | DataType::Utf8 => arrow_binary_to_data_block(arrays, num_values, 32),
1550            DataType::BinaryView | DataType::Utf8View => {
1551                todo!()
1552            }
1553            DataType::LargeBinary | DataType::LargeUtf8 => {
1554                arrow_binary_to_data_block(arrays, num_values, 64)
1555            }
1556            DataType::Boolean => {
1557                let data = encode_bitmap_data(arrays, num_values);
1558                Self::FixedWidth(FixedWidthDataBlock {
1559                    data,
1560                    bits_per_value: 1,
1561                    num_values,
1562                    block_info: BlockInfo::new(),
1563                })
1564            }
1565            DataType::Date32
1566            | DataType::Date64
1567            | DataType::Decimal128(_, _)
1568            | DataType::Decimal256(_, _)
1569            | DataType::Duration(_)
1570            | DataType::FixedSizeBinary(_)
1571            | DataType::Float16
1572            | DataType::Float32
1573            | DataType::Float64
1574            | DataType::Int16
1575            | DataType::Int32
1576            | DataType::Int64
1577            | DataType::Int8
1578            | DataType::Interval(_)
1579            | DataType::Time32(_)
1580            | DataType::Time64(_)
1581            | DataType::Timestamp(_, _)
1582            | DataType::UInt16
1583            | DataType::UInt32
1584            | DataType::UInt64
1585            | DataType::UInt8 => {
1586                let data = encode_flat_data(arrays, num_values);
1587                Self::FixedWidth(FixedWidthDataBlock {
1588                    data,
1589                    bits_per_value: data_type.byte_width() as u64 * 8,
1590                    num_values,
1591                    block_info: BlockInfo::new(),
1592                })
1593            }
1594            DataType::Null => Self::AllNull(AllNullDataBlock { num_values }),
1595            DataType::Dictionary(_, _) => arrow_dictionary_to_data_block(arrays, nulls.to_option()),
1596            DataType::Struct(fields) => {
1597                let structs = arrays.iter().map(|arr| arr.as_struct()).collect::<Vec<_>>();
1598                let mut children = Vec::with_capacity(fields.len());
1599                for child_idx in 0..fields.len() {
1600                    let child_vec = structs
1601                        .iter()
1602                        .map(|s| s.column(child_idx).clone())
1603                        .collect::<Vec<_>>();
1604                    children.push(Self::from_arrays(&child_vec, num_values));
1605                }
1606
1607                // Extract validity for the struct array
1608                let validity = match &nulls {
1609                    Nullability::None => None,
1610                    Nullability::Some(null_buffer) => Some(null_buffer.clone()),
1611                    Nullability::All => unreachable!("Should have returned AllNull earlier"),
1612                };
1613
1614                Self::Struct(StructDataBlock {
1615                    children,
1616                    block_info: BlockInfo::default(),
1617                    validity,
1618                })
1619            }
1620            DataType::FixedSizeList(_, dim) => {
1621                let children = arrays
1622                    .iter()
1623                    .map(|arr| arr.as_fixed_size_list().values().clone())
1624                    .collect::<Vec<_>>();
1625                let child_block = Self::from_arrays(&children, num_values * *dim as u64);
1626                Self::FixedSizeList(FixedSizeListBlock {
1627                    child: Box::new(child_block),
1628                    dimension: *dim as u64,
1629                })
1630            }
1631            DataType::LargeList(_)
1632            | DataType::List(_)
1633            | DataType::ListView(_)
1634            | DataType::LargeListView(_)
1635            | DataType::Map(_, _)
1636            | DataType::RunEndEncoded(_, _)
1637            | DataType::Union(_, _) => {
1638                panic!(
1639                    "Field with data type {} cannot be converted to data block",
1640                    data_type
1641                )
1642            }
1643        };
1644
1645        // compute statistics
1646        encoded.compute_stat();
1647
1648        if !matches!(data_type, DataType::Dictionary(_, _)) {
1649            match nulls {
1650                Nullability::None => encoded,
1651                Nullability::Some(nulls) => Self::Nullable(NullableDataBlock {
1652                    data: Box::new(encoded),
1653                    nulls: LanceBuffer::Borrowed(nulls.into_inner().into_inner()),
1654                    block_info: BlockInfo::new(),
1655                }),
1656                _ => unreachable!(),
1657            }
1658        } else {
1659            // Dictionaries already insert the nulls into the dictionary items
1660            encoded
1661        }
1662    }
1663
1664    pub fn from_array<T: Array + 'static>(array: T) -> Self {
1665        let num_values = array.len();
1666        Self::from_arrays(&[Arc::new(array)], num_values as u64)
1667    }
1668}
1669
1670impl From<ArrayRef> for DataBlock {
1671    fn from(array: ArrayRef) -> Self {
1672        let num_values = array.len() as u64;
1673        Self::from_arrays(&[array], num_values)
1674    }
1675}
1676
1677pub trait DataBlockBuilderImpl: std::fmt::Debug {
1678    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>);
1679    fn finish(self: Box<Self>) -> DataBlock;
1680}
1681
1682#[derive(Debug)]
1683pub struct DataBlockBuilder {
1684    estimated_size_bytes: u64,
1685    builder: Option<Box<dyn DataBlockBuilderImpl>>,
1686}
1687
1688impl DataBlockBuilder {
1689    pub fn with_capacity_estimate(estimated_size_bytes: u64) -> Self {
1690        Self {
1691            estimated_size_bytes,
1692            builder: None,
1693        }
1694    }
1695
1696    fn get_builder(&mut self, block: &DataBlock) -> &mut dyn DataBlockBuilderImpl {
1697        if self.builder.is_none() {
1698            self.builder = Some(block.make_builder(self.estimated_size_bytes));
1699        }
1700        self.builder.as_mut().unwrap().as_mut()
1701    }
1702
1703    pub fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
1704        self.get_builder(data_block).append(data_block, selection);
1705    }
1706
1707    pub fn finish(self) -> DataBlock {
1708        let builder = self.builder.expect("DataBlockBuilder didn't see any data");
1709        builder.finish()
1710    }
1711}
1712
1713#[cfg(test)]
1714mod tests {
1715    use std::sync::Arc;
1716
1717    use arrow::datatypes::{Int32Type, Int8Type};
1718    use arrow_array::{
1719        make_array, new_null_array, ArrayRef, DictionaryArray, Int8Array, LargeBinaryArray,
1720        StringArray, UInt16Array, UInt8Array,
1721    };
1722    use arrow_buffer::{BooleanBuffer, NullBuffer};
1723
1724    use arrow_schema::{DataType, Field, Fields};
1725    use lance_datagen::{array, ArrayGeneratorExt, RowCount, DEFAULT_SEED};
1726    use rand::SeedableRng;
1727
1728    use crate::buffer::LanceBuffer;
1729
1730    use super::{AllNullDataBlock, DataBlock};
1731
1732    use arrow::compute::concat;
1733    use arrow_array::Array;
1734
1735    #[test]
1736    fn test_sliced_to_data_block() {
1737        let ints = UInt16Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
1738        let ints = ints.slice(2, 4);
1739        let data = DataBlock::from_array(ints);
1740
1741        let fixed_data = data.as_fixed_width().unwrap();
1742        assert_eq!(fixed_data.num_values, 4);
1743        assert_eq!(fixed_data.data.len(), 8);
1744
1745        let nullable_ints =
1746            UInt16Array::from(vec![Some(0), None, Some(2), None, Some(4), None, Some(6)]);
1747        let nullable_ints = nullable_ints.slice(1, 3);
1748        let data = DataBlock::from_array(nullable_ints);
1749
1750        let nullable = data.as_nullable().unwrap();
1751        assert_eq!(nullable.nulls, LanceBuffer::Owned(vec![0b00000010]));
1752    }
1753
1754    #[test]
1755    fn test_string_to_data_block() {
1756        // Converting string arrays that contain nulls to DataBlock
1757        let strings1 = StringArray::from(vec![Some("hello"), None, Some("world")]);
1758        let strings2 = StringArray::from(vec![Some("a"), Some("b")]);
1759        let strings3 = StringArray::from(vec![Option::<&'static str>::None, None]);
1760
1761        let arrays = &[strings1, strings2, strings3]
1762            .iter()
1763            .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1764            .collect::<Vec<_>>();
1765
1766        let block = DataBlock::from_arrays(arrays, 7);
1767
1768        assert_eq!(block.num_values(), 7);
1769        let block = block.as_nullable().unwrap();
1770
1771        assert_eq!(block.nulls, LanceBuffer::Owned(vec![0b00011101]));
1772
1773        let data = block.data.as_variable_width().unwrap();
1774        assert_eq!(
1775            data.offsets,
1776            LanceBuffer::reinterpret_vec(vec![0, 5, 5, 10, 11, 12, 12, 12])
1777        );
1778
1779        assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworldab"));
1780
1781        // Converting string arrays that do not contain nulls to DataBlock
1782        let strings1 = StringArray::from(vec![Some("a"), Some("bc")]);
1783        let strings2 = StringArray::from(vec![Some("def")]);
1784
1785        let arrays = &[strings1, strings2]
1786            .iter()
1787            .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1788            .collect::<Vec<_>>();
1789
1790        let block = DataBlock::from_arrays(arrays, 3);
1791
1792        assert_eq!(block.num_values(), 3);
1793        // Should be no nullable wrapper
1794        let data = block.as_variable_width().unwrap();
1795        assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(vec![0, 1, 3, 6]));
1796        assert_eq!(data.data, LanceBuffer::copy_slice(b"abcdef"));
1797    }
1798
1799    #[test]
1800    fn test_string_sliced() {
1801        let check = |arr: Vec<StringArray>, expected_off: Vec<i32>, expected_data: &[u8]| {
1802            let arrs = arr
1803                .into_iter()
1804                .map(|a| Arc::new(a) as ArrayRef)
1805                .collect::<Vec<_>>();
1806            let num_rows = arrs.iter().map(|a| a.len()).sum::<usize>() as u64;
1807            let data = DataBlock::from_arrays(&arrs, num_rows);
1808
1809            assert_eq!(data.num_values(), num_rows);
1810
1811            let data = data.as_variable_width().unwrap();
1812            assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(expected_off));
1813            assert_eq!(data.data, LanceBuffer::copy_slice(expected_data));
1814        };
1815
1816        let string = StringArray::from(vec![Some("hello"), Some("world")]);
1817        check(vec![string.slice(1, 1)], vec![0, 5], b"world");
1818        check(vec![string.slice(0, 1)], vec![0, 5], b"hello");
1819        check(
1820            vec![string.slice(0, 1), string.slice(1, 1)],
1821            vec![0, 5, 10],
1822            b"helloworld",
1823        );
1824
1825        let string2 = StringArray::from(vec![Some("foo"), Some("bar")]);
1826        check(
1827            vec![string.slice(0, 1), string2.slice(0, 1)],
1828            vec![0, 5, 8],
1829            b"hellofoo",
1830        );
1831    }
1832
1833    #[test]
1834    fn test_large() {
1835        let arr = LargeBinaryArray::from_vec(vec![b"hello", b"world"]);
1836        let data = DataBlock::from_array(arr);
1837
1838        assert_eq!(data.num_values(), 2);
1839        let data = data.as_variable_width().unwrap();
1840        assert_eq!(data.bits_per_offset, 64);
1841        assert_eq!(data.num_values, 2);
1842        assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworld"));
1843        assert_eq!(
1844            data.offsets,
1845            LanceBuffer::reinterpret_vec(vec![0_u64, 5, 10])
1846        );
1847    }
1848
1849    #[test]
1850    fn test_dictionary_indices_normalized() {
1851        let arr1 = DictionaryArray::<Int8Type>::from_iter([Some("a"), Some("a"), Some("b")]);
1852        let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("b"), Some("c")]);
1853
1854        let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1855
1856        assert_eq!(data.num_values(), 5);
1857        let data = data.as_dictionary().unwrap();
1858        let indices = data.indices;
1859        assert_eq!(indices.bits_per_value, 8);
1860        assert_eq!(indices.num_values, 5);
1861        assert_eq!(
1862            indices.data,
1863            // You might expect 0, 0, 1, 1, 2 but it seems that arrow's dictionary concat does
1864            // not actually collapse dictionaries.  This is an arrow problem however, and we don't
1865            // need to fix it here.
1866            LanceBuffer::reinterpret_vec::<i8>(vec![0, 0, 1, 2, 3])
1867        );
1868
1869        let items = data.dictionary.as_variable_width().unwrap();
1870        assert_eq!(items.bits_per_offset, 32);
1871        assert_eq!(items.num_values, 4);
1872        assert_eq!(items.data, LanceBuffer::copy_slice(b"abbc"));
1873        assert_eq!(
1874            items.offsets,
1875            LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 4],)
1876        );
1877    }
1878
1879    #[test]
1880    fn test_dictionary_nulls() {
1881        // Test both ways of encoding nulls
1882
1883        // By default, nulls get encoded into the indices
1884        let arr1 = DictionaryArray::<Int8Type>::from_iter([None, Some("a"), Some("b")]);
1885        let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("c"), None]);
1886
1887        let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1888
1889        let check_common = |data: DataBlock| {
1890            assert_eq!(data.num_values(), 5);
1891            let dict = data.as_dictionary().unwrap();
1892
1893            let nullable_items = dict.dictionary.as_nullable().unwrap();
1894            assert_eq!(nullable_items.nulls, LanceBuffer::Owned(vec![0b00000111]));
1895            assert_eq!(nullable_items.data.num_values(), 4);
1896
1897            let items = nullable_items.data.as_variable_width().unwrap();
1898            assert_eq!(items.bits_per_offset, 32);
1899            assert_eq!(items.num_values, 4);
1900            assert_eq!(items.data, LanceBuffer::copy_slice(b"abc"));
1901            assert_eq!(
1902                items.offsets,
1903                LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 3],)
1904            );
1905
1906            let indices = dict.indices;
1907            assert_eq!(indices.bits_per_value, 8);
1908            assert_eq!(indices.num_values, 5);
1909            assert_eq!(
1910                indices.data,
1911                LanceBuffer::reinterpret_vec::<i8>(vec![3, 0, 1, 2, 3])
1912            );
1913        };
1914        check_common(data);
1915
1916        // However, we can manually create a dictionary where nulls are in the dictionary
1917        let items = StringArray::from(vec![Some("a"), Some("b"), Some("c"), None]);
1918        let indices = Int8Array::from(vec![Some(3), Some(0), Some(1), Some(2), Some(3)]);
1919        let dict = DictionaryArray::new(indices, Arc::new(items));
1920
1921        let data = DataBlock::from_array(dict);
1922
1923        check_common(data);
1924    }
1925
1926    #[test]
1927    fn test_dictionary_cannot_add_null() {
1928        // 256 unique strings
1929        let items = StringArray::from(
1930            (0..256)
1931                .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1932                .collect::<Vec<_>>(),
1933        );
1934        // 257 indices, covering the whole range, plus one null
1935        let indices = UInt8Array::from(
1936            (0..=256)
1937                .map(|i| if i == 256 { None } else { Some(i as u8) })
1938                .collect::<Vec<_>>(),
1939        );
1940        // We want to normalize this by pushing nulls into the dictionary, but we cannot because
1941        // the dictionary is too large for the index type
1942        let dict = DictionaryArray::new(indices, Arc::new(items));
1943        let data = DataBlock::from_array(dict);
1944
1945        assert_eq!(data.num_values(), 257);
1946
1947        let dict = data.as_dictionary().unwrap();
1948
1949        assert_eq!(dict.indices.bits_per_value, 32);
1950        assert_eq!(
1951            dict.indices.data,
1952            LanceBuffer::reinterpret_vec((0_u32..257).collect::<Vec<_>>())
1953        );
1954
1955        let nullable_items = dict.dictionary.as_nullable().unwrap();
1956        let null_buffer = NullBuffer::new(BooleanBuffer::new(
1957            nullable_items.nulls.into_buffer(),
1958            0,
1959            257,
1960        ));
1961        for i in 0..256 {
1962            assert!(!null_buffer.is_null(i));
1963        }
1964        assert!(null_buffer.is_null(256));
1965
1966        assert_eq!(
1967            nullable_items.data.as_variable_width().unwrap().data.len(),
1968            32640
1969        );
1970    }
1971
1972    #[test]
1973    fn test_all_null() {
1974        for data_type in [
1975            DataType::UInt32,
1976            DataType::FixedSizeBinary(2),
1977            DataType::List(Arc::new(Field::new("item", DataType::UInt32, true))),
1978            DataType::Struct(Fields::from(vec![Field::new("a", DataType::UInt32, true)])),
1979        ] {
1980            let block = DataBlock::AllNull(AllNullDataBlock { num_values: 10 });
1981            let arr = block.into_arrow(data_type.clone(), true).unwrap();
1982            let arr = make_array(arr);
1983            let expected = new_null_array(&data_type, 10);
1984            assert_eq!(&arr, &expected);
1985        }
1986    }
1987
1988    #[test]
1989    fn test_dictionary_cannot_concatenate() {
1990        // 256 unique strings
1991        let items = StringArray::from(
1992            (0..256)
1993                .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1994                .collect::<Vec<_>>(),
1995        );
1996        // 256 different unique strings
1997        let other_items = StringArray::from(
1998            (0..256)
1999                .map(|i| Some(String::from_utf8(vec![1; i + 1]).unwrap()))
2000                .collect::<Vec<_>>(),
2001        );
2002        let indices = UInt8Array::from_iter_values(0..=255);
2003        let dict1 = DictionaryArray::new(indices.clone(), Arc::new(items));
2004        let dict2 = DictionaryArray::new(indices, Arc::new(other_items));
2005        let data = DataBlock::from_arrays(&[Arc::new(dict1), Arc::new(dict2)], 512);
2006        assert_eq!(data.num_values(), 512);
2007
2008        let dict = data.as_dictionary().unwrap();
2009
2010        assert_eq!(dict.indices.bits_per_value, 32);
2011        assert_eq!(
2012            dict.indices.data,
2013            LanceBuffer::reinterpret_vec::<u32>((0..512).collect::<Vec<_>>())
2014        );
2015        // What fun: 0 + 1 + .. + 255 + 1 + 2 + .. + 256 = 2^16
2016        assert_eq!(
2017            dict.dictionary.as_variable_width().unwrap().data.len(),
2018            65536
2019        );
2020    }
2021
2022    #[test]
2023    fn test_data_size() {
2024        let mut rng = rand_xoshiro::Xoshiro256PlusPlus::seed_from_u64(DEFAULT_SEED.0);
2025        // test data_size() when input has no nulls
2026        let mut gen = array::rand::<Int32Type>().with_nulls(&[false, false, false]);
2027
2028        let arr = gen.generate(RowCount::from(3), &mut rng).unwrap();
2029        let block = DataBlock::from_array(arr.clone());
2030        assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
2031
2032        let arr = gen.generate(RowCount::from(400), &mut rng).unwrap();
2033        let block = DataBlock::from_array(arr.clone());
2034        assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
2035
2036        // test data_size() when input has nulls
2037        let mut gen = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
2038        let arr = gen.generate(RowCount::from(3), &mut rng).unwrap();
2039        let block = DataBlock::from_array(arr.clone());
2040
2041        let array_data = arr.to_data();
2042        let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
2043        // the NullBuffer.len() returns the length in bits so we divide_round_up by 8
2044        let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
2045        assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
2046
2047        let arr = gen.generate(RowCount::from(400), &mut rng).unwrap();
2048        let block = DataBlock::from_array(arr.clone());
2049
2050        let array_data = arr.to_data();
2051        let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
2052        let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
2053        assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
2054
2055        let mut gen = array::rand::<Int32Type>().with_nulls(&[true, true, false]);
2056        let arr = gen.generate(RowCount::from(3), &mut rng).unwrap();
2057        let block = DataBlock::from_array(arr.clone());
2058
2059        let array_data = arr.to_data();
2060        let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
2061        let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
2062        assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
2063
2064        let arr = gen.generate(RowCount::from(400), &mut rng).unwrap();
2065        let block = DataBlock::from_array(arr.clone());
2066
2067        let array_data = arr.to_data();
2068        let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
2069        let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
2070        assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
2071
2072        let mut gen = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
2073        let arr1 = gen.generate(RowCount::from(3), &mut rng).unwrap();
2074        let arr2 = gen.generate(RowCount::from(3), &mut rng).unwrap();
2075        let arr3 = gen.generate(RowCount::from(3), &mut rng).unwrap();
2076        let block = DataBlock::from_arrays(&[arr1.clone(), arr2.clone(), arr3.clone()], 9);
2077
2078        let concatenated_array = concat(&[
2079            &*Arc::new(arr1.clone()) as &dyn Array,
2080            &*Arc::new(arr2.clone()) as &dyn Array,
2081            &*Arc::new(arr3.clone()) as &dyn Array,
2082        ])
2083        .unwrap();
2084        let total_buffer_size: usize = concatenated_array
2085            .to_data()
2086            .buffers()
2087            .iter()
2088            .map(|buffer| buffer.len())
2089            .sum();
2090
2091        let total_nulls_size_in_bytes = concatenated_array.nulls().unwrap().len().div_ceil(8);
2092        assert!(block.data_size() == (total_buffer_size + total_nulls_size_in_bytes) as u64);
2093    }
2094}