Skip to main content

lance_encoding/
encoder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! The top-level encoding module for Lance files.
5//!
6//! Lance files are encoded using a [`FieldEncodingStrategy`] which choose
7//! what encoder to use for each field.
8//!
9//! The current strategy is the [`StructuralEncodingStrategy`] which uses "structural"
10//! encoding.  A tree of encoders is built up for each field.  The struct & list encoders
11//! simply pull off the validity and offsets and collect them.  Then, in the primitive leaf
12//! encoder the validity, offsets, and values are accumulated in an accumulation buffer.  Once
13//! enough data has been collected the primitive encoder will either use a miniblock encoding
14//! or a full zip encoding to create a page of data from the accumulation buffer.
15
16use std::{collections::HashMap, sync::Arc};
17
18use arrow_array::{Array, ArrayRef, RecordBatch};
19use arrow_schema::DataType;
20use bytes::{Bytes, BytesMut};
21use futures::future::BoxFuture;
22use lance_core::datatypes::{Field, Schema};
23use lance_core::error::LanceOptionExt;
24use lance_core::utils::bit::{is_pwr_two, pad_bytes_to};
25use lance_core::{Error, Result};
26use snafu::location;
27
28use crate::buffer::LanceBuffer;
29use crate::compression::{CompressionStrategy, DefaultCompressionStrategy};
30use crate::compression_config::CompressionParams;
31use crate::decoder::PageEncoding;
32use crate::encodings::logical::blob::{BlobStructuralEncoder, BlobV2StructuralEncoder};
33use crate::encodings::logical::fixed_size_list::FixedSizeListStructuralEncoder;
34use crate::encodings::logical::list::ListStructuralEncoder;
35use crate::encodings::logical::map::MapStructuralEncoder;
36use crate::encodings::logical::primitive::PrimitiveStructuralEncoder;
37use crate::encodings::logical::r#struct::StructStructuralEncoder;
38use crate::repdef::RepDefBuilder;
39use crate::version::LanceFileVersion;
40use crate::{
41    decoder::{ColumnInfo, PageInfo},
42    format::pb,
43};
44
45/// The minimum alignment for a page buffer.  Writers must respect this.
46pub const MIN_PAGE_BUFFER_ALIGNMENT: u64 = 8;
47
48/// An encoded page of data
49///
50/// Maps to a top-level array
51///
52/// For example, `FixedSizeList<Int32>` will have two EncodedArray instances and one EncodedPage
53#[derive(Debug)]
54pub struct EncodedPage {
55    // The encoded page buffers
56    pub data: Vec<LanceBuffer>,
57    // A description of the encoding used to encode the page
58    pub description: PageEncoding,
59    /// The number of rows in the encoded page
60    pub num_rows: u64,
61    /// The top-level row number of the first row in the page
62    ///
63    /// Generally the number of "top-level" rows and the number of rows are the same.  However,
64    /// when there is repetition (list/fixed-size-list) there will be more or less items than rows.
65    ///
66    /// A top-level row can never be split across a page boundary.
67    pub row_number: u64,
68    /// The index of the column
69    pub column_idx: u32,
70}
71
72pub struct EncodedColumn {
73    pub column_buffers: Vec<LanceBuffer>,
74    pub encoding: pb::ColumnEncoding,
75    pub final_pages: Vec<EncodedPage>,
76}
77
78impl Default for EncodedColumn {
79    fn default() -> Self {
80        Self {
81            column_buffers: Default::default(),
82            encoding: pb::ColumnEncoding {
83                column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
84            },
85            final_pages: Default::default(),
86        }
87    }
88}
89
90/// A tool to reserve space for buffers that are not in-line with the data
91///
92/// In most cases, buffers are stored in the page and referred to in the encoding
93/// metadata by their index in the page.  This keeps all buffers within a page together.
94/// As a result, most encoders should not need to use this structure.
95///
96/// In some cases (currently only the large binary encoding) there is a need to access
97/// buffers that are not in the page (because storing the position / offset of every page
98/// in the page metadata would be too expensive).
99///
100/// To do this you can add a buffer with `add_buffer` and then use the returned position
101/// in some way (in the large binary encoding the returned position is stored in the page
102/// data as a position / size array).
103pub struct OutOfLineBuffers {
104    position: u64,
105    buffer_alignment: u64,
106    buffers: Vec<LanceBuffer>,
107}
108
109impl OutOfLineBuffers {
110    pub fn new(base_position: u64, buffer_alignment: u64) -> Self {
111        Self {
112            position: base_position,
113            buffer_alignment,
114            buffers: Vec::new(),
115        }
116    }
117
118    pub fn add_buffer(&mut self, buffer: LanceBuffer) -> u64 {
119        let position = self.position;
120        self.position += buffer.len() as u64;
121        self.position += pad_bytes_to(buffer.len(), self.buffer_alignment as usize) as u64;
122        self.buffers.push(buffer);
123        position
124    }
125
126    pub fn take_buffers(self) -> Vec<LanceBuffer> {
127        self.buffers
128    }
129
130    pub fn reset_position(&mut self, position: u64) {
131        self.position = position;
132    }
133}
134
135/// A task to create a page of data
136pub type EncodeTask = BoxFuture<'static, Result<EncodedPage>>;
137
138/// Top level encoding trait to code any Arrow array type into one or more pages.
139///
140/// The field encoder implements buffering and encoding of a single input column
141/// but it may map to multiple output columns.  For example, a list array or struct
142/// array will be encoded into multiple columns.
143///
144/// Also, fields may be encoded at different speeds.  For example, given a struct
145/// column with three fields (a boolean field, an int32 field, and a 4096-dimension
146/// tensor field) the tensor field is likely to emit encoded pages much more frequently
147/// than the boolean field.
148pub trait FieldEncoder: Send {
149    /// Buffer the data and, if there is enough data in the buffer to form a page, return
150    /// an encoding task to encode the data.
151    ///
152    /// This may return more than one task because a single column may be mapped to multiple
153    /// output columns.  For example, if encoding a struct column with three children then
154    /// up to three tasks may be returned from each call to maybe_encode.
155    ///
156    /// It may also return multiple tasks for a single column if the input array is larger
157    /// than a single disk page.
158    ///
159    /// It could also return an empty Vec if there is not enough data yet to encode any pages.
160    ///
161    /// The `row_number` must be passed which is the top-level row number currently being encoded
162    /// This is stored in any pages produced by this call so that we can know the priority of the
163    /// page.
164    ///
165    /// The `num_rows` is the number of top level rows.  It is initially the same as `array.len()`
166    /// however it is passed seprately because array will become flattened over time (if there is
167    /// repetition) and we need to know the original number of rows for various purposes.
168    fn maybe_encode(
169        &mut self,
170        array: ArrayRef,
171        external_buffers: &mut OutOfLineBuffers,
172        repdef: RepDefBuilder,
173        row_number: u64,
174        num_rows: u64,
175    ) -> Result<Vec<EncodeTask>>;
176    /// Flush any remaining data from the buffers into encoding tasks
177    ///
178    /// Each encode task produces a single page.  The order of these pages will be maintained
179    /// in the file (we do not worry about order between columns but all pages in the same
180    /// column should maintain order)
181    ///
182    /// This may be called intermittently throughout encoding but will always be called
183    /// once at the end of encoding just before calling finish
184    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>>;
185    /// Finish encoding and return column metadata
186    ///
187    /// This is called only once, after all encode tasks have completed
188    ///
189    /// This returns a Vec because a single field may have created multiple columns
190    fn finish(
191        &mut self,
192        external_buffers: &mut OutOfLineBuffers,
193    ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>>;
194
195    /// The number of output columns this encoding will create
196    fn num_columns(&self) -> u32;
197}
198
199/// Keeps track of the current column index and makes a mapping
200/// from field id to column index
201#[derive(Debug, Default)]
202pub struct ColumnIndexSequence {
203    current_index: u32,
204    mapping: Vec<(u32, u32)>,
205}
206
207impl ColumnIndexSequence {
208    pub fn next_column_index(&mut self, field_id: u32) -> u32 {
209        let idx = self.current_index;
210        self.current_index += 1;
211        self.mapping.push((field_id, idx));
212        idx
213    }
214
215    pub fn skip(&mut self) {
216        self.current_index += 1;
217    }
218}
219
220/// Options that control the encoding process
221pub struct EncodingOptions {
222    /// How much data (in bytes) to cache in-memory before writing a page
223    ///
224    /// This cache is applied on a per-column basis
225    pub cache_bytes_per_column: u64,
226    /// The maximum size of a page in bytes, if a single array would create
227    /// a page larger than this then it will be split into multiple pages
228    pub max_page_bytes: u64,
229    /// If false (the default) then arrays will be copied (deeply) before
230    /// being cached.  This ensures any data kept alive by the array can
231    /// be discarded safely and helps avoid writer accumulation.  However,
232    /// there is an associated cost.
233    pub keep_original_array: bool,
234    /// The alignment that the writer is applying to buffers
235    ///
236    /// The encoder needs to know this so it figures the position of out-of-line
237    /// buffers correctly
238    pub buffer_alignment: u64,
239
240    /// The Lance file version being written
241    pub version: LanceFileVersion,
242}
243
244impl Default for EncodingOptions {
245    fn default() -> Self {
246        Self {
247            cache_bytes_per_column: 8 * 1024 * 1024,
248            max_page_bytes: 32 * 1024 * 1024,
249            keep_original_array: true,
250            buffer_alignment: 64,
251            version: LanceFileVersion::default(),
252        }
253    }
254}
255
256impl EncodingOptions {
257    /// If true (for Lance file version 2.2+), miniblock chunk sizes are u32,
258    /// to allow storing larger chunks and their sizes for better compression.
259    /// For Lance file version 2.1, miniblock chunk sizes are u16.
260    pub fn support_large_chunk(&self) -> bool {
261        self.version >= LanceFileVersion::V2_2
262    }
263}
264
265/// A trait to pick which kind of field encoding to use for a field
266///
267/// Unlike the ArrayEncodingStrategy, the field encoding strategy is
268/// chosen before any data is generated and the same field encoder is
269/// used for all data in the field.
270pub trait FieldEncodingStrategy: Send + Sync + std::fmt::Debug {
271    /// Choose and create an appropriate field encoder for the given
272    /// field.
273    ///
274    /// The field encoder can be chosen on the data type as well as
275    /// any metadata that is attached to the field.
276    ///
277    /// The `encoding_strategy_root` is the encoder that should be
278    /// used to encode any inner data in struct / list / etc. fields.
279    ///
280    /// Initially it is the same as `self` and generally should be
281    /// forwarded to any inner encoding strategy.
282    fn create_field_encoder(
283        &self,
284        encoding_strategy_root: &dyn FieldEncodingStrategy,
285        field: &Field,
286        column_index: &mut ColumnIndexSequence,
287        options: &EncodingOptions,
288    ) -> Result<Box<dyn FieldEncoder>>;
289}
290
291pub fn default_encoding_strategy(version: LanceFileVersion) -> Box<dyn FieldEncodingStrategy> {
292    match version.resolve() {
293        LanceFileVersion::Legacy => panic!(),
294        LanceFileVersion::V2_0 => Box::new(
295            crate::previous::encoder::CoreFieldEncodingStrategy::new(version),
296        ),
297        _ => Box::new(StructuralEncodingStrategy::with_version(version)),
298    }
299}
300
301/// Create an encoding strategy with user-configured compression parameters
302pub fn default_encoding_strategy_with_params(
303    version: LanceFileVersion,
304    params: CompressionParams,
305) -> Result<Box<dyn FieldEncodingStrategy>> {
306    match version.resolve() {
307        LanceFileVersion::Legacy | LanceFileVersion::V2_0 => Err(Error::invalid_input(
308            "Compression parameters are only supported in Lance file version 2.1 and later",
309            location!(),
310        )),
311        _ => {
312            let compression_strategy =
313                Arc::new(DefaultCompressionStrategy::with_params(params).with_version(version));
314            Ok(Box::new(StructuralEncodingStrategy {
315                compression_strategy,
316                version,
317            }))
318        }
319    }
320}
321
322/// An encoding strategy used for 2.1+ files
323#[derive(Debug)]
324pub struct StructuralEncodingStrategy {
325    pub compression_strategy: Arc<dyn CompressionStrategy>,
326    pub version: LanceFileVersion,
327}
328
329// For some reason, clippy thinks we can add Default to the above derive but
330// rustc doesn't agree (no default for Arc<dyn Trait>)
331#[allow(clippy::derivable_impls)]
332impl Default for StructuralEncodingStrategy {
333    fn default() -> Self {
334        Self {
335            compression_strategy: Arc::new(DefaultCompressionStrategy::new()),
336            version: LanceFileVersion::default(),
337        }
338    }
339}
340
341impl StructuralEncodingStrategy {
342    pub fn with_version(version: LanceFileVersion) -> Self {
343        Self {
344            compression_strategy: Arc::new(DefaultCompressionStrategy::new().with_version(version)),
345            version,
346        }
347    }
348
349    fn is_primitive_type(data_type: &DataType) -> bool {
350        match data_type {
351            DataType::FixedSizeList(inner, _) => Self::is_primitive_type(inner.data_type()),
352            _ => matches!(
353                data_type,
354                DataType::Boolean
355                    | DataType::Date32
356                    | DataType::Date64
357                    | DataType::Decimal128(_, _)
358                    | DataType::Decimal256(_, _)
359                    | DataType::Duration(_)
360                    | DataType::Float16
361                    | DataType::Float32
362                    | DataType::Float64
363                    | DataType::Int16
364                    | DataType::Int32
365                    | DataType::Int64
366                    | DataType::Int8
367                    | DataType::Interval(_)
368                    | DataType::Null
369                    | DataType::Time32(_)
370                    | DataType::Time64(_)
371                    | DataType::Timestamp(_, _)
372                    | DataType::UInt16
373                    | DataType::UInt32
374                    | DataType::UInt64
375                    | DataType::UInt8
376                    | DataType::FixedSizeBinary(_)
377                    | DataType::Binary
378                    | DataType::LargeBinary
379                    | DataType::Utf8
380                    | DataType::LargeUtf8,
381            ),
382        }
383    }
384
385    fn do_create_field_encoder(
386        &self,
387        _encoding_strategy_root: &dyn FieldEncodingStrategy,
388        field: &Field,
389        column_index: &mut ColumnIndexSequence,
390        options: &EncodingOptions,
391        root_field_metadata: &HashMap<String, String>,
392    ) -> Result<Box<dyn FieldEncoder>> {
393        let data_type = field.data_type();
394
395        // Check if field is marked as blob
396        if field.is_blob() {
397            match data_type {
398                DataType::Binary | DataType::LargeBinary => {
399                    return Ok(Box::new(BlobStructuralEncoder::new(
400                        field,
401                        column_index.next_column_index(field.id as u32),
402                        options,
403                        self.compression_strategy.clone(),
404                    )?));
405                }
406                DataType::Struct(_) if self.version >= LanceFileVersion::V2_2 => {
407                    return Ok(Box::new(BlobV2StructuralEncoder::new(
408                        field,
409                        column_index.next_column_index(field.id as u32),
410                        options,
411                        self.compression_strategy.clone(),
412                    )?));
413                }
414                DataType::Struct(_) => {
415                    return Err(Error::InvalidInput {
416                        source: "Blob v2 struct input requires file version >= 2.2".into(),
417                        location: location!(),
418                    });
419                }
420                _ => {
421                    return Err(Error::InvalidInput {
422                        source: format!(
423                            "Blob encoding only supports Binary/LargeBinary or v2 Struct, got {}",
424                            data_type
425                        )
426                        .into(),
427                        location: location!(),
428                    });
429                }
430            }
431        }
432
433        if Self::is_primitive_type(&data_type) {
434            Ok(Box::new(PrimitiveStructuralEncoder::try_new(
435                options,
436                self.compression_strategy.clone(),
437                column_index.next_column_index(field.id as u32),
438                field.clone(),
439                Arc::new(root_field_metadata.clone()),
440            )?))
441        } else {
442            match data_type {
443                DataType::List(_) | DataType::LargeList(_) => {
444                    let child = field.children.first().expect_ok()?;
445                    let child_encoder = self.do_create_field_encoder(
446                        _encoding_strategy_root,
447                        child,
448                        column_index,
449                        options,
450                        root_field_metadata,
451                    )?;
452                    Ok(Box::new(ListStructuralEncoder::new(
453                        options.keep_original_array,
454                        child_encoder,
455                    )))
456                }
457                DataType::FixedSizeList(inner, _)
458                    if matches!(inner.data_type(), DataType::Struct(_)) =>
459                {
460                    if self.version < LanceFileVersion::V2_2 {
461                        return Err(Error::NotSupported {
462                            source: format!(
463                                "FixedSizeList<Struct> is only supported in Lance file format 2.2+, current version: {}",
464                                self.version
465                            )
466                            .into(),
467                            location: location!(),
468                        });
469                    }
470                    // Complex FixedSizeList needs structural encoding
471                    let child = field.children.first().expect_ok()?;
472                    let child_encoder = self.do_create_field_encoder(
473                        _encoding_strategy_root,
474                        child,
475                        column_index,
476                        options,
477                        root_field_metadata,
478                    )?;
479                    Ok(Box::new(FixedSizeListStructuralEncoder::new(
480                        options.keep_original_array,
481                        child_encoder,
482                    )))
483                }
484                DataType::Map(_, keys_sorted) => {
485                    // TODO: We only support keys_sorted=false for now,
486                    //  because converting a rust arrow map field to the python arrow field will
487                    //  lose the keys_sorted property.
488                    if keys_sorted {
489                        return Err(Error::NotSupported {
490                            source: format!("Map data type is not supported with keys_sorted=true now, current value is {}", keys_sorted).into(),
491                            location: location!(),
492                        });
493                    }
494                    if self.version < LanceFileVersion::V2_2 {
495                        return Err(Error::NotSupported {
496                            source: format!(
497                                "Map data type is only supported in Lance file format 2.2+, current version: {}",
498                                self.version
499                            )
500                            .into(),
501                            location: location!(),
502                        });
503                    }
504                    let entries_child = field.children.first().ok_or_else(|| Error::Schema {
505                        message: "Map should have an entries child".to_string(),
506                        location: location!(),
507                    })?;
508                    let DataType::Struct(struct_fields) = entries_child.data_type() else {
509                        return Err(Error::Schema {
510                            message: "Map entries field must be a Struct<key, value>".to_string(),
511                            location: location!(),
512                        });
513                    };
514                    if struct_fields.len() < 2 {
515                        return Err(Error::Schema {
516                            message: "Map entries struct must contain both key and value fields"
517                                .to_string(),
518                            location: location!(),
519                        });
520                    }
521                    let key_field = &struct_fields[0];
522                    if key_field.is_nullable() {
523                        return Err(Error::Schema {
524                            message: format!(
525                                "Map key field '{}' must be non-nullable according to Arrow Map specification",
526                                key_field.name()
527                            ),
528                            location: location!(),
529                        });
530                    }
531                    let child_encoder = self.do_create_field_encoder(
532                        _encoding_strategy_root,
533                        entries_child,
534                        column_index,
535                        options,
536                        root_field_metadata,
537                    )?;
538                    Ok(Box::new(MapStructuralEncoder::new(
539                        options.keep_original_array,
540                        child_encoder,
541                    )))
542                }
543                DataType::Struct(fields) => {
544                    if field.is_packed_struct() || fields.is_empty() {
545                        // Both packed structs and empty structs are encoded as primitive
546                        Ok(Box::new(PrimitiveStructuralEncoder::try_new(
547                            options,
548                            self.compression_strategy.clone(),
549                            column_index.next_column_index(field.id as u32),
550                            field.clone(),
551                            Arc::new(root_field_metadata.clone()),
552                        )?))
553                    } else {
554                        let children_encoders = field
555                            .children
556                            .iter()
557                            .map(|field| {
558                                self.do_create_field_encoder(
559                                    _encoding_strategy_root,
560                                    field,
561                                    column_index,
562                                    options,
563                                    root_field_metadata,
564                                )
565                            })
566                            .collect::<Result<Vec<_>>>()?;
567                        Ok(Box::new(StructStructuralEncoder::new(
568                            options.keep_original_array,
569                            children_encoders,
570                        )))
571                    }
572                }
573                DataType::Dictionary(_, value_type) => {
574                    // A dictionary of primitive is, itself, primitive
575                    if Self::is_primitive_type(&value_type) {
576                        Ok(Box::new(PrimitiveStructuralEncoder::try_new(
577                            options,
578                            self.compression_strategy.clone(),
579                            column_index.next_column_index(field.id as u32),
580                            field.clone(),
581                            Arc::new(root_field_metadata.clone()),
582                        )?))
583                    } else {
584                        // A dictionary of logical is, itself, logical and we don't support that today
585                        // It could be possible (e.g. store indices in one column and values in remaining columns)
586                        // but would be a significant amount of work
587                        //
588                        // An easier fallback implementation would be to decode-on-write and encode-on-read
589                        Err(Error::NotSupported { source: format!("cannot encode a dictionary column whose value type is a logical type ({})", value_type).into(), location: location!() })
590                    }
591                }
592                _ => todo!("Implement encoding for field {}", field),
593            }
594        }
595    }
596}
597
598impl FieldEncodingStrategy for StructuralEncodingStrategy {
599    fn create_field_encoder(
600        &self,
601        encoding_strategy_root: &dyn FieldEncodingStrategy,
602        field: &Field,
603        column_index: &mut ColumnIndexSequence,
604        options: &EncodingOptions,
605    ) -> Result<Box<dyn FieldEncoder>> {
606        self.do_create_field_encoder(
607            encoding_strategy_root,
608            field,
609            column_index,
610            options,
611            &field.metadata,
612        )
613    }
614}
615
616/// A batch encoder that encodes RecordBatch objects by delegating
617/// to field encoders for each top-level field in the batch.
618pub struct BatchEncoder {
619    pub field_encoders: Vec<Box<dyn FieldEncoder>>,
620    pub field_id_to_column_index: Vec<(u32, u32)>,
621}
622
623impl BatchEncoder {
624    pub fn try_new(
625        schema: &Schema,
626        strategy: &dyn FieldEncodingStrategy,
627        options: &EncodingOptions,
628    ) -> Result<Self> {
629        let mut col_idx = 0;
630        let mut col_idx_sequence = ColumnIndexSequence::default();
631        let field_encoders = schema
632            .fields
633            .iter()
634            .map(|field| {
635                let encoder = strategy.create_field_encoder(
636                    strategy,
637                    field,
638                    &mut col_idx_sequence,
639                    options,
640                )?;
641                col_idx += encoder.as_ref().num_columns();
642                Ok(encoder)
643            })
644            .collect::<Result<Vec<_>>>()?;
645        Ok(Self {
646            field_encoders,
647            field_id_to_column_index: col_idx_sequence.mapping,
648        })
649    }
650
651    pub fn num_columns(&self) -> u32 {
652        self.field_encoders
653            .iter()
654            .map(|field_encoder| field_encoder.num_columns())
655            .sum::<u32>()
656    }
657}
658
659/// An encoded batch of data and a page table describing it
660///
661/// This is returned by [`crate::encoder::encode_batch`]
662#[derive(Debug)]
663pub struct EncodedBatch {
664    pub data: Bytes,
665    pub page_table: Vec<Arc<ColumnInfo>>,
666    pub schema: Arc<Schema>,
667    pub top_level_columns: Vec<u32>,
668    pub num_rows: u64,
669}
670
671fn write_page_to_data_buffer(page: EncodedPage, data_buffer: &mut BytesMut) -> PageInfo {
672    let buffers = page.data;
673    let mut buffer_offsets_and_sizes = Vec::with_capacity(buffers.len());
674    for buffer in buffers {
675        let buffer_offset = data_buffer.len() as u64;
676        data_buffer.extend_from_slice(&buffer);
677        let size = data_buffer.len() as u64 - buffer_offset;
678        buffer_offsets_and_sizes.push((buffer_offset, size));
679    }
680
681    PageInfo {
682        buffer_offsets_and_sizes: Arc::from(buffer_offsets_and_sizes),
683        encoding: page.description,
684        num_rows: page.num_rows,
685        priority: page.row_number,
686    }
687}
688
689/// Helper method to encode a batch of data into memory
690///
691/// This is primarily for testing and benchmarking but could be useful in other
692/// niche situations like IPC.
693pub async fn encode_batch(
694    batch: &RecordBatch,
695    schema: Arc<Schema>,
696    encoding_strategy: &dyn FieldEncodingStrategy,
697    options: &EncodingOptions,
698) -> Result<EncodedBatch> {
699    if !is_pwr_two(options.buffer_alignment) || options.buffer_alignment < MIN_PAGE_BUFFER_ALIGNMENT
700    {
701        return Err(Error::InvalidInput {
702            source: format!(
703                "buffer_alignment must be a power of two and at least {}",
704                MIN_PAGE_BUFFER_ALIGNMENT
705            )
706            .into(),
707            location: location!(),
708        });
709    }
710
711    let mut data_buffer = BytesMut::new();
712    let lance_schema = Schema::try_from(batch.schema().as_ref())?;
713    let options = EncodingOptions {
714        keep_original_array: true,
715        ..*options
716    };
717    let batch_encoder = BatchEncoder::try_new(&lance_schema, encoding_strategy, &options)?;
718    let mut page_table = Vec::new();
719    let mut col_idx_offset = 0;
720    for (arr, mut encoder) in batch.columns().iter().zip(batch_encoder.field_encoders) {
721        let mut external_buffers =
722            OutOfLineBuffers::new(data_buffer.len() as u64, options.buffer_alignment);
723        let repdef = RepDefBuilder::default();
724        let encoder = encoder.as_mut();
725        let num_rows = arr.len() as u64;
726        let mut tasks =
727            encoder.maybe_encode(arr.clone(), &mut external_buffers, repdef, 0, num_rows)?;
728        tasks.extend(encoder.flush(&mut external_buffers)?);
729        for buffer in external_buffers.take_buffers() {
730            data_buffer.extend_from_slice(&buffer);
731        }
732        let mut pages = HashMap::<u32, Vec<PageInfo>>::new();
733        for task in tasks {
734            let encoded_page = task.await?;
735            // Write external buffers first
736            pages
737                .entry(encoded_page.column_idx)
738                .or_default()
739                .push(write_page_to_data_buffer(encoded_page, &mut data_buffer));
740        }
741        let mut external_buffers =
742            OutOfLineBuffers::new(data_buffer.len() as u64, options.buffer_alignment);
743        let encoded_columns = encoder.finish(&mut external_buffers).await?;
744        for buffer in external_buffers.take_buffers() {
745            data_buffer.extend_from_slice(&buffer);
746        }
747        let num_columns = encoded_columns.len();
748        for (col_idx, encoded_column) in encoded_columns.into_iter().enumerate() {
749            let col_idx = col_idx + col_idx_offset;
750            let mut col_buffer_offsets_and_sizes = Vec::new();
751            for buffer in encoded_column.column_buffers {
752                let buffer_offset = data_buffer.len() as u64;
753                data_buffer.extend_from_slice(&buffer);
754                let size = data_buffer.len() as u64 - buffer_offset;
755                col_buffer_offsets_and_sizes.push((buffer_offset, size));
756            }
757            for page in encoded_column.final_pages {
758                pages
759                    .entry(page.column_idx)
760                    .or_default()
761                    .push(write_page_to_data_buffer(page, &mut data_buffer));
762            }
763            let col_pages = std::mem::take(pages.entry(col_idx as u32).or_default());
764            page_table.push(Arc::new(ColumnInfo {
765                index: col_idx as u32,
766                buffer_offsets_and_sizes: Arc::from(
767                    col_buffer_offsets_and_sizes.into_boxed_slice(),
768                ),
769                page_infos: Arc::from(col_pages.into_boxed_slice()),
770                encoding: encoded_column.encoding,
771            }))
772        }
773        col_idx_offset += num_columns;
774    }
775    let top_level_columns = batch_encoder
776        .field_id_to_column_index
777        .iter()
778        .map(|(_, idx)| *idx)
779        .collect();
780    Ok(EncodedBatch {
781        data: data_buffer.freeze(),
782        top_level_columns,
783        page_table,
784        schema,
785        num_rows: batch.num_rows() as u64,
786    })
787}
788
789#[cfg(test)]
790mod tests {
791    use super::*;
792    use crate::compression_config::{CompressionFieldParams, CompressionParams};
793
794    #[test]
795    fn test_configured_encoding_strategy() {
796        // Create test parameters
797        let mut params = CompressionParams::new();
798        params.columns.insert(
799            "*_id".to_string(),
800            CompressionFieldParams {
801                rle_threshold: Some(0.5),
802                compression: Some("lz4".to_string()),
803                compression_level: None,
804                bss: None,
805                minichunk_size: None,
806            },
807        );
808
809        // Test with V2.1 - should succeed
810        let strategy =
811            default_encoding_strategy_with_params(LanceFileVersion::V2_1, params.clone())
812                .expect("Should succeed for V2.1");
813
814        // Verify it's a StructuralEncodingStrategy
815        assert!(format!("{:?}", strategy).contains("StructuralEncodingStrategy"));
816        assert!(format!("{:?}", strategy).contains("DefaultCompressionStrategy"));
817
818        // Test with V2.0 - should fail
819        let err = default_encoding_strategy_with_params(LanceFileVersion::V2_0, params.clone())
820            .expect_err("Should fail for V2.0");
821        assert!(err
822            .to_string()
823            .contains("only supported in Lance file version 2.1"));
824
825        // Test with Legacy - should fail
826        let err = default_encoding_strategy_with_params(LanceFileVersion::Legacy, params)
827            .expect_err("Should fail for Legacy");
828        assert!(err
829            .to_string()
830            .contains("only supported in Lance file version 2.1"));
831    }
832}