Skip to main content

lance_encoding/
encoder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! The top-level encoding module for Lance files.
5//!
6//! Lance files are encoded using a [`FieldEncodingStrategy`] which choose
7//! what encoder to use for each field.
8//!
9//! The current strategy is the [`StructuralEncodingStrategy`] which uses "structural"
10//! encoding.  A tree of encoders is built up for each field.  The struct & list encoders
11//! simply pull off the validity and offsets and collect them.  Then, in the primitive leaf
12//! encoder the validity, offsets, and values are accumulated in an accumulation buffer.  Once
13//! enough data has been collected the primitive encoder will either use a miniblock encoding
14//! or a full zip encoding to create a page of data from the accumulation buffer.
15
16use std::{collections::HashMap, sync::Arc};
17
18use arrow_array::{Array, ArrayRef, RecordBatch};
19use arrow_schema::DataType;
20use bytes::{Bytes, BytesMut};
21use futures::future::BoxFuture;
22use lance_core::datatypes::{Field, Schema};
23use lance_core::error::LanceOptionExt;
24use lance_core::utils::bit::{is_pwr_two, pad_bytes_to};
25use lance_core::{Error, Result};
26
27use crate::buffer::LanceBuffer;
28use crate::compression::{CompressionStrategy, DefaultCompressionStrategy};
29use crate::compression_config::CompressionParams;
30use crate::decoder::PageEncoding;
31use crate::encodings::logical::blob::{BlobStructuralEncoder, BlobV2StructuralEncoder};
32use crate::encodings::logical::fixed_size_list::FixedSizeListStructuralEncoder;
33use crate::encodings::logical::list::ListStructuralEncoder;
34use crate::encodings::logical::map::MapStructuralEncoder;
35use crate::encodings::logical::primitive::PrimitiveStructuralEncoder;
36use crate::encodings::logical::r#struct::StructStructuralEncoder;
37use crate::repdef::RepDefBuilder;
38use crate::version::LanceFileVersion;
39use crate::{
40    decoder::{ColumnInfo, PageInfo},
41    format::pb,
42};
43
44/// The minimum alignment for a page buffer.  Writers must respect this.
45pub const MIN_PAGE_BUFFER_ALIGNMENT: u64 = 8;
46
47/// An encoded page of data
48///
49/// Maps to a top-level array
50///
51/// For example, `FixedSizeList<Int32>` will have two EncodedArray instances and one EncodedPage
52#[derive(Debug)]
53pub struct EncodedPage {
54    // The encoded page buffers
55    pub data: Vec<LanceBuffer>,
56    // A description of the encoding used to encode the page
57    pub description: PageEncoding,
58    /// The number of rows in the encoded page
59    pub num_rows: u64,
60    /// The top-level row number of the first row in the page
61    ///
62    /// Generally the number of "top-level" rows and the number of rows are the same.  However,
63    /// when there is repetition (list/fixed-size-list) there will be more or less items than rows.
64    ///
65    /// A top-level row can never be split across a page boundary.
66    pub row_number: u64,
67    /// The index of the column
68    pub column_idx: u32,
69}
70
71pub struct EncodedColumn {
72    pub column_buffers: Vec<LanceBuffer>,
73    pub encoding: pb::ColumnEncoding,
74    pub final_pages: Vec<EncodedPage>,
75}
76
77impl Default for EncodedColumn {
78    fn default() -> Self {
79        Self {
80            column_buffers: Default::default(),
81            encoding: pb::ColumnEncoding {
82                column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
83            },
84            final_pages: Default::default(),
85        }
86    }
87}
88
89/// A tool to reserve space for buffers that are not in-line with the data
90///
91/// In most cases, buffers are stored in the page and referred to in the encoding
92/// metadata by their index in the page.  This keeps all buffers within a page together.
93/// As a result, most encoders should not need to use this structure.
94///
95/// In some cases (currently only the large binary encoding) there is a need to access
96/// buffers that are not in the page (because storing the position / offset of every page
97/// in the page metadata would be too expensive).
98///
99/// To do this you can add a buffer with `add_buffer` and then use the returned position
100/// in some way (in the large binary encoding the returned position is stored in the page
101/// data as a position / size array).
102pub struct OutOfLineBuffers {
103    position: u64,
104    buffer_alignment: u64,
105    buffers: Vec<LanceBuffer>,
106}
107
108impl OutOfLineBuffers {
109    pub fn new(base_position: u64, buffer_alignment: u64) -> Self {
110        Self {
111            position: base_position,
112            buffer_alignment,
113            buffers: Vec::new(),
114        }
115    }
116
117    pub fn add_buffer(&mut self, buffer: LanceBuffer) -> u64 {
118        let position = self.position;
119        self.position += buffer.len() as u64;
120        self.position += pad_bytes_to(buffer.len(), self.buffer_alignment as usize) as u64;
121        self.buffers.push(buffer);
122        position
123    }
124
125    pub fn take_buffers(self) -> Vec<LanceBuffer> {
126        self.buffers
127    }
128
129    pub fn reset_position(&mut self, position: u64) {
130        self.position = position;
131    }
132}
133
134/// A task to create a page of data
135pub type EncodeTask = BoxFuture<'static, Result<EncodedPage>>;
136
137/// Top level encoding trait to code any Arrow array type into one or more pages.
138///
139/// The field encoder implements buffering and encoding of a single input column
140/// but it may map to multiple output columns.  For example, a list array or struct
141/// array will be encoded into multiple columns.
142///
143/// Also, fields may be encoded at different speeds.  For example, given a struct
144/// column with three fields (a boolean field, an int32 field, and a 4096-dimension
145/// tensor field) the tensor field is likely to emit encoded pages much more frequently
146/// than the boolean field.
147pub trait FieldEncoder: Send {
148    /// Buffer the data and, if there is enough data in the buffer to form a page, return
149    /// an encoding task to encode the data.
150    ///
151    /// This may return more than one task because a single column may be mapped to multiple
152    /// output columns.  For example, if encoding a struct column with three children then
153    /// up to three tasks may be returned from each call to maybe_encode.
154    ///
155    /// It may also return multiple tasks for a single column if the input array is larger
156    /// than a single disk page.
157    ///
158    /// It could also return an empty Vec if there is not enough data yet to encode any pages.
159    ///
160    /// The `row_number` must be passed which is the top-level row number currently being encoded
161    /// This is stored in any pages produced by this call so that we can know the priority of the
162    /// page.
163    ///
164    /// The `num_rows` is the number of top level rows.  It is initially the same as `array.len()`
165    /// however it is passed seprately because array will become flattened over time (if there is
166    /// repetition) and we need to know the original number of rows for various purposes.
167    fn maybe_encode(
168        &mut self,
169        array: ArrayRef,
170        external_buffers: &mut OutOfLineBuffers,
171        repdef: RepDefBuilder,
172        row_number: u64,
173        num_rows: u64,
174    ) -> Result<Vec<EncodeTask>>;
175    /// Flush any remaining data from the buffers into encoding tasks
176    ///
177    /// Each encode task produces a single page.  The order of these pages will be maintained
178    /// in the file (we do not worry about order between columns but all pages in the same
179    /// column should maintain order)
180    ///
181    /// This may be called intermittently throughout encoding but will always be called
182    /// once at the end of encoding just before calling finish
183    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>>;
184    /// Finish encoding and return column metadata
185    ///
186    /// This is called only once, after all encode tasks have completed
187    ///
188    /// This returns a Vec because a single field may have created multiple columns
189    fn finish(
190        &mut self,
191        external_buffers: &mut OutOfLineBuffers,
192    ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>>;
193
194    /// The number of output columns this encoding will create
195    fn num_columns(&self) -> u32;
196}
197
198/// Keeps track of the current column index and makes a mapping
199/// from field id to column index
200#[derive(Debug, Default)]
201pub struct ColumnIndexSequence {
202    current_index: u32,
203    mapping: Vec<(u32, u32)>,
204}
205
206impl ColumnIndexSequence {
207    pub fn next_column_index(&mut self, field_id: u32) -> u32 {
208        let idx = self.current_index;
209        self.current_index += 1;
210        self.mapping.push((field_id, idx));
211        idx
212    }
213
214    pub fn skip(&mut self) {
215        self.current_index += 1;
216    }
217}
218
219/// Options that control the encoding process
220pub struct EncodingOptions {
221    /// How much data (in bytes) to cache in-memory before writing a page
222    ///
223    /// This cache is applied on a per-column basis
224    pub cache_bytes_per_column: u64,
225    /// The maximum size of a page in bytes, if a single array would create
226    /// a page larger than this then it will be split into multiple pages
227    pub max_page_bytes: u64,
228    /// If false (the default) then arrays will be copied (deeply) before
229    /// being cached.  This ensures any data kept alive by the array can
230    /// be discarded safely and helps avoid writer accumulation.  However,
231    /// there is an associated cost.
232    pub keep_original_array: bool,
233    /// The alignment that the writer is applying to buffers
234    ///
235    /// The encoder needs to know this so it figures the position of out-of-line
236    /// buffers correctly
237    pub buffer_alignment: u64,
238
239    /// The Lance file version being written
240    pub version: LanceFileVersion,
241}
242
243impl Default for EncodingOptions {
244    fn default() -> Self {
245        Self {
246            cache_bytes_per_column: 8 * 1024 * 1024,
247            max_page_bytes: 32 * 1024 * 1024,
248            keep_original_array: true,
249            buffer_alignment: 64,
250            version: LanceFileVersion::default(),
251        }
252    }
253}
254
255impl EncodingOptions {
256    /// If true (for Lance file version 2.2+), miniblock chunk sizes are u32,
257    /// to allow storing larger chunks and their sizes for better compression.
258    /// For Lance file version 2.1, miniblock chunk sizes are u16.
259    pub fn support_large_chunk(&self) -> bool {
260        self.version >= LanceFileVersion::V2_2
261    }
262}
263
264/// A trait to pick which kind of field encoding to use for a field
265///
266/// Unlike the ArrayEncodingStrategy, the field encoding strategy is
267/// chosen before any data is generated and the same field encoder is
268/// used for all data in the field.
269pub trait FieldEncodingStrategy: Send + Sync + std::fmt::Debug {
270    /// Choose and create an appropriate field encoder for the given
271    /// field.
272    ///
273    /// The field encoder can be chosen on the data type as well as
274    /// any metadata that is attached to the field.
275    ///
276    /// The `encoding_strategy_root` is the encoder that should be
277    /// used to encode any inner data in struct / list / etc. fields.
278    ///
279    /// Initially it is the same as `self` and generally should be
280    /// forwarded to any inner encoding strategy.
281    fn create_field_encoder(
282        &self,
283        encoding_strategy_root: &dyn FieldEncodingStrategy,
284        field: &Field,
285        column_index: &mut ColumnIndexSequence,
286        options: &EncodingOptions,
287    ) -> Result<Box<dyn FieldEncoder>>;
288}
289
290pub fn default_encoding_strategy(version: LanceFileVersion) -> Box<dyn FieldEncodingStrategy> {
291    match version.resolve() {
292        LanceFileVersion::Legacy => panic!(),
293        LanceFileVersion::V2_0 => Box::new(
294            crate::previous::encoder::CoreFieldEncodingStrategy::new(version),
295        ),
296        _ => Box::new(StructuralEncodingStrategy::with_version(version)),
297    }
298}
299
300/// Create an encoding strategy with user-configured compression parameters
301pub fn default_encoding_strategy_with_params(
302    version: LanceFileVersion,
303    params: CompressionParams,
304) -> Result<Box<dyn FieldEncodingStrategy>> {
305    match version.resolve() {
306        LanceFileVersion::Legacy | LanceFileVersion::V2_0 => Err(Error::invalid_input(
307            "Compression parameters are only supported in Lance file version 2.1 and later",
308        )),
309        _ => {
310            let compression_strategy =
311                Arc::new(DefaultCompressionStrategy::with_params(params).with_version(version));
312            Ok(Box::new(StructuralEncodingStrategy {
313                compression_strategy,
314                version,
315            }))
316        }
317    }
318}
319
320/// An encoding strategy used for 2.1+ files
321#[derive(Debug)]
322pub struct StructuralEncodingStrategy {
323    pub compression_strategy: Arc<dyn CompressionStrategy>,
324    pub version: LanceFileVersion,
325}
326
327// For some reason, clippy thinks we can add Default to the above derive but
328// rustc doesn't agree (no default for Arc<dyn Trait>)
329#[allow(clippy::derivable_impls)]
330impl Default for StructuralEncodingStrategy {
331    fn default() -> Self {
332        Self {
333            compression_strategy: Arc::new(DefaultCompressionStrategy::new()),
334            version: LanceFileVersion::default(),
335        }
336    }
337}
338
339impl StructuralEncodingStrategy {
340    pub fn with_version(version: LanceFileVersion) -> Self {
341        Self {
342            compression_strategy: Arc::new(DefaultCompressionStrategy::new().with_version(version)),
343            version,
344        }
345    }
346
347    fn is_primitive_type(data_type: &DataType) -> bool {
348        match data_type {
349            DataType::FixedSizeList(inner, _) => Self::is_primitive_type(inner.data_type()),
350            _ => matches!(
351                data_type,
352                DataType::Boolean
353                    | DataType::Date32
354                    | DataType::Date64
355                    | DataType::Decimal128(_, _)
356                    | DataType::Decimal256(_, _)
357                    | DataType::Duration(_)
358                    | DataType::Float16
359                    | DataType::Float32
360                    | DataType::Float64
361                    | DataType::Int16
362                    | DataType::Int32
363                    | DataType::Int64
364                    | DataType::Int8
365                    | DataType::Interval(_)
366                    | DataType::Null
367                    | DataType::Time32(_)
368                    | DataType::Time64(_)
369                    | DataType::Timestamp(_, _)
370                    | DataType::UInt16
371                    | DataType::UInt32
372                    | DataType::UInt64
373                    | DataType::UInt8
374                    | DataType::FixedSizeBinary(_)
375                    | DataType::Binary
376                    | DataType::LargeBinary
377                    | DataType::Utf8
378                    | DataType::LargeUtf8,
379            ),
380        }
381    }
382
383    fn do_create_field_encoder(
384        &self,
385        _encoding_strategy_root: &dyn FieldEncodingStrategy,
386        field: &Field,
387        column_index: &mut ColumnIndexSequence,
388        options: &EncodingOptions,
389        root_field_metadata: &HashMap<String, String>,
390    ) -> Result<Box<dyn FieldEncoder>> {
391        let data_type = field.data_type();
392
393        // Check if field is marked as blob
394        if field.is_blob() {
395            match data_type {
396                DataType::Binary | DataType::LargeBinary => {
397                    return Ok(Box::new(BlobStructuralEncoder::new(
398                        field,
399                        column_index.next_column_index(field.id as u32),
400                        options,
401                        self.compression_strategy.clone(),
402                    )?));
403                }
404                DataType::Struct(_) if self.version >= LanceFileVersion::V2_2 => {
405                    return Ok(Box::new(BlobV2StructuralEncoder::new(
406                        field,
407                        column_index.next_column_index(field.id as u32),
408                        options,
409                        self.compression_strategy.clone(),
410                    )?));
411                }
412                DataType::Struct(_) => {
413                    return Err(Error::invalid_input_source(
414                        "Blob v2 struct input requires file version >= 2.2".into(),
415                    ));
416                }
417                _ => {
418                    return Err(Error::invalid_input_source(
419                        format!(
420                            "Blob encoding only supports Binary/LargeBinary or v2 Struct, got {}",
421                            data_type
422                        )
423                        .into(),
424                    ));
425                }
426            }
427        }
428
429        if Self::is_primitive_type(&data_type) {
430            Ok(Box::new(PrimitiveStructuralEncoder::try_new(
431                options,
432                self.compression_strategy.clone(),
433                column_index.next_column_index(field.id as u32),
434                field.clone(),
435                Arc::new(root_field_metadata.clone()),
436            )?))
437        } else {
438            match data_type {
439                DataType::List(_) | DataType::LargeList(_) => {
440                    let child = field.children.first().expect_ok()?;
441                    let child_encoder = self.do_create_field_encoder(
442                        _encoding_strategy_root,
443                        child,
444                        column_index,
445                        options,
446                        root_field_metadata,
447                    )?;
448                    Ok(Box::new(ListStructuralEncoder::new(
449                        options.keep_original_array,
450                        child_encoder,
451                    )))
452                }
453                DataType::FixedSizeList(inner, _)
454                    if matches!(inner.data_type(), DataType::Struct(_)) =>
455                {
456                    if self.version < LanceFileVersion::V2_2 {
457                        return Err(Error::not_supported_source(format!(
458                            "FixedSizeList<Struct> is only supported in Lance file format 2.2+, current version: {}",
459                            self.version
460                        )
461                        .into()));
462                    }
463                    // Complex FixedSizeList needs structural encoding
464                    let child = field.children.first().expect_ok()?;
465                    let child_encoder = self.do_create_field_encoder(
466                        _encoding_strategy_root,
467                        child,
468                        column_index,
469                        options,
470                        root_field_metadata,
471                    )?;
472                    Ok(Box::new(FixedSizeListStructuralEncoder::new(
473                        options.keep_original_array,
474                        child_encoder,
475                    )))
476                }
477                DataType::Map(_, keys_sorted) => {
478                    // TODO: We only support keys_sorted=false for now,
479                    //  because converting a rust arrow map field to the python arrow field will
480                    //  lose the keys_sorted property.
481                    if keys_sorted {
482                        return Err(Error::not_supported_source(format!("Map data type is not supported with keys_sorted=true now, current value is {}", keys_sorted).into()));
483                    }
484                    if self.version < LanceFileVersion::V2_2 {
485                        return Err(Error::not_supported_source(format!(
486                            "Map data type is only supported in Lance file format 2.2+, current version: {}",
487                            self.version
488                        )
489                        .into()));
490                    }
491                    let entries_child = field.children.first().ok_or_else(|| {
492                        Error::schema("Map should have an entries child".to_string())
493                    })?;
494                    let DataType::Struct(struct_fields) = entries_child.data_type() else {
495                        return Err(Error::schema(
496                            "Map entries field must be a Struct<key, value>".to_string(),
497                        ));
498                    };
499                    if struct_fields.len() < 2 {
500                        return Err(Error::schema(
501                            "Map entries struct must contain both key and value fields".to_string(),
502                        ));
503                    }
504                    let key_field = &struct_fields[0];
505                    if key_field.is_nullable() {
506                        return Err(Error::schema(format!(
507                            "Map key field '{}' must be non-nullable according to Arrow Map specification",
508                            key_field.name()
509                        )));
510                    }
511                    let child_encoder = self.do_create_field_encoder(
512                        _encoding_strategy_root,
513                        entries_child,
514                        column_index,
515                        options,
516                        root_field_metadata,
517                    )?;
518                    Ok(Box::new(MapStructuralEncoder::new(
519                        options.keep_original_array,
520                        child_encoder,
521                    )))
522                }
523                DataType::Struct(fields) => {
524                    if field.is_packed_struct() || fields.is_empty() {
525                        // Both packed structs and empty structs are encoded as primitive
526                        Ok(Box::new(PrimitiveStructuralEncoder::try_new(
527                            options,
528                            self.compression_strategy.clone(),
529                            column_index.next_column_index(field.id as u32),
530                            field.clone(),
531                            Arc::new(root_field_metadata.clone()),
532                        )?))
533                    } else {
534                        let children_encoders = field
535                            .children
536                            .iter()
537                            .map(|field| {
538                                self.do_create_field_encoder(
539                                    _encoding_strategy_root,
540                                    field,
541                                    column_index,
542                                    options,
543                                    root_field_metadata,
544                                )
545                            })
546                            .collect::<Result<Vec<_>>>()?;
547                        Ok(Box::new(StructStructuralEncoder::new(
548                            options.keep_original_array,
549                            children_encoders,
550                        )))
551                    }
552                }
553                DataType::Dictionary(_, value_type) => {
554                    // A dictionary of primitive is, itself, primitive
555                    if Self::is_primitive_type(&value_type) {
556                        Ok(Box::new(PrimitiveStructuralEncoder::try_new(
557                            options,
558                            self.compression_strategy.clone(),
559                            column_index.next_column_index(field.id as u32),
560                            field.clone(),
561                            Arc::new(root_field_metadata.clone()),
562                        )?))
563                    } else {
564                        // A dictionary of logical is, itself, logical and we don't support that today
565                        // It could be possible (e.g. store indices in one column and values in remaining columns)
566                        // but would be a significant amount of work
567                        //
568                        // An easier fallback implementation would be to decode-on-write and encode-on-read
569                        Err(Error::not_supported_source(format!("cannot encode a dictionary column whose value type is a logical type ({})", value_type).into()))
570                    }
571                }
572                _ => todo!("Implement encoding for field {}", field),
573            }
574        }
575    }
576}
577
578impl FieldEncodingStrategy for StructuralEncodingStrategy {
579    fn create_field_encoder(
580        &self,
581        encoding_strategy_root: &dyn FieldEncodingStrategy,
582        field: &Field,
583        column_index: &mut ColumnIndexSequence,
584        options: &EncodingOptions,
585    ) -> Result<Box<dyn FieldEncoder>> {
586        self.do_create_field_encoder(
587            encoding_strategy_root,
588            field,
589            column_index,
590            options,
591            &field.metadata,
592        )
593    }
594}
595
596/// A batch encoder that encodes RecordBatch objects by delegating
597/// to field encoders for each top-level field in the batch.
598pub struct BatchEncoder {
599    pub field_encoders: Vec<Box<dyn FieldEncoder>>,
600    pub field_id_to_column_index: Vec<(u32, u32)>,
601}
602
603impl BatchEncoder {
604    pub fn try_new(
605        schema: &Schema,
606        strategy: &dyn FieldEncodingStrategy,
607        options: &EncodingOptions,
608    ) -> Result<Self> {
609        let mut col_idx = 0;
610        let mut col_idx_sequence = ColumnIndexSequence::default();
611        let field_encoders = schema
612            .fields
613            .iter()
614            .map(|field| {
615                let encoder = strategy.create_field_encoder(
616                    strategy,
617                    field,
618                    &mut col_idx_sequence,
619                    options,
620                )?;
621                col_idx += encoder.as_ref().num_columns();
622                Ok(encoder)
623            })
624            .collect::<Result<Vec<_>>>()?;
625        Ok(Self {
626            field_encoders,
627            field_id_to_column_index: col_idx_sequence.mapping,
628        })
629    }
630
631    pub fn num_columns(&self) -> u32 {
632        self.field_encoders
633            .iter()
634            .map(|field_encoder| field_encoder.num_columns())
635            .sum::<u32>()
636    }
637}
638
639/// An encoded batch of data and a page table describing it
640///
641/// This is returned by [`crate::encoder::encode_batch`]
642#[derive(Debug)]
643pub struct EncodedBatch {
644    pub data: Bytes,
645    pub page_table: Vec<Arc<ColumnInfo>>,
646    pub schema: Arc<Schema>,
647    pub top_level_columns: Vec<u32>,
648    pub num_rows: u64,
649}
650
651fn write_page_to_data_buffer(page: EncodedPage, data_buffer: &mut BytesMut) -> PageInfo {
652    let buffers = page.data;
653    let mut buffer_offsets_and_sizes = Vec::with_capacity(buffers.len());
654    for buffer in buffers {
655        let buffer_offset = data_buffer.len() as u64;
656        data_buffer.extend_from_slice(&buffer);
657        let size = data_buffer.len() as u64 - buffer_offset;
658        buffer_offsets_and_sizes.push((buffer_offset, size));
659    }
660
661    PageInfo {
662        buffer_offsets_and_sizes: Arc::from(buffer_offsets_and_sizes),
663        encoding: page.description,
664        num_rows: page.num_rows,
665        priority: page.row_number,
666    }
667}
668
669/// Helper method to encode a batch of data into memory
670///
671/// This is primarily for testing and benchmarking but could be useful in other
672/// niche situations like IPC.
673pub async fn encode_batch(
674    batch: &RecordBatch,
675    schema: Arc<Schema>,
676    encoding_strategy: &dyn FieldEncodingStrategy,
677    options: &EncodingOptions,
678) -> Result<EncodedBatch> {
679    if !is_pwr_two(options.buffer_alignment) || options.buffer_alignment < MIN_PAGE_BUFFER_ALIGNMENT
680    {
681        return Err(Error::invalid_input_source(
682            format!(
683                "buffer_alignment must be a power of two and at least {}",
684                MIN_PAGE_BUFFER_ALIGNMENT
685            )
686            .into(),
687        ));
688    }
689
690    let mut data_buffer = BytesMut::new();
691    let lance_schema = Schema::try_from(batch.schema().as_ref())?;
692    let options = EncodingOptions {
693        keep_original_array: true,
694        ..*options
695    };
696    let batch_encoder = BatchEncoder::try_new(&lance_schema, encoding_strategy, &options)?;
697    let mut page_table = Vec::new();
698    let mut col_idx_offset = 0;
699    for (arr, mut encoder) in batch.columns().iter().zip(batch_encoder.field_encoders) {
700        let mut external_buffers =
701            OutOfLineBuffers::new(data_buffer.len() as u64, options.buffer_alignment);
702        let repdef = RepDefBuilder::default();
703        let encoder = encoder.as_mut();
704        let num_rows = arr.len() as u64;
705        let mut tasks =
706            encoder.maybe_encode(arr.clone(), &mut external_buffers, repdef, 0, num_rows)?;
707        tasks.extend(encoder.flush(&mut external_buffers)?);
708        for buffer in external_buffers.take_buffers() {
709            data_buffer.extend_from_slice(&buffer);
710        }
711        let mut pages = HashMap::<u32, Vec<PageInfo>>::new();
712        for task in tasks {
713            let encoded_page = task.await?;
714            // Write external buffers first
715            pages
716                .entry(encoded_page.column_idx)
717                .or_default()
718                .push(write_page_to_data_buffer(encoded_page, &mut data_buffer));
719        }
720        let mut external_buffers =
721            OutOfLineBuffers::new(data_buffer.len() as u64, options.buffer_alignment);
722        let encoded_columns = encoder.finish(&mut external_buffers).await?;
723        for buffer in external_buffers.take_buffers() {
724            data_buffer.extend_from_slice(&buffer);
725        }
726        let num_columns = encoded_columns.len();
727        for (col_idx, encoded_column) in encoded_columns.into_iter().enumerate() {
728            let col_idx = col_idx + col_idx_offset;
729            let mut col_buffer_offsets_and_sizes = Vec::new();
730            for buffer in encoded_column.column_buffers {
731                let buffer_offset = data_buffer.len() as u64;
732                data_buffer.extend_from_slice(&buffer);
733                let size = data_buffer.len() as u64 - buffer_offset;
734                col_buffer_offsets_and_sizes.push((buffer_offset, size));
735            }
736            for page in encoded_column.final_pages {
737                pages
738                    .entry(page.column_idx)
739                    .or_default()
740                    .push(write_page_to_data_buffer(page, &mut data_buffer));
741            }
742            let col_pages = std::mem::take(pages.entry(col_idx as u32).or_default());
743            page_table.push(Arc::new(ColumnInfo {
744                index: col_idx as u32,
745                buffer_offsets_and_sizes: Arc::from(
746                    col_buffer_offsets_and_sizes.into_boxed_slice(),
747                ),
748                page_infos: Arc::from(col_pages.into_boxed_slice()),
749                encoding: encoded_column.encoding,
750            }))
751        }
752        col_idx_offset += num_columns;
753    }
754    let top_level_columns = batch_encoder
755        .field_id_to_column_index
756        .iter()
757        .map(|(_, idx)| *idx)
758        .collect();
759    Ok(EncodedBatch {
760        data: data_buffer.freeze(),
761        top_level_columns,
762        page_table,
763        schema,
764        num_rows: batch.num_rows() as u64,
765    })
766}
767
768#[cfg(test)]
769mod tests {
770    use super::*;
771    use crate::compression_config::{CompressionFieldParams, CompressionParams};
772    use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields};
773
774    #[test]
775    fn test_configured_encoding_strategy() {
776        // Create test parameters
777        let mut params = CompressionParams::new();
778        params.columns.insert(
779            "*_id".to_string(),
780            CompressionFieldParams {
781                rle_threshold: Some(0.5),
782                compression: Some("lz4".to_string()),
783                compression_level: None,
784                bss: None,
785                minichunk_size: None,
786            },
787        );
788
789        // Test with V2.1 - should succeed
790        let strategy =
791            default_encoding_strategy_with_params(LanceFileVersion::V2_1, params.clone())
792                .expect("Should succeed for V2.1");
793
794        // Verify it's a StructuralEncodingStrategy
795        assert!(format!("{:?}", strategy).contains("StructuralEncodingStrategy"));
796        assert!(format!("{:?}", strategy).contains("DefaultCompressionStrategy"));
797
798        // Test with V2.0 - should fail
799        let err = default_encoding_strategy_with_params(LanceFileVersion::V2_0, params.clone())
800            .expect_err("Should fail for V2.0");
801        assert!(
802            err.to_string()
803                .contains("only supported in Lance file version 2.1")
804        );
805
806        // Test with Legacy - should fail
807        let err = default_encoding_strategy_with_params(LanceFileVersion::Legacy, params)
808            .expect_err("Should fail for Legacy");
809        assert!(
810            err.to_string()
811                .contains("only supported in Lance file version 2.1")
812        );
813    }
814
815    #[test]
816    fn test_fixed_size_list_struct_requires_v2_2() {
817        let list_item = ArrowField::new(
818            "item",
819            ArrowDataType::Struct(ArrowFields::from(vec![ArrowField::new(
820                "x",
821                ArrowDataType::Int32,
822                true,
823            )])),
824            true,
825        );
826        let arrow_field = ArrowField::new(
827            "list_struct",
828            ArrowDataType::FixedSizeList(Arc::new(list_item), 2),
829            true,
830        );
831        let field = Field::try_from(&arrow_field).unwrap();
832
833        let strategy = StructuralEncodingStrategy::with_version(LanceFileVersion::V2_1);
834        let mut column_index = ColumnIndexSequence::default();
835        let options = EncodingOptions::default();
836
837        let result = strategy.create_field_encoder(&strategy, &field, &mut column_index, &options);
838        assert!(
839            result.is_err(),
840            "FixedSizeList<Struct> should be rejected for file version 2.1"
841        );
842        let err = result.err().unwrap();
843
844        assert!(
845            err.to_string()
846                .contains("FixedSizeList<Struct> is only supported in Lance file format 2.2+")
847        );
848    }
849}