lance_encoding/
encoder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! The top-level encoding module for Lance files.
5//!
6//! Lance files are encoded using a [`FieldEncodingStrategy`] which choose
7//! what encoder to use for each field.
8//!
9//! The current strategy is the [`StructuralEncodingStrategy`] which uses "structural"
10//! encoding.  A tree of encoders is built up for each field.  The struct & list encoders
11//! simply pull off the validity and offsets and collect them.  Then, in the primitive leaf
12//! encoder the validity, offsets, and values are accumulated in an accumulation buffer.  Once
13//! enough data has been collected the primitive encoder will either use a miniblock encoding
14//! or a full zip encoding to create a page of data from the accumulation buffer.
15
16use std::{collections::HashMap, sync::Arc};
17
18use arrow_array::{Array, ArrayRef, RecordBatch};
19use arrow_schema::DataType;
20use bytes::{Bytes, BytesMut};
21use futures::future::BoxFuture;
22use lance_core::datatypes::{Field, Schema};
23use lance_core::utils::bit::{is_pwr_two, pad_bytes_to};
24use lance_core::{Error, Result};
25use snafu::location;
26
27use crate::buffer::LanceBuffer;
28use crate::compression::{CompressionStrategy, DefaultCompressionStrategy};
29use crate::compression_config::CompressionParams;
30use crate::decoder::PageEncoding;
31use crate::encodings::logical::list::ListStructuralEncoder;
32use crate::encodings::logical::primitive::PrimitiveStructuralEncoder;
33use crate::encodings::logical::r#struct::StructStructuralEncoder;
34use crate::repdef::RepDefBuilder;
35use crate::version::LanceFileVersion;
36use crate::{
37    decoder::{ColumnInfo, PageInfo},
38    format::pb,
39};
40
41/// The minimum alignment for a page buffer.  Writers must respect this.
42pub const MIN_PAGE_BUFFER_ALIGNMENT: u64 = 8;
43
44/// An encoded page of data
45///
46/// Maps to a top-level array
47///
48/// For example, FixedSizeList<Int32> will have two EncodedArray instances and one EncodedPage
49#[derive(Debug)]
50pub struct EncodedPage {
51    // The encoded page buffers
52    pub data: Vec<LanceBuffer>,
53    // A description of the encoding used to encode the page
54    pub description: PageEncoding,
55    /// The number of rows in the encoded page
56    pub num_rows: u64,
57    /// The top-level row number of the first row in the page
58    ///
59    /// Generally the number of "top-level" rows and the number of rows are the same.  However,
60    /// when there is repetition (list/fixed-size-list) there will be more or less items than rows.
61    ///
62    /// A top-level row can never be split across a page boundary.
63    pub row_number: u64,
64    /// The index of the column
65    pub column_idx: u32,
66}
67
68pub struct EncodedColumn {
69    pub column_buffers: Vec<LanceBuffer>,
70    pub encoding: pb::ColumnEncoding,
71    pub final_pages: Vec<EncodedPage>,
72}
73
74impl Default for EncodedColumn {
75    fn default() -> Self {
76        Self {
77            column_buffers: Default::default(),
78            encoding: pb::ColumnEncoding {
79                column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
80            },
81            final_pages: Default::default(),
82        }
83    }
84}
85
86/// A tool to reserve space for buffers that are not in-line with the data
87///
88/// In most cases, buffers are stored in the page and referred to in the encoding
89/// metadata by their index in the page.  This keeps all buffers within a page together.
90/// As a result, most encoders should not need to use this structure.
91///
92/// In some cases (currently only the large binary encoding) there is a need to access
93/// buffers that are not in the page (because storing the position / offset of every page
94/// in the page metadata would be too expensive).
95///
96/// To do this you can add a buffer with `add_buffer` and then use the returned position
97/// in some way (in the large binary encoding the returned position is stored in the page
98/// data as a position / size array).
99pub struct OutOfLineBuffers {
100    position: u64,
101    buffer_alignment: u64,
102    buffers: Vec<LanceBuffer>,
103}
104
105impl OutOfLineBuffers {
106    pub fn new(base_position: u64, buffer_alignment: u64) -> Self {
107        Self {
108            position: base_position,
109            buffer_alignment,
110            buffers: Vec::new(),
111        }
112    }
113
114    pub fn add_buffer(&mut self, buffer: LanceBuffer) -> u64 {
115        let position = self.position;
116        self.position += buffer.len() as u64;
117        self.position += pad_bytes_to(buffer.len(), self.buffer_alignment as usize) as u64;
118        self.buffers.push(buffer);
119        position
120    }
121
122    pub fn take_buffers(self) -> Vec<LanceBuffer> {
123        self.buffers
124    }
125
126    pub fn reset_position(&mut self, position: u64) {
127        self.position = position;
128    }
129}
130
131/// A task to create a page of data
132pub type EncodeTask = BoxFuture<'static, Result<EncodedPage>>;
133
134/// Top level encoding trait to code any Arrow array type into one or more pages.
135///
136/// The field encoder implements buffering and encoding of a single input column
137/// but it may map to multiple output columns.  For example, a list array or struct
138/// array will be encoded into multiple columns.
139///
140/// Also, fields may be encoded at different speeds.  For example, given a struct
141/// column with three fields (a boolean field, an int32 field, and a 4096-dimension
142/// tensor field) the tensor field is likely to emit encoded pages much more frequently
143/// than the boolean field.
144pub trait FieldEncoder: Send {
145    /// Buffer the data and, if there is enough data in the buffer to form a page, return
146    /// an encoding task to encode the data.
147    ///
148    /// This may return more than one task because a single column may be mapped to multiple
149    /// output columns.  For example, if encoding a struct column with three children then
150    /// up to three tasks may be returned from each call to maybe_encode.
151    ///
152    /// It may also return multiple tasks for a single column if the input array is larger
153    /// than a single disk page.
154    ///
155    /// It could also return an empty Vec if there is not enough data yet to encode any pages.
156    ///
157    /// The `row_number` must be passed which is the top-level row number currently being encoded
158    /// This is stored in any pages produced by this call so that we can know the priority of the
159    /// page.
160    ///
161    /// The `num_rows` is the number of top level rows.  It is initially the same as `array.len()`
162    /// however it is passed seprately because array will become flattened over time (if there is
163    /// repetition) and we need to know the original number of rows for various purposes.
164    fn maybe_encode(
165        &mut self,
166        array: ArrayRef,
167        external_buffers: &mut OutOfLineBuffers,
168        repdef: RepDefBuilder,
169        row_number: u64,
170        num_rows: u64,
171    ) -> Result<Vec<EncodeTask>>;
172    /// Flush any remaining data from the buffers into encoding tasks
173    ///
174    /// Each encode task produces a single page.  The order of these pages will be maintained
175    /// in the file (we do not worry about order between columns but all pages in the same
176    /// column should maintain order)
177    ///
178    /// This may be called intermittently throughout encoding but will always be called
179    /// once at the end of encoding just before calling finish
180    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>>;
181    /// Finish encoding and return column metadata
182    ///
183    /// This is called only once, after all encode tasks have completed
184    ///
185    /// This returns a Vec because a single field may have created multiple columns
186    fn finish(
187        &mut self,
188        external_buffers: &mut OutOfLineBuffers,
189    ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>>;
190
191    /// The number of output columns this encoding will create
192    fn num_columns(&self) -> u32;
193}
194
195/// Keeps track of the current column index and makes a mapping
196/// from field id to column index
197#[derive(Debug, Default)]
198pub struct ColumnIndexSequence {
199    current_index: u32,
200    mapping: Vec<(u32, u32)>,
201}
202
203impl ColumnIndexSequence {
204    pub fn next_column_index(&mut self, field_id: u32) -> u32 {
205        let idx = self.current_index;
206        self.current_index += 1;
207        self.mapping.push((field_id, idx));
208        idx
209    }
210
211    pub fn skip(&mut self) {
212        self.current_index += 1;
213    }
214}
215
216/// Options that control the encoding process
217pub struct EncodingOptions {
218    /// How much data (in bytes) to cache in-memory before writing a page
219    ///
220    /// This cache is applied on a per-column basis
221    pub cache_bytes_per_column: u64,
222    /// The maximum size of a page in bytes, if a single array would create
223    /// a page larger than this then it will be split into multiple pages
224    pub max_page_bytes: u64,
225    /// If false (the default) then arrays will be copied (deeply) before
226    /// being cached.  This ensures any data kept alive by the array can
227    /// be discarded safely and helps avoid writer accumulation.  However,
228    /// there is an associated cost.
229    pub keep_original_array: bool,
230    /// The alignment that the writer is applying to buffers
231    ///
232    /// The encoder needs to know this so it figures the position of out-of-line
233    /// buffers correctly
234    pub buffer_alignment: u64,
235}
236
237impl Default for EncodingOptions {
238    fn default() -> Self {
239        Self {
240            cache_bytes_per_column: 8 * 1024 * 1024,
241            max_page_bytes: 32 * 1024 * 1024,
242            keep_original_array: true,
243            buffer_alignment: 64,
244        }
245    }
246}
247
248/// A trait to pick which kind of field encoding to use for a field
249///
250/// Unlike the ArrayEncodingStrategy, the field encoding strategy is
251/// chosen before any data is generated and the same field encoder is
252/// used for all data in the field.
253pub trait FieldEncodingStrategy: Send + Sync + std::fmt::Debug {
254    /// Choose and create an appropriate field encoder for the given
255    /// field.
256    ///
257    /// The field encoder can be chosen on the data type as well as
258    /// any metadata that is attached to the field.
259    ///
260    /// The `encoding_strategy_root` is the encoder that should be
261    /// used to encode any inner data in struct / list / etc. fields.
262    ///
263    /// Initially it is the same as `self` and generally should be
264    /// forwarded to any inner encoding strategy.
265    fn create_field_encoder(
266        &self,
267        encoding_strategy_root: &dyn FieldEncodingStrategy,
268        field: &Field,
269        column_index: &mut ColumnIndexSequence,
270        options: &EncodingOptions,
271    ) -> Result<Box<dyn FieldEncoder>>;
272}
273
274pub fn default_encoding_strategy(version: LanceFileVersion) -> Box<dyn FieldEncodingStrategy> {
275    match version.resolve() {
276        LanceFileVersion::Legacy => panic!(),
277        LanceFileVersion::V2_0 => {
278            Box::new(crate::previous::encoder::CoreFieldEncodingStrategy::default())
279        }
280        _ => Box::new(StructuralEncodingStrategy::default()),
281    }
282}
283
284/// Create an encoding strategy with user-configured compression parameters
285pub fn default_encoding_strategy_with_params(
286    version: LanceFileVersion,
287    params: CompressionParams,
288) -> Result<Box<dyn FieldEncodingStrategy>> {
289    match version.resolve() {
290        LanceFileVersion::Legacy | LanceFileVersion::V2_0 => Err(Error::invalid_input(
291            "Compression parameters are only supported in Lance file version 2.1 and later",
292            location!(),
293        )),
294        _ => {
295            let compression_strategy = Arc::new(DefaultCompressionStrategy::with_params(params));
296            Ok(Box::new(StructuralEncodingStrategy {
297                compression_strategy,
298                version,
299            }))
300        }
301    }
302}
303
304/// An encoding strategy used for 2.1+ files
305#[derive(Debug)]
306pub struct StructuralEncodingStrategy {
307    pub compression_strategy: Arc<dyn CompressionStrategy>,
308    pub version: LanceFileVersion,
309}
310
311// For some reason, clippy thinks we can add Default to the above derive but
312// rustc doesn't agree (no default for Arc<dyn Trait>)
313#[allow(clippy::derivable_impls)]
314impl Default for StructuralEncodingStrategy {
315    fn default() -> Self {
316        Self {
317            compression_strategy: Arc::new(DefaultCompressionStrategy::new()),
318            version: LanceFileVersion::default(),
319        }
320    }
321}
322
323impl StructuralEncodingStrategy {
324    fn is_primitive_type(data_type: &DataType) -> bool {
325        matches!(
326            data_type,
327            DataType::Boolean
328                | DataType::Date32
329                | DataType::Date64
330                | DataType::Decimal128(_, _)
331                | DataType::Decimal256(_, _)
332                | DataType::Duration(_)
333                | DataType::Float16
334                | DataType::Float32
335                | DataType::Float64
336                | DataType::Int16
337                | DataType::Int32
338                | DataType::Int64
339                | DataType::Int8
340                | DataType::Interval(_)
341                | DataType::Null
342                | DataType::Time32(_)
343                | DataType::Time64(_)
344                | DataType::Timestamp(_, _)
345                | DataType::UInt16
346                | DataType::UInt32
347                | DataType::UInt64
348                | DataType::UInt8
349                | DataType::FixedSizeBinary(_)
350                | DataType::FixedSizeList(_, _)
351                | DataType::Binary
352                | DataType::LargeBinary
353                | DataType::Utf8
354                | DataType::LargeUtf8,
355        )
356    }
357
358    fn do_create_field_encoder(
359        &self,
360        _encoding_strategy_root: &dyn FieldEncodingStrategy,
361        field: &Field,
362        column_index: &mut ColumnIndexSequence,
363        options: &EncodingOptions,
364        root_field_metadata: &HashMap<String, String>,
365    ) -> Result<Box<dyn FieldEncoder>> {
366        let data_type = field.data_type();
367        if Self::is_primitive_type(&data_type) {
368            Ok(Box::new(PrimitiveStructuralEncoder::try_new(
369                options,
370                self.compression_strategy.clone(),
371                column_index.next_column_index(field.id as u32),
372                field.clone(),
373                Arc::new(root_field_metadata.clone()),
374            )?))
375        } else {
376            match data_type {
377                DataType::List(_) | DataType::LargeList(_) => {
378                    let child = field.children.first().expect("List should have a child");
379                    let child_encoder = self.do_create_field_encoder(
380                        _encoding_strategy_root,
381                        child,
382                        column_index,
383                        options,
384                        root_field_metadata,
385                    )?;
386                    Ok(Box::new(ListStructuralEncoder::new(
387                        options.keep_original_array,
388                        child_encoder,
389                    )))
390                }
391                DataType::Struct(_) => {
392                    if field.is_packed_struct() {
393                        Ok(Box::new(PrimitiveStructuralEncoder::try_new(
394                            options,
395                            self.compression_strategy.clone(),
396                            column_index.next_column_index(field.id as u32),
397                            field.clone(),
398                            Arc::new(root_field_metadata.clone()),
399                        )?))
400                    } else {
401                        let children_encoders = field
402                            .children
403                            .iter()
404                            .map(|field| {
405                                self.do_create_field_encoder(
406                                    _encoding_strategy_root,
407                                    field,
408                                    column_index,
409                                    options,
410                                    root_field_metadata,
411                                )
412                            })
413                            .collect::<Result<Vec<_>>>()?;
414                        Ok(Box::new(StructStructuralEncoder::new(
415                            options.keep_original_array,
416                            children_encoders,
417                        )))
418                    }
419                }
420                DataType::Dictionary(_, value_type) => {
421                    // A dictionary of primitive is, itself, primitive
422                    if Self::is_primitive_type(&value_type) {
423                        Ok(Box::new(PrimitiveStructuralEncoder::try_new(
424                            options,
425                            self.compression_strategy.clone(),
426                            column_index.next_column_index(field.id as u32),
427                            field.clone(),
428                            Arc::new(root_field_metadata.clone()),
429                        )?))
430                    } else {
431                        // A dictionary of logical is, itself, logical and we don't support that today
432                        // It could be possible (e.g. store indices in one column and values in remaining columns)
433                        // but would be a significant amount of work
434                        //
435                        // An easier fallback implementation would be to decode-on-write and encode-on-read
436                        Err(Error::NotSupported { source: format!("cannot encode a dictionary column whose value type is a logical type ({})", value_type).into(), location: location!() })
437                    }
438                }
439                _ => todo!("Implement encoding for field {}", field),
440            }
441        }
442    }
443}
444
445impl FieldEncodingStrategy for StructuralEncodingStrategy {
446    fn create_field_encoder(
447        &self,
448        encoding_strategy_root: &dyn FieldEncodingStrategy,
449        field: &Field,
450        column_index: &mut ColumnIndexSequence,
451        options: &EncodingOptions,
452    ) -> Result<Box<dyn FieldEncoder>> {
453        self.do_create_field_encoder(
454            encoding_strategy_root,
455            field,
456            column_index,
457            options,
458            &field.metadata,
459        )
460    }
461}
462
463/// A batch encoder that encodes RecordBatch objects by delegating
464/// to field encoders for each top-level field in the batch.
465pub struct BatchEncoder {
466    pub field_encoders: Vec<Box<dyn FieldEncoder>>,
467    pub field_id_to_column_index: Vec<(u32, u32)>,
468}
469
470impl BatchEncoder {
471    pub fn try_new(
472        schema: &Schema,
473        strategy: &dyn FieldEncodingStrategy,
474        options: &EncodingOptions,
475    ) -> Result<Self> {
476        let mut col_idx = 0;
477        let mut col_idx_sequence = ColumnIndexSequence::default();
478        let field_encoders = schema
479            .fields
480            .iter()
481            .map(|field| {
482                let encoder = strategy.create_field_encoder(
483                    strategy,
484                    field,
485                    &mut col_idx_sequence,
486                    options,
487                )?;
488                col_idx += encoder.as_ref().num_columns();
489                Ok(encoder)
490            })
491            .collect::<Result<Vec<_>>>()?;
492        Ok(Self {
493            field_encoders,
494            field_id_to_column_index: col_idx_sequence.mapping,
495        })
496    }
497
498    pub fn num_columns(&self) -> u32 {
499        self.field_encoders
500            .iter()
501            .map(|field_encoder| field_encoder.num_columns())
502            .sum::<u32>()
503    }
504}
505
506/// An encoded batch of data and a page table describing it
507///
508/// This is returned by [`crate::encoder::encode_batch`]
509#[derive(Debug)]
510pub struct EncodedBatch {
511    pub data: Bytes,
512    pub page_table: Vec<Arc<ColumnInfo>>,
513    pub schema: Arc<Schema>,
514    pub top_level_columns: Vec<u32>,
515    pub num_rows: u64,
516}
517
518fn write_page_to_data_buffer(page: EncodedPage, data_buffer: &mut BytesMut) -> PageInfo {
519    let buffers = page.data;
520    let mut buffer_offsets_and_sizes = Vec::with_capacity(buffers.len());
521    for buffer in buffers {
522        let buffer_offset = data_buffer.len() as u64;
523        data_buffer.extend_from_slice(&buffer);
524        let size = data_buffer.len() as u64 - buffer_offset;
525        buffer_offsets_and_sizes.push((buffer_offset, size));
526    }
527
528    PageInfo {
529        buffer_offsets_and_sizes: Arc::from(buffer_offsets_and_sizes),
530        encoding: page.description,
531        num_rows: page.num_rows,
532        priority: page.row_number,
533    }
534}
535
536/// Helper method to encode a batch of data into memory
537///
538/// This is primarily for testing and benchmarking but could be useful in other
539/// niche situations like IPC.
540pub async fn encode_batch(
541    batch: &RecordBatch,
542    schema: Arc<Schema>,
543    encoding_strategy: &dyn FieldEncodingStrategy,
544    options: &EncodingOptions,
545) -> Result<EncodedBatch> {
546    if !is_pwr_two(options.buffer_alignment) || options.buffer_alignment < MIN_PAGE_BUFFER_ALIGNMENT
547    {
548        return Err(Error::InvalidInput {
549            source: format!(
550                "buffer_alignment must be a power of two and at least {}",
551                MIN_PAGE_BUFFER_ALIGNMENT
552            )
553            .into(),
554            location: location!(),
555        });
556    }
557
558    let mut data_buffer = BytesMut::new();
559    let lance_schema = Schema::try_from(batch.schema().as_ref())?;
560    let options = EncodingOptions {
561        keep_original_array: true,
562        ..*options
563    };
564    let batch_encoder = BatchEncoder::try_new(&lance_schema, encoding_strategy, &options)?;
565    let mut page_table = Vec::new();
566    let mut col_idx_offset = 0;
567    for (arr, mut encoder) in batch.columns().iter().zip(batch_encoder.field_encoders) {
568        let mut external_buffers =
569            OutOfLineBuffers::new(data_buffer.len() as u64, options.buffer_alignment);
570        let repdef = RepDefBuilder::default();
571        let encoder = encoder.as_mut();
572        let num_rows = arr.len() as u64;
573        let mut tasks =
574            encoder.maybe_encode(arr.clone(), &mut external_buffers, repdef, 0, num_rows)?;
575        tasks.extend(encoder.flush(&mut external_buffers)?);
576        for buffer in external_buffers.take_buffers() {
577            data_buffer.extend_from_slice(&buffer);
578        }
579        let mut pages = HashMap::<u32, Vec<PageInfo>>::new();
580        for task in tasks {
581            let encoded_page = task.await?;
582            // Write external buffers first
583            pages
584                .entry(encoded_page.column_idx)
585                .or_default()
586                .push(write_page_to_data_buffer(encoded_page, &mut data_buffer));
587        }
588        let mut external_buffers =
589            OutOfLineBuffers::new(data_buffer.len() as u64, options.buffer_alignment);
590        let encoded_columns = encoder.finish(&mut external_buffers).await?;
591        for buffer in external_buffers.take_buffers() {
592            data_buffer.extend_from_slice(&buffer);
593        }
594        let num_columns = encoded_columns.len();
595        for (col_idx, encoded_column) in encoded_columns.into_iter().enumerate() {
596            let col_idx = col_idx + col_idx_offset;
597            let mut col_buffer_offsets_and_sizes = Vec::new();
598            for buffer in encoded_column.column_buffers {
599                let buffer_offset = data_buffer.len() as u64;
600                data_buffer.extend_from_slice(&buffer);
601                let size = data_buffer.len() as u64 - buffer_offset;
602                col_buffer_offsets_and_sizes.push((buffer_offset, size));
603            }
604            for page in encoded_column.final_pages {
605                pages
606                    .entry(page.column_idx)
607                    .or_default()
608                    .push(write_page_to_data_buffer(page, &mut data_buffer));
609            }
610            let col_pages = std::mem::take(pages.entry(col_idx as u32).or_default());
611            page_table.push(Arc::new(ColumnInfo {
612                index: col_idx as u32,
613                buffer_offsets_and_sizes: Arc::from(
614                    col_buffer_offsets_and_sizes.into_boxed_slice(),
615                ),
616                page_infos: Arc::from(col_pages.into_boxed_slice()),
617                encoding: encoded_column.encoding,
618            }))
619        }
620        col_idx_offset += num_columns;
621    }
622    let top_level_columns = batch_encoder
623        .field_id_to_column_index
624        .iter()
625        .map(|(_, idx)| *idx)
626        .collect();
627    Ok(EncodedBatch {
628        data: data_buffer.freeze(),
629        top_level_columns,
630        page_table,
631        schema,
632        num_rows: batch.num_rows() as u64,
633    })
634}
635
636#[cfg(test)]
637mod tests {
638    use super::*;
639    use crate::compression_config::{CompressionFieldParams, CompressionParams};
640
641    #[test]
642    fn test_configured_encoding_strategy() {
643        // Create test parameters
644        let mut params = CompressionParams::new();
645        params.columns.insert(
646            "*_id".to_string(),
647            CompressionFieldParams {
648                rle_threshold: Some(0.5),
649                compression: Some("lz4".to_string()),
650                compression_level: None,
651            },
652        );
653
654        // Test with V2.1 - should succeed
655        let strategy =
656            default_encoding_strategy_with_params(LanceFileVersion::V2_1, params.clone())
657                .expect("Should succeed for V2.1");
658
659        // Verify it's a StructuralEncodingStrategy
660        assert!(format!("{:?}", strategy).contains("StructuralEncodingStrategy"));
661        assert!(format!("{:?}", strategy).contains("DefaultCompressionStrategy"));
662
663        // Test with V2.0 - should fail
664        let err = default_encoding_strategy_with_params(LanceFileVersion::V2_0, params.clone())
665            .expect_err("Should fail for V2.0");
666        assert!(err
667            .to_string()
668            .contains("only supported in Lance file version 2.1"));
669
670        // Test with Legacy - should fail
671        let err = default_encoding_strategy_with_params(LanceFileVersion::Legacy, params)
672            .expect_err("Should fail for Legacy");
673        assert!(err
674            .to_string()
675            .contains("only supported in Lance file version 2.1"));
676    }
677}