lance_encoding/
data.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Data layouts to represent encoded data in a sub-Arrow format
5//!
6//! These [`DataBlock`] structures represent physical layouts.  They fill a gap somewhere
7//! between [`arrow_data::data::ArrayData`] (which, as a collection of buffers, is too
8//! generic because it doesn't give us enough information about what those buffers represent)
9//! and [`arrow_array::array::Array`] (which is too specific, because it cares about the
10//! logical data type).
11//!
12//! In addition, the layouts represented here are slightly stricter than Arrow's layout rules.
13//! For example, offset buffers MUST start with 0.  These additional restrictions impose a
14//! slight penalty on encode (to normalize arrow data) but make the development of encoders
15//! and decoders easier (since they can rely on a normalized representation)
16
17use std::{
18    ops::Range,
19    sync::{Arc, RwLock},
20};
21
22use arrow::array::{ArrayData, ArrayDataBuilder, AsArray};
23use arrow_array::{new_empty_array, new_null_array, Array, ArrayRef, UInt64Array};
24use arrow_buffer::{ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer};
25use arrow_schema::DataType;
26use bytemuck::try_cast_slice;
27use lance_arrow::DataTypeExt;
28use snafu::location;
29
30use lance_core::{Error, Result};
31
32use crate::{
33    buffer::LanceBuffer,
34    statistics::{ComputeStat, Stat},
35};
36
37/// A data block with no buffers where everything is null
38///
39/// Note: this data block should not be used for future work.  It will be deprecated
40/// in the 2.1 version of the format where nullability will be handled by the structural
41/// encoders.
42#[derive(Debug)]
43pub struct AllNullDataBlock {
44    /// The number of values represented by this block
45    pub num_values: u64,
46}
47
48impl AllNullDataBlock {
49    fn into_arrow(self, data_type: DataType, _validate: bool) -> Result<ArrayData> {
50        Ok(ArrayData::new_null(&data_type, self.num_values as usize))
51    }
52
53    fn into_buffers(self) -> Vec<LanceBuffer> {
54        vec![]
55    }
56
57    fn borrow_and_clone(&mut self) -> Self {
58        Self {
59            num_values: self.num_values,
60        }
61    }
62
63    fn try_clone(&self) -> Result<Self> {
64        Ok(Self {
65            num_values: self.num_values,
66        })
67    }
68}
69
70use std::collections::HashMap;
71
72// `BlockInfo` stores the statistics of this `DataBlock`, such as `NullCount` for `NullableDataBlock`,
73// `BitWidth` for `FixedWidthDataBlock`, `Cardinality` for all `DataBlock`
74#[derive(Debug, Clone)]
75pub struct BlockInfo(pub Arc<RwLock<HashMap<Stat, Arc<dyn Array>>>>);
76
77impl Default for BlockInfo {
78    fn default() -> Self {
79        Self::new()
80    }
81}
82
83impl BlockInfo {
84    pub fn new() -> Self {
85        Self(Arc::new(RwLock::new(HashMap::new())))
86    }
87}
88
89impl PartialEq for BlockInfo {
90    fn eq(&self, other: &Self) -> bool {
91        let self_info = self.0.read().unwrap();
92        let other_info = other.0.read().unwrap();
93        *self_info == *other_info
94    }
95}
96
97/// Wraps a data block and adds nullability information to it
98///
99/// Note: this data block should not be used for future work.  It will be deprecated
100/// in the 2.1 version of the format where nullability will be handled by the structural
101/// encoders.
102#[derive(Debug)]
103pub struct NullableDataBlock {
104    /// The underlying data
105    pub data: Box<DataBlock>,
106    /// A bitmap of validity for each value
107    pub nulls: LanceBuffer,
108
109    pub block_info: BlockInfo,
110}
111
112impl NullableDataBlock {
113    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
114        let nulls = self.nulls.into_buffer();
115        let data = self.data.into_arrow(data_type, validate)?.into_builder();
116        let data = data.null_bit_buffer(Some(nulls));
117        if validate {
118            Ok(data.build()?)
119        } else {
120            Ok(unsafe { data.build_unchecked() })
121        }
122    }
123
124    fn into_buffers(self) -> Vec<LanceBuffer> {
125        let mut buffers = vec![self.nulls];
126        buffers.extend(self.data.into_buffers());
127        buffers
128    }
129
130    fn borrow_and_clone(&mut self) -> Self {
131        Self {
132            data: Box::new(self.data.borrow_and_clone()),
133            nulls: self.nulls.borrow_and_clone(),
134            block_info: self.block_info.clone(),
135        }
136    }
137
138    fn try_clone(&self) -> Result<Self> {
139        Ok(Self {
140            data: Box::new(self.data.try_clone()?),
141            nulls: self.nulls.try_clone()?,
142            block_info: self.block_info.clone(),
143        })
144    }
145
146    pub fn data_size(&self) -> u64 {
147        self.data.data_size() + self.nulls.len() as u64
148    }
149}
150
151/// A block representing the same constant value repeated many times
152#[derive(Debug, PartialEq)]
153pub struct ConstantDataBlock {
154    /// Data buffer containing the value
155    pub data: LanceBuffer,
156    /// The number of values
157    pub num_values: u64,
158}
159
160impl ConstantDataBlock {
161    fn into_buffers(self) -> Vec<LanceBuffer> {
162        vec![self.data]
163    }
164
165    fn into_arrow(self, _data_type: DataType, _validate: bool) -> Result<ArrayData> {
166        // We don't need this yet but if we come up with some way of serializing
167        // scalars to/from bytes then we could implement it.
168        todo!()
169    }
170
171    pub fn borrow_and_clone(&mut self) -> Self {
172        Self {
173            data: self.data.borrow_and_clone(),
174            num_values: self.num_values,
175        }
176    }
177
178    pub fn try_clone(&self) -> Result<Self> {
179        Ok(Self {
180            data: self.data.try_clone()?,
181            num_values: self.num_values,
182        })
183    }
184
185    pub fn data_size(&self) -> u64 {
186        self.data.len() as u64
187    }
188}
189
190/// A data block for a single buffer of data where each element has a fixed number of bits
191#[derive(Debug, PartialEq)]
192pub struct FixedWidthDataBlock {
193    /// The data buffer
194    pub data: LanceBuffer,
195    /// The number of bits per value
196    pub bits_per_value: u64,
197    /// The number of values represented by this block
198    pub num_values: u64,
199
200    pub block_info: BlockInfo,
201}
202
203impl FixedWidthDataBlock {
204    fn do_into_arrow(
205        self,
206        data_type: DataType,
207        num_values: u64,
208        validate: bool,
209    ) -> Result<ArrayData> {
210        let data_buffer = self.data.into_buffer();
211        let builder = ArrayDataBuilder::new(data_type)
212            .add_buffer(data_buffer)
213            .len(num_values as usize)
214            .null_count(0);
215        if validate {
216            Ok(builder.build()?)
217        } else {
218            Ok(unsafe { builder.build_unchecked() })
219        }
220    }
221
222    pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
223        let root_num_values = self.num_values;
224        self.do_into_arrow(data_type, root_num_values, validate)
225    }
226
227    pub fn into_buffers(self) -> Vec<LanceBuffer> {
228        vec![self.data]
229    }
230
231    pub fn borrow_and_clone(&mut self) -> Self {
232        Self {
233            data: self.data.borrow_and_clone(),
234            bits_per_value: self.bits_per_value,
235            num_values: self.num_values,
236            block_info: self.block_info.clone(),
237        }
238    }
239
240    pub fn try_clone(&self) -> Result<Self> {
241        Ok(Self {
242            data: self.data.try_clone()?,
243            bits_per_value: self.bits_per_value,
244            num_values: self.num_values,
245            block_info: self.block_info.clone(),
246        })
247    }
248
249    pub fn data_size(&self) -> u64 {
250        self.data.len() as u64
251    }
252}
253
254#[derive(Debug)]
255pub struct VariableWidthDataBlockBuilder {
256    offsets: Vec<u32>,
257    bytes: Vec<u8>,
258}
259
260impl VariableWidthDataBlockBuilder {
261    fn new(estimated_size_bytes: u64) -> Self {
262        Self {
263            offsets: vec![0u32],
264            bytes: Vec::with_capacity(estimated_size_bytes as usize),
265        }
266    }
267}
268
269impl DataBlockBuilderImpl for VariableWidthDataBlockBuilder {
270    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
271        let block = data_block.as_variable_width_ref().unwrap();
272        assert!(block.bits_per_offset == 32);
273
274        let offsets: &[u32] = try_cast_slice(&block.offsets)
275            .expect("cast from a bits_per_offset=32 `VariableWidthDataBlock's offsets field field to &[32] should be fine.");
276
277        let start_offset = offsets[selection.start as usize];
278        let end_offset = offsets[selection.end as usize];
279        let mut previous_len = self.bytes.len();
280
281        self.bytes
282            .extend_from_slice(&block.data[start_offset as usize..end_offset as usize]);
283
284        self.offsets.extend(
285            offsets[selection.start as usize..selection.end as usize]
286                .iter()
287                .zip(&offsets[selection.start as usize + 1..=selection.end as usize])
288                .map(|(&current, &next)| {
289                    let this_value_len = next - current;
290                    previous_len += this_value_len as usize;
291                    previous_len as u32
292                }),
293        );
294    }
295
296    fn finish(self: Box<Self>) -> DataBlock {
297        let num_values = (self.offsets.len() - 1) as u64;
298        DataBlock::VariableWidth(VariableWidthBlock {
299            data: LanceBuffer::Owned(self.bytes),
300            offsets: LanceBuffer::reinterpret_vec(self.offsets),
301            bits_per_offset: 32,
302            num_values,
303            block_info: BlockInfo::new(),
304        })
305    }
306}
307
308#[derive(Debug)]
309struct BitmapDataBlockBuilder {
310    values: BooleanBufferBuilder,
311}
312
313impl BitmapDataBlockBuilder {
314    fn new(estimated_size_bytes: u64) -> Self {
315        Self {
316            values: BooleanBufferBuilder::new(estimated_size_bytes as usize * 8),
317        }
318    }
319}
320
321impl DataBlockBuilderImpl for BitmapDataBlockBuilder {
322    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
323        let bitmap_blk = data_block.as_fixed_width_ref().unwrap();
324        self.values.append_packed_range(
325            selection.start as usize..selection.end as usize,
326            &bitmap_blk.data,
327        );
328    }
329
330    fn finish(mut self: Box<Self>) -> DataBlock {
331        let bool_buf = self.values.finish();
332        let num_values = bool_buf.len() as u64;
333        let bits_buf = bool_buf.into_inner();
334        DataBlock::FixedWidth(FixedWidthDataBlock {
335            data: LanceBuffer::from(bits_buf),
336            bits_per_value: 1,
337            num_values,
338            block_info: BlockInfo::new(),
339        })
340    }
341}
342
343#[derive(Debug)]
344struct FixedWidthDataBlockBuilder {
345    bits_per_value: u64,
346    bytes_per_value: u64,
347    values: Vec<u8>,
348}
349
350impl FixedWidthDataBlockBuilder {
351    fn new(bits_per_value: u64, estimated_size_bytes: u64) -> Self {
352        assert!(bits_per_value % 8 == 0);
353        Self {
354            bits_per_value,
355            bytes_per_value: bits_per_value / 8,
356            values: Vec::with_capacity(estimated_size_bytes as usize),
357        }
358    }
359}
360
361impl DataBlockBuilderImpl for FixedWidthDataBlockBuilder {
362    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
363        let block = data_block.as_fixed_width_ref().unwrap();
364        assert_eq!(self.bits_per_value, block.bits_per_value);
365        let start = selection.start as usize * self.bytes_per_value as usize;
366        let end = selection.end as usize * self.bytes_per_value as usize;
367        self.values.extend_from_slice(&block.data[start..end]);
368    }
369
370    fn finish(self: Box<Self>) -> DataBlock {
371        let num_values = (self.values.len() / self.bytes_per_value as usize) as u64;
372        DataBlock::FixedWidth(FixedWidthDataBlock {
373            data: LanceBuffer::Owned(self.values),
374            bits_per_value: self.bits_per_value,
375            num_values,
376            block_info: BlockInfo::new(),
377        })
378    }
379}
380
381#[derive(Debug)]
382struct StructDataBlockBuilder {
383    children: Vec<Box<dyn DataBlockBuilderImpl>>,
384}
385
386impl StructDataBlockBuilder {
387    // Currently only Struct with fixed-width fields are supported.
388    // And the assumption that all fields have `bits_per_value % 8 == 0` is made here.
389    fn new(bits_per_values: Vec<u32>, estimated_size_bytes: u64) -> Self {
390        let mut children = vec![];
391
392        debug_assert!(bits_per_values.iter().all(|bpv| bpv % 8 == 0));
393
394        let bytes_per_row: u32 = bits_per_values.iter().sum::<u32>() / 8;
395        let bytes_per_row = bytes_per_row as u64;
396
397        for bits_per_value in bits_per_values.iter() {
398            let this_estimated_size_bytes =
399                estimated_size_bytes / bytes_per_row * (*bits_per_value as u64) / 8;
400            let child =
401                FixedWidthDataBlockBuilder::new(*bits_per_value as u64, this_estimated_size_bytes);
402            children.push(Box::new(child) as Box<dyn DataBlockBuilderImpl>);
403        }
404        Self { children }
405    }
406}
407
408impl DataBlockBuilderImpl for StructDataBlockBuilder {
409    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
410        let data_block = data_block.as_struct_ref().unwrap();
411        for i in 0..self.children.len() {
412            self.children[i].append(&data_block.children[i], selection.clone());
413        }
414    }
415
416    fn finish(self: Box<Self>) -> DataBlock {
417        let mut children_data_block = Vec::new();
418        for child in self.children {
419            let child_data_block = child.finish();
420            children_data_block.push(child_data_block);
421        }
422        DataBlock::Struct(StructDataBlock {
423            children: children_data_block,
424            block_info: BlockInfo::new(),
425        })
426    }
427}
428/// A data block to represent a fixed size list
429#[derive(Debug)]
430pub struct FixedSizeListBlock {
431    /// The child data block
432    pub child: Box<DataBlock>,
433    /// The number of items in each list
434    pub dimension: u64,
435}
436
437impl FixedSizeListBlock {
438    fn borrow_and_clone(&mut self) -> Self {
439        Self {
440            child: Box::new(self.child.borrow_and_clone()),
441            dimension: self.dimension,
442        }
443    }
444
445    fn try_clone(&self) -> Result<Self> {
446        Ok(Self {
447            child: Box::new(self.child.try_clone()?),
448            dimension: self.dimension,
449        })
450    }
451
452    pub fn num_values(&self) -> u64 {
453        self.child.num_values() / self.dimension
454    }
455
456    /// Try to flatten a FixedSizeListBlock into a FixedWidthDataBlock
457    ///
458    /// Returns None if any children are nullable
459    pub fn try_into_flat(self) -> Option<FixedWidthDataBlock> {
460        match *self.child {
461            // Cannot flatten a nullable child
462            DataBlock::Nullable(_) => None,
463            DataBlock::FixedSizeList(inner) => {
464                let mut flat = inner.try_into_flat()?;
465                flat.bits_per_value *= self.dimension;
466                flat.num_values /= self.dimension;
467                Some(flat)
468            }
469            DataBlock::FixedWidth(mut inner) => {
470                inner.bits_per_value *= self.dimension;
471                inner.num_values /= self.dimension;
472                Some(inner)
473            }
474            _ => panic!(
475                "Expected FixedSizeList or FixedWidth data block but found {:?}",
476                self
477            ),
478        }
479    }
480
481    pub fn flatten_as_fixed(&mut self) -> FixedWidthDataBlock {
482        match self.child.as_mut() {
483            DataBlock::FixedSizeList(fsl) => fsl.flatten_as_fixed(),
484            DataBlock::FixedWidth(fw) => fw.borrow_and_clone(),
485            _ => panic!("Expected FixedSizeList or FixedWidth data block"),
486        }
487    }
488
489    /// Convert a flattened values block into a FixedSizeListBlock
490    pub fn from_flat(data: FixedWidthDataBlock, data_type: &DataType) -> DataBlock {
491        match data_type {
492            DataType::FixedSizeList(child_field, dimension) => {
493                let mut data = data;
494                data.bits_per_value /= *dimension as u64;
495                data.num_values *= *dimension as u64;
496                let child_data = Self::from_flat(data, child_field.data_type());
497                DataBlock::FixedSizeList(Self {
498                    child: Box::new(child_data),
499                    dimension: *dimension as u64,
500                })
501            }
502            // Base case, we've hit a non-list type
503            _ => DataBlock::FixedWidth(data),
504        }
505    }
506
507    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
508        let num_values = self.num_values();
509        let builder = match &data_type {
510            DataType::FixedSizeList(child_field, _) => {
511                let child_data = self
512                    .child
513                    .into_arrow(child_field.data_type().clone(), validate)?;
514                ArrayDataBuilder::new(data_type)
515                    .add_child_data(child_data)
516                    .len(num_values as usize)
517                    .null_count(0)
518            }
519            _ => panic!("Expected FixedSizeList data type and got {:?}", data_type),
520        };
521        if validate {
522            Ok(builder.build()?)
523        } else {
524            Ok(unsafe { builder.build_unchecked() })
525        }
526    }
527
528    fn into_buffers(self) -> Vec<LanceBuffer> {
529        self.child.into_buffers()
530    }
531
532    fn data_size(&self) -> u64 {
533        self.child.data_size()
534    }
535}
536
537#[derive(Debug)]
538struct FixedSizeListBlockBuilder {
539    inner: Box<dyn DataBlockBuilderImpl>,
540    dimension: u64,
541}
542
543impl FixedSizeListBlockBuilder {
544    fn new(inner: Box<dyn DataBlockBuilderImpl>, dimension: u64) -> Self {
545        Self { inner, dimension }
546    }
547}
548
549impl DataBlockBuilderImpl for FixedSizeListBlockBuilder {
550    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
551        let selection = selection.start * self.dimension..selection.end * self.dimension;
552        let fsl = data_block.as_fixed_size_list_ref().unwrap();
553        self.inner.append(fsl.child.as_ref(), selection);
554    }
555
556    fn finish(self: Box<Self>) -> DataBlock {
557        let inner_block = self.inner.finish();
558        DataBlock::FixedSizeList(FixedSizeListBlock {
559            child: Box::new(inner_block),
560            dimension: self.dimension,
561        })
562    }
563}
564
565#[derive(Debug)]
566struct NullableDataBlockBuilder {
567    inner: Box<dyn DataBlockBuilderImpl>,
568    validity: BooleanBufferBuilder,
569}
570
571impl NullableDataBlockBuilder {
572    fn new(inner: Box<dyn DataBlockBuilderImpl>, estimated_size_bytes: usize) -> Self {
573        Self {
574            inner,
575            validity: BooleanBufferBuilder::new(estimated_size_bytes * 8),
576        }
577    }
578}
579
580impl DataBlockBuilderImpl for NullableDataBlockBuilder {
581    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
582        let nullable = data_block.as_nullable_ref().unwrap();
583        let bool_buf = BooleanBuffer::new(
584            nullable.nulls.try_clone().unwrap().into_buffer(),
585            selection.start as usize,
586            (selection.end - selection.start) as usize,
587        );
588        self.validity.append_buffer(&bool_buf);
589        self.inner.append(nullable.data.as_ref(), selection);
590    }
591
592    fn finish(mut self: Box<Self>) -> DataBlock {
593        let inner_block = self.inner.finish();
594        DataBlock::Nullable(NullableDataBlock {
595            data: Box::new(inner_block),
596            nulls: LanceBuffer::Borrowed(self.validity.finish().into_inner()),
597            block_info: BlockInfo::new(),
598        })
599    }
600}
601
602/// A data block with no regular structure.  There is no available spot to attach
603/// validity / repdef information and it cannot be converted to Arrow without being
604/// decoded
605#[derive(Debug)]
606pub struct OpaqueBlock {
607    pub buffers: Vec<LanceBuffer>,
608    pub num_values: u64,
609    pub block_info: BlockInfo,
610}
611
612impl OpaqueBlock {
613    fn borrow_and_clone(&mut self) -> Self {
614        Self {
615            buffers: self
616                .buffers
617                .iter_mut()
618                .map(|b| b.borrow_and_clone())
619                .collect(),
620            num_values: self.num_values,
621            block_info: self.block_info.clone(),
622        }
623    }
624
625    fn try_clone(&self) -> Result<Self> {
626        Ok(Self {
627            buffers: self
628                .buffers
629                .iter()
630                .map(|b| b.try_clone())
631                .collect::<Result<_>>()?,
632            num_values: self.num_values,
633            block_info: self.block_info.clone(),
634        })
635    }
636
637    pub fn data_size(&self) -> u64 {
638        self.buffers.iter().map(|b| b.len() as u64).sum()
639    }
640}
641
642/// A data block for variable-width data (e.g. strings, packed rows, etc.)
643#[derive(Debug)]
644pub struct VariableWidthBlock {
645    /// The data buffer
646    pub data: LanceBuffer,
647    /// The offsets buffer (contains num_values + 1 offsets)
648    ///
649    /// Offsets MUST start at 0
650    pub offsets: LanceBuffer,
651    /// The number of bits per offset
652    pub bits_per_offset: u8,
653    /// The number of values represented by this block
654    pub num_values: u64,
655
656    pub block_info: BlockInfo,
657}
658
659impl VariableWidthBlock {
660    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
661        let data_buffer = self.data.into_buffer();
662        let offsets_buffer = self.offsets.into_buffer();
663        let builder = ArrayDataBuilder::new(data_type)
664            .add_buffer(offsets_buffer)
665            .add_buffer(data_buffer)
666            .len(self.num_values as usize)
667            .null_count(0);
668        if validate {
669            Ok(builder.build()?)
670        } else {
671            Ok(unsafe { builder.build_unchecked() })
672        }
673    }
674
675    fn into_buffers(self) -> Vec<LanceBuffer> {
676        vec![self.offsets, self.data]
677    }
678
679    fn borrow_and_clone(&mut self) -> Self {
680        Self {
681            data: self.data.borrow_and_clone(),
682            offsets: self.offsets.borrow_and_clone(),
683            bits_per_offset: self.bits_per_offset,
684            num_values: self.num_values,
685            block_info: self.block_info.clone(),
686        }
687    }
688
689    fn try_clone(&self) -> Result<Self> {
690        Ok(Self {
691            data: self.data.try_clone()?,
692            offsets: self.offsets.try_clone()?,
693            bits_per_offset: self.bits_per_offset,
694            num_values: self.num_values,
695            block_info: self.block_info.clone(),
696        })
697    }
698
699    pub fn offsets_as_block(&mut self) -> DataBlock {
700        let offsets = self.offsets.borrow_and_clone();
701        DataBlock::FixedWidth(FixedWidthDataBlock {
702            data: offsets,
703            bits_per_value: self.bits_per_offset as u64,
704            num_values: self.num_values + 1,
705            block_info: BlockInfo::new(),
706        })
707    }
708
709    pub fn data_size(&self) -> u64 {
710        (self.data.len() + self.offsets.len()) as u64
711    }
712}
713
714/// A data block representing a struct
715#[derive(Debug)]
716pub struct StructDataBlock {
717    /// The child arrays
718    pub children: Vec<DataBlock>,
719    pub block_info: BlockInfo,
720}
721
722impl StructDataBlock {
723    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
724        if let DataType::Struct(fields) = &data_type {
725            let mut builder = ArrayDataBuilder::new(DataType::Struct(fields.clone()));
726            let mut num_rows = 0;
727            for (field, child) in fields.iter().zip(self.children) {
728                let child_data = child.into_arrow(field.data_type().clone(), validate)?;
729                num_rows = child_data.len();
730                builder = builder.add_child_data(child_data);
731            }
732            let builder = builder.null_count(0).len(num_rows);
733            if validate {
734                Ok(builder.build()?)
735            } else {
736                Ok(unsafe { builder.build_unchecked() })
737            }
738        } else {
739            Err(Error::Internal {
740                message: format!("Expected Struct, got {:?}", data_type),
741                location: location!(),
742            })
743        }
744    }
745
746    fn remove_outer_validity(self) -> Self {
747        Self {
748            children: self
749                .children
750                .into_iter()
751                .map(|c| c.remove_outer_validity())
752                .collect(),
753            block_info: self.block_info,
754        }
755    }
756
757    fn into_buffers(self) -> Vec<LanceBuffer> {
758        self.children
759            .into_iter()
760            .flat_map(|c| c.into_buffers())
761            .collect()
762    }
763
764    fn borrow_and_clone(&mut self) -> Self {
765        Self {
766            children: self
767                .children
768                .iter_mut()
769                .map(|c| c.borrow_and_clone())
770                .collect(),
771            block_info: self.block_info.clone(),
772        }
773    }
774
775    fn try_clone(&self) -> Result<Self> {
776        Ok(Self {
777            children: self
778                .children
779                .iter()
780                .map(|c| c.try_clone())
781                .collect::<Result<_>>()?,
782            block_info: self.block_info.clone(),
783        })
784    }
785
786    pub fn data_size(&self) -> u64 {
787        self.children
788            .iter()
789            .map(|data_block| data_block.data_size())
790            .sum()
791    }
792}
793
794/// A data block for dictionary encoded data
795#[derive(Debug)]
796pub struct DictionaryDataBlock {
797    /// The indices buffer
798    pub indices: FixedWidthDataBlock,
799    /// The dictionary itself
800    pub dictionary: Box<DataBlock>,
801}
802
803impl DictionaryDataBlock {
804    fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
805        let (key_type, value_type) = if let DataType::Dictionary(key_type, value_type) = &data_type
806        {
807            (key_type.as_ref().clone(), value_type.as_ref().clone())
808        } else {
809            return Err(Error::Internal {
810                message: format!("Expected Dictionary, got {:?}", data_type),
811                location: location!(),
812            });
813        };
814
815        let indices = self.indices.into_arrow(key_type, validate)?;
816        let dictionary = self.dictionary.into_arrow(value_type, validate)?;
817
818        let builder = indices
819            .into_builder()
820            .add_child_data(dictionary)
821            .data_type(data_type);
822
823        if validate {
824            Ok(builder.build()?)
825        } else {
826            Ok(unsafe { builder.build_unchecked() })
827        }
828    }
829
830    fn into_buffers(self) -> Vec<LanceBuffer> {
831        let mut buffers = self.indices.into_buffers();
832        buffers.extend(self.dictionary.into_buffers());
833        buffers
834    }
835
836    fn borrow_and_clone(&mut self) -> Self {
837        Self {
838            indices: self.indices.borrow_and_clone(),
839            dictionary: Box::new(self.dictionary.borrow_and_clone()),
840        }
841    }
842
843    fn try_clone(&self) -> Result<Self> {
844        Ok(Self {
845            indices: self.indices.try_clone()?,
846            dictionary: Box::new(self.dictionary.try_clone()?),
847        })
848    }
849}
850
851/// A DataBlock is a collection of buffers that represents an "array" of data in very generic terms
852///
853/// The output of each decoder is a DataBlock.  Decoders can be chained together to transform
854/// one DataBlock into a different kind of DataBlock.
855///
856/// The DataBlock is somewhere in between Arrow's ArrayData and Array and represents a physical
857/// layout of the data.
858///
859/// A DataBlock can be converted into an Arrow ArrayData (and then Array) for a given array type.
860/// For example, a FixedWidthDataBlock can be converted into any primitive type or a fixed size
861/// list of a primitive type.  This is a zero-copy operation.
862///
863/// In addition, a DataBlock can be created from an Arrow array or arrays.  This is not a zero-copy
864/// operation as some normalization may be required.
865#[derive(Debug)]
866pub enum DataBlock {
867    Empty(),
868    Constant(ConstantDataBlock),
869    AllNull(AllNullDataBlock),
870    Nullable(NullableDataBlock),
871    FixedWidth(FixedWidthDataBlock),
872    FixedSizeList(FixedSizeListBlock),
873    VariableWidth(VariableWidthBlock),
874    Opaque(OpaqueBlock),
875    Struct(StructDataBlock),
876    Dictionary(DictionaryDataBlock),
877}
878
879impl DataBlock {
880    /// Convert self into an Arrow ArrayData
881    pub fn into_arrow(self, data_type: DataType, validate: bool) -> Result<ArrayData> {
882        match self {
883            Self::Empty() => Ok(new_empty_array(&data_type).to_data()),
884            Self::Constant(inner) => inner.into_arrow(data_type, validate),
885            Self::AllNull(inner) => inner.into_arrow(data_type, validate),
886            Self::Nullable(inner) => inner.into_arrow(data_type, validate),
887            Self::FixedWidth(inner) => inner.into_arrow(data_type, validate),
888            Self::FixedSizeList(inner) => inner.into_arrow(data_type, validate),
889            Self::VariableWidth(inner) => inner.into_arrow(data_type, validate),
890            Self::Struct(inner) => inner.into_arrow(data_type, validate),
891            Self::Dictionary(inner) => inner.into_arrow(data_type, validate),
892            Self::Opaque(_) => Err(Error::Internal {
893                message: "Cannot convert OpaqueBlock to Arrow".to_string(),
894                location: location!(),
895            }),
896        }
897    }
898
899    /// Convert the data block into a collection of buffers for serialization
900    ///
901    /// The order matters and will be used to reconstruct the data block at read time.
902    pub fn into_buffers(self) -> Vec<LanceBuffer> {
903        match self {
904            Self::Empty() => Vec::default(),
905            Self::Constant(inner) => inner.into_buffers(),
906            Self::AllNull(inner) => inner.into_buffers(),
907            Self::Nullable(inner) => inner.into_buffers(),
908            Self::FixedWidth(inner) => inner.into_buffers(),
909            Self::FixedSizeList(inner) => inner.into_buffers(),
910            Self::VariableWidth(inner) => inner.into_buffers(),
911            Self::Struct(inner) => inner.into_buffers(),
912            Self::Dictionary(inner) => inner.into_buffers(),
913            Self::Opaque(inner) => inner.buffers,
914        }
915    }
916
917    /// Converts the data buffers into borrowed mode and clones the block
918    ///
919    /// This is a zero-copy operation but requires a mutable reference to self and, afterwards,
920    /// all buffers will be in Borrowed mode.
921    pub fn borrow_and_clone(&mut self) -> Self {
922        match self {
923            Self::Empty() => Self::Empty(),
924            Self::Constant(inner) => Self::Constant(inner.borrow_and_clone()),
925            Self::AllNull(inner) => Self::AllNull(inner.borrow_and_clone()),
926            Self::Nullable(inner) => Self::Nullable(inner.borrow_and_clone()),
927            Self::FixedWidth(inner) => Self::FixedWidth(inner.borrow_and_clone()),
928            Self::FixedSizeList(inner) => Self::FixedSizeList(inner.borrow_and_clone()),
929            Self::VariableWidth(inner) => Self::VariableWidth(inner.borrow_and_clone()),
930            Self::Struct(inner) => Self::Struct(inner.borrow_and_clone()),
931            Self::Dictionary(inner) => Self::Dictionary(inner.borrow_and_clone()),
932            Self::Opaque(inner) => Self::Opaque(inner.borrow_and_clone()),
933        }
934    }
935
936    /// Try and clone the block
937    ///
938    /// This will fail if any buffers are in owned mode.  You can call borrow_and_clone() to
939    /// ensure that all buffers are in borrowed mode before calling this method.
940    pub fn try_clone(&self) -> Result<Self> {
941        match self {
942            Self::Empty() => Ok(Self::Empty()),
943            Self::Constant(inner) => Ok(Self::Constant(inner.try_clone()?)),
944            Self::AllNull(inner) => Ok(Self::AllNull(inner.try_clone()?)),
945            Self::Nullable(inner) => Ok(Self::Nullable(inner.try_clone()?)),
946            Self::FixedWidth(inner) => Ok(Self::FixedWidth(inner.try_clone()?)),
947            Self::FixedSizeList(inner) => Ok(Self::FixedSizeList(inner.try_clone()?)),
948            Self::VariableWidth(inner) => Ok(Self::VariableWidth(inner.try_clone()?)),
949            Self::Struct(inner) => Ok(Self::Struct(inner.try_clone()?)),
950            Self::Dictionary(inner) => Ok(Self::Dictionary(inner.try_clone()?)),
951            Self::Opaque(inner) => Ok(Self::Opaque(inner.try_clone()?)),
952        }
953    }
954
955    pub fn name(&self) -> &'static str {
956        match self {
957            Self::Constant(_) => "Constant",
958            Self::Empty() => "Empty",
959            Self::AllNull(_) => "AllNull",
960            Self::Nullable(_) => "Nullable",
961            Self::FixedWidth(_) => "FixedWidth",
962            Self::FixedSizeList(_) => "FixedSizeList",
963            Self::VariableWidth(_) => "VariableWidth",
964            Self::Struct(_) => "Struct",
965            Self::Dictionary(_) => "Dictionary",
966            Self::Opaque(_) => "Opaque",
967        }
968    }
969
970    pub fn is_variable(&self) -> bool {
971        match self {
972            Self::Constant(_) => false,
973            Self::Empty() => false,
974            Self::AllNull(_) => false,
975            Self::Nullable(nullable) => nullable.data.is_variable(),
976            Self::FixedWidth(_) => false,
977            Self::FixedSizeList(fsl) => fsl.child.is_variable(),
978            Self::VariableWidth(_) => true,
979            Self::Struct(strct) => strct.children.iter().any(|c| c.is_variable()),
980            Self::Dictionary(_) => {
981                todo!("is_variable for DictionaryDataBlock is not implemented yet")
982            }
983            Self::Opaque(_) => panic!("Does not make sense to ask if an Opaque block is variable"),
984        }
985    }
986
987    pub fn is_nullable(&self) -> bool {
988        match self {
989            Self::AllNull(_) => true,
990            Self::Nullable(_) => true,
991            Self::FixedSizeList(fsl) => fsl.child.is_nullable(),
992            Self::Struct(strct) => strct.children.iter().any(|c| c.is_nullable()),
993            Self::Dictionary(_) => {
994                todo!("is_nullable for DictionaryDataBlock is not implemented yet")
995            }
996            Self::Opaque(_) => panic!("Does not make sense to ask if an Opaque block is nullable"),
997            _ => false,
998        }
999    }
1000
1001    /// The number of values in the block
1002    ///
1003    /// This function does not recurse into child blocks.  If this is a FSL then it will
1004    /// be the number of lists and not the number of items.
1005    pub fn num_values(&self) -> u64 {
1006        match self {
1007            Self::Empty() => 0,
1008            Self::Constant(inner) => inner.num_values,
1009            Self::AllNull(inner) => inner.num_values,
1010            Self::Nullable(inner) => inner.data.num_values(),
1011            Self::FixedWidth(inner) => inner.num_values,
1012            Self::FixedSizeList(inner) => inner.num_values(),
1013            Self::VariableWidth(inner) => inner.num_values,
1014            Self::Struct(inner) => inner.children[0].num_values(),
1015            Self::Dictionary(inner) => inner.indices.num_values,
1016            Self::Opaque(inner) => inner.num_values,
1017        }
1018    }
1019
1020    /// The number of items in a single row
1021    ///
1022    /// This is always 1 unless there are layers of FSL
1023    pub fn items_per_row(&self) -> u64 {
1024        match self {
1025            Self::Empty() => todo!(),     // Leave undefined until needed
1026            Self::Constant(_) => todo!(), // Leave undefined until needed
1027            Self::AllNull(_) => todo!(),  // Leave undefined until needed
1028            Self::Nullable(nullable) => nullable.data.items_per_row(),
1029            Self::FixedWidth(_) => 1,
1030            Self::FixedSizeList(fsl) => fsl.dimension * fsl.child.items_per_row(),
1031            Self::VariableWidth(_) => 1,
1032            Self::Struct(_) => todo!(), // Leave undefined until needed
1033            Self::Dictionary(_) => 1,
1034            Self::Opaque(_) => 1,
1035        }
1036    }
1037
1038    /// The number of bytes in the data block (including any child blocks)
1039    pub fn data_size(&self) -> u64 {
1040        match self {
1041            Self::Empty() => 0,
1042            Self::Constant(inner) => inner.data_size(),
1043            Self::AllNull(_) => 0,
1044            Self::Nullable(inner) => inner.data_size(),
1045            Self::FixedWidth(inner) => inner.data_size(),
1046            Self::FixedSizeList(inner) => inner.data_size(),
1047            Self::VariableWidth(inner) => inner.data_size(),
1048            Self::Struct(_) => {
1049                todo!("the data_size method for StructDataBlock is not implemented yet")
1050            }
1051            Self::Dictionary(_) => {
1052                todo!("the data_size method for DictionaryDataBlock is not implemented yet")
1053            }
1054            Self::Opaque(inner) => inner.data_size(),
1055        }
1056    }
1057
1058    /// Removes any validity information from the block
1059    ///
1060    /// This does not filter the block (e.g. remove rows).  It only removes
1061    /// the validity bitmaps (if present).  Any garbage masked by null bits
1062    /// will now appear as proper values.
1063    ///
1064    /// If `recurse` is true, then this will also remove validity from any child blocks.
1065    pub fn remove_outer_validity(self) -> Self {
1066        match self {
1067            Self::AllNull(_) => panic!("Cannot remove validity on all-null data"),
1068            Self::Nullable(inner) => *inner.data,
1069            Self::Struct(inner) => Self::Struct(inner.remove_outer_validity()),
1070            other => other,
1071        }
1072    }
1073
1074    pub fn make_builder(&self, estimated_size_bytes: u64) -> Box<dyn DataBlockBuilderImpl> {
1075        match self {
1076            Self::FixedWidth(inner) => {
1077                if inner.bits_per_value == 1 {
1078                    Box::new(BitmapDataBlockBuilder::new(estimated_size_bytes))
1079                } else {
1080                    Box::new(FixedWidthDataBlockBuilder::new(
1081                        inner.bits_per_value,
1082                        estimated_size_bytes,
1083                    ))
1084                }
1085            }
1086            Self::VariableWidth(inner) => {
1087                if inner.bits_per_offset == 32 {
1088                    Box::new(VariableWidthDataBlockBuilder::new(estimated_size_bytes))
1089                } else {
1090                    todo!()
1091                }
1092            }
1093            Self::FixedSizeList(inner) => {
1094                let inner_builder = inner.child.make_builder(estimated_size_bytes);
1095                Box::new(FixedSizeListBlockBuilder::new(
1096                    inner_builder,
1097                    inner.dimension,
1098                ))
1099            }
1100            Self::Nullable(nullable) => {
1101                // There's no easy way to know what percentage of the data is in the valiidty buffer
1102                // but 1/16th seems like a reasonable guess.
1103                let estimated_validity_size_bytes = estimated_size_bytes / 16;
1104                let inner_builder = nullable
1105                    .data
1106                    .make_builder(estimated_size_bytes - estimated_validity_size_bytes);
1107                Box::new(NullableDataBlockBuilder::new(
1108                    inner_builder,
1109                    estimated_validity_size_bytes as usize,
1110                ))
1111            }
1112            Self::Struct(struct_data_block) => {
1113                let mut bits_per_values = vec![];
1114                for child in struct_data_block.children.iter() {
1115                    let child = child.as_fixed_width_ref().
1116                        expect("Currently StructDataBlockBuilder is only used in packed-struct encoding, and currently in packed-struct encoding, only fixed-width fields are supported.");
1117                    bits_per_values.push(child.bits_per_value as u32);
1118                }
1119                Box::new(StructDataBlockBuilder::new(
1120                    bits_per_values,
1121                    estimated_size_bytes,
1122                ))
1123            }
1124            _ => todo!("make_builder for {:?}", self),
1125        }
1126    }
1127}
1128
1129macro_rules! as_type {
1130    ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1131        pub fn $fn_name(self) -> Option<$inner_type> {
1132            match self {
1133                Self::$inner(inner) => Some(inner),
1134                _ => None,
1135            }
1136        }
1137    };
1138}
1139
1140macro_rules! as_type_ref {
1141    ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1142        pub fn $fn_name(&self) -> Option<&$inner_type> {
1143            match self {
1144                Self::$inner(inner) => Some(inner),
1145                _ => None,
1146            }
1147        }
1148    };
1149}
1150
1151macro_rules! as_type_ref_mut {
1152    ($fn_name:ident, $inner:tt, $inner_type:ident) => {
1153        pub fn $fn_name(&mut self) -> Option<&mut $inner_type> {
1154            match self {
1155                Self::$inner(inner) => Some(inner),
1156                _ => None,
1157            }
1158        }
1159    };
1160}
1161
1162// Cast implementations
1163impl DataBlock {
1164    as_type!(as_all_null, AllNull, AllNullDataBlock);
1165    as_type!(as_nullable, Nullable, NullableDataBlock);
1166    as_type!(as_fixed_width, FixedWidth, FixedWidthDataBlock);
1167    as_type!(as_fixed_size_list, FixedSizeList, FixedSizeListBlock);
1168    as_type!(as_variable_width, VariableWidth, VariableWidthBlock);
1169    as_type!(as_struct, Struct, StructDataBlock);
1170    as_type!(as_dictionary, Dictionary, DictionaryDataBlock);
1171    as_type_ref!(as_all_null_ref, AllNull, AllNullDataBlock);
1172    as_type_ref!(as_nullable_ref, Nullable, NullableDataBlock);
1173    as_type_ref!(as_fixed_width_ref, FixedWidth, FixedWidthDataBlock);
1174    as_type_ref!(as_fixed_size_list_ref, FixedSizeList, FixedSizeListBlock);
1175    as_type_ref!(as_variable_width_ref, VariableWidth, VariableWidthBlock);
1176    as_type_ref!(as_struct_ref, Struct, StructDataBlock);
1177    as_type_ref!(as_dictionary_ref, Dictionary, DictionaryDataBlock);
1178    as_type_ref_mut!(as_all_null_ref_mut, AllNull, AllNullDataBlock);
1179    as_type_ref_mut!(as_nullable_ref_mut, Nullable, NullableDataBlock);
1180    as_type_ref_mut!(as_fixed_width_ref_mut, FixedWidth, FixedWidthDataBlock);
1181    as_type_ref_mut!(
1182        as_fixed_size_list_ref_mut,
1183        FixedSizeList,
1184        FixedSizeListBlock
1185    );
1186    as_type_ref_mut!(as_variable_width_ref_mut, VariableWidth, VariableWidthBlock);
1187    as_type_ref_mut!(as_struct_ref_mut, Struct, StructDataBlock);
1188    as_type_ref_mut!(as_dictionary_ref_mut, Dictionary, DictionaryDataBlock);
1189}
1190
1191// Methods to convert from Arrow -> DataBlock
1192
1193fn get_byte_range<T: ArrowNativeType>(offsets: &mut LanceBuffer) -> Range<usize> {
1194    let offsets = offsets.borrow_to_typed_slice::<T>();
1195    if offsets.as_ref().is_empty() {
1196        0..0
1197    } else {
1198        offsets.as_ref().first().unwrap().as_usize()..offsets.as_ref().last().unwrap().as_usize()
1199    }
1200}
1201
1202// Given multiple offsets arrays [0, 5, 10], [0, 3, 7], etc. stitch
1203// them together to get [0, 5, 10, 13, 20, ...]
1204//
1205// Also returns the data range referenced by each offset array (may
1206// not be 0..len if there is slicing involved)
1207fn stitch_offsets<T: ArrowNativeType + std::ops::Add<Output = T> + std::ops::Sub<Output = T>>(
1208    offsets: Vec<LanceBuffer>,
1209) -> (LanceBuffer, Vec<Range<usize>>) {
1210    if offsets.is_empty() {
1211        return (LanceBuffer::empty(), Vec::default());
1212    }
1213    let len = offsets.iter().map(|b| b.len()).sum::<usize>();
1214    // Note: we are making a copy here, even if there is only one input, because we want to
1215    // normalize that input if it doesn't start with zero.  This could be micro-optimized out
1216    // if needed.
1217    let mut dest = Vec::with_capacity(len);
1218    let mut byte_ranges = Vec::with_capacity(offsets.len());
1219
1220    // We insert one leading 0 before processing any of the inputs
1221    dest.push(T::from_usize(0).unwrap());
1222
1223    for mut o in offsets.into_iter() {
1224        if !o.is_empty() {
1225            let last_offset = *dest.last().unwrap();
1226            let o = o.borrow_to_typed_slice::<T>();
1227            let start = *o.as_ref().first().unwrap();
1228            // First, we skip the first offset
1229            // Then, we subtract that first offset from each remaining offset
1230            //
1231            // This gives us a 0-based offset array (minus the leading 0)
1232            //
1233            // Then we add the last offset from the previous array to each offset
1234            // which shifts our offset array to the correct position
1235            //
1236            // For example, let's assume the last offset from the previous array
1237            // was 10 and we are given [13, 17, 22].  This means we have two values with
1238            // length 4 (17 - 13) and 5 (22 - 17).  The output from this step will be
1239            // [14, 19].  Combined with our last offset of 10, this gives us [10, 14, 19]
1240            // which is our same two values of length 4 and 5.
1241            dest.extend(o.as_ref()[1..].iter().map(|&x| x + last_offset - start));
1242        }
1243        byte_ranges.push(get_byte_range::<T>(&mut o));
1244    }
1245    (LanceBuffer::reinterpret_vec(dest), byte_ranges)
1246}
1247
1248fn arrow_binary_to_data_block(
1249    arrays: &[ArrayRef],
1250    num_values: u64,
1251    bits_per_offset: u8,
1252) -> DataBlock {
1253    let data_vec = arrays.iter().map(|arr| arr.to_data()).collect::<Vec<_>>();
1254    let bytes_per_offset = bits_per_offset as usize / 8;
1255    let offsets = data_vec
1256        .iter()
1257        .map(|d| {
1258            LanceBuffer::Borrowed(
1259                d.buffers()[0].slice_with_length(d.offset(), (d.len() + 1) * bytes_per_offset),
1260            )
1261        })
1262        .collect::<Vec<_>>();
1263    let (offsets, data_ranges) = if bits_per_offset == 32 {
1264        stitch_offsets::<i32>(offsets)
1265    } else {
1266        stitch_offsets::<i64>(offsets)
1267    };
1268    let data = data_vec
1269        .iter()
1270        .zip(data_ranges)
1271        .map(|(d, byte_range)| {
1272            LanceBuffer::Borrowed(
1273                d.buffers()[1]
1274                    .slice_with_length(byte_range.start, byte_range.end - byte_range.start),
1275            )
1276        })
1277        .collect::<Vec<_>>();
1278    let data = LanceBuffer::concat_into_one(data);
1279    DataBlock::VariableWidth(VariableWidthBlock {
1280        data,
1281        offsets,
1282        bits_per_offset,
1283        num_values,
1284        block_info: BlockInfo::new(),
1285    })
1286}
1287
1288fn encode_flat_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1289    let bytes_per_value = arrays[0].data_type().byte_width();
1290    let mut buffer = Vec::with_capacity(num_values as usize * bytes_per_value);
1291    for arr in arrays {
1292        let data = arr.to_data();
1293        buffer.extend_from_slice(data.buffers()[0].as_slice());
1294    }
1295    LanceBuffer::Owned(buffer)
1296}
1297
1298fn do_encode_bitmap_data(bitmaps: &[BooleanBuffer], num_values: u64) -> LanceBuffer {
1299    let mut builder = BooleanBufferBuilder::new(num_values as usize);
1300
1301    for buf in bitmaps {
1302        builder.append_buffer(buf);
1303    }
1304
1305    let buffer = builder.finish().into_inner();
1306    LanceBuffer::Borrowed(buffer)
1307}
1308
1309fn encode_bitmap_data(arrays: &[ArrayRef], num_values: u64) -> LanceBuffer {
1310    let bitmaps = arrays
1311        .iter()
1312        .map(|arr| arr.as_boolean().values().clone())
1313        .collect::<Vec<_>>();
1314    do_encode_bitmap_data(&bitmaps, num_values)
1315}
1316
1317// Concatenate dictionary arrays.  This is a bit tricky because we might overflow the
1318// index type.  If we do, we need to upscale the indices to a larger type.
1319fn concat_dict_arrays(arrays: &[ArrayRef]) -> ArrayRef {
1320    let value_type = arrays[0].as_any_dictionary().values().data_type();
1321    let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1322    match arrow_select::concat::concat(&array_refs) {
1323        Ok(array) => array,
1324        Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1325            // Slow, but hopefully a corner case.  Optimize later
1326            let upscaled = array_refs
1327                .iter()
1328                .map(|arr| {
1329                    match arrow_cast::cast(
1330                        *arr,
1331                        &DataType::Dictionary(
1332                            Box::new(DataType::UInt32),
1333                            Box::new(value_type.clone()),
1334                        ),
1335                    ) {
1336                        Ok(arr) => arr,
1337                        Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1338                            // Technically I think this means the input type was u64 already
1339                            unimplemented!("Dictionary arrays with more than 2^32 unique values")
1340                        }
1341                        err => err.unwrap(),
1342                    }
1343                })
1344                .collect::<Vec<_>>();
1345            let array_refs = upscaled.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
1346            // Can still fail if concat pushes over u32 boundary
1347            match arrow_select::concat::concat(&array_refs) {
1348                Ok(array) => array,
1349                Err(arrow_schema::ArrowError::DictionaryKeyOverflowError) => {
1350                    unimplemented!("Dictionary arrays with more than 2^32 unique values")
1351                }
1352                err => err.unwrap(),
1353            }
1354        }
1355        // Shouldn't be any other possible errors in concat
1356        err => err.unwrap(),
1357    }
1358}
1359
1360fn max_index_val(index_type: &DataType) -> u64 {
1361    match index_type {
1362        DataType::Int8 => i8::MAX as u64,
1363        DataType::Int16 => i16::MAX as u64,
1364        DataType::Int32 => i32::MAX as u64,
1365        DataType::Int64 => i64::MAX as u64,
1366        DataType::UInt8 => u8::MAX as u64,
1367        DataType::UInt16 => u16::MAX as u64,
1368        DataType::UInt32 => u32::MAX as u64,
1369        DataType::UInt64 => u64::MAX,
1370        _ => panic!("Invalid dictionary index type"),
1371    }
1372}
1373
1374// If we get multiple dictionary arrays and they don't all have the same dictionary
1375// then we need to normalize the indices.  Otherwise we might have something like:
1376//
1377// First chunk ["hello", "foo"], [0, 0, 1, 1, 1]
1378// Second chunk ["bar", "world"], [0, 1, 0, 1, 1]
1379//
1380// If we simply encode as ["hello", "foo", "bar", "world"], [0, 0, 1, 1, 1, 0, 1, 0, 1, 1]
1381// then we will get the wrong answer because the dictionaries were not merged and the indices
1382// were not remapped.
1383//
1384// A simple way to do this today is to just concatenate all the arrays.  This is because
1385// arrow's dictionary concatenation function already has the logic to merge dictionaries.
1386//
1387// TODO: We could be more efficient here by checking if the dictionaries are the same
1388//       Also, if they aren't, we can possibly do something cheaper than concatenating
1389//
1390// In addition, we want to normalize the representation of nulls.  The cheapest thing to
1391// do (space-wise) is to put the nulls in the dictionary.
1392fn arrow_dictionary_to_data_block(arrays: &[ArrayRef], validity: Option<NullBuffer>) -> DataBlock {
1393    let array = concat_dict_arrays(arrays);
1394    let array_dict = array.as_any_dictionary();
1395    let mut indices = array_dict.keys();
1396    let num_values = indices.len() as u64;
1397    let mut values = array_dict.values().clone();
1398    // Placeholder, if we need to upcast, we will initialize this and set `indices` to refer to it
1399    let mut upcast = None;
1400
1401    // TODO: Should we just always normalize indices to u32?  That would make logic simpler
1402    // and we're going to bitpack them soon anyways
1403
1404    let indices_block = if let Some(validity) = validity {
1405        // If there is validity then we find the first invalid index in the dictionary values, inserting
1406        // a new value if we need to.  Then we change all indices to point to that value.  This way we
1407        // never need to store nullability of the indices.
1408        let mut first_invalid_index = None;
1409        if let Some(values_validity) = values.nulls() {
1410            first_invalid_index = (!values_validity.inner()).set_indices().next();
1411        }
1412        let first_invalid_index = first_invalid_index.unwrap_or_else(|| {
1413            let null_arr = new_null_array(values.data_type(), 1);
1414            values = arrow_select::concat::concat(&[values.as_ref(), null_arr.as_ref()]).unwrap();
1415            let null_index = values.len() - 1;
1416            let max_index_val = max_index_val(indices.data_type());
1417            if null_index as u64 > max_index_val {
1418                // Widen the index type
1419                if max_index_val >= u32::MAX as u64 {
1420                    unimplemented!("Dictionary arrays with 2^32 unique value (or more) and a null")
1421                }
1422                upcast = Some(arrow_cast::cast(indices, &DataType::UInt32).unwrap());
1423                indices = upcast.as_ref().unwrap();
1424            }
1425            null_index
1426        });
1427        // This can't fail since we already checked for fit
1428        let null_index_arr = arrow_cast::cast(
1429            &UInt64Array::from(vec![first_invalid_index as u64]),
1430            indices.data_type(),
1431        )
1432        .unwrap();
1433
1434        let bytes_per_index = indices.data_type().byte_width();
1435        let bits_per_index = bytes_per_index as u64 * 8;
1436
1437        let null_index_arr = null_index_arr.into_data();
1438        let null_index_bytes = &null_index_arr.buffers()[0];
1439        // Need to make a copy here since indices isn't mutable, could be avoided in theory
1440        let mut indices_bytes = indices.to_data().buffers()[0].to_vec();
1441        for invalid_idx in (!validity.inner()).set_indices() {
1442            indices_bytes[invalid_idx * bytes_per_index..(invalid_idx + 1) * bytes_per_index]
1443                .copy_from_slice(null_index_bytes.as_slice());
1444        }
1445        FixedWidthDataBlock {
1446            data: LanceBuffer::Owned(indices_bytes),
1447            bits_per_value: bits_per_index,
1448            num_values,
1449            block_info: BlockInfo::new(),
1450        }
1451    } else {
1452        FixedWidthDataBlock {
1453            data: LanceBuffer::Borrowed(indices.to_data().buffers()[0].clone()),
1454            bits_per_value: indices.data_type().byte_width() as u64 * 8,
1455            num_values,
1456            block_info: BlockInfo::new(),
1457        }
1458    };
1459
1460    let items = DataBlock::from(values);
1461    DataBlock::Dictionary(DictionaryDataBlock {
1462        indices: indices_block,
1463        dictionary: Box::new(items),
1464    })
1465}
1466
1467enum Nullability {
1468    None,
1469    All,
1470    Some(NullBuffer),
1471}
1472
1473impl Nullability {
1474    fn to_option(&self) -> Option<NullBuffer> {
1475        match self {
1476            Self::Some(nulls) => Some(nulls.clone()),
1477            _ => None,
1478        }
1479    }
1480}
1481
1482fn extract_nulls(arrays: &[ArrayRef], num_values: u64) -> Nullability {
1483    let mut has_nulls = false;
1484    let nulls_and_lens = arrays
1485        .iter()
1486        .map(|arr| {
1487            let nulls = arr.logical_nulls();
1488            has_nulls |= nulls.is_some();
1489            (nulls, arr.len())
1490        })
1491        .collect::<Vec<_>>();
1492    if !has_nulls {
1493        return Nullability::None;
1494    }
1495    let mut builder = BooleanBufferBuilder::new(num_values as usize);
1496    let mut num_nulls = 0;
1497    for (null, len) in nulls_and_lens {
1498        if let Some(null) = null {
1499            num_nulls += null.null_count();
1500            builder.append_buffer(&null.into_inner());
1501        } else {
1502            builder.append_n(len, true);
1503        }
1504    }
1505    if num_nulls == num_values as usize {
1506        Nullability::All
1507    } else {
1508        Nullability::Some(NullBuffer::new(builder.finish()))
1509    }
1510}
1511
1512impl DataBlock {
1513    pub fn from_arrays(arrays: &[ArrayRef], num_values: u64) -> Self {
1514        if arrays.is_empty() || num_values == 0 {
1515            return Self::AllNull(AllNullDataBlock { num_values: 0 });
1516        }
1517
1518        let data_type = arrays[0].data_type();
1519        let nulls = extract_nulls(arrays, num_values);
1520
1521        if let Nullability::All = nulls {
1522            return Self::AllNull(AllNullDataBlock { num_values });
1523        }
1524
1525        let mut encoded = match data_type {
1526            DataType::Binary | DataType::Utf8 => arrow_binary_to_data_block(arrays, num_values, 32),
1527            DataType::BinaryView | DataType::Utf8View => {
1528                todo!()
1529            }
1530            DataType::LargeBinary | DataType::LargeUtf8 => {
1531                arrow_binary_to_data_block(arrays, num_values, 64)
1532            }
1533            DataType::Boolean => {
1534                let data = encode_bitmap_data(arrays, num_values);
1535                Self::FixedWidth(FixedWidthDataBlock {
1536                    data,
1537                    bits_per_value: 1,
1538                    num_values,
1539                    block_info: BlockInfo::new(),
1540                })
1541            }
1542            DataType::Date32
1543            | DataType::Date64
1544            | DataType::Decimal128(_, _)
1545            | DataType::Decimal256(_, _)
1546            | DataType::Duration(_)
1547            | DataType::FixedSizeBinary(_)
1548            | DataType::Float16
1549            | DataType::Float32
1550            | DataType::Float64
1551            | DataType::Int16
1552            | DataType::Int32
1553            | DataType::Int64
1554            | DataType::Int8
1555            | DataType::Interval(_)
1556            | DataType::Time32(_)
1557            | DataType::Time64(_)
1558            | DataType::Timestamp(_, _)
1559            | DataType::UInt16
1560            | DataType::UInt32
1561            | DataType::UInt64
1562            | DataType::UInt8 => {
1563                let data = encode_flat_data(arrays, num_values);
1564                Self::FixedWidth(FixedWidthDataBlock {
1565                    data,
1566                    bits_per_value: data_type.byte_width() as u64 * 8,
1567                    num_values,
1568                    block_info: BlockInfo::new(),
1569                })
1570            }
1571            DataType::Null => Self::AllNull(AllNullDataBlock { num_values }),
1572            DataType::Dictionary(_, _) => arrow_dictionary_to_data_block(arrays, nulls.to_option()),
1573            DataType::Struct(fields) => {
1574                let structs = arrays.iter().map(|arr| arr.as_struct()).collect::<Vec<_>>();
1575                let mut children = Vec::with_capacity(fields.len());
1576                for child_idx in 0..fields.len() {
1577                    let child_vec = structs
1578                        .iter()
1579                        .map(|s| s.column(child_idx).clone())
1580                        .collect::<Vec<_>>();
1581                    children.push(Self::from_arrays(&child_vec, num_values));
1582                }
1583                Self::Struct(StructDataBlock {
1584                    children,
1585                    block_info: BlockInfo::default(),
1586                })
1587            }
1588            DataType::FixedSizeList(_, dim) => {
1589                let children = arrays
1590                    .iter()
1591                    .map(|arr| arr.as_fixed_size_list().values().clone())
1592                    .collect::<Vec<_>>();
1593                let child_block = Self::from_arrays(&children, num_values * *dim as u64);
1594                Self::FixedSizeList(FixedSizeListBlock {
1595                    child: Box::new(child_block),
1596                    dimension: *dim as u64,
1597                })
1598            }
1599            DataType::LargeList(_)
1600            | DataType::List(_)
1601            | DataType::ListView(_)
1602            | DataType::LargeListView(_)
1603            | DataType::Map(_, _)
1604            | DataType::RunEndEncoded(_, _)
1605            | DataType::Union(_, _) => {
1606                panic!(
1607                    "Field with data type {} cannot be converted to data block",
1608                    data_type
1609                )
1610            }
1611        };
1612
1613        // compute statistics
1614        encoded.compute_stat();
1615
1616        if !matches!(data_type, DataType::Dictionary(_, _)) {
1617            match nulls {
1618                Nullability::None => encoded,
1619                Nullability::Some(nulls) => Self::Nullable(NullableDataBlock {
1620                    data: Box::new(encoded),
1621                    nulls: LanceBuffer::Borrowed(nulls.into_inner().into_inner()),
1622                    block_info: BlockInfo::new(),
1623                }),
1624                _ => unreachable!(),
1625            }
1626        } else {
1627            // Dictionaries already insert the nulls into the dictionary items
1628            encoded
1629        }
1630    }
1631
1632    pub fn from_array<T: Array + 'static>(array: T) -> Self {
1633        let num_values = array.len();
1634        Self::from_arrays(&[Arc::new(array)], num_values as u64)
1635    }
1636}
1637
1638impl From<ArrayRef> for DataBlock {
1639    fn from(array: ArrayRef) -> Self {
1640        let num_values = array.len() as u64;
1641        Self::from_arrays(&[array], num_values)
1642    }
1643}
1644
1645pub trait DataBlockBuilderImpl: std::fmt::Debug {
1646    fn append(&mut self, data_block: &DataBlock, selection: Range<u64>);
1647    fn finish(self: Box<Self>) -> DataBlock;
1648}
1649
1650#[derive(Debug)]
1651pub struct DataBlockBuilder {
1652    estimated_size_bytes: u64,
1653    builder: Option<Box<dyn DataBlockBuilderImpl>>,
1654}
1655
1656impl DataBlockBuilder {
1657    pub fn with_capacity_estimate(estimated_size_bytes: u64) -> Self {
1658        Self {
1659            estimated_size_bytes,
1660            builder: None,
1661        }
1662    }
1663
1664    fn get_builder(&mut self, block: &DataBlock) -> &mut dyn DataBlockBuilderImpl {
1665        if self.builder.is_none() {
1666            self.builder = Some(block.make_builder(self.estimated_size_bytes));
1667        }
1668        self.builder.as_mut().unwrap().as_mut()
1669    }
1670
1671    pub fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
1672        self.get_builder(data_block).append(data_block, selection);
1673    }
1674
1675    pub fn finish(self) -> DataBlock {
1676        let builder = self.builder.expect("DataBlockBuilder didn't see any data");
1677        builder.finish()
1678    }
1679}
1680
1681#[cfg(test)]
1682mod tests {
1683    use std::sync::Arc;
1684
1685    use arrow::datatypes::{Int32Type, Int8Type};
1686    use arrow_array::{
1687        make_array, new_null_array, ArrayRef, DictionaryArray, Int8Array, LargeBinaryArray,
1688        StringArray, UInt16Array, UInt8Array,
1689    };
1690    use arrow_buffer::{BooleanBuffer, NullBuffer};
1691
1692    use arrow_schema::{DataType, Field, Fields};
1693    use lance_datagen::{array, ArrayGeneratorExt, RowCount, DEFAULT_SEED};
1694    use rand::SeedableRng;
1695
1696    use crate::buffer::LanceBuffer;
1697
1698    use super::{AllNullDataBlock, DataBlock};
1699
1700    use arrow::compute::concat;
1701    use arrow_array::Array;
1702
1703    #[test]
1704    fn test_sliced_to_data_block() {
1705        let ints = UInt16Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
1706        let ints = ints.slice(2, 4);
1707        let data = DataBlock::from_array(ints);
1708
1709        let fixed_data = data.as_fixed_width().unwrap();
1710        assert_eq!(fixed_data.num_values, 4);
1711        assert_eq!(fixed_data.data.len(), 8);
1712
1713        let nullable_ints =
1714            UInt16Array::from(vec![Some(0), None, Some(2), None, Some(4), None, Some(6)]);
1715        let nullable_ints = nullable_ints.slice(1, 3);
1716        let data = DataBlock::from_array(nullable_ints);
1717
1718        let nullable = data.as_nullable().unwrap();
1719        assert_eq!(nullable.nulls, LanceBuffer::Owned(vec![0b00000010]));
1720    }
1721
1722    #[test]
1723    fn test_string_to_data_block() {
1724        // Converting string arrays that contain nulls to DataBlock
1725        let strings1 = StringArray::from(vec![Some("hello"), None, Some("world")]);
1726        let strings2 = StringArray::from(vec![Some("a"), Some("b")]);
1727        let strings3 = StringArray::from(vec![Option::<&'static str>::None, None]);
1728
1729        let arrays = &[strings1, strings2, strings3]
1730            .iter()
1731            .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1732            .collect::<Vec<_>>();
1733
1734        let block = DataBlock::from_arrays(arrays, 7);
1735
1736        assert_eq!(block.num_values(), 7);
1737        let block = block.as_nullable().unwrap();
1738
1739        assert_eq!(block.nulls, LanceBuffer::Owned(vec![0b00011101]));
1740
1741        let data = block.data.as_variable_width().unwrap();
1742        assert_eq!(
1743            data.offsets,
1744            LanceBuffer::reinterpret_vec(vec![0, 5, 5, 10, 11, 12, 12, 12])
1745        );
1746
1747        assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworldab"));
1748
1749        // Converting string arrays that do not contain nulls to DataBlock
1750        let strings1 = StringArray::from(vec![Some("a"), Some("bc")]);
1751        let strings2 = StringArray::from(vec![Some("def")]);
1752
1753        let arrays = &[strings1, strings2]
1754            .iter()
1755            .map(|arr| Arc::new(arr.clone()) as ArrayRef)
1756            .collect::<Vec<_>>();
1757
1758        let block = DataBlock::from_arrays(arrays, 3);
1759
1760        assert_eq!(block.num_values(), 3);
1761        // Should be no nullable wrapper
1762        let data = block.as_variable_width().unwrap();
1763        assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(vec![0, 1, 3, 6]));
1764        assert_eq!(data.data, LanceBuffer::copy_slice(b"abcdef"));
1765    }
1766
1767    #[test]
1768    fn test_string_sliced() {
1769        let check = |arr: Vec<StringArray>, expected_off: Vec<i32>, expected_data: &[u8]| {
1770            let arrs = arr
1771                .into_iter()
1772                .map(|a| Arc::new(a) as ArrayRef)
1773                .collect::<Vec<_>>();
1774            let num_rows = arrs.iter().map(|a| a.len()).sum::<usize>() as u64;
1775            let data = DataBlock::from_arrays(&arrs, num_rows);
1776
1777            assert_eq!(data.num_values(), num_rows);
1778
1779            let data = data.as_variable_width().unwrap();
1780            assert_eq!(data.offsets, LanceBuffer::reinterpret_vec(expected_off));
1781            assert_eq!(data.data, LanceBuffer::copy_slice(expected_data));
1782        };
1783
1784        let string = StringArray::from(vec![Some("hello"), Some("world")]);
1785        check(vec![string.slice(1, 1)], vec![0, 5], b"world");
1786        check(vec![string.slice(0, 1)], vec![0, 5], b"hello");
1787        check(
1788            vec![string.slice(0, 1), string.slice(1, 1)],
1789            vec![0, 5, 10],
1790            b"helloworld",
1791        );
1792
1793        let string2 = StringArray::from(vec![Some("foo"), Some("bar")]);
1794        check(
1795            vec![string.slice(0, 1), string2.slice(0, 1)],
1796            vec![0, 5, 8],
1797            b"hellofoo",
1798        );
1799    }
1800
1801    #[test]
1802    fn test_large() {
1803        let arr = LargeBinaryArray::from_vec(vec![b"hello", b"world"]);
1804        let data = DataBlock::from_array(arr);
1805
1806        assert_eq!(data.num_values(), 2);
1807        let data = data.as_variable_width().unwrap();
1808        assert_eq!(data.bits_per_offset, 64);
1809        assert_eq!(data.num_values, 2);
1810        assert_eq!(data.data, LanceBuffer::copy_slice(b"helloworld"));
1811        assert_eq!(
1812            data.offsets,
1813            LanceBuffer::reinterpret_vec(vec![0_u64, 5, 10])
1814        );
1815    }
1816
1817    #[test]
1818    fn test_dictionary_indices_normalized() {
1819        let arr1 = DictionaryArray::<Int8Type>::from_iter([Some("a"), Some("a"), Some("b")]);
1820        let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("b"), Some("c")]);
1821
1822        let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1823
1824        assert_eq!(data.num_values(), 5);
1825        let data = data.as_dictionary().unwrap();
1826        let indices = data.indices;
1827        assert_eq!(indices.bits_per_value, 8);
1828        assert_eq!(indices.num_values, 5);
1829        assert_eq!(
1830            indices.data,
1831            // You might expect 0, 0, 1, 1, 2 but it seems that arrow's dictionary concat does
1832            // not actually collapse dictionaries.  This is an arrow problem however, and we don't
1833            // need to fix it here.
1834            LanceBuffer::reinterpret_vec::<i8>(vec![0, 0, 1, 2, 3])
1835        );
1836
1837        let items = data.dictionary.as_variable_width().unwrap();
1838        assert_eq!(items.bits_per_offset, 32);
1839        assert_eq!(items.num_values, 4);
1840        assert_eq!(items.data, LanceBuffer::copy_slice(b"abbc"));
1841        assert_eq!(
1842            items.offsets,
1843            LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 4],)
1844        );
1845    }
1846
1847    #[test]
1848    fn test_dictionary_nulls() {
1849        // Test both ways of encoding nulls
1850
1851        // By default, nulls get encoded into the indices
1852        let arr1 = DictionaryArray::<Int8Type>::from_iter([None, Some("a"), Some("b")]);
1853        let arr2 = DictionaryArray::<Int8Type>::from_iter([Some("c"), None]);
1854
1855        let data = DataBlock::from_arrays(&[Arc::new(arr1), Arc::new(arr2)], 5);
1856
1857        let check_common = |data: DataBlock| {
1858            assert_eq!(data.num_values(), 5);
1859            let dict = data.as_dictionary().unwrap();
1860
1861            let nullable_items = dict.dictionary.as_nullable().unwrap();
1862            assert_eq!(nullable_items.nulls, LanceBuffer::Owned(vec![0b00000111]));
1863            assert_eq!(nullable_items.data.num_values(), 4);
1864
1865            let items = nullable_items.data.as_variable_width().unwrap();
1866            assert_eq!(items.bits_per_offset, 32);
1867            assert_eq!(items.num_values, 4);
1868            assert_eq!(items.data, LanceBuffer::copy_slice(b"abc"));
1869            assert_eq!(
1870                items.offsets,
1871                LanceBuffer::reinterpret_vec(vec![0, 1, 2, 3, 3],)
1872            );
1873
1874            let indices = dict.indices;
1875            assert_eq!(indices.bits_per_value, 8);
1876            assert_eq!(indices.num_values, 5);
1877            assert_eq!(
1878                indices.data,
1879                LanceBuffer::reinterpret_vec::<i8>(vec![3, 0, 1, 2, 3])
1880            );
1881        };
1882        check_common(data);
1883
1884        // However, we can manually create a dictionary where nulls are in the dictionary
1885        let items = StringArray::from(vec![Some("a"), Some("b"), Some("c"), None]);
1886        let indices = Int8Array::from(vec![Some(3), Some(0), Some(1), Some(2), Some(3)]);
1887        let dict = DictionaryArray::new(indices, Arc::new(items));
1888
1889        let data = DataBlock::from_array(dict);
1890
1891        check_common(data);
1892    }
1893
1894    #[test]
1895    fn test_dictionary_cannot_add_null() {
1896        // 256 unique strings
1897        let items = StringArray::from(
1898            (0..256)
1899                .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1900                .collect::<Vec<_>>(),
1901        );
1902        // 257 indices, covering the whole range, plus one null
1903        let indices = UInt8Array::from(
1904            (0..=256)
1905                .map(|i| if i == 256 { None } else { Some(i as u8) })
1906                .collect::<Vec<_>>(),
1907        );
1908        // We want to normalize this by pushing nulls into the dictionary, but we cannot because
1909        // the dictionary is too large for the index type
1910        let dict = DictionaryArray::new(indices, Arc::new(items));
1911        let data = DataBlock::from_array(dict);
1912
1913        assert_eq!(data.num_values(), 257);
1914
1915        let dict = data.as_dictionary().unwrap();
1916
1917        assert_eq!(dict.indices.bits_per_value, 32);
1918        assert_eq!(
1919            dict.indices.data,
1920            LanceBuffer::reinterpret_vec((0_u32..257).collect::<Vec<_>>())
1921        );
1922
1923        let nullable_items = dict.dictionary.as_nullable().unwrap();
1924        let null_buffer = NullBuffer::new(BooleanBuffer::new(
1925            nullable_items.nulls.into_buffer(),
1926            0,
1927            257,
1928        ));
1929        for i in 0..256 {
1930            assert!(!null_buffer.is_null(i));
1931        }
1932        assert!(null_buffer.is_null(256));
1933
1934        assert_eq!(
1935            nullable_items.data.as_variable_width().unwrap().data.len(),
1936            32640
1937        );
1938    }
1939
1940    #[test]
1941    fn test_all_null() {
1942        for data_type in [
1943            DataType::UInt32,
1944            DataType::FixedSizeBinary(2),
1945            DataType::List(Arc::new(Field::new("item", DataType::UInt32, true))),
1946            DataType::Struct(Fields::from(vec![Field::new("a", DataType::UInt32, true)])),
1947        ] {
1948            let block = DataBlock::AllNull(AllNullDataBlock { num_values: 10 });
1949            let arr = block.into_arrow(data_type.clone(), true).unwrap();
1950            let arr = make_array(arr);
1951            let expected = new_null_array(&data_type, 10);
1952            assert_eq!(&arr, &expected);
1953        }
1954    }
1955
1956    #[test]
1957    fn test_dictionary_cannot_concatenate() {
1958        // 256 unique strings
1959        let items = StringArray::from(
1960            (0..256)
1961                .map(|i| Some(String::from_utf8(vec![0; i]).unwrap()))
1962                .collect::<Vec<_>>(),
1963        );
1964        // 256 different unique strings
1965        let other_items = StringArray::from(
1966            (0..256)
1967                .map(|i| Some(String::from_utf8(vec![1; i + 1]).unwrap()))
1968                .collect::<Vec<_>>(),
1969        );
1970        let indices = UInt8Array::from_iter_values(0..=255);
1971        let dict1 = DictionaryArray::new(indices.clone(), Arc::new(items));
1972        let dict2 = DictionaryArray::new(indices, Arc::new(other_items));
1973        let data = DataBlock::from_arrays(&[Arc::new(dict1), Arc::new(dict2)], 512);
1974        assert_eq!(data.num_values(), 512);
1975
1976        let dict = data.as_dictionary().unwrap();
1977
1978        assert_eq!(dict.indices.bits_per_value, 32);
1979        assert_eq!(
1980            dict.indices.data,
1981            LanceBuffer::reinterpret_vec::<u32>((0..512).collect::<Vec<_>>())
1982        );
1983        // What fun: 0 + 1 + .. + 255 + 1 + 2 + .. + 256 = 2^16
1984        assert_eq!(
1985            dict.dictionary.as_variable_width().unwrap().data.len(),
1986            65536
1987        );
1988    }
1989
1990    #[test]
1991    fn test_data_size() {
1992        let mut rng = rand_xoshiro::Xoshiro256PlusPlus::seed_from_u64(DEFAULT_SEED.0);
1993        // test data_size() when input has no nulls
1994        let mut gen = array::rand::<Int32Type>().with_nulls(&[false, false, false]);
1995
1996        let arr = gen.generate(RowCount::from(3), &mut rng).unwrap();
1997        let block = DataBlock::from_array(arr.clone());
1998        assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
1999
2000        let arr = gen.generate(RowCount::from(400), &mut rng).unwrap();
2001        let block = DataBlock::from_array(arr.clone());
2002        assert!(block.data_size() == arr.get_buffer_memory_size() as u64);
2003
2004        // test data_size() when input has nulls
2005        let mut gen = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
2006        let arr = gen.generate(RowCount::from(3), &mut rng).unwrap();
2007        let block = DataBlock::from_array(arr.clone());
2008
2009        let array_data = arr.to_data();
2010        let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
2011        // the NullBuffer.len() returns the length in bits so we divide_round_up by 8
2012        let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
2013        assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
2014
2015        let arr = gen.generate(RowCount::from(400), &mut rng).unwrap();
2016        let block = DataBlock::from_array(arr.clone());
2017
2018        let array_data = arr.to_data();
2019        let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
2020        let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
2021        assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
2022
2023        let mut gen = array::rand::<Int32Type>().with_nulls(&[true, true, false]);
2024        let arr = gen.generate(RowCount::from(3), &mut rng).unwrap();
2025        let block = DataBlock::from_array(arr.clone());
2026
2027        let array_data = arr.to_data();
2028        let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
2029        let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
2030        assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
2031
2032        let arr = gen.generate(RowCount::from(400), &mut rng).unwrap();
2033        let block = DataBlock::from_array(arr.clone());
2034
2035        let array_data = arr.to_data();
2036        let total_buffer_size: usize = array_data.buffers().iter().map(|buffer| buffer.len()).sum();
2037        let array_nulls_size_in_bytes = arr.nulls().unwrap().len().div_ceil(8);
2038        assert!(block.data_size() == (total_buffer_size + array_nulls_size_in_bytes) as u64);
2039
2040        let mut gen = array::rand::<Int32Type>().with_nulls(&[false, true, false]);
2041        let arr1 = gen.generate(RowCount::from(3), &mut rng).unwrap();
2042        let arr2 = gen.generate(RowCount::from(3), &mut rng).unwrap();
2043        let arr3 = gen.generate(RowCount::from(3), &mut rng).unwrap();
2044        let block = DataBlock::from_arrays(&[arr1.clone(), arr2.clone(), arr3.clone()], 9);
2045
2046        let concatenated_array = concat(&[
2047            &*Arc::new(arr1.clone()) as &dyn Array,
2048            &*Arc::new(arr2.clone()) as &dyn Array,
2049            &*Arc::new(arr3.clone()) as &dyn Array,
2050        ])
2051        .unwrap();
2052        let total_buffer_size: usize = concatenated_array
2053            .to_data()
2054            .buffers()
2055            .iter()
2056            .map(|buffer| buffer.len())
2057            .sum();
2058
2059        let total_nulls_size_in_bytes = concatenated_array.nulls().unwrap().len().div_ceil(8);
2060        assert!(block.data_size() == (total_buffer_size + total_nulls_size_in_bytes) as u64);
2061    }
2062}