lance_encoding/
encoder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3use std::{collections::HashMap, env, sync::Arc};
4
5use arrow::array::AsArray;
6use arrow::datatypes::UInt64Type;
7use arrow_array::{Array, ArrayRef, RecordBatch, UInt8Array};
8use arrow_schema::DataType;
9use bytes::{Bytes, BytesMut};
10use futures::future::BoxFuture;
11use lance_core::datatypes::{
12    Field, Schema, BLOB_DESC_FIELD, BLOB_META_KEY, COMPRESSION_LEVEL_META_KEY,
13    COMPRESSION_META_KEY, PACKED_STRUCT_LEGACY_META_KEY, PACKED_STRUCT_META_KEY,
14};
15use lance_core::utils::bit::{is_pwr_two, pad_bytes_to};
16use lance_core::{Error, Result};
17use snafu::location;
18
19use crate::buffer::LanceBuffer;
20use crate::data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock};
21use crate::decoder::PageEncoding;
22use crate::encodings::logical::blob::BlobFieldEncoder;
23use crate::encodings::logical::list::ListStructuralEncoder;
24use crate::encodings::logical::primitive::PrimitiveStructuralEncoder;
25use crate::encodings::logical::r#struct::StructFieldEncoder;
26use crate::encodings::logical::r#struct::StructStructuralEncoder;
27use crate::encodings::physical::binary::{BinaryMiniBlockEncoder, VariableEncoder};
28use crate::encodings::physical::bitpack_fastlanes::BitpackedForNonNegArrayEncoder;
29use crate::encodings::physical::bitpack_fastlanes::{
30    compute_compressed_bit_width_for_non_neg, InlineBitpacking,
31};
32use crate::encodings::physical::block_compress::{
33    CompressedBufferEncoder, CompressionConfig, CompressionScheme,
34};
35use crate::encodings::physical::dictionary::AlreadyDictionaryEncoder;
36use crate::encodings::physical::fsst::{
37    FsstArrayEncoder, FsstMiniBlockEncoder, FsstPerValueEncoder,
38};
39use crate::encodings::physical::packed_struct::PackedStructEncoder;
40use crate::encodings::physical::struct_encoding::PackedStructFixedWidthMiniBlockEncoder;
41use crate::format::ProtobufUtils;
42use crate::repdef::RepDefBuilder;
43use crate::statistics::{GetStat, Stat};
44use crate::version::LanceFileVersion;
45use crate::{
46    decoder::{ColumnInfo, PageInfo},
47    encodings::{
48        logical::{list::ListFieldEncoder, primitive::PrimitiveFieldEncoder},
49        physical::{
50            basic::BasicEncoder, binary::BinaryEncoder, dictionary::DictionaryEncoder,
51            fixed_size_binary::FixedSizeBinaryEncoder, fixed_size_list::FslEncoder,
52            value::ValueEncoder,
53        },
54    },
55    format::pb,
56};
57use fsst::fsst::{FSST_LEAST_INPUT_MAX_LENGTH, FSST_LEAST_INPUT_SIZE};
58
59use hyperloglogplus::{HyperLogLog, HyperLogLogPlus};
60use std::collections::hash_map::RandomState;
61
62/// The minimum alignment for a page buffer.  Writers must respect this.
63pub const MIN_PAGE_BUFFER_ALIGNMENT: u64 = 8;
64
65/// An encoded array
66///
67/// Maps to a single Arrow array
68///
69/// This contains the encoded data as well as a description of the encoding that was applied which
70/// can be used to decode the data later.
71#[derive(Debug)]
72pub struct EncodedArray {
73    /// The encoded buffers
74    pub data: DataBlock,
75    /// A description of the encoding used to encode the array
76    pub encoding: pb::ArrayEncoding,
77}
78
79impl EncodedArray {
80    pub fn new(data: DataBlock, encoding: pb::ArrayEncoding) -> Self {
81        Self { data, encoding }
82    }
83
84    pub fn into_buffers(self) -> (Vec<LanceBuffer>, pb::ArrayEncoding) {
85        let buffers = self.data.into_buffers();
86        (buffers, self.encoding)
87    }
88}
89
90/// An encoded page of data
91///
92/// Maps to a top-level array
93///
94/// For example, FixedSizeList<Int32> will have two EncodedArray instances and one EncodedPage
95#[derive(Debug)]
96pub struct EncodedPage {
97    // The encoded page buffers
98    pub data: Vec<LanceBuffer>,
99    // A description of the encoding used to encode the page
100    pub description: PageEncoding,
101    /// The number of rows in the encoded page
102    pub num_rows: u64,
103    /// The top-level row number of the first row in the page
104    ///
105    /// Generally the number of "top-level" rows and the number of rows are the same.  However,
106    /// when there is repetition (list/fixed-size-list) there will be more or less items than rows.
107    ///
108    /// A top-level row can never be split across a page boundary.
109    pub row_number: u64,
110    /// The index of the column
111    pub column_idx: u32,
112}
113
114#[derive(Debug)]
115pub struct EncodedBufferMeta {
116    pub bits_per_value: u64,
117
118    pub bitpacking: Option<BitpackingBufferMeta>,
119
120    pub compression_scheme: Option<CompressionScheme>,
121}
122
123#[derive(Debug)]
124pub struct BitpackingBufferMeta {
125    pub bits_per_value: u64,
126
127    pub signed: bool,
128}
129
130/// Encodes data from one format to another (hopefully more compact or useful) format
131///
132/// The array encoder must be Send + Sync.  Encoding is always done on its own
133/// thread task in the background and there could potentially be multiple encode
134/// tasks running for a column at once.
135pub trait ArrayEncoder: std::fmt::Debug + Send + Sync {
136    /// Encode data
137    ///
138    /// The result should contain a description of the encoding that was chosen.
139    /// This can be used to decode the data later.
140    fn encode(
141        &self,
142        data: DataBlock,
143        data_type: &DataType,
144        buffer_index: &mut u32,
145    ) -> Result<EncodedArray>;
146}
147
148pub const MAX_MINIBLOCK_BYTES: u64 = 8 * 1024 - 6;
149pub const MAX_MINIBLOCK_VALUES: u64 = 4096;
150
151/// Page data that has been compressed into a series of chunks put into
152/// a single buffer.
153#[derive(Debug)]
154pub struct MiniBlockCompressed {
155    /// The buffers of compressed data
156    pub data: Vec<LanceBuffer>,
157    /// Describes the size of each chunk
158    pub chunks: Vec<MiniBlockChunk>,
159    /// The number of values in the entire page
160    pub num_values: u64,
161}
162
163/// Describes the size of a mini-block chunk of data
164///
165/// Mini-block chunks are designed to be small (just a few disk sectors)
166/// and contain a power-of-two number of values (except for the last chunk)
167///
168/// To enforce this we limit a chunk to 4Ki values and slightly less than
169/// 8KiB of compressed data.  This means that even in the extreme case
170/// where we have 4 bytes of rep/def then we will have at most 24KiB of
171/// data (values, repetition, and definition) per mini-block.
172#[derive(Debug)]
173pub struct MiniBlockChunk {
174    // The size in bytes of each buffer in the chunk.
175    //
176    // The total size must be less than or equal to 8Ki - 6 (8188)
177    pub buffer_sizes: Vec<u16>,
178    // The log (base 2) of the number of values in the chunk.  If this is the final chunk
179    // then this should be 0 (the number of values will be calculated by subtracting the
180    // size of all other chunks from the total size of the page)
181    //
182    // For example, 1 would mean there are 2 values in the chunk and 12 would mean there
183    // are 4Ki values in the chunk.
184    //
185    // This must be <= 12 (i.e. <= 4096 values)
186    pub log_num_values: u8,
187}
188
189impl MiniBlockChunk {
190    /// Gets the number of values in this block
191    ///
192    /// This requires `vals_in_prev_blocks` and `total_num_values` because the
193    /// last block in a page is a special case which stores 0 for log_num_values
194    /// and, in that case, the number of values is determined by subtracting
195    /// `vals_in_prev_blocks` from `total_num_values`
196    pub fn num_values(&self, vals_in_prev_blocks: u64, total_num_values: u64) -> u64 {
197        if self.log_num_values == 0 {
198            total_num_values - vals_in_prev_blocks
199        } else {
200            1 << self.log_num_values
201        }
202    }
203}
204
205/// Trait for compression algorithms that are suitable for use in the miniblock structural encoding
206///
207/// These compression algorithms should be capable of encoding the data into small chunks
208/// where each chunk (except the last) has 2^N values (N can vary between chunks)
209pub trait MiniBlockCompressor: std::fmt::Debug + Send + Sync {
210    /// Compress a `page` of data into multiple chunks
211    ///
212    /// See [`MiniBlockCompressed`] for details on how chunks should be sized.
213    ///
214    /// This method also returns a description of the encoding applied that will be
215    /// used at decode time to read the data.
216    fn compress(&self, page: DataBlock) -> Result<(MiniBlockCompressed, pb::ArrayEncoding)>;
217}
218
219/// Per-value compression must either:
220///
221/// A single buffer of fixed-width values
222/// A single buffer of value data and a buffer of offsets
223///
224/// TODO: In the future we may allow metadata buffers
225#[derive(Debug)]
226pub enum PerValueDataBlock {
227    Fixed(FixedWidthDataBlock),
228    Variable(VariableWidthBlock),
229}
230
231impl PerValueDataBlock {
232    pub fn data_size(&self) -> u64 {
233        match self {
234            Self::Fixed(fixed) => fixed.data_size(),
235            Self::Variable(variable) => variable.data_size(),
236        }
237    }
238}
239
240/// Trait for compression algorithms that are suitable for use in the zipped structural encoding
241///
242/// This compression must return either a FixedWidthDataBlock or a VariableWidthBlock.  This is because
243/// we need to zip the data and those are the only two blocks we know how to zip today.
244///
245/// In addition, the compressed data must be able to be decompressed in a random-access fashion.
246/// This means that the decompression algorithm must be able to decompress any value without
247/// decompressing all values before it.
248pub trait PerValueCompressor: std::fmt::Debug + Send + Sync {
249    /// Compress the data into a single buffer
250    ///
251    /// Also returns a description of the compression that can be used to decompress when reading the data back
252    fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, pb::ArrayEncoding)>;
253}
254
255/// Trait for compression algorithms that compress an entire block of data into one opaque
256/// and self-described chunk.
257///
258/// This is the most general type of compression.  There are no constraints on the method
259/// of compression it is assumed that the entire block of data will be present at decompression.
260///
261/// This is the least appropriate strategy for random access because we must load the entire
262/// block to access any single value.  This should only be used for cases where random access is never
263/// required (e.g. when encoding metadata buffers like a dictionary or for encoding rep/def
264/// mini-block chunks)
265pub trait BlockCompressor: std::fmt::Debug + Send + Sync {
266    /// Compress the data into a single buffer
267    ///
268    /// Also returns a description of the compression that can be used to decompress
269    /// when reading the data back
270    fn compress(&self, data: DataBlock) -> Result<LanceBuffer>;
271}
272
273pub fn values_column_encoding() -> pb::ColumnEncoding {
274    pb::ColumnEncoding {
275        column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
276    }
277}
278
279pub struct EncodedColumn {
280    pub column_buffers: Vec<LanceBuffer>,
281    pub encoding: pb::ColumnEncoding,
282    pub final_pages: Vec<EncodedPage>,
283}
284
285impl Default for EncodedColumn {
286    fn default() -> Self {
287        Self {
288            column_buffers: Default::default(),
289            encoding: pb::ColumnEncoding {
290                column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
291            },
292            final_pages: Default::default(),
293        }
294    }
295}
296
297/// A tool to reserve space for buffers that are not in-line with the data
298///
299/// In most cases, buffers are stored in the page and referred to in the encoding
300/// metadata by their index in the page.  This keeps all buffers within a page together.
301/// As a result, most encoders should not need to use this structure.
302///
303/// In some cases (currently only the large binary encoding) there is a need to access
304/// buffers that are not in the page (because storing the position / offset of every page
305/// in the page metadata would be too expensive).
306///
307/// To do this you can add a buffer with `add_buffer` and then use the returned position
308/// in some way (in the large binary encoding the returned position is stored in the page
309/// data as a position / size array).
310pub struct OutOfLineBuffers {
311    position: u64,
312    buffer_alignment: u64,
313    buffers: Vec<LanceBuffer>,
314}
315
316impl OutOfLineBuffers {
317    pub fn new(base_position: u64, buffer_alignment: u64) -> Self {
318        Self {
319            position: base_position,
320            buffer_alignment,
321            buffers: Vec::new(),
322        }
323    }
324
325    pub fn add_buffer(&mut self, buffer: LanceBuffer) -> u64 {
326        let position = self.position;
327        self.position += buffer.len() as u64;
328        self.position += pad_bytes_to(buffer.len(), self.buffer_alignment as usize) as u64;
329        self.buffers.push(buffer);
330        position
331    }
332
333    pub fn take_buffers(self) -> Vec<LanceBuffer> {
334        self.buffers
335    }
336
337    pub fn reset_position(&mut self, position: u64) {
338        self.position = position;
339    }
340}
341
342/// A task to create a page of data
343pub type EncodeTask = BoxFuture<'static, Result<EncodedPage>>;
344
345/// Top level encoding trait to code any Arrow array type into one or more pages.
346///
347/// The field encoder implements buffering and encoding of a single input column
348/// but it may map to multiple output columns.  For example, a list array or struct
349/// array will be encoded into multiple columns.
350///
351/// Also, fields may be encoded at different speeds.  For example, given a struct
352/// column with three fields (a boolean field, an int32 field, and a 4096-dimension
353/// tensor field) the tensor field is likely to emit encoded pages much more frequently
354/// than the boolean field.
355pub trait FieldEncoder: Send {
356    /// Buffer the data and, if there is enough data in the buffer to form a page, return
357    /// an encoding task to encode the data.
358    ///
359    /// This may return more than one task because a single column may be mapped to multiple
360    /// output columns.  For example, if encoding a struct column with three children then
361    /// up to three tasks may be returned from each call to maybe_encode.
362    ///
363    /// It may also return multiple tasks for a single column if the input array is larger
364    /// than a single disk page.
365    ///
366    /// It could also return an empty Vec if there is not enough data yet to encode any pages.
367    ///
368    /// The `row_number` must be passed which is the top-level row number currently being encoded
369    /// This is stored in any pages produced by this call so that we can know the priority of the
370    /// page.
371    ///
372    /// The `num_rows` is the number of top level rows.  It is initially the same as `array.len()`
373    /// however it is passed seprately because array will become flattened over time (if there is
374    /// repetition) and we need to know the original number of rows for various purposes.
375    fn maybe_encode(
376        &mut self,
377        array: ArrayRef,
378        external_buffers: &mut OutOfLineBuffers,
379        repdef: RepDefBuilder,
380        row_number: u64,
381        num_rows: u64,
382    ) -> Result<Vec<EncodeTask>>;
383    /// Flush any remaining data from the buffers into encoding tasks
384    ///
385    /// Each encode task produces a single page.  The order of these pages will be maintained
386    /// in the file (we do not worry about order between columns but all pages in the same
387    /// column should maintain order)
388    ///
389    /// This may be called intermittently throughout encoding but will always be called
390    /// once at the end of encoding just before calling finish
391    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>>;
392    /// Finish encoding and return column metadata
393    ///
394    /// This is called only once, after all encode tasks have completed
395    ///
396    /// This returns a Vec because a single field may have created multiple columns
397    fn finish(
398        &mut self,
399        external_buffers: &mut OutOfLineBuffers,
400    ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>>;
401
402    /// The number of output columns this encoding will create
403    fn num_columns(&self) -> u32;
404}
405
406/// A trait to pick which encoding strategy to use for a single page
407/// of data
408///
409/// Presumably, implementations will make encoding decisions based on
410/// array statistics.
411pub trait ArrayEncodingStrategy: Send + Sync + std::fmt::Debug {
412    fn create_array_encoder(
413        &self,
414        arrays: &[ArrayRef],
415        field: &Field,
416    ) -> Result<Box<dyn ArrayEncoder>>;
417}
418
419/// A trait to pick which compression to use for given data
420///
421/// There are several different kinds of compression.
422///
423/// - Block compression is the most generic, but most difficult to use efficiently
424/// - Per-value compression results in either a fixed width data block or a variable
425///   width data block.  In other words, there is some number of bits per value.
426///   In addition, each value should be independently decompressible.
427/// - Mini-block compression results in a small block of opaque data for chunks
428///   of rows.  Each block is somewhere between 0 and 16KiB in size.  This is
429///   used for narrow data types (both fixed and variable length) where we can
430///   fit many values into an 16KiB block.
431pub trait CompressionStrategy: Send + Sync + std::fmt::Debug {
432    /// Create a block compressor for the given data
433    fn create_block_compressor(
434        &self,
435        field: &Field,
436        data: &DataBlock,
437    ) -> Result<(Box<dyn BlockCompressor>, pb::ArrayEncoding)>;
438
439    /// Create a per-value compressor for the given data
440    fn create_per_value(
441        &self,
442        field: &Field,
443        data: &DataBlock,
444    ) -> Result<Box<dyn PerValueCompressor>>;
445
446    /// Create a mini-block compressor for the given data
447    fn create_miniblock_compressor(
448        &self,
449        field: &Field,
450        data: &DataBlock,
451    ) -> Result<Box<dyn MiniBlockCompressor>>;
452}
453
454/// The core array encoding strategy is a set of basic encodings that
455/// are generally applicable in most scenarios.
456#[derive(Debug, Default)]
457pub struct CoreArrayEncodingStrategy {
458    pub version: LanceFileVersion,
459}
460
461const BINARY_DATATYPES: [DataType; 4] = [
462    DataType::Binary,
463    DataType::LargeBinary,
464    DataType::Utf8,
465    DataType::LargeUtf8,
466];
467
468impl CoreArrayEncodingStrategy {
469    fn can_use_fsst(data_type: &DataType, data_size: u64, version: LanceFileVersion) -> bool {
470        version >= LanceFileVersion::V2_1
471            && matches!(data_type, DataType::Utf8 | DataType::Binary)
472            && data_size > 4 * 1024 * 1024
473    }
474
475    fn get_field_compression(field_meta: &HashMap<String, String>) -> Option<CompressionConfig> {
476        let compression = field_meta.get(COMPRESSION_META_KEY)?;
477        let compression_scheme = compression.parse::<CompressionScheme>();
478        match compression_scheme {
479            Ok(compression_scheme) => Some(CompressionConfig::new(
480                compression_scheme,
481                field_meta
482                    .get(COMPRESSION_LEVEL_META_KEY)
483                    .and_then(|level| level.parse().ok()),
484            )),
485            Err(_) => None,
486        }
487    }
488
489    fn default_binary_encoder(
490        arrays: &[ArrayRef],
491        data_type: &DataType,
492        field_meta: Option<&HashMap<String, String>>,
493        data_size: u64,
494        version: LanceFileVersion,
495    ) -> Result<Box<dyn ArrayEncoder>> {
496        let bin_indices_encoder =
497            Self::choose_array_encoder(arrays, &DataType::UInt64, data_size, false, version, None)?;
498
499        if let Some(compression) = field_meta.and_then(Self::get_field_compression) {
500            if compression.scheme == CompressionScheme::Fsst {
501                // User requested FSST
502                let raw_encoder = Box::new(BinaryEncoder::new(bin_indices_encoder, None));
503                Ok(Box::new(FsstArrayEncoder::new(raw_encoder)))
504            } else {
505                // Generic compression
506                Ok(Box::new(BinaryEncoder::new(
507                    bin_indices_encoder,
508                    Some(compression),
509                )))
510            }
511        } else {
512            // No user-specified compression, use FSST if we can
513            let bin_encoder = Box::new(BinaryEncoder::new(bin_indices_encoder, None));
514            if Self::can_use_fsst(data_type, data_size, version) {
515                Ok(Box::new(FsstArrayEncoder::new(bin_encoder)))
516            } else {
517                Ok(bin_encoder)
518            }
519        }
520    }
521
522    fn choose_array_encoder(
523        arrays: &[ArrayRef],
524        data_type: &DataType,
525        data_size: u64,
526        use_dict_encoding: bool,
527        version: LanceFileVersion,
528        field_meta: Option<&HashMap<String, String>>,
529    ) -> Result<Box<dyn ArrayEncoder>> {
530        match data_type {
531            DataType::FixedSizeList(inner, dimension) => {
532                Ok(Box::new(BasicEncoder::new(Box::new(FslEncoder::new(
533                    Self::choose_array_encoder(
534                        arrays,
535                        inner.data_type(),
536                        data_size,
537                        use_dict_encoding,
538                        version,
539                        None,
540                    )?,
541                    *dimension as u32,
542                )))))
543            }
544            DataType::Dictionary(key_type, value_type) => {
545                let key_encoder =
546                    Self::choose_array_encoder(arrays, key_type, data_size, false, version, None)?;
547                let value_encoder = Self::choose_array_encoder(
548                    arrays, value_type, data_size, false, version, None,
549                )?;
550
551                Ok(Box::new(AlreadyDictionaryEncoder::new(
552                    key_encoder,
553                    value_encoder,
554                )))
555            }
556            DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
557                if use_dict_encoding {
558                    let dict_indices_encoder = Self::choose_array_encoder(
559                        // We need to pass arrays to this method to figure out what kind of compression to
560                        // use but we haven't actually calculated the indices yet.  For now, we just assume
561                        // worst case and use the full range.  In the future maybe we can pass in statistics
562                        // instead of the actual data
563                        &[Arc::new(UInt8Array::from_iter_values(0_u8..255_u8))],
564                        &DataType::UInt8,
565                        data_size,
566                        false,
567                        version,
568                        None,
569                    )?;
570                    let dict_items_encoder = Self::choose_array_encoder(
571                        arrays,
572                        &DataType::Utf8,
573                        data_size,
574                        false,
575                        version,
576                        None,
577                    )?;
578
579                    Ok(Box::new(DictionaryEncoder::new(
580                        dict_indices_encoder,
581                        dict_items_encoder,
582                    )))
583                }
584                // The parent datatype should be binary or utf8 to use the fixed size encoding
585                // The variable 'data_type' is passed through recursion so comparing with it would be incorrect
586                else if BINARY_DATATYPES.contains(arrays[0].data_type()) {
587                    if let Some(byte_width) = check_fixed_size_encoding(arrays, version) {
588                        // use FixedSizeBinaryEncoder
589                        let bytes_encoder = Self::choose_array_encoder(
590                            arrays,
591                            &DataType::UInt8,
592                            data_size,
593                            false,
594                            version,
595                            None,
596                        )?;
597
598                        Ok(Box::new(BasicEncoder::new(Box::new(
599                            FixedSizeBinaryEncoder::new(bytes_encoder, byte_width as usize),
600                        ))))
601                    } else {
602                        Self::default_binary_encoder(
603                            arrays, data_type, field_meta, data_size, version,
604                        )
605                    }
606                } else {
607                    Self::default_binary_encoder(arrays, data_type, field_meta, data_size, version)
608                }
609            }
610            DataType::Struct(fields) => {
611                let num_fields = fields.len();
612                let mut inner_encoders = Vec::new();
613
614                for i in 0..num_fields {
615                    let inner_datatype = fields[i].data_type();
616                    let inner_encoder = Self::choose_array_encoder(
617                        arrays,
618                        inner_datatype,
619                        data_size,
620                        use_dict_encoding,
621                        version,
622                        None,
623                    )?;
624                    inner_encoders.push(inner_encoder);
625                }
626
627                Ok(Box::new(PackedStructEncoder::new(inner_encoders)))
628            }
629            DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => {
630                if version >= LanceFileVersion::V2_1 && arrays[0].data_type() == data_type {
631                    let compressed_bit_width = compute_compressed_bit_width_for_non_neg(arrays);
632                    Ok(Box::new(BitpackedForNonNegArrayEncoder::new(
633                        compressed_bit_width as usize,
634                        data_type.clone(),
635                    )))
636                } else {
637                    Ok(Box::new(BasicEncoder::new(Box::new(
638                        ValueEncoder::default(),
639                    ))))
640                }
641            }
642
643            // TODO: for signed integers, I intend to make it a cascaded encoding, a sparse array for the negative values and very wide(bit-width) values,
644            // then a bitpacked array for the narrow(bit-width) values, I need `BitpackedForNeg` to be merged first, I am
645            // thinking about putting this sparse array in the metadata so bitpacking remain using one page buffer only.
646            DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => {
647                if version >= LanceFileVersion::V2_1 && arrays[0].data_type() == data_type {
648                    let compressed_bit_width = compute_compressed_bit_width_for_non_neg(arrays);
649                    Ok(Box::new(BitpackedForNonNegArrayEncoder::new(
650                        compressed_bit_width as usize,
651                        data_type.clone(),
652                    )))
653                } else {
654                    Ok(Box::new(BasicEncoder::new(Box::new(
655                        ValueEncoder::default(),
656                    ))))
657                }
658            }
659            _ => Ok(Box::new(BasicEncoder::new(Box::new(
660                ValueEncoder::default(),
661            )))),
662        }
663    }
664}
665
666fn get_dict_encoding_threshold() -> u64 {
667    env::var("LANCE_DICT_ENCODING_THRESHOLD")
668        .ok()
669        .and_then(|val| val.parse().ok())
670        .unwrap_or(100)
671}
672
673// check whether we want to use dictionary encoding or not
674// by applying a threshold on cardinality
675// returns true if cardinality < threshold but false if the total number of rows is less than the threshold
676// The choice to use 100 is just a heuristic for now
677// hyperloglog is used for cardinality estimation
678// error rate = 1.04 / sqrt(2^p), where p is the precision
679// and error rate is 1.04 / sqrt(2^12) = 1.56%
680fn check_dict_encoding(arrays: &[ArrayRef], threshold: u64) -> bool {
681    let num_total_rows = arrays.iter().map(|arr| arr.len()).sum::<usize>();
682    if num_total_rows < threshold as usize {
683        return false;
684    }
685    const PRECISION: u8 = 12;
686
687    let mut hll: HyperLogLogPlus<String, RandomState> =
688        HyperLogLogPlus::new(PRECISION, RandomState::new()).unwrap();
689
690    for arr in arrays {
691        let string_array = arrow_array::cast::as_string_array(arr);
692        for value in string_array.iter().flatten() {
693            hll.insert(value);
694            let estimated_cardinality = hll.count() as u64;
695            if estimated_cardinality >= threshold {
696                return false;
697            }
698        }
699    }
700
701    true
702}
703
704fn check_fixed_size_encoding(arrays: &[ArrayRef], version: LanceFileVersion) -> Option<u64> {
705    if version < LanceFileVersion::V2_1 || arrays.is_empty() {
706        return None;
707    }
708
709    // make sure no array has an empty string
710    if !arrays.iter().all(|arr| {
711        if let Some(arr) = arr.as_string_opt::<i32>() {
712            arr.iter().flatten().all(|s| !s.is_empty())
713        } else if let Some(arr) = arr.as_binary_opt::<i32>() {
714            arr.iter().flatten().all(|s| !s.is_empty())
715        } else if let Some(arr) = arr.as_string_opt::<i64>() {
716            arr.iter().flatten().all(|s| !s.is_empty())
717        } else if let Some(arr) = arr.as_binary_opt::<i64>() {
718            arr.iter().flatten().all(|s| !s.is_empty())
719        } else {
720            panic!("wrong dtype");
721        }
722    }) {
723        return None;
724    }
725
726    let lengths = arrays
727        .iter()
728        .flat_map(|arr| {
729            if let Some(arr) = arr.as_string_opt::<i32>() {
730                let offsets = arr.offsets().inner();
731                offsets
732                    .windows(2)
733                    .map(|w| (w[1] - w[0]) as u64)
734                    .collect::<Vec<_>>()
735            } else if let Some(arr) = arr.as_binary_opt::<i32>() {
736                let offsets = arr.offsets().inner();
737                offsets
738                    .windows(2)
739                    .map(|w| (w[1] - w[0]) as u64)
740                    .collect::<Vec<_>>()
741            } else if let Some(arr) = arr.as_string_opt::<i64>() {
742                let offsets = arr.offsets().inner();
743                offsets
744                    .windows(2)
745                    .map(|w| (w[1] - w[0]) as u64)
746                    .collect::<Vec<_>>()
747            } else if let Some(arr) = arr.as_binary_opt::<i64>() {
748                let offsets = arr.offsets().inner();
749                offsets
750                    .windows(2)
751                    .map(|w| (w[1] - w[0]) as u64)
752                    .collect::<Vec<_>>()
753            } else {
754                panic!("wrong dtype");
755            }
756        })
757        .collect::<Vec<_>>();
758
759    // find first non-zero value in lengths
760    let first_non_zero = lengths.iter().position(|&x| x != 0);
761    if let Some(first_non_zero) = first_non_zero {
762        // make sure all lengths are equal to first_non_zero length or zero
763        if !lengths
764            .iter()
765            .all(|&x| x == 0 || x == lengths[first_non_zero])
766        {
767            return None;
768        }
769
770        // set the byte width
771        Some(lengths[first_non_zero])
772    } else {
773        None
774    }
775}
776
777impl ArrayEncodingStrategy for CoreArrayEncodingStrategy {
778    fn create_array_encoder(
779        &self,
780        arrays: &[ArrayRef],
781        field: &Field,
782    ) -> Result<Box<dyn ArrayEncoder>> {
783        let data_size = arrays
784            .iter()
785            .map(|arr| arr.get_buffer_memory_size() as u64)
786            .sum::<u64>();
787        let data_type = arrays[0].data_type();
788
789        let use_dict_encoding = data_type == &DataType::Utf8
790            && check_dict_encoding(arrays, get_dict_encoding_threshold());
791
792        Self::choose_array_encoder(
793            arrays,
794            data_type,
795            data_size,
796            use_dict_encoding,
797            self.version,
798            Some(&field.metadata),
799        )
800    }
801}
802
803impl CompressionStrategy for CoreArrayEncodingStrategy {
804    fn create_miniblock_compressor(
805        &self,
806        field: &Field,
807        data: &DataBlock,
808    ) -> Result<Box<dyn MiniBlockCompressor>> {
809        match data {
810            DataBlock::FixedWidth(fixed_width_data) => {
811                if let Some(compression) = field.metadata.get(COMPRESSION_META_KEY) {
812                    if compression == "none" {
813                        return Ok(Box::new(ValueEncoder::default()));
814                    }
815                }
816
817                let bit_widths = data.expect_stat(Stat::BitWidth);
818                let bit_widths = bit_widths.as_primitive::<UInt64Type>();
819                // Temporary hack to work around https://github.com/lancedb/lance/issues/3102
820                // Ideally we should still be able to bit-pack here (either to 0 or 1 bit per value)
821                let has_all_zeros = bit_widths.values().iter().any(|v| *v == 0);
822                // The minimum bit packing size is a block of 1024 values.  For very small pages the uncompressed
823                // size might be smaller than the compressed size.
824                let too_small = bit_widths.len() == 1
825                    && InlineBitpacking::min_size_bytes(bit_widths.value(0)) >= data.data_size();
826                if !has_all_zeros
827                    && !too_small
828                    && (fixed_width_data.bits_per_value == 8
829                        || fixed_width_data.bits_per_value == 16
830                        || fixed_width_data.bits_per_value == 32
831                        || fixed_width_data.bits_per_value == 64)
832                {
833                    Ok(Box::new(InlineBitpacking::new(
834                        fixed_width_data.bits_per_value,
835                    )))
836                } else {
837                    Ok(Box::new(ValueEncoder::default()))
838                }
839            }
840            DataBlock::VariableWidth(variable_width_data) => {
841                if variable_width_data.bits_per_offset == 32 {
842                    let data_size =
843                        variable_width_data.expect_single_stat::<UInt64Type>(Stat::DataSize);
844                    let max_len =
845                        variable_width_data.expect_single_stat::<UInt64Type>(Stat::MaxLength);
846
847                    if max_len >= FSST_LEAST_INPUT_MAX_LENGTH
848                        && data_size >= FSST_LEAST_INPUT_SIZE as u64
849                    {
850                        Ok(Box::new(FsstMiniBlockEncoder::default()))
851                    } else {
852                        Ok(Box::new(BinaryMiniBlockEncoder::default()))
853                    }
854                } else {
855                    todo!("Implement MiniBlockCompression for VariableWidth DataBlock with 64 bits offsets.")
856                }
857            }
858            DataBlock::Struct(struct_data_block) => {
859                // this condition is actually checked at `PrimitiveStructuralEncoder::do_flush`,
860                // just being cautious here.
861                if struct_data_block
862                    .children
863                    .iter()
864                    .any(|child| !matches!(child, DataBlock::FixedWidth(_)))
865                {
866                    panic!("packed struct encoding currently only supports fixed-width fields.")
867                }
868                Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default()))
869            }
870            DataBlock::FixedSizeList(_) => {
871                // Ideally we would compress the list items but this creates something of a challenge.
872                // We don't want to break lists across chunks and we need to worry about inner validity
873                // layers.  If we try and use a compression scheme then it is unlikely to respect these
874                // constraints.
875                //
876                // For now, we just don't compress.  In the future, we might want to consider a more
877                // sophisticated approach.
878                Ok(Box::new(ValueEncoder::default()))
879            }
880            _ => Err(Error::NotSupported {
881                source: format!(
882                    "Mini-block compression not yet supported for block type {}",
883                    data.name()
884                )
885                .into(),
886                location: location!(),
887            }),
888        }
889    }
890
891    fn create_per_value(
892        &self,
893        _field: &Field,
894        data: &DataBlock,
895    ) -> Result<Box<dyn PerValueCompressor>> {
896        match data {
897            DataBlock::FixedWidth(_) => Ok(Box::new(ValueEncoder::default())),
898            DataBlock::FixedSizeList(_) => Ok(Box::new(ValueEncoder::default())),
899            DataBlock::VariableWidth(variable_width) => {
900                let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
901                let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
902
903                // If values are very large then use zstd-per-value
904                //
905                // TODO: Could maybe use median here
906                if max_len > 32 * 1024 && data_size >= FSST_LEAST_INPUT_SIZE as u64 {
907                    return Ok(Box::new(CompressedBufferEncoder::default()));
908                }
909
910                if variable_width.bits_per_offset == 32 {
911                    let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
912                    let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
913
914                    let variable_compression = Box::new(VariableEncoder::default());
915
916                    if max_len >= FSST_LEAST_INPUT_MAX_LENGTH
917                        && data_size >= FSST_LEAST_INPUT_SIZE as u64
918                    {
919                        Ok(Box::new(FsstPerValueEncoder::new(variable_compression)))
920                    } else {
921                        Ok(variable_compression)
922                    }
923                } else {
924                    todo!("Implement MiniBlockCompression for VariableWidth DataBlock with 64 bits offsets.")
925                }
926            }
927            _ => unreachable!(
928                "Per-value compression not yet supported for block type: {}",
929                data.name()
930            ),
931        }
932    }
933
934    fn create_block_compressor(
935        &self,
936        _field: &Field,
937        data: &DataBlock,
938    ) -> Result<(Box<dyn BlockCompressor>, pb::ArrayEncoding)> {
939        match data {
940            // Right now we only need block compressors for rep/def which is u16.  Will need to expand
941            // this if we need block compression of other types.
942            DataBlock::FixedWidth(fixed_width) => {
943                let encoder = Box::new(ValueEncoder::default());
944                let encoding = ProtobufUtils::flat_encoding(fixed_width.bits_per_value, 0, None);
945                Ok((encoder, encoding))
946            }
947            DataBlock::VariableWidth(variable_width) => {
948                let encoder = Box::new(VariableEncoder::default());
949                let encoding = ProtobufUtils::variable(variable_width.bits_per_offset);
950                Ok((encoder, encoding))
951            }
952            _ => unreachable!(),
953        }
954    }
955}
956/// Keeps track of the current column index and makes a mapping
957/// from field id to column index
958#[derive(Debug, Default)]
959pub struct ColumnIndexSequence {
960    current_index: u32,
961    mapping: Vec<(u32, u32)>,
962}
963
964impl ColumnIndexSequence {
965    pub fn next_column_index(&mut self, field_id: u32) -> u32 {
966        let idx = self.current_index;
967        self.current_index += 1;
968        self.mapping.push((field_id, idx));
969        idx
970    }
971
972    pub fn skip(&mut self) {
973        self.current_index += 1;
974    }
975}
976
977/// Options that control the encoding process
978pub struct EncodingOptions {
979    /// How much data (in bytes) to cache in-memory before writing a page
980    ///
981    /// This cache is applied on a per-column basis
982    pub cache_bytes_per_column: u64,
983    /// The maximum size of a page in bytes, if a single array would create
984    /// a page larger than this then it will be split into multiple pages
985    pub max_page_bytes: u64,
986    /// If false (the default) then arrays will be copied (deeply) before
987    /// being cached.  This ensures any data kept alive by the array can
988    /// be discarded safely and helps avoid writer accumulation.  However,
989    /// there is an associated cost.
990    pub keep_original_array: bool,
991    /// The alignment that the writer is applying to buffers
992    ///
993    /// The encoder needs to know this so it figures the position of out-of-line
994    /// buffers correctly
995    pub buffer_alignment: u64,
996}
997
998impl Default for EncodingOptions {
999    fn default() -> Self {
1000        Self {
1001            cache_bytes_per_column: 8 * 1024 * 1024,
1002            max_page_bytes: 32 * 1024 * 1024,
1003            keep_original_array: true,
1004            buffer_alignment: 64,
1005        }
1006    }
1007}
1008
1009/// A trait to pick which kind of field encoding to use for a field
1010///
1011/// Unlike the ArrayEncodingStrategy, the field encoding strategy is
1012/// chosen before any data is generated and the same field encoder is
1013/// used for all data in the field.
1014pub trait FieldEncodingStrategy: Send + Sync + std::fmt::Debug {
1015    /// Choose and create an appropriate field encoder for the given
1016    /// field.
1017    ///
1018    /// The field encoder can be chosen on the data type as well as
1019    /// any metadata that is attached to the field.
1020    ///
1021    /// The `encoding_strategy_root` is the encoder that should be
1022    /// used to encode any inner data in struct / list / etc. fields.
1023    ///
1024    /// Initially it is the same as `self` and generally should be
1025    /// forwarded to any inner encoding strategy.
1026    fn create_field_encoder(
1027        &self,
1028        encoding_strategy_root: &dyn FieldEncodingStrategy,
1029        field: &Field,
1030        column_index: &mut ColumnIndexSequence,
1031        options: &EncodingOptions,
1032    ) -> Result<Box<dyn FieldEncoder>>;
1033}
1034
1035pub fn default_encoding_strategy(version: LanceFileVersion) -> Box<dyn FieldEncodingStrategy> {
1036    match version.resolve() {
1037        LanceFileVersion::Legacy => panic!(),
1038        LanceFileVersion::V2_0 => Box::new(CoreFieldEncodingStrategy::default()),
1039        _ => Box::new(StructuralEncodingStrategy::default()),
1040    }
1041}
1042
1043/// The core field encoding strategy is a set of basic encodings that
1044/// are generally applicable in most scenarios.
1045#[derive(Debug)]
1046pub struct CoreFieldEncodingStrategy {
1047    pub array_encoding_strategy: Arc<dyn ArrayEncodingStrategy>,
1048    pub version: LanceFileVersion,
1049}
1050
1051// For some reason clippy has a false negative and thinks this can be derived but
1052// it can't because ArrayEncodingStrategy has no default implementation
1053#[allow(clippy::derivable_impls)]
1054impl Default for CoreFieldEncodingStrategy {
1055    fn default() -> Self {
1056        Self {
1057            array_encoding_strategy: Arc::<CoreArrayEncodingStrategy>::default(),
1058            version: LanceFileVersion::default(),
1059        }
1060    }
1061}
1062
1063impl CoreFieldEncodingStrategy {
1064    fn is_primitive_type(data_type: &DataType) -> bool {
1065        matches!(
1066            data_type,
1067            DataType::Boolean
1068                | DataType::Date32
1069                | DataType::Date64
1070                | DataType::Decimal128(_, _)
1071                | DataType::Decimal256(_, _)
1072                | DataType::Duration(_)
1073                | DataType::Float16
1074                | DataType::Float32
1075                | DataType::Float64
1076                | DataType::Int16
1077                | DataType::Int32
1078                | DataType::Int64
1079                | DataType::Int8
1080                | DataType::Interval(_)
1081                | DataType::Null
1082                | DataType::Time32(_)
1083                | DataType::Time64(_)
1084                | DataType::Timestamp(_, _)
1085                | DataType::UInt16
1086                | DataType::UInt32
1087                | DataType::UInt64
1088                | DataType::UInt8
1089                | DataType::FixedSizeBinary(_)
1090                | DataType::FixedSizeList(_, _)
1091                | DataType::Binary
1092                | DataType::LargeBinary
1093                | DataType::Utf8
1094                | DataType::LargeUtf8,
1095        )
1096    }
1097}
1098
1099impl FieldEncodingStrategy for CoreFieldEncodingStrategy {
1100    fn create_field_encoder(
1101        &self,
1102        encoding_strategy_root: &dyn FieldEncodingStrategy,
1103        field: &Field,
1104        column_index: &mut ColumnIndexSequence,
1105        options: &EncodingOptions,
1106    ) -> Result<Box<dyn FieldEncoder>> {
1107        let data_type = field.data_type();
1108        if Self::is_primitive_type(&data_type) {
1109            let column_index = column_index.next_column_index(field.id as u32);
1110            if field.metadata.contains_key(BLOB_META_KEY) {
1111                let mut packed_meta = HashMap::new();
1112                packed_meta.insert(PACKED_STRUCT_META_KEY.to_string(), "true".to_string());
1113                let desc_field =
1114                    Field::try_from(BLOB_DESC_FIELD.clone().with_metadata(packed_meta)).unwrap();
1115                let desc_encoder = Box::new(PrimitiveFieldEncoder::try_new(
1116                    options,
1117                    self.array_encoding_strategy.clone(),
1118                    column_index,
1119                    desc_field,
1120                )?);
1121                Ok(Box::new(BlobFieldEncoder::new(desc_encoder)))
1122            } else {
1123                Ok(Box::new(PrimitiveFieldEncoder::try_new(
1124                    options,
1125                    self.array_encoding_strategy.clone(),
1126                    column_index,
1127                    field.clone(),
1128                )?))
1129            }
1130        } else {
1131            match data_type {
1132                DataType::List(_child) | DataType::LargeList(_child) => {
1133                    let list_idx = column_index.next_column_index(field.id as u32);
1134                    let inner_encoding = encoding_strategy_root.create_field_encoder(
1135                        encoding_strategy_root,
1136                        &field.children[0],
1137                        column_index,
1138                        options,
1139                    )?;
1140                    let offsets_encoder =
1141                        Arc::new(BasicEncoder::new(Box::new(ValueEncoder::default())));
1142                    Ok(Box::new(ListFieldEncoder::new(
1143                        inner_encoding,
1144                        offsets_encoder,
1145                        options.cache_bytes_per_column,
1146                        options.keep_original_array,
1147                        list_idx,
1148                    )))
1149                }
1150                DataType::Struct(_) => {
1151                    let field_metadata = &field.metadata;
1152                    if field_metadata
1153                        .get(PACKED_STRUCT_LEGACY_META_KEY)
1154                        .map(|v| v == "true")
1155                        .unwrap_or(field_metadata.contains_key(PACKED_STRUCT_META_KEY))
1156                    {
1157                        Ok(Box::new(PrimitiveFieldEncoder::try_new(
1158                            options,
1159                            self.array_encoding_strategy.clone(),
1160                            column_index.next_column_index(field.id as u32),
1161                            field.clone(),
1162                        )?))
1163                    } else {
1164                        let header_idx = column_index.next_column_index(field.id as u32);
1165                        let children_encoders = field
1166                            .children
1167                            .iter()
1168                            .map(|field| {
1169                                self.create_field_encoder(
1170                                    encoding_strategy_root,
1171                                    field,
1172                                    column_index,
1173                                    options,
1174                                )
1175                            })
1176                            .collect::<Result<Vec<_>>>()?;
1177                        Ok(Box::new(StructFieldEncoder::new(
1178                            children_encoders,
1179                            header_idx,
1180                        )))
1181                    }
1182                }
1183                DataType::Dictionary(_, value_type) => {
1184                    // A dictionary of primitive is, itself, primitive
1185                    if Self::is_primitive_type(&value_type) {
1186                        Ok(Box::new(PrimitiveFieldEncoder::try_new(
1187                            options,
1188                            self.array_encoding_strategy.clone(),
1189                            column_index.next_column_index(field.id as u32),
1190                            field.clone(),
1191                        )?))
1192                    } else {
1193                        // A dictionary of logical is, itself, logical and we don't support that today
1194                        // It could be possible (e.g. store indices in one column and values in remaining columns)
1195                        // but would be a significant amount of work
1196                        //
1197                        // An easier fallback implementation would be to decode-on-write and encode-on-read
1198                        Err(Error::NotSupported { source: format!("cannot encode a dictionary column whose value type is a logical type ({})", value_type).into(), location: location!() })
1199                    }
1200                }
1201                _ => todo!("Implement encoding for field {}", field),
1202            }
1203        }
1204    }
1205}
1206
1207/// An encoding strategy used for 2.1+ files
1208#[derive(Debug)]
1209pub struct StructuralEncodingStrategy {
1210    pub compression_strategy: Arc<dyn CompressionStrategy>,
1211    pub version: LanceFileVersion,
1212}
1213
1214// For some reason, clippy thinks we can add Default to the above derive but
1215// rustc doesn't agree (no default for Arc<dyn Trait>)
1216#[allow(clippy::derivable_impls)]
1217impl Default for StructuralEncodingStrategy {
1218    fn default() -> Self {
1219        Self {
1220            compression_strategy: Arc::<CoreArrayEncodingStrategy>::default(),
1221            version: LanceFileVersion::default(),
1222        }
1223    }
1224}
1225
1226impl StructuralEncodingStrategy {
1227    fn is_primitive_type(data_type: &DataType) -> bool {
1228        matches!(
1229            data_type,
1230            DataType::Boolean
1231                | DataType::Date32
1232                | DataType::Date64
1233                | DataType::Decimal128(_, _)
1234                | DataType::Decimal256(_, _)
1235                | DataType::Duration(_)
1236                | DataType::Float16
1237                | DataType::Float32
1238                | DataType::Float64
1239                | DataType::Int16
1240                | DataType::Int32
1241                | DataType::Int64
1242                | DataType::Int8
1243                | DataType::Interval(_)
1244                | DataType::Null
1245                | DataType::Time32(_)
1246                | DataType::Time64(_)
1247                | DataType::Timestamp(_, _)
1248                | DataType::UInt16
1249                | DataType::UInt32
1250                | DataType::UInt64
1251                | DataType::UInt8
1252                | DataType::FixedSizeBinary(_)
1253                | DataType::FixedSizeList(_, _)
1254                | DataType::Binary
1255                | DataType::LargeBinary
1256                | DataType::Utf8
1257                | DataType::LargeUtf8,
1258        )
1259    }
1260
1261    fn do_create_field_encoder(
1262        &self,
1263        _encoding_strategy_root: &dyn FieldEncodingStrategy,
1264        field: &Field,
1265        column_index: &mut ColumnIndexSequence,
1266        options: &EncodingOptions,
1267        root_field_metadata: &HashMap<String, String>,
1268    ) -> Result<Box<dyn FieldEncoder>> {
1269        let data_type = field.data_type();
1270        if Self::is_primitive_type(&data_type) {
1271            Ok(Box::new(PrimitiveStructuralEncoder::try_new(
1272                options,
1273                self.compression_strategy.clone(),
1274                column_index.next_column_index(field.id as u32),
1275                field.clone(),
1276                Arc::new(root_field_metadata.clone()),
1277            )?))
1278        } else {
1279            match data_type {
1280                DataType::List(_) | DataType::LargeList(_) => {
1281                    let child = field.children.first().expect("List should have a child");
1282                    let child_encoder = self.do_create_field_encoder(
1283                        _encoding_strategy_root,
1284                        child,
1285                        column_index,
1286                        options,
1287                        root_field_metadata,
1288                    )?;
1289                    Ok(Box::new(ListStructuralEncoder::new(child_encoder)))
1290                }
1291                DataType::Struct(_) => {
1292                    if field.is_packed_struct() {
1293                        Ok(Box::new(PrimitiveStructuralEncoder::try_new(
1294                            options,
1295                            self.compression_strategy.clone(),
1296                            column_index.next_column_index(field.id as u32),
1297                            field.clone(),
1298                            Arc::new(root_field_metadata.clone()),
1299                        )?))
1300                    } else {
1301                        let children_encoders = field
1302                            .children
1303                            .iter()
1304                            .map(|field| {
1305                                self.do_create_field_encoder(
1306                                    _encoding_strategy_root,
1307                                    field,
1308                                    column_index,
1309                                    options,
1310                                    root_field_metadata,
1311                                )
1312                            })
1313                            .collect::<Result<Vec<_>>>()?;
1314                        Ok(Box::new(StructStructuralEncoder::new(children_encoders)))
1315                    }
1316                }
1317                DataType::Dictionary(_, value_type) => {
1318                    // A dictionary of primitive is, itself, primitive
1319                    if Self::is_primitive_type(&value_type) {
1320                        Ok(Box::new(PrimitiveStructuralEncoder::try_new(
1321                            options,
1322                            self.compression_strategy.clone(),
1323                            column_index.next_column_index(field.id as u32),
1324                            field.clone(),
1325                            Arc::new(root_field_metadata.clone()),
1326                        )?))
1327                    } else {
1328                        // A dictionary of logical is, itself, logical and we don't support that today
1329                        // It could be possible (e.g. store indices in one column and values in remaining columns)
1330                        // but would be a significant amount of work
1331                        //
1332                        // An easier fallback implementation would be to decode-on-write and encode-on-read
1333                        Err(Error::NotSupported { source: format!("cannot encode a dictionary column whose value type is a logical type ({})", value_type).into(), location: location!() })
1334                    }
1335                }
1336                _ => todo!("Implement encoding for field {}", field),
1337            }
1338        }
1339    }
1340}
1341
1342impl FieldEncodingStrategy for StructuralEncodingStrategy {
1343    fn create_field_encoder(
1344        &self,
1345        encoding_strategy_root: &dyn FieldEncodingStrategy,
1346        field: &Field,
1347        column_index: &mut ColumnIndexSequence,
1348        options: &EncodingOptions,
1349    ) -> Result<Box<dyn FieldEncoder>> {
1350        self.do_create_field_encoder(
1351            encoding_strategy_root,
1352            field,
1353            column_index,
1354            options,
1355            &field.metadata,
1356        )
1357    }
1358}
1359
1360/// A batch encoder that encodes RecordBatch objects by delegating
1361/// to field encoders for each top-level field in the batch.
1362pub struct BatchEncoder {
1363    pub field_encoders: Vec<Box<dyn FieldEncoder>>,
1364    pub field_id_to_column_index: Vec<(u32, u32)>,
1365}
1366
1367impl BatchEncoder {
1368    pub fn try_new(
1369        schema: &Schema,
1370        strategy: &dyn FieldEncodingStrategy,
1371        options: &EncodingOptions,
1372    ) -> Result<Self> {
1373        let mut col_idx = 0;
1374        let mut col_idx_sequence = ColumnIndexSequence::default();
1375        let field_encoders = schema
1376            .fields
1377            .iter()
1378            .map(|field| {
1379                let encoder = strategy.create_field_encoder(
1380                    strategy,
1381                    field,
1382                    &mut col_idx_sequence,
1383                    options,
1384                )?;
1385                col_idx += encoder.as_ref().num_columns();
1386                Ok(encoder)
1387            })
1388            .collect::<Result<Vec<_>>>()?;
1389        Ok(Self {
1390            field_encoders,
1391            field_id_to_column_index: col_idx_sequence.mapping,
1392        })
1393    }
1394
1395    pub fn num_columns(&self) -> u32 {
1396        self.field_encoders
1397            .iter()
1398            .map(|field_encoder| field_encoder.num_columns())
1399            .sum::<u32>()
1400    }
1401}
1402
1403/// An encoded batch of data and a page table describing it
1404///
1405/// This is returned by [`crate::encoder::encode_batch`]
1406#[derive(Debug)]
1407pub struct EncodedBatch {
1408    pub data: Bytes,
1409    pub page_table: Vec<Arc<ColumnInfo>>,
1410    pub schema: Arc<Schema>,
1411    pub top_level_columns: Vec<u32>,
1412    pub num_rows: u64,
1413}
1414
1415fn write_page_to_data_buffer(page: EncodedPage, data_buffer: &mut BytesMut) -> PageInfo {
1416    let buffers = page.data;
1417    let mut buffer_offsets_and_sizes = Vec::with_capacity(buffers.len());
1418    for buffer in buffers {
1419        let buffer_offset = data_buffer.len() as u64;
1420        data_buffer.extend_from_slice(&buffer);
1421        let size = data_buffer.len() as u64 - buffer_offset;
1422        buffer_offsets_and_sizes.push((buffer_offset, size));
1423    }
1424
1425    PageInfo {
1426        buffer_offsets_and_sizes: Arc::from(buffer_offsets_and_sizes),
1427        encoding: page.description,
1428        num_rows: page.num_rows,
1429        priority: page.row_number,
1430    }
1431}
1432
1433/// Helper method to encode a batch of data into memory
1434///
1435/// This is primarily for testing and benchmarking but could be useful in other
1436/// niche situations like IPC.
1437pub async fn encode_batch(
1438    batch: &RecordBatch,
1439    schema: Arc<Schema>,
1440    encoding_strategy: &dyn FieldEncodingStrategy,
1441    options: &EncodingOptions,
1442) -> Result<EncodedBatch> {
1443    if !is_pwr_two(options.buffer_alignment) || options.buffer_alignment < MIN_PAGE_BUFFER_ALIGNMENT
1444    {
1445        return Err(Error::InvalidInput {
1446            source: format!(
1447                "buffer_alignment must be a power of two and at least {}",
1448                MIN_PAGE_BUFFER_ALIGNMENT
1449            )
1450            .into(),
1451            location: location!(),
1452        });
1453    }
1454
1455    let mut data_buffer = BytesMut::new();
1456    let lance_schema = Schema::try_from(batch.schema().as_ref())?;
1457    let options = EncodingOptions {
1458        keep_original_array: true,
1459        ..*options
1460    };
1461    let batch_encoder = BatchEncoder::try_new(&lance_schema, encoding_strategy, &options)?;
1462    let mut page_table = Vec::new();
1463    let mut col_idx_offset = 0;
1464    for (arr, mut encoder) in batch.columns().iter().zip(batch_encoder.field_encoders) {
1465        let mut external_buffers =
1466            OutOfLineBuffers::new(data_buffer.len() as u64, options.buffer_alignment);
1467        let repdef = RepDefBuilder::default();
1468        let encoder = encoder.as_mut();
1469        let num_rows = arr.len() as u64;
1470        let mut tasks =
1471            encoder.maybe_encode(arr.clone(), &mut external_buffers, repdef, 0, num_rows)?;
1472        tasks.extend(encoder.flush(&mut external_buffers)?);
1473        for buffer in external_buffers.take_buffers() {
1474            data_buffer.extend_from_slice(&buffer);
1475        }
1476        let mut pages = HashMap::<u32, Vec<PageInfo>>::new();
1477        for task in tasks {
1478            let encoded_page = task.await?;
1479            // Write external buffers first
1480            pages
1481                .entry(encoded_page.column_idx)
1482                .or_default()
1483                .push(write_page_to_data_buffer(encoded_page, &mut data_buffer));
1484        }
1485        let mut external_buffers =
1486            OutOfLineBuffers::new(data_buffer.len() as u64, options.buffer_alignment);
1487        let encoded_columns = encoder.finish(&mut external_buffers).await?;
1488        for buffer in external_buffers.take_buffers() {
1489            data_buffer.extend_from_slice(&buffer);
1490        }
1491        let num_columns = encoded_columns.len();
1492        for (col_idx, encoded_column) in encoded_columns.into_iter().enumerate() {
1493            let col_idx = col_idx + col_idx_offset;
1494            let mut col_buffer_offsets_and_sizes = Vec::new();
1495            for buffer in encoded_column.column_buffers {
1496                let buffer_offset = data_buffer.len() as u64;
1497                data_buffer.extend_from_slice(&buffer);
1498                let size = data_buffer.len() as u64 - buffer_offset;
1499                col_buffer_offsets_and_sizes.push((buffer_offset, size));
1500            }
1501            for page in encoded_column.final_pages {
1502                pages
1503                    .entry(page.column_idx)
1504                    .or_default()
1505                    .push(write_page_to_data_buffer(page, &mut data_buffer));
1506            }
1507            let col_pages = std::mem::take(pages.entry(col_idx as u32).or_default());
1508            page_table.push(Arc::new(ColumnInfo {
1509                index: col_idx as u32,
1510                buffer_offsets_and_sizes: Arc::from(
1511                    col_buffer_offsets_and_sizes.into_boxed_slice(),
1512                ),
1513                page_infos: Arc::from(col_pages.into_boxed_slice()),
1514                encoding: encoded_column.encoding,
1515            }))
1516        }
1517        col_idx_offset += num_columns;
1518    }
1519    let top_level_columns = batch_encoder
1520        .field_id_to_column_index
1521        .iter()
1522        .map(|(_, idx)| *idx)
1523        .collect();
1524    Ok(EncodedBatch {
1525        data: data_buffer.freeze(),
1526        top_level_columns,
1527        page_table,
1528        schema,
1529        num_rows: batch.num_rows() as u64,
1530    })
1531}
1532
1533#[cfg(test)]
1534pub mod tests {
1535    use crate::version::LanceFileVersion;
1536    use arrow_array::{ArrayRef, StringArray};
1537    use arrow_schema::Field;
1538    use lance_core::datatypes::{COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY};
1539    use std::collections::HashMap;
1540    use std::sync::Arc;
1541
1542    use super::check_fixed_size_encoding;
1543    use super::{check_dict_encoding, ArrayEncodingStrategy, CoreArrayEncodingStrategy};
1544
1545    fn is_dict_encoding_applicable(arr: Vec<Option<&str>>, threshold: u64) -> bool {
1546        let arr = StringArray::from(arr);
1547        let arr = Arc::new(arr) as ArrayRef;
1548        check_dict_encoding(&[arr], threshold)
1549    }
1550
1551    #[test]
1552    fn test_dict_encoding_should_be_applied_if_cardinality_less_than_threshold() {
1553        assert!(is_dict_encoding_applicable(
1554            vec![Some("a"), Some("b"), Some("a"), Some("b")],
1555            3,
1556        ));
1557    }
1558
1559    #[test]
1560    fn test_dict_encoding_should_not_be_applied_if_cardinality_larger_than_threshold() {
1561        assert!(!is_dict_encoding_applicable(
1562            vec![Some("a"), Some("b"), Some("c"), Some("d")],
1563            3,
1564        ));
1565    }
1566
1567    #[test]
1568    fn test_dict_encoding_should_not_be_applied_if_cardinality_equal_to_threshold() {
1569        assert!(!is_dict_encoding_applicable(
1570            vec![Some("a"), Some("b"), Some("c"), Some("a")],
1571            3,
1572        ));
1573    }
1574
1575    #[test]
1576    fn test_dict_encoding_should_not_be_applied_for_empty_arrays() {
1577        assert!(!is_dict_encoding_applicable(vec![], 3));
1578    }
1579
1580    #[test]
1581    fn test_dict_encoding_should_not_be_applied_for_smaller_than_threshold_arrays() {
1582        assert!(!is_dict_encoding_applicable(vec![Some("a"), Some("a")], 3));
1583    }
1584
1585    fn is_fixed_size_encoding_applicable(
1586        arrays: Vec<Vec<Option<&str>>>,
1587        version: LanceFileVersion,
1588    ) -> bool {
1589        let mut final_arrays = Vec::new();
1590        for arr in arrays {
1591            let arr = StringArray::from(arr);
1592            let arr = Arc::new(arr) as ArrayRef;
1593            final_arrays.push(arr);
1594        }
1595
1596        check_fixed_size_encoding(&final_arrays.clone(), version).is_some()
1597    }
1598
1599    #[test]
1600    fn test_fixed_size_binary_encoding_applicable() {
1601        assert!(!is_fixed_size_encoding_applicable(
1602            vec![vec![]],
1603            LanceFileVersion::V2_1
1604        ));
1605
1606        assert!(is_fixed_size_encoding_applicable(
1607            vec![vec![Some("a"), Some("b")]],
1608            LanceFileVersion::V2_1
1609        ));
1610
1611        assert!(!is_fixed_size_encoding_applicable(
1612            vec![vec![Some("abc"), Some("de")]],
1613            LanceFileVersion::V2_1
1614        ));
1615
1616        assert!(is_fixed_size_encoding_applicable(
1617            vec![vec![Some("pqr"), None]],
1618            LanceFileVersion::V2_1
1619        ));
1620
1621        assert!(!is_fixed_size_encoding_applicable(
1622            vec![vec![Some("pqr"), Some("")]],
1623            LanceFileVersion::V2_1
1624        ));
1625
1626        assert!(!is_fixed_size_encoding_applicable(
1627            vec![vec![Some(""), Some("")]],
1628            LanceFileVersion::V2_1
1629        ));
1630    }
1631
1632    #[test]
1633    fn test_fixed_size_binary_encoding_applicable_multiple_arrays() {
1634        assert!(is_fixed_size_encoding_applicable(
1635            vec![vec![Some("a"), Some("b")], vec![Some("c"), Some("d")]],
1636            LanceFileVersion::V2_1
1637        ));
1638
1639        assert!(!is_fixed_size_encoding_applicable(
1640            vec![vec![Some("ab"), Some("bc")], vec![Some("c"), Some("d")]],
1641            LanceFileVersion::V2_1
1642        ));
1643
1644        assert!(!is_fixed_size_encoding_applicable(
1645            vec![vec![Some("ab"), None], vec![None, Some("d")]],
1646            LanceFileVersion::V2_1
1647        ));
1648
1649        assert!(is_fixed_size_encoding_applicable(
1650            vec![vec![Some("a"), None], vec![None, Some("d")]],
1651            LanceFileVersion::V2_1
1652        ));
1653
1654        assert!(!is_fixed_size_encoding_applicable(
1655            vec![vec![Some(""), None], vec![None, Some("")]],
1656            LanceFileVersion::V2_1
1657        ));
1658
1659        assert!(!is_fixed_size_encoding_applicable(
1660            vec![vec![None, None], vec![None, None]],
1661            LanceFileVersion::V2_1
1662        ));
1663    }
1664
1665    fn verify_array_encoder(
1666        array: ArrayRef,
1667        field_meta: Option<HashMap<String, String>>,
1668        version: LanceFileVersion,
1669        expected_encoder: &str,
1670    ) {
1671        let encoding_strategy = CoreArrayEncodingStrategy { version };
1672        let mut field = Field::new("test_field", array.data_type().clone(), true);
1673        if let Some(field_meta) = field_meta {
1674            field.set_metadata(field_meta);
1675        }
1676        let lance_field = lance_core::datatypes::Field::try_from(field).unwrap();
1677        let encoder_result = encoding_strategy.create_array_encoder(&[array], &lance_field);
1678        assert!(encoder_result.is_ok());
1679        let encoder = encoder_result.unwrap();
1680        assert_eq!(format!("{:?}", encoder).as_str(), expected_encoder);
1681    }
1682
1683    #[test]
1684    fn test_choose_encoder_for_zstd_compressed_string_field() {
1685        verify_array_encoder(Arc::new(StringArray::from(vec!["a", "bb", "ccc"])),
1686                             Some(HashMap::from([(COMPRESSION_META_KEY.to_string(), "zstd".to_string())])),
1687                             LanceFileVersion::V2_1,
1688                             "BinaryEncoder { indices_encoder: BasicEncoder { values_encoder: ValueEncoder }, compression_config: Some(CompressionConfig { scheme: Zstd, level: None }), buffer_compressor: Some(ZstdBufferCompressor { compression_level: 0 }) }");
1689    }
1690
1691    #[test]
1692    fn test_choose_encoder_for_zstd_compression_level() {
1693        verify_array_encoder(Arc::new(StringArray::from(vec!["a", "bb", "ccc"])),
1694                             Some(HashMap::from([
1695                                 (COMPRESSION_META_KEY.to_string(), "zstd".to_string()),
1696                                 (COMPRESSION_LEVEL_META_KEY.to_string(), "22".to_string())
1697                             ])),
1698                             LanceFileVersion::V2_1,
1699                             "BinaryEncoder { indices_encoder: BasicEncoder { values_encoder: ValueEncoder }, compression_config: Some(CompressionConfig { scheme: Zstd, level: Some(22) }), buffer_compressor: Some(ZstdBufferCompressor { compression_level: 22 }) }");
1700    }
1701}