lance_encoding/
encoder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! The top-level encoding module for Lance files.
5//!
6//! Lance files are encoded using a [`FieldEncodingStrategy`] which choose
7//! what encoder to use for each field.
8//!
9//! The current strategy is the [`StructuralEncodingStrategy`] which uses "structural"
10//! encoding.  A tree of encoders is built up for each field.  The struct & list encoders
11//! simply pull off the validity and offsets and collect them.  Then, in the primitive leaf
12//! encoder the validity, offsets, and values are accumulated in an accumulation buffer.  Once
13//! enough data has been collected the primitive encoder will either use a miniblock encoding
14//! or a full zip encoding to create a page of data from the accumulation buffer.
15
16use std::{collections::HashMap, sync::Arc};
17
18use arrow_array::{Array, ArrayRef, RecordBatch};
19use arrow_schema::DataType;
20use bytes::{Bytes, BytesMut};
21use futures::future::BoxFuture;
22use lance_core::datatypes::{Field, Schema};
23use lance_core::utils::bit::{is_pwr_two, pad_bytes_to};
24use lance_core::{Error, Result};
25use snafu::location;
26
27use crate::buffer::LanceBuffer;
28use crate::compression::{CompressionStrategy, DefaultCompressionStrategy};
29use crate::decoder::PageEncoding;
30use crate::encodings::logical::list::ListStructuralEncoder;
31use crate::encodings::logical::primitive::PrimitiveStructuralEncoder;
32use crate::encodings::logical::r#struct::StructStructuralEncoder;
33use crate::repdef::RepDefBuilder;
34use crate::version::LanceFileVersion;
35use crate::{
36    decoder::{ColumnInfo, PageInfo},
37    format::pb,
38};
39
40/// The minimum alignment for a page buffer.  Writers must respect this.
41pub const MIN_PAGE_BUFFER_ALIGNMENT: u64 = 8;
42
43/// An encoded page of data
44///
45/// Maps to a top-level array
46///
47/// For example, FixedSizeList<Int32> will have two EncodedArray instances and one EncodedPage
48#[derive(Debug)]
49pub struct EncodedPage {
50    // The encoded page buffers
51    pub data: Vec<LanceBuffer>,
52    // A description of the encoding used to encode the page
53    pub description: PageEncoding,
54    /// The number of rows in the encoded page
55    pub num_rows: u64,
56    /// The top-level row number of the first row in the page
57    ///
58    /// Generally the number of "top-level" rows and the number of rows are the same.  However,
59    /// when there is repetition (list/fixed-size-list) there will be more or less items than rows.
60    ///
61    /// A top-level row can never be split across a page boundary.
62    pub row_number: u64,
63    /// The index of the column
64    pub column_idx: u32,
65}
66
67pub struct EncodedColumn {
68    pub column_buffers: Vec<LanceBuffer>,
69    pub encoding: pb::ColumnEncoding,
70    pub final_pages: Vec<EncodedPage>,
71}
72
73impl Default for EncodedColumn {
74    fn default() -> Self {
75        Self {
76            column_buffers: Default::default(),
77            encoding: pb::ColumnEncoding {
78                column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
79            },
80            final_pages: Default::default(),
81        }
82    }
83}
84
85/// A tool to reserve space for buffers that are not in-line with the data
86///
87/// In most cases, buffers are stored in the page and referred to in the encoding
88/// metadata by their index in the page.  This keeps all buffers within a page together.
89/// As a result, most encoders should not need to use this structure.
90///
91/// In some cases (currently only the large binary encoding) there is a need to access
92/// buffers that are not in the page (because storing the position / offset of every page
93/// in the page metadata would be too expensive).
94///
95/// To do this you can add a buffer with `add_buffer` and then use the returned position
96/// in some way (in the large binary encoding the returned position is stored in the page
97/// data as a position / size array).
98pub struct OutOfLineBuffers {
99    position: u64,
100    buffer_alignment: u64,
101    buffers: Vec<LanceBuffer>,
102}
103
104impl OutOfLineBuffers {
105    pub fn new(base_position: u64, buffer_alignment: u64) -> Self {
106        Self {
107            position: base_position,
108            buffer_alignment,
109            buffers: Vec::new(),
110        }
111    }
112
113    pub fn add_buffer(&mut self, buffer: LanceBuffer) -> u64 {
114        let position = self.position;
115        self.position += buffer.len() as u64;
116        self.position += pad_bytes_to(buffer.len(), self.buffer_alignment as usize) as u64;
117        self.buffers.push(buffer);
118        position
119    }
120
121    pub fn take_buffers(self) -> Vec<LanceBuffer> {
122        self.buffers
123    }
124
125    pub fn reset_position(&mut self, position: u64) {
126        self.position = position;
127    }
128}
129
130/// A task to create a page of data
131pub type EncodeTask = BoxFuture<'static, Result<EncodedPage>>;
132
133/// Top level encoding trait to code any Arrow array type into one or more pages.
134///
135/// The field encoder implements buffering and encoding of a single input column
136/// but it may map to multiple output columns.  For example, a list array or struct
137/// array will be encoded into multiple columns.
138///
139/// Also, fields may be encoded at different speeds.  For example, given a struct
140/// column with three fields (a boolean field, an int32 field, and a 4096-dimension
141/// tensor field) the tensor field is likely to emit encoded pages much more frequently
142/// than the boolean field.
143pub trait FieldEncoder: Send {
144    /// Buffer the data and, if there is enough data in the buffer to form a page, return
145    /// an encoding task to encode the data.
146    ///
147    /// This may return more than one task because a single column may be mapped to multiple
148    /// output columns.  For example, if encoding a struct column with three children then
149    /// up to three tasks may be returned from each call to maybe_encode.
150    ///
151    /// It may also return multiple tasks for a single column if the input array is larger
152    /// than a single disk page.
153    ///
154    /// It could also return an empty Vec if there is not enough data yet to encode any pages.
155    ///
156    /// The `row_number` must be passed which is the top-level row number currently being encoded
157    /// This is stored in any pages produced by this call so that we can know the priority of the
158    /// page.
159    ///
160    /// The `num_rows` is the number of top level rows.  It is initially the same as `array.len()`
161    /// however it is passed seprately because array will become flattened over time (if there is
162    /// repetition) and we need to know the original number of rows for various purposes.
163    fn maybe_encode(
164        &mut self,
165        array: ArrayRef,
166        external_buffers: &mut OutOfLineBuffers,
167        repdef: RepDefBuilder,
168        row_number: u64,
169        num_rows: u64,
170    ) -> Result<Vec<EncodeTask>>;
171    /// Flush any remaining data from the buffers into encoding tasks
172    ///
173    /// Each encode task produces a single page.  The order of these pages will be maintained
174    /// in the file (we do not worry about order between columns but all pages in the same
175    /// column should maintain order)
176    ///
177    /// This may be called intermittently throughout encoding but will always be called
178    /// once at the end of encoding just before calling finish
179    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>>;
180    /// Finish encoding and return column metadata
181    ///
182    /// This is called only once, after all encode tasks have completed
183    ///
184    /// This returns a Vec because a single field may have created multiple columns
185    fn finish(
186        &mut self,
187        external_buffers: &mut OutOfLineBuffers,
188    ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>>;
189
190    /// The number of output columns this encoding will create
191    fn num_columns(&self) -> u32;
192}
193
194/// Keeps track of the current column index and makes a mapping
195/// from field id to column index
196#[derive(Debug, Default)]
197pub struct ColumnIndexSequence {
198    current_index: u32,
199    mapping: Vec<(u32, u32)>,
200}
201
202impl ColumnIndexSequence {
203    pub fn next_column_index(&mut self, field_id: u32) -> u32 {
204        let idx = self.current_index;
205        self.current_index += 1;
206        self.mapping.push((field_id, idx));
207        idx
208    }
209
210    pub fn skip(&mut self) {
211        self.current_index += 1;
212    }
213}
214
215/// Options that control the encoding process
216pub struct EncodingOptions {
217    /// How much data (in bytes) to cache in-memory before writing a page
218    ///
219    /// This cache is applied on a per-column basis
220    pub cache_bytes_per_column: u64,
221    /// The maximum size of a page in bytes, if a single array would create
222    /// a page larger than this then it will be split into multiple pages
223    pub max_page_bytes: u64,
224    /// If false (the default) then arrays will be copied (deeply) before
225    /// being cached.  This ensures any data kept alive by the array can
226    /// be discarded safely and helps avoid writer accumulation.  However,
227    /// there is an associated cost.
228    pub keep_original_array: bool,
229    /// The alignment that the writer is applying to buffers
230    ///
231    /// The encoder needs to know this so it figures the position of out-of-line
232    /// buffers correctly
233    pub buffer_alignment: u64,
234}
235
236impl Default for EncodingOptions {
237    fn default() -> Self {
238        Self {
239            cache_bytes_per_column: 8 * 1024 * 1024,
240            max_page_bytes: 32 * 1024 * 1024,
241            keep_original_array: true,
242            buffer_alignment: 64,
243        }
244    }
245}
246
247/// A trait to pick which kind of field encoding to use for a field
248///
249/// Unlike the ArrayEncodingStrategy, the field encoding strategy is
250/// chosen before any data is generated and the same field encoder is
251/// used for all data in the field.
252pub trait FieldEncodingStrategy: Send + Sync + std::fmt::Debug {
253    /// Choose and create an appropriate field encoder for the given
254    /// field.
255    ///
256    /// The field encoder can be chosen on the data type as well as
257    /// any metadata that is attached to the field.
258    ///
259    /// The `encoding_strategy_root` is the encoder that should be
260    /// used to encode any inner data in struct / list / etc. fields.
261    ///
262    /// Initially it is the same as `self` and generally should be
263    /// forwarded to any inner encoding strategy.
264    fn create_field_encoder(
265        &self,
266        encoding_strategy_root: &dyn FieldEncodingStrategy,
267        field: &Field,
268        column_index: &mut ColumnIndexSequence,
269        options: &EncodingOptions,
270    ) -> Result<Box<dyn FieldEncoder>>;
271}
272
273pub fn default_encoding_strategy(version: LanceFileVersion) -> Box<dyn FieldEncodingStrategy> {
274    match version.resolve() {
275        LanceFileVersion::Legacy => panic!(),
276        LanceFileVersion::V2_0 => {
277            Box::new(crate::v2::encoder::CoreFieldEncodingStrategy::default())
278        }
279        _ => Box::new(StructuralEncodingStrategy::default()),
280    }
281}
282
283/// An encoding strategy used for 2.1+ files
284#[derive(Debug)]
285pub struct StructuralEncodingStrategy {
286    pub compression_strategy: Arc<dyn CompressionStrategy>,
287    pub version: LanceFileVersion,
288}
289
290// For some reason, clippy thinks we can add Default to the above derive but
291// rustc doesn't agree (no default for Arc<dyn Trait>)
292#[allow(clippy::derivable_impls)]
293impl Default for StructuralEncodingStrategy {
294    fn default() -> Self {
295        Self {
296            compression_strategy: Arc::<DefaultCompressionStrategy>::default(),
297            version: LanceFileVersion::default(),
298        }
299    }
300}
301
302impl StructuralEncodingStrategy {
303    fn is_primitive_type(data_type: &DataType) -> bool {
304        matches!(
305            data_type,
306            DataType::Boolean
307                | DataType::Date32
308                | DataType::Date64
309                | DataType::Decimal128(_, _)
310                | DataType::Decimal256(_, _)
311                | DataType::Duration(_)
312                | DataType::Float16
313                | DataType::Float32
314                | DataType::Float64
315                | DataType::Int16
316                | DataType::Int32
317                | DataType::Int64
318                | DataType::Int8
319                | DataType::Interval(_)
320                | DataType::Null
321                | DataType::Time32(_)
322                | DataType::Time64(_)
323                | DataType::Timestamp(_, _)
324                | DataType::UInt16
325                | DataType::UInt32
326                | DataType::UInt64
327                | DataType::UInt8
328                | DataType::FixedSizeBinary(_)
329                | DataType::FixedSizeList(_, _)
330                | DataType::Binary
331                | DataType::LargeBinary
332                | DataType::Utf8
333                | DataType::LargeUtf8,
334        )
335    }
336
337    fn do_create_field_encoder(
338        &self,
339        _encoding_strategy_root: &dyn FieldEncodingStrategy,
340        field: &Field,
341        column_index: &mut ColumnIndexSequence,
342        options: &EncodingOptions,
343        root_field_metadata: &HashMap<String, String>,
344    ) -> Result<Box<dyn FieldEncoder>> {
345        let data_type = field.data_type();
346        if Self::is_primitive_type(&data_type) {
347            Ok(Box::new(PrimitiveStructuralEncoder::try_new(
348                options,
349                self.compression_strategy.clone(),
350                column_index.next_column_index(field.id as u32),
351                field.clone(),
352                Arc::new(root_field_metadata.clone()),
353            )?))
354        } else {
355            match data_type {
356                DataType::List(_) | DataType::LargeList(_) => {
357                    let child = field.children.first().expect("List should have a child");
358                    let child_encoder = self.do_create_field_encoder(
359                        _encoding_strategy_root,
360                        child,
361                        column_index,
362                        options,
363                        root_field_metadata,
364                    )?;
365                    Ok(Box::new(ListStructuralEncoder::new(
366                        options.keep_original_array,
367                        child_encoder,
368                    )))
369                }
370                DataType::Struct(_) => {
371                    if field.is_packed_struct() {
372                        Ok(Box::new(PrimitiveStructuralEncoder::try_new(
373                            options,
374                            self.compression_strategy.clone(),
375                            column_index.next_column_index(field.id as u32),
376                            field.clone(),
377                            Arc::new(root_field_metadata.clone()),
378                        )?))
379                    } else {
380                        let children_encoders = field
381                            .children
382                            .iter()
383                            .map(|field| {
384                                self.do_create_field_encoder(
385                                    _encoding_strategy_root,
386                                    field,
387                                    column_index,
388                                    options,
389                                    root_field_metadata,
390                                )
391                            })
392                            .collect::<Result<Vec<_>>>()?;
393                        Ok(Box::new(StructStructuralEncoder::new(
394                            options.keep_original_array,
395                            children_encoders,
396                        )))
397                    }
398                }
399                DataType::Dictionary(_, value_type) => {
400                    // A dictionary of primitive is, itself, primitive
401                    if Self::is_primitive_type(&value_type) {
402                        Ok(Box::new(PrimitiveStructuralEncoder::try_new(
403                            options,
404                            self.compression_strategy.clone(),
405                            column_index.next_column_index(field.id as u32),
406                            field.clone(),
407                            Arc::new(root_field_metadata.clone()),
408                        )?))
409                    } else {
410                        // A dictionary of logical is, itself, logical and we don't support that today
411                        // It could be possible (e.g. store indices in one column and values in remaining columns)
412                        // but would be a significant amount of work
413                        //
414                        // An easier fallback implementation would be to decode-on-write and encode-on-read
415                        Err(Error::NotSupported { source: format!("cannot encode a dictionary column whose value type is a logical type ({})", value_type).into(), location: location!() })
416                    }
417                }
418                _ => todo!("Implement encoding for field {}", field),
419            }
420        }
421    }
422}
423
424impl FieldEncodingStrategy for StructuralEncodingStrategy {
425    fn create_field_encoder(
426        &self,
427        encoding_strategy_root: &dyn FieldEncodingStrategy,
428        field: &Field,
429        column_index: &mut ColumnIndexSequence,
430        options: &EncodingOptions,
431    ) -> Result<Box<dyn FieldEncoder>> {
432        self.do_create_field_encoder(
433            encoding_strategy_root,
434            field,
435            column_index,
436            options,
437            &field.metadata,
438        )
439    }
440}
441
442/// A batch encoder that encodes RecordBatch objects by delegating
443/// to field encoders for each top-level field in the batch.
444pub struct BatchEncoder {
445    pub field_encoders: Vec<Box<dyn FieldEncoder>>,
446    pub field_id_to_column_index: Vec<(u32, u32)>,
447}
448
449impl BatchEncoder {
450    pub fn try_new(
451        schema: &Schema,
452        strategy: &dyn FieldEncodingStrategy,
453        options: &EncodingOptions,
454    ) -> Result<Self> {
455        let mut col_idx = 0;
456        let mut col_idx_sequence = ColumnIndexSequence::default();
457        let field_encoders = schema
458            .fields
459            .iter()
460            .map(|field| {
461                let encoder = strategy.create_field_encoder(
462                    strategy,
463                    field,
464                    &mut col_idx_sequence,
465                    options,
466                )?;
467                col_idx += encoder.as_ref().num_columns();
468                Ok(encoder)
469            })
470            .collect::<Result<Vec<_>>>()?;
471        Ok(Self {
472            field_encoders,
473            field_id_to_column_index: col_idx_sequence.mapping,
474        })
475    }
476
477    pub fn num_columns(&self) -> u32 {
478        self.field_encoders
479            .iter()
480            .map(|field_encoder| field_encoder.num_columns())
481            .sum::<u32>()
482    }
483}
484
485/// An encoded batch of data and a page table describing it
486///
487/// This is returned by [`crate::encoder::encode_batch`]
488#[derive(Debug)]
489pub struct EncodedBatch {
490    pub data: Bytes,
491    pub page_table: Vec<Arc<ColumnInfo>>,
492    pub schema: Arc<Schema>,
493    pub top_level_columns: Vec<u32>,
494    pub num_rows: u64,
495}
496
497fn write_page_to_data_buffer(page: EncodedPage, data_buffer: &mut BytesMut) -> PageInfo {
498    let buffers = page.data;
499    let mut buffer_offsets_and_sizes = Vec::with_capacity(buffers.len());
500    for buffer in buffers {
501        let buffer_offset = data_buffer.len() as u64;
502        data_buffer.extend_from_slice(&buffer);
503        let size = data_buffer.len() as u64 - buffer_offset;
504        buffer_offsets_and_sizes.push((buffer_offset, size));
505    }
506
507    PageInfo {
508        buffer_offsets_and_sizes: Arc::from(buffer_offsets_and_sizes),
509        encoding: page.description,
510        num_rows: page.num_rows,
511        priority: page.row_number,
512    }
513}
514
515/// Helper method to encode a batch of data into memory
516///
517/// This is primarily for testing and benchmarking but could be useful in other
518/// niche situations like IPC.
519pub async fn encode_batch(
520    batch: &RecordBatch,
521    schema: Arc<Schema>,
522    encoding_strategy: &dyn FieldEncodingStrategy,
523    options: &EncodingOptions,
524) -> Result<EncodedBatch> {
525    if !is_pwr_two(options.buffer_alignment) || options.buffer_alignment < MIN_PAGE_BUFFER_ALIGNMENT
526    {
527        return Err(Error::InvalidInput {
528            source: format!(
529                "buffer_alignment must be a power of two and at least {}",
530                MIN_PAGE_BUFFER_ALIGNMENT
531            )
532            .into(),
533            location: location!(),
534        });
535    }
536
537    let mut data_buffer = BytesMut::new();
538    let lance_schema = Schema::try_from(batch.schema().as_ref())?;
539    let options = EncodingOptions {
540        keep_original_array: true,
541        ..*options
542    };
543    let batch_encoder = BatchEncoder::try_new(&lance_schema, encoding_strategy, &options)?;
544    let mut page_table = Vec::new();
545    let mut col_idx_offset = 0;
546    for (arr, mut encoder) in batch.columns().iter().zip(batch_encoder.field_encoders) {
547        let mut external_buffers =
548            OutOfLineBuffers::new(data_buffer.len() as u64, options.buffer_alignment);
549        let repdef = RepDefBuilder::default();
550        let encoder = encoder.as_mut();
551        let num_rows = arr.len() as u64;
552        let mut tasks =
553            encoder.maybe_encode(arr.clone(), &mut external_buffers, repdef, 0, num_rows)?;
554        tasks.extend(encoder.flush(&mut external_buffers)?);
555        for buffer in external_buffers.take_buffers() {
556            data_buffer.extend_from_slice(&buffer);
557        }
558        let mut pages = HashMap::<u32, Vec<PageInfo>>::new();
559        for task in tasks {
560            let encoded_page = task.await?;
561            // Write external buffers first
562            pages
563                .entry(encoded_page.column_idx)
564                .or_default()
565                .push(write_page_to_data_buffer(encoded_page, &mut data_buffer));
566        }
567        let mut external_buffers =
568            OutOfLineBuffers::new(data_buffer.len() as u64, options.buffer_alignment);
569        let encoded_columns = encoder.finish(&mut external_buffers).await?;
570        for buffer in external_buffers.take_buffers() {
571            data_buffer.extend_from_slice(&buffer);
572        }
573        let num_columns = encoded_columns.len();
574        for (col_idx, encoded_column) in encoded_columns.into_iter().enumerate() {
575            let col_idx = col_idx + col_idx_offset;
576            let mut col_buffer_offsets_and_sizes = Vec::new();
577            for buffer in encoded_column.column_buffers {
578                let buffer_offset = data_buffer.len() as u64;
579                data_buffer.extend_from_slice(&buffer);
580                let size = data_buffer.len() as u64 - buffer_offset;
581                col_buffer_offsets_and_sizes.push((buffer_offset, size));
582            }
583            for page in encoded_column.final_pages {
584                pages
585                    .entry(page.column_idx)
586                    .or_default()
587                    .push(write_page_to_data_buffer(page, &mut data_buffer));
588            }
589            let col_pages = std::mem::take(pages.entry(col_idx as u32).or_default());
590            page_table.push(Arc::new(ColumnInfo {
591                index: col_idx as u32,
592                buffer_offsets_and_sizes: Arc::from(
593                    col_buffer_offsets_and_sizes.into_boxed_slice(),
594                ),
595                page_infos: Arc::from(col_pages.into_boxed_slice()),
596                encoding: encoded_column.encoding,
597            }))
598        }
599        col_idx_offset += num_columns;
600    }
601    let top_level_columns = batch_encoder
602        .field_id_to_column_index
603        .iter()
604        .map(|(_, idx)| *idx)
605        .collect();
606    Ok(EncodedBatch {
607        data: data_buffer.freeze(),
608        top_level_columns,
609        page_table,
610        schema,
611        num_rows: batch.num_rows() as u64,
612    })
613}