lance_file/v2/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use core::panic;
5use std::collections::HashMap;
6use std::sync::atomic::AtomicBool;
7use std::sync::Arc;
8
9use arrow_array::RecordBatch;
10
11use arrow_data::ArrayData;
12use bytes::{BufMut, Bytes, BytesMut};
13use futures::stream::FuturesOrdered;
14use futures::StreamExt;
15use lance_core::datatypes::{Field, Schema as LanceSchema};
16use lance_core::utils::bit::pad_bytes;
17use lance_core::{Error, Result};
18use lance_encoding::decoder::PageEncoding;
19use lance_encoding::encoder::{
20    default_encoding_strategy, BatchEncoder, EncodeTask, EncodedBatch, EncodedPage,
21    EncodingOptions, FieldEncoder, FieldEncodingStrategy, OutOfLineBuffers,
22};
23use lance_encoding::repdef::RepDefBuilder;
24use lance_encoding::version::LanceFileVersion;
25use lance_io::object_store::ObjectStore;
26use lance_io::object_writer::ObjectWriter;
27use lance_io::traits::Writer;
28use log::{debug, warn};
29use object_store::path::Path;
30use prost::Message;
31use prost_types::Any;
32use snafu::location;
33use tokio::io::AsyncWriteExt;
34use tracing::instrument;
35
36use crate::datatypes::FieldsWithMeta;
37use crate::format::pb;
38use crate::format::pbfile;
39use crate::format::pbfile::DirectEncoding;
40use crate::format::MAGIC;
41
42/// Pages buffers are aligned to 64 bytes
43pub(crate) const PAGE_BUFFER_ALIGNMENT: usize = 64;
44const PAD_BUFFER: [u8; PAGE_BUFFER_ALIGNMENT] = [72; PAGE_BUFFER_ALIGNMENT];
45const MAX_PAGE_BYTES: usize = 32 * 1024 * 1024;
46const ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES: &str = "LANCE_FILE_WRITER_MAX_PAGE_BYTES";
47
48#[derive(Debug, Clone, Default)]
49pub struct FileWriterOptions {
50    /// How many bytes to use for buffering column data
51    ///
52    /// When data comes in small batches the writer will buffer column data so that
53    /// larger pages can be created.  This value will be divided evenly across all of the
54    /// columns.  Generally you want this to be at least large enough to match your
55    /// filesystem's ideal read size per column.
56    ///
57    /// In some cases you might want this value to be even larger if you have highly
58    /// compressible data.  However, if this is too large, then the writer could require
59    /// a lot of memory and write performance may suffer if the CPU-expensive encoding
60    /// falls behind and can't be interleaved with the I/O expensive flushing.
61    ///
62    /// The default will use 8MiB per column which should be reasonable for most cases.
63    // TODO: Do we need to be able to set this on a per-column basis?
64    pub data_cache_bytes: Option<u64>,
65    /// A hint to indicate the max size of a page
66    ///
67    /// This hint can't always be respected.  A single value could be larger than this value
68    /// and we never slice single values.  In addition, there are some cases where it can be
69    /// difficult to know size up-front and so we might not be able to respect this value.
70    pub max_page_bytes: Option<u64>,
71    /// The file writer buffers columns until enough data has arrived to flush a page
72    /// to disk.
73    ///
74    /// Some columns with small data types may not flush very often.  These arrays can
75    /// stick around for a long time.  These arrays might also be keeping larger data
76    /// structures alive.  By default, the writer will make a deep copy of this array
77    /// to avoid any potential memory leaks.  However, this can be disabled for a
78    /// (probably minor) performance boost if you are sure that arrays are not keeping
79    /// any sibling structures alive (this typically means the array was allocated in
80    /// the same language / runtime as the writer)
81    ///
82    /// Do not enable this if your data is arriving from the C data interface.
83    /// Data typically arrives one "batch" at a time (encoded in the C data interface
84    /// as a struct array).  Each array in that batch keeps the entire batch alive.
85    /// This means a small boolean array (which we will buffer in memory for quite a
86    /// while) might keep a much larger record batch around in memory (even though most
87    /// of that batch's data has been written to disk)
88    pub keep_original_array: Option<bool>,
89    pub encoding_strategy: Option<Arc<dyn FieldEncodingStrategy>>,
90    /// The format version to use when writing the file
91    ///
92    /// This controls which encodings will be used when encoding the data.  Newer
93    /// versions may have more efficient encodings.  However, newer format versions will
94    /// require more up-to-date readers to read the data.
95    pub format_version: Option<LanceFileVersion>,
96}
97
98pub struct FileWriter {
99    writer: ObjectWriter,
100    schema: Option<LanceSchema>,
101    column_writers: Vec<Box<dyn FieldEncoder>>,
102    column_metadata: Vec<pbfile::ColumnMetadata>,
103    field_id_to_column_indices: Vec<(u32, u32)>,
104    num_columns: u32,
105    rows_written: u64,
106    global_buffers: Vec<(u64, u64)>,
107    schema_metadata: HashMap<String, String>,
108    options: FileWriterOptions,
109}
110
111fn initial_column_metadata() -> pbfile::ColumnMetadata {
112    pbfile::ColumnMetadata {
113        pages: Vec::new(),
114        buffer_offsets: Vec::new(),
115        buffer_sizes: Vec::new(),
116        encoding: None,
117    }
118}
119
120static WARNED_ON_UNSTABLE_API: AtomicBool = AtomicBool::new(false);
121
122impl FileWriter {
123    /// Create a new FileWriter with a desired output schema
124    pub fn try_new(
125        object_writer: ObjectWriter,
126        schema: LanceSchema,
127        options: FileWriterOptions,
128    ) -> Result<Self> {
129        let mut writer = Self::new_lazy(object_writer, options);
130        writer.initialize(schema)?;
131        Ok(writer)
132    }
133
134    /// Create a new FileWriter without a desired output schema
135    ///
136    /// The output schema will be set based on the first batch of data to arrive.
137    /// If no data arrives and the writer is finished then the write will fail.
138    pub fn new_lazy(object_writer: ObjectWriter, options: FileWriterOptions) -> Self {
139        if let Some(format_version) = options.format_version {
140            if format_version > LanceFileVersion::Stable
141                && WARNED_ON_UNSTABLE_API
142                    .compare_exchange(
143                        false,
144                        true,
145                        std::sync::atomic::Ordering::Relaxed,
146                        std::sync::atomic::Ordering::Relaxed,
147                    )
148                    .is_ok()
149            {
150                warn!("You have requested an unstable format version.  Files written with this format version may not be readable in the future!  This is a development feature and should only be used for experimentation and never for production data.");
151            }
152        }
153        Self {
154            writer: object_writer,
155            schema: None,
156            column_writers: Vec::new(),
157            column_metadata: Vec::new(),
158            num_columns: 0,
159            rows_written: 0,
160            field_id_to_column_indices: Vec::new(),
161            global_buffers: Vec::new(),
162            schema_metadata: HashMap::new(),
163            options,
164        }
165    }
166
167    /// Write a series of record batches to a new file
168    ///
169    /// Returns the number of rows written
170    pub async fn create_file_with_batches(
171        store: &ObjectStore,
172        path: &Path,
173        schema: lance_core::datatypes::Schema,
174        batches: impl Iterator<Item = RecordBatch> + Send,
175        options: FileWriterOptions,
176    ) -> Result<usize> {
177        let writer = store.create(path).await?;
178        let mut writer = Self::try_new(writer, schema, options)?;
179        for batch in batches {
180            writer.write_batch(&batch).await?;
181        }
182        Ok(writer.finish().await? as usize)
183    }
184
185    async fn do_write_buffer(writer: &mut ObjectWriter, buf: &[u8]) -> Result<()> {
186        writer.write_all(buf).await?;
187        let pad_bytes = pad_bytes::<PAGE_BUFFER_ALIGNMENT>(buf.len());
188        writer.write_all(&PAD_BUFFER[..pad_bytes]).await?;
189        Ok(())
190    }
191
192    /// Returns the format version that will be used when writing the file
193    pub fn version(&self) -> LanceFileVersion {
194        self.options.format_version.unwrap_or_default()
195    }
196
197    async fn write_page(&mut self, encoded_page: EncodedPage) -> Result<()> {
198        let buffers = encoded_page.data;
199        let mut buffer_offsets = Vec::with_capacity(buffers.len());
200        let mut buffer_sizes = Vec::with_capacity(buffers.len());
201        for buffer in buffers {
202            buffer_offsets.push(self.writer.tell().await? as u64);
203            buffer_sizes.push(buffer.len() as u64);
204            Self::do_write_buffer(&mut self.writer, &buffer).await?;
205        }
206        let encoded_encoding = match encoded_page.description {
207            PageEncoding::Legacy(array_encoding) => Any::from_msg(&array_encoding)?.encode_to_vec(),
208            PageEncoding::Structural(page_layout) => Any::from_msg(&page_layout)?.encode_to_vec(),
209        };
210        let page = pbfile::column_metadata::Page {
211            buffer_offsets,
212            buffer_sizes,
213            encoding: Some(pbfile::Encoding {
214                location: Some(pbfile::encoding::Location::Direct(DirectEncoding {
215                    encoding: encoded_encoding,
216                })),
217            }),
218            length: encoded_page.num_rows,
219            priority: encoded_page.row_number,
220        };
221        self.column_metadata[encoded_page.column_idx as usize]
222            .pages
223            .push(page);
224        Ok(())
225    }
226
227    #[instrument(skip_all, level = "debug")]
228    async fn write_pages(&mut self, mut encoding_tasks: FuturesOrdered<EncodeTask>) -> Result<()> {
229        // As soon as an encoding task is done we write it.  There is no parallelism
230        // needed here because "writing" is really just submitting the buffer to the
231        // underlying write scheduler (either the OS or object_store's scheduler for
232        // cloud writes).  The only time we might truly await on write_page is if the
233        // scheduler's write queue is full.
234        //
235        // Also, there is no point in trying to make write_page parallel anyways
236        // because we wouldn't want buffers getting mixed up across pages.
237        while let Some(encoding_task) = encoding_tasks.next().await {
238            let encoded_page = encoding_task?;
239            self.write_page(encoded_page).await?;
240        }
241        // It's important to flush here, we don't know when the next batch will arrive
242        // and the underlying cloud store could have writes in progress that won't advance
243        // until we interact with the writer again.  These in-progress writes will time out
244        // if we don't flush.
245        self.writer.flush().await?;
246        Ok(())
247    }
248
249    /// Schedule batches of data to be written to the file
250    pub async fn write_batches(
251        &mut self,
252        batches: impl Iterator<Item = &RecordBatch>,
253    ) -> Result<()> {
254        for batch in batches {
255            self.write_batch(batch).await?;
256        }
257        Ok(())
258    }
259
260    fn verify_field_nullability(arr: &ArrayData, field: &Field) -> Result<()> {
261        if !field.nullable && arr.null_count() > 0 {
262            return Err(Error::invalid_input(format!("The field `{}` contained null values even though the field is marked non-null in the schema", field.name), location!()));
263        }
264
265        for (child_field, child_arr) in field.children.iter().zip(arr.child_data()) {
266            Self::verify_field_nullability(child_arr, child_field)?;
267        }
268
269        Ok(())
270    }
271
272    fn verify_nullability_constraints(&self, batch: &RecordBatch) -> Result<()> {
273        for (col, field) in batch
274            .columns()
275            .iter()
276            .zip(self.schema.as_ref().unwrap().fields.iter())
277        {
278            Self::verify_field_nullability(&col.to_data(), field)?;
279        }
280        Ok(())
281    }
282
283    fn initialize(&mut self, mut schema: LanceSchema) -> Result<()> {
284        let cache_bytes_per_column = if let Some(data_cache_bytes) = self.options.data_cache_bytes {
285            data_cache_bytes / schema.fields.len() as u64
286        } else {
287            8 * 1024 * 1024
288        };
289
290        let max_page_bytes = self.options.max_page_bytes.unwrap_or_else(|| {
291            std::env::var(ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES)
292                .map(|s| {
293                    s.parse::<u64>().unwrap_or_else(|e| {
294                        warn!(
295                            "Failed to parse {}: {}, using default",
296                            ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, e
297                        );
298                        MAX_PAGE_BYTES as u64
299                    })
300                })
301                .unwrap_or(MAX_PAGE_BYTES as u64)
302        });
303
304        schema.validate()?;
305
306        let keep_original_array = self.options.keep_original_array.unwrap_or(false);
307        let encoding_strategy = self.options.encoding_strategy.clone().unwrap_or_else(|| {
308            let version = self.version();
309            default_encoding_strategy(version).into()
310        });
311
312        let encoding_options = EncodingOptions {
313            cache_bytes_per_column,
314            max_page_bytes,
315            keep_original_array,
316            buffer_alignment: PAGE_BUFFER_ALIGNMENT as u64,
317        };
318        let encoder =
319            BatchEncoder::try_new(&schema, encoding_strategy.as_ref(), &encoding_options)?;
320        self.num_columns = encoder.num_columns();
321
322        self.column_writers = encoder.field_encoders;
323        self.column_metadata = vec![initial_column_metadata(); self.num_columns as usize];
324        self.field_id_to_column_indices = encoder.field_id_to_column_index;
325        self.schema_metadata
326            .extend(std::mem::take(&mut schema.metadata));
327        self.schema = Some(schema);
328        Ok(())
329    }
330
331    fn ensure_initialized(&mut self, batch: &RecordBatch) -> Result<&LanceSchema> {
332        if self.schema.is_none() {
333            let schema = LanceSchema::try_from(batch.schema().as_ref())?;
334            self.initialize(schema)?;
335        }
336        Ok(self.schema.as_ref().unwrap())
337    }
338
339    #[instrument(skip_all, level = "debug")]
340    fn encode_batch(
341        &mut self,
342        batch: &RecordBatch,
343        external_buffers: &mut OutOfLineBuffers,
344    ) -> Result<Vec<Vec<EncodeTask>>> {
345        self.schema
346            .as_ref()
347            .unwrap()
348            .fields
349            .iter()
350            .zip(self.column_writers.iter_mut())
351            .map(|(field, column_writer)| {
352                let array = batch
353                    .column_by_name(&field.name)
354                    .ok_or(Error::InvalidInput {
355                        source: format!(
356                            "Cannot write batch.  The batch was missing the column `{}`",
357                            field.name
358                        )
359                        .into(),
360                        location: location!(),
361                    })?;
362                let repdef = RepDefBuilder::default();
363                let num_rows = array.len() as u64;
364                column_writer.maybe_encode(
365                    array.clone(),
366                    external_buffers,
367                    repdef,
368                    self.rows_written,
369                    num_rows,
370                )
371            })
372            .collect::<Result<Vec<_>>>()
373    }
374
375    /// Schedule a batch of data to be written to the file
376    ///
377    /// Note: the future returned by this method may complete before the data has been fully
378    /// flushed to the file (some data may be in the data cache or the I/O cache)
379    pub async fn write_batch(&mut self, batch: &RecordBatch) -> Result<()> {
380        debug!(
381            "write_batch called with {} bytes of data",
382            batch.get_array_memory_size()
383        );
384        self.ensure_initialized(batch)?;
385        self.verify_nullability_constraints(batch)?;
386        let num_rows = batch.num_rows() as u64;
387        if num_rows == 0 {
388            return Ok(());
389        }
390        if num_rows > u32::MAX as u64 {
391            return Err(Error::InvalidInput {
392                source: "cannot write Lance files with more than 2^32 rows".into(),
393                location: location!(),
394            });
395        }
396        // First we push each array into its column writer.  This may or may not generate enough
397        // data to trigger an encoding task.  We collect any encoding tasks into a queue.
398        let mut external_buffers =
399            OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
400        let encoding_tasks = self.encode_batch(batch, &mut external_buffers)?;
401        // Next, write external buffers
402        for external_buffer in external_buffers.take_buffers() {
403            Self::do_write_buffer(&mut self.writer, &external_buffer).await?;
404        }
405
406        let encoding_tasks = encoding_tasks
407            .into_iter()
408            .flatten()
409            .collect::<FuturesOrdered<_>>();
410
411        self.rows_written = match self.rows_written.checked_add(batch.num_rows() as u64) {
412            Some(rows_written) => rows_written,
413            None => {
414                return Err(Error::InvalidInput { source: format!("cannot write batch with {} rows because {} rows have already been written and Lance files cannot contain more than 2^64 rows", num_rows, self.rows_written).into(), location: location!() });
415            }
416        };
417
418        self.write_pages(encoding_tasks).await?;
419
420        Ok(())
421    }
422
423    async fn write_column_metadata(
424        &mut self,
425        metadata: pbfile::ColumnMetadata,
426    ) -> Result<(u64, u64)> {
427        let metadata_bytes = metadata.encode_to_vec();
428        let position = self.writer.tell().await? as u64;
429        let len = metadata_bytes.len() as u64;
430        self.writer.write_all(&metadata_bytes).await?;
431        Ok((position, len))
432    }
433
434    async fn write_column_metadatas(&mut self) -> Result<Vec<(u64, u64)>> {
435        let mut metadatas = Vec::new();
436        std::mem::swap(&mut self.column_metadata, &mut metadatas);
437        let mut metadata_positions = Vec::with_capacity(metadatas.len());
438        for metadata in metadatas {
439            metadata_positions.push(self.write_column_metadata(metadata).await?);
440        }
441        Ok(metadata_positions)
442    }
443
444    fn make_file_descriptor(
445        schema: &lance_core::datatypes::Schema,
446        num_rows: u64,
447    ) -> Result<pb::FileDescriptor> {
448        let fields_with_meta = FieldsWithMeta::from(schema);
449        Ok(pb::FileDescriptor {
450            schema: Some(pb::Schema {
451                fields: fields_with_meta.fields.0,
452                metadata: fields_with_meta.metadata,
453            }),
454            length: num_rows,
455        })
456    }
457
458    async fn write_global_buffers(&mut self) -> Result<Vec<(u64, u64)>> {
459        let schema = self.schema.as_mut().ok_or(Error::invalid_input("No schema provided on writer open and no data provided.  Schema is unknown and file cannot be created", location!()))?;
460        schema.metadata = std::mem::take(&mut self.schema_metadata);
461        let file_descriptor = Self::make_file_descriptor(schema, self.rows_written)?;
462        let file_descriptor_bytes = file_descriptor.encode_to_vec();
463        let file_descriptor_len = file_descriptor_bytes.len() as u64;
464        let file_descriptor_position = self.writer.tell().await? as u64;
465        self.writer.write_all(&file_descriptor_bytes).await?;
466        let mut gbo_table = Vec::with_capacity(1 + self.global_buffers.len());
467        gbo_table.push((file_descriptor_position, file_descriptor_len));
468        gbo_table.append(&mut self.global_buffers);
469        Ok(gbo_table)
470    }
471
472    /// Add a metadata entry to the schema
473    ///
474    /// This method is useful because sometimes the metadata is not known until after the
475    /// data has been written.  This method allows you to alter the schema metadata.  It
476    /// must be called before `finish` is called.
477    pub fn add_schema_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
478        self.schema_metadata.insert(key.into(), value.into());
479    }
480
481    /// Adds a global buffer to the file
482    ///
483    /// The global buffer can contain any arbitrary bytes.  It will be written to the disk
484    /// immediately.  This method returns the index of the global buffer (this will always
485    /// start at 1 and increment by 1 each time this method is called)
486    pub async fn add_global_buffer(&mut self, buffer: Bytes) -> Result<u32> {
487        let position = self.writer.tell().await? as u64;
488        let len = buffer.len() as u64;
489        Self::do_write_buffer(&mut self.writer, &buffer).await?;
490        self.global_buffers.push((position, len));
491        Ok(self.global_buffers.len() as u32)
492    }
493
494    async fn finish_writers(&mut self) -> Result<()> {
495        let mut col_idx = 0;
496        for mut writer in std::mem::take(&mut self.column_writers) {
497            let mut external_buffers =
498                OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
499            let columns = writer.finish(&mut external_buffers).await?;
500            for buffer in external_buffers.take_buffers() {
501                self.writer.write_all(&buffer).await?;
502            }
503            debug_assert_eq!(
504                columns.len(),
505                writer.num_columns() as usize,
506                "Expected {} columns from column at index {} and got {}",
507                writer.num_columns(),
508                col_idx,
509                columns.len()
510            );
511            for column in columns {
512                for page in column.final_pages {
513                    self.write_page(page).await?;
514                }
515                let column_metadata = &mut self.column_metadata[col_idx];
516                let mut buffer_pos = self.writer.tell().await? as u64;
517                for buffer in column.column_buffers {
518                    column_metadata.buffer_offsets.push(buffer_pos);
519                    let mut size = 0;
520                    Self::do_write_buffer(&mut self.writer, &buffer).await?;
521                    size += buffer.len() as u64;
522                    buffer_pos += size;
523                    column_metadata.buffer_sizes.push(size);
524                }
525                let encoded_encoding = Any::from_msg(&column.encoding)?.encode_to_vec();
526                column_metadata.encoding = Some(pbfile::Encoding {
527                    location: Some(pbfile::encoding::Location::Direct(pbfile::DirectEncoding {
528                        encoding: encoded_encoding,
529                    })),
530                });
531                col_idx += 1;
532            }
533        }
534        if col_idx != self.column_metadata.len() {
535            panic!(
536                "Column writers finished with {} columns but we expected {}",
537                col_idx,
538                self.column_metadata.len()
539            );
540        }
541        Ok(())
542    }
543
544    /// Converts self.version (which is a mix of "software version" and
545    /// "format version" into a format version)
546    fn version_to_numbers(&self) -> (u16, u16) {
547        let version = self.options.format_version.unwrap_or_default();
548        match version.resolve() {
549            LanceFileVersion::V2_0 => (0, 3),
550            LanceFileVersion::V2_1 => (2, 1),
551            _ => panic!("Unsupported version: {}", version),
552        }
553    }
554
555    /// Finishes writing the file
556    ///
557    /// This method will wait until all data has been flushed to the file.  Then it
558    /// will write the file metadata and the footer.  It will not return until all
559    /// data has been flushed and the file has been closed.
560    ///
561    /// Returns the total number of rows written
562    pub async fn finish(&mut self) -> Result<u64> {
563        // 1. flush any remaining data and write out those pages
564        let mut external_buffers =
565            OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
566        let encoding_tasks = self
567            .column_writers
568            .iter_mut()
569            .map(|writer| writer.flush(&mut external_buffers))
570            .collect::<Result<Vec<_>>>()?;
571        for external_buffer in external_buffers.take_buffers() {
572            Self::do_write_buffer(&mut self.writer, &external_buffer).await?;
573        }
574        let encoding_tasks = encoding_tasks
575            .into_iter()
576            .flatten()
577            .collect::<FuturesOrdered<_>>();
578        self.write_pages(encoding_tasks).await?;
579
580        self.finish_writers().await?;
581
582        // 3. write global buffers (we write the schema here)
583        let global_buffer_offsets = self.write_global_buffers().await?;
584        let num_global_buffers = global_buffer_offsets.len() as u32;
585
586        // 4. write the column metadatas
587        let column_metadata_start = self.writer.tell().await? as u64;
588        let metadata_positions = self.write_column_metadatas().await?;
589
590        // 5. write the column metadata offset table
591        let cmo_table_start = self.writer.tell().await? as u64;
592        for (meta_pos, meta_len) in metadata_positions {
593            self.writer.write_u64_le(meta_pos).await?;
594            self.writer.write_u64_le(meta_len).await?;
595        }
596
597        // 6. write global buffers offset table
598        let gbo_table_start = self.writer.tell().await? as u64;
599        for (gbo_pos, gbo_len) in global_buffer_offsets {
600            self.writer.write_u64_le(gbo_pos).await?;
601            self.writer.write_u64_le(gbo_len).await?;
602        }
603
604        let (major, minor) = self.version_to_numbers();
605        // 7. write the footer
606        self.writer.write_u64_le(column_metadata_start).await?;
607        self.writer.write_u64_le(cmo_table_start).await?;
608        self.writer.write_u64_le(gbo_table_start).await?;
609        self.writer.write_u32_le(num_global_buffers).await?;
610        self.writer.write_u32_le(self.num_columns).await?;
611        self.writer.write_u16_le(major).await?;
612        self.writer.write_u16_le(minor).await?;
613        self.writer.write_all(MAGIC).await?;
614
615        // 7. close the writer
616        self.writer.shutdown().await?;
617        Ok(self.rows_written)
618    }
619
620    pub async fn abort(&mut self) {
621        self.writer.abort().await;
622    }
623
624    pub async fn tell(&mut self) -> Result<u64> {
625        Ok(self.writer.tell().await? as u64)
626    }
627
628    pub fn field_id_to_column_indices(&self) -> &[(u32, u32)] {
629        &self.field_id_to_column_indices
630    }
631}
632
633/// Utility trait for converting EncodedBatch to Bytes using the
634/// lance file format
635pub trait EncodedBatchWriteExt {
636    /// Serializes into a lance file, including the schema
637    fn try_to_self_described_lance(&self, version: LanceFileVersion) -> Result<Bytes>;
638    /// Serializes into a lance file, without the schema.
639    ///
640    /// The schema must be provided to deserialize the buffer
641    fn try_to_mini_lance(&self, version: LanceFileVersion) -> Result<Bytes>;
642}
643
644// Creates a lance footer and appends it to the encoded data
645//
646// The logic here is very similar to logic in the FileWriter except we
647// are using BufMut (put_xyz) instead of AsyncWrite (write_xyz).
648fn concat_lance_footer(
649    batch: &EncodedBatch,
650    write_schema: bool,
651    version: LanceFileVersion,
652) -> Result<Bytes> {
653    // Estimating 1MiB for file footer
654    let mut data = BytesMut::with_capacity(batch.data.len() + 1024 * 1024);
655    data.put(batch.data.clone());
656    // write global buffers (we write the schema here)
657    let global_buffers = if write_schema {
658        let schema_start = data.len() as u64;
659        let lance_schema = lance_core::datatypes::Schema::try_from(batch.schema.as_ref())?;
660        let descriptor = FileWriter::make_file_descriptor(&lance_schema, batch.num_rows)?;
661        let descriptor_bytes = descriptor.encode_to_vec();
662        let descriptor_len = descriptor_bytes.len() as u64;
663        data.put(descriptor_bytes.as_slice());
664
665        vec![(schema_start, descriptor_len)]
666    } else {
667        vec![]
668    };
669    let col_metadata_start = data.len() as u64;
670
671    let mut col_metadata_positions = Vec::new();
672    // Write column metadata
673    for col in &batch.page_table {
674        let position = data.len() as u64;
675        let pages = col
676            .page_infos
677            .iter()
678            .map(|page_info| {
679                let encoded_encoding = match &page_info.encoding {
680                    PageEncoding::Legacy(array_encoding) => {
681                        Any::from_msg(array_encoding)?.encode_to_vec()
682                    }
683                    PageEncoding::Structural(page_layout) => {
684                        Any::from_msg(page_layout)?.encode_to_vec()
685                    }
686                };
687                let (buffer_offsets, buffer_sizes): (Vec<_>, Vec<_>) = page_info
688                    .buffer_offsets_and_sizes
689                    .as_ref()
690                    .iter()
691                    .cloned()
692                    .unzip();
693                Ok(pbfile::column_metadata::Page {
694                    buffer_offsets,
695                    buffer_sizes,
696                    encoding: Some(pbfile::Encoding {
697                        location: Some(pbfile::encoding::Location::Direct(DirectEncoding {
698                            encoding: encoded_encoding,
699                        })),
700                    }),
701                    length: page_info.num_rows,
702                    priority: page_info.priority,
703                })
704            })
705            .collect::<Result<Vec<_>>>()?;
706        let (buffer_offsets, buffer_sizes): (Vec<_>, Vec<_>) =
707            col.buffer_offsets_and_sizes.iter().cloned().unzip();
708        let encoded_col_encoding = Any::from_msg(&col.encoding)?.encode_to_vec();
709        let column = pbfile::ColumnMetadata {
710            pages,
711            buffer_offsets,
712            buffer_sizes,
713            encoding: Some(pbfile::Encoding {
714                location: Some(pbfile::encoding::Location::Direct(pbfile::DirectEncoding {
715                    encoding: encoded_col_encoding,
716                })),
717            }),
718        };
719        let column_bytes = column.encode_to_vec();
720        col_metadata_positions.push((position, column_bytes.len() as u64));
721        data.put(column_bytes.as_slice());
722    }
723    // Write column metadata offsets table
724    let cmo_table_start = data.len() as u64;
725    for (meta_pos, meta_len) in col_metadata_positions {
726        data.put_u64_le(meta_pos);
727        data.put_u64_le(meta_len);
728    }
729    // Write global buffers offsets table
730    let gbo_table_start = data.len() as u64;
731    let num_global_buffers = global_buffers.len() as u32;
732    for (gbo_pos, gbo_len) in global_buffers {
733        data.put_u64_le(gbo_pos);
734        data.put_u64_le(gbo_len);
735    }
736
737    let (major, minor) = version.to_numbers();
738
739    // write the footer
740    data.put_u64_le(col_metadata_start);
741    data.put_u64_le(cmo_table_start);
742    data.put_u64_le(gbo_table_start);
743    data.put_u32_le(num_global_buffers);
744    data.put_u32_le(batch.page_table.len() as u32);
745    data.put_u16_le(major as u16);
746    data.put_u16_le(minor as u16);
747    data.put(MAGIC.as_slice());
748
749    Ok(data.freeze())
750}
751
752impl EncodedBatchWriteExt for EncodedBatch {
753    fn try_to_self_described_lance(&self, version: LanceFileVersion) -> Result<Bytes> {
754        concat_lance_footer(self, true, version)
755    }
756
757    fn try_to_mini_lance(&self, version: LanceFileVersion) -> Result<Bytes> {
758        concat_lance_footer(self, false, version)
759    }
760}
761
762#[cfg(test)]
763mod tests {
764    use std::collections::HashMap;
765    use std::sync::Arc;
766
767    use crate::v2::reader::{describe_encoding, FileReader, FileReaderOptions};
768    use crate::v2::testing::FsFixture;
769    use crate::v2::writer::{FileWriter, FileWriterOptions, ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES};
770    use arrow_array::builder::{Float32Builder, Int32Builder};
771    use arrow_array::{types::Float64Type, RecordBatchReader, StringArray};
772    use arrow_array::{Int32Array, RecordBatch, UInt64Array};
773    use arrow_schema::{DataType, Field, Field as ArrowField, Schema, Schema as ArrowSchema};
774    use lance_core::cache::LanceCache;
775    use lance_core::datatypes::Schema as LanceSchema;
776    use lance_datagen::{array, gen_batch, BatchCount, RowCount};
777    use lance_encoding::compression_config::{CompressionFieldParams, CompressionParams};
778    use lance_encoding::decoder::DecoderPlugins;
779    use lance_encoding::version::LanceFileVersion;
780    use lance_io::object_store::ObjectStore;
781    use lance_io::utils::CachedFileSize;
782    use object_store::path::Path;
783    use tempfile::tempdir;
784
785    #[tokio::test]
786    async fn test_basic_write() {
787        let tmp_dir = tempfile::tempdir().unwrap();
788        let tmp_path: String = tmp_dir.path().to_str().unwrap().to_owned();
789        let tmp_path = Path::parse(tmp_path).unwrap();
790        let tmp_path = tmp_path.child("some_file.lance");
791        let obj_store = Arc::new(ObjectStore::local());
792
793        let reader = gen_batch()
794            .col("score", array::rand::<Float64Type>())
795            .into_reader_rows(RowCount::from(1000), BatchCount::from(10));
796
797        let writer = obj_store.create(&tmp_path).await.unwrap();
798
799        let lance_schema =
800            lance_core::datatypes::Schema::try_from(reader.schema().as_ref()).unwrap();
801
802        let mut file_writer =
803            FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
804
805        for batch in reader {
806            file_writer.write_batch(&batch.unwrap()).await.unwrap();
807        }
808        file_writer.add_schema_metadata("foo", "bar");
809        file_writer.finish().await.unwrap();
810        // Tests asserting the contents of the written file are in reader.rs
811    }
812
813    #[tokio::test]
814    async fn test_write_empty() {
815        let tmp_dir = tempfile::tempdir().unwrap();
816        let tmp_path: String = tmp_dir.path().to_str().unwrap().to_owned();
817        let tmp_path = Path::parse(tmp_path).unwrap();
818        let tmp_path = tmp_path.child("some_file.lance");
819        let obj_store = Arc::new(ObjectStore::local());
820
821        let reader = gen_batch()
822            .col("score", array::rand::<Float64Type>())
823            .into_reader_rows(RowCount::from(0), BatchCount::from(0));
824
825        let writer = obj_store.create(&tmp_path).await.unwrap();
826
827        let lance_schema =
828            lance_core::datatypes::Schema::try_from(reader.schema().as_ref()).unwrap();
829
830        let mut file_writer =
831            FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
832
833        for batch in reader {
834            file_writer.write_batch(&batch.unwrap()).await.unwrap();
835        }
836        file_writer.add_schema_metadata("foo", "bar");
837        file_writer.finish().await.unwrap();
838    }
839
840    #[tokio::test]
841    async fn test_max_page_bytes_enforced() {
842        let arrow_field = Field::new("data", DataType::UInt64, false);
843        let arrow_schema = Schema::new(vec![arrow_field]);
844        let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();
845
846        // 8MiB
847        let data: Vec<u64> = (0..1_000_000).collect();
848        let array = UInt64Array::from(data);
849        let batch =
850            RecordBatch::try_new(arrow_schema.clone().into(), vec![Arc::new(array)]).unwrap();
851
852        let options = FileWriterOptions {
853            max_page_bytes: Some(1024 * 1024), // 1MB
854            ..Default::default()
855        };
856
857        let tmp_dir = tempdir().unwrap();
858        let path = tmp_dir.path().join("test.lance");
859        let object_store = ObjectStore::local();
860        let mut writer = FileWriter::try_new(
861            object_store
862                .create(&Path::from(path.to_str().unwrap()))
863                .await
864                .unwrap(),
865            lance_schema,
866            options,
867        )
868        .unwrap();
869
870        writer.write_batch(&batch).await.unwrap();
871        writer.finish().await.unwrap();
872
873        let fs = FsFixture::default();
874        let file_scheduler = fs
875            .scheduler
876            .open_file(
877                &Path::from(path.to_str().unwrap()),
878                &CachedFileSize::unknown(),
879            )
880            .await
881            .unwrap();
882        let file_reader = FileReader::try_open(
883            file_scheduler,
884            None,
885            Arc::<DecoderPlugins>::default(),
886            &LanceCache::no_cache(),
887            FileReaderOptions::default(),
888        )
889        .await
890        .unwrap();
891
892        let column_meta = file_reader.metadata();
893
894        let mut total_page_num: u32 = 0;
895        for (col_idx, col_metadata) in column_meta.column_metadatas.iter().enumerate() {
896            assert!(
897                !col_metadata.pages.is_empty(),
898                "Column {} has no pages",
899                col_idx
900            );
901
902            for (page_idx, page) in col_metadata.pages.iter().enumerate() {
903                total_page_num += 1;
904                let total_size: u64 = page.buffer_sizes.iter().sum();
905                assert!(
906                    total_size <= 1024 * 1024,
907                    "Column {} Page {} size {} exceeds 1MB limit",
908                    col_idx,
909                    page_idx,
910                    total_size
911                );
912            }
913        }
914
915        assert_eq!(total_page_num, 8)
916    }
917
918    #[tokio::test(flavor = "current_thread")]
919    async fn test_max_page_bytes_env_var() {
920        let arrow_field = Field::new("data", DataType::UInt64, false);
921        let arrow_schema = Schema::new(vec![arrow_field]);
922        let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();
923        // 4MiB
924        let data: Vec<u64> = (0..500_000).collect();
925        let array = UInt64Array::from(data);
926        let batch =
927            RecordBatch::try_new(arrow_schema.clone().into(), vec![Arc::new(array)]).unwrap();
928
929        // 2MiB
930        std::env::set_var(ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, "2097152");
931
932        let options = FileWriterOptions {
933            max_page_bytes: None, // enforce env
934            ..Default::default()
935        };
936
937        let tmp_dir = tempdir().unwrap();
938        let path = tmp_dir.path().join("test_env_var.lance");
939        let object_store = ObjectStore::local();
940        let mut writer = FileWriter::try_new(
941            object_store
942                .create(&Path::from(path.to_str().unwrap()))
943                .await
944                .unwrap(),
945            lance_schema.clone(),
946            options,
947        )
948        .unwrap();
949
950        writer.write_batch(&batch).await.unwrap();
951        writer.finish().await.unwrap();
952
953        let fs = FsFixture::default();
954        let file_scheduler = fs
955            .scheduler
956            .open_file(
957                &Path::from(path.to_str().unwrap()),
958                &CachedFileSize::unknown(),
959            )
960            .await
961            .unwrap();
962        let file_reader = FileReader::try_open(
963            file_scheduler,
964            None,
965            Arc::<DecoderPlugins>::default(),
966            &LanceCache::no_cache(),
967            FileReaderOptions::default(),
968        )
969        .await
970        .unwrap();
971
972        for col_metadata in file_reader.metadata().column_metadatas.iter() {
973            for page in col_metadata.pages.iter() {
974                let total_size: u64 = page.buffer_sizes.iter().sum();
975                assert!(
976                    total_size <= 2 * 1024 * 1024,
977                    "Page size {} exceeds 2MB limit",
978                    total_size
979                );
980            }
981        }
982
983        std::env::set_var(ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, "");
984    }
985
986    #[tokio::test]
987    async fn test_compression_overrides_end_to_end() {
988        // Create test schema with different column types
989        let arrow_schema = Arc::new(ArrowSchema::new(vec![
990            ArrowField::new("customer_id", DataType::Int32, false),
991            ArrowField::new("product_id", DataType::Int32, false),
992            ArrowField::new("quantity", DataType::Int32, false),
993            ArrowField::new("price", DataType::Float32, false),
994            ArrowField::new("description", DataType::Utf8, false),
995        ]));
996
997        let lance_schema = LanceSchema::try_from(arrow_schema.as_ref()).unwrap();
998
999        // Create test data with patterns suitable for different compression
1000        let mut customer_ids = Int32Builder::new();
1001        let mut product_ids = Int32Builder::new();
1002        let mut quantities = Int32Builder::new();
1003        let mut prices = Float32Builder::new();
1004        let mut descriptions = Vec::new();
1005
1006        // Generate data with specific patterns:
1007        // - customer_id: highly repetitive (good for RLE)
1008        // - product_id: moderately repetitive (good for RLE)
1009        // - quantity: random values (not good for RLE)
1010        // - price: some repetition
1011        // - description: long strings (good for Zstd)
1012        for i in 0..10000 {
1013            // Customer ID repeats every 100 rows (100 unique customers)
1014            // This creates runs of 100 identical values
1015            customer_ids.append_value(i / 100);
1016
1017            // Product ID has only 5 unique values with long runs
1018            product_ids.append_value(i / 2000);
1019
1020            // Quantity is mostly 1 with occasional other values
1021            quantities.append_value(if i % 10 == 0 { 5 } else { 1 });
1022
1023            // Price has only 3 unique values
1024            prices.append_value(match i % 3 {
1025                0 => 9.99,
1026                1 => 19.99,
1027                _ => 29.99,
1028            });
1029
1030            // Descriptions are repetitive but we'll keep them simple
1031            descriptions.push(format!("Product {}", i / 2000));
1032        }
1033
1034        let batch = RecordBatch::try_new(
1035            arrow_schema.clone(),
1036            vec![
1037                Arc::new(customer_ids.finish()),
1038                Arc::new(product_ids.finish()),
1039                Arc::new(quantities.finish()),
1040                Arc::new(prices.finish()),
1041                Arc::new(StringArray::from(descriptions)),
1042            ],
1043        )
1044        .unwrap();
1045
1046        // Configure compression parameters
1047        let mut params = CompressionParams::new();
1048
1049        // RLE for ID columns (ends with _id)
1050        params.columns.insert(
1051            "*_id".to_string(),
1052            CompressionFieldParams {
1053                rle_threshold: Some(0.5), // Lower threshold to trigger RLE more easily
1054                compression: None,        // Will use default compression if any
1055                compression_level: None,
1056                bss: Some(lance_encoding::compression_config::BssMode::Off), // Explicitly disable BSS to ensure RLE is used
1057            },
1058        );
1059
1060        // For now, we'll skip Zstd compression since it's not imported
1061        // In a real implementation, you could add other compression types here
1062
1063        // Build encoding strategy with compression parameters
1064        let encoding_strategy = lance_encoding::encoder::default_encoding_strategy_with_params(
1065            LanceFileVersion::V2_1,
1066            params,
1067        )
1068        .unwrap();
1069
1070        // Configure file writer options
1071        let options = FileWriterOptions {
1072            encoding_strategy: Some(Arc::from(encoding_strategy)),
1073            format_version: Some(LanceFileVersion::V2_1),
1074            max_page_bytes: Some(64 * 1024), // 64KB pages
1075            ..Default::default()
1076        };
1077
1078        // Write the file
1079        let tmp_dir = tempdir().unwrap();
1080        let path = tmp_dir.path().join("test_compression_overrides.lance");
1081        let object_store = ObjectStore::local();
1082
1083        let mut writer = FileWriter::try_new(
1084            object_store
1085                .create(&Path::from(path.to_str().unwrap()))
1086                .await
1087                .unwrap(),
1088            lance_schema.clone(),
1089            options,
1090        )
1091        .unwrap();
1092
1093        writer.write_batch(&batch).await.unwrap();
1094        writer.add_schema_metadata("compression_test", "configured_compression");
1095        writer.finish().await.unwrap();
1096
1097        // Now write the same data without compression overrides for comparison
1098        let path_no_compression = tmp_dir.path().join("test_no_compression.lance");
1099        let default_options = FileWriterOptions {
1100            format_version: Some(LanceFileVersion::V2_1),
1101            max_page_bytes: Some(64 * 1024),
1102            ..Default::default()
1103        };
1104
1105        let mut writer_no_compression = FileWriter::try_new(
1106            object_store
1107                .create(&Path::from(path_no_compression.to_str().unwrap()))
1108                .await
1109                .unwrap(),
1110            lance_schema.clone(),
1111            default_options,
1112        )
1113        .unwrap();
1114
1115        writer_no_compression.write_batch(&batch).await.unwrap();
1116        writer_no_compression.finish().await.unwrap();
1117
1118        // Note: With our current data patterns and RLE compression, the compressed file
1119        // might actually be slightly larger due to compression metadata overhead.
1120        // This is expected and the test is mainly to verify the system works end-to-end.
1121
1122        // Read back the compressed file and verify data integrity
1123        let fs = FsFixture::default();
1124        let file_scheduler = fs
1125            .scheduler
1126            .open_file(
1127                &Path::from(path.to_str().unwrap()),
1128                &CachedFileSize::unknown(),
1129            )
1130            .await
1131            .unwrap();
1132
1133        let file_reader = FileReader::try_open(
1134            file_scheduler,
1135            None,
1136            Arc::<DecoderPlugins>::default(),
1137            &LanceCache::no_cache(),
1138            FileReaderOptions::default(),
1139        )
1140        .await
1141        .unwrap();
1142
1143        // Verify metadata
1144        let metadata = file_reader.metadata();
1145        assert_eq!(metadata.major_version, 2);
1146        assert_eq!(metadata.minor_version, 1);
1147
1148        let schema = file_reader.schema();
1149        assert_eq!(
1150            schema.metadata.get("compression_test"),
1151            Some(&"configured_compression".to_string())
1152        );
1153
1154        // Verify the actual encodings used
1155        let column_metadatas = &metadata.column_metadatas;
1156
1157        // Check customer_id column (index 0) - should use RLE due to our configuration
1158        assert!(!column_metadatas[0].pages.is_empty());
1159        let customer_id_encoding = describe_encoding(&column_metadatas[0].pages[0]);
1160        assert!(
1161            customer_id_encoding.contains("RLE") || customer_id_encoding.contains("Rle"),
1162            "customer_id column should use RLE encoding due to '*_id' pattern match, but got: {}",
1163            customer_id_encoding
1164        );
1165
1166        // Check product_id column (index 1) - should use RLE due to our configuration
1167        assert!(!column_metadatas[1].pages.is_empty());
1168        let product_id_encoding = describe_encoding(&column_metadatas[1].pages[0]);
1169        assert!(
1170            product_id_encoding.contains("RLE") || product_id_encoding.contains("Rle"),
1171            "product_id column should use RLE encoding due to '*_id' pattern match, but got: {}",
1172            product_id_encoding
1173        );
1174    }
1175
1176    #[tokio::test]
1177    async fn test_field_metadata_compression() {
1178        // Test that field metadata compression settings are respected
1179        let mut metadata = HashMap::new();
1180        metadata.insert(
1181            lance_encoding::constants::COMPRESSION_META_KEY.to_string(),
1182            "zstd".to_string(),
1183        );
1184        metadata.insert(
1185            lance_encoding::constants::COMPRESSION_LEVEL_META_KEY.to_string(),
1186            "6".to_string(),
1187        );
1188
1189        let arrow_schema = Arc::new(ArrowSchema::new(vec![
1190            ArrowField::new("id", DataType::Int32, false),
1191            ArrowField::new("text", DataType::Utf8, false).with_metadata(metadata.clone()),
1192            ArrowField::new("data", DataType::Int32, false).with_metadata(HashMap::from([(
1193                lance_encoding::constants::COMPRESSION_META_KEY.to_string(),
1194                "none".to_string(),
1195            )])),
1196        ]));
1197
1198        let lance_schema = LanceSchema::try_from(arrow_schema.as_ref()).unwrap();
1199
1200        // Create test data
1201        let id_array = Int32Array::from_iter_values(0..1000);
1202        let text_array = StringArray::from_iter_values(
1203            (0..1000).map(|i| format!("test string {} repeated text", i)),
1204        );
1205        let data_array = Int32Array::from_iter_values((0..1000).map(|i| i * 2));
1206
1207        let batch = RecordBatch::try_new(
1208            arrow_schema.clone(),
1209            vec![
1210                Arc::new(id_array),
1211                Arc::new(text_array),
1212                Arc::new(data_array),
1213            ],
1214        )
1215        .unwrap();
1216
1217        let tmp_dir = tempdir().unwrap();
1218        let path = tmp_dir.path().join("field_metadata_test.lance");
1219        let object_store = ObjectStore::local();
1220
1221        // Create encoding strategy that will read from field metadata
1222        let params = CompressionParams::new();
1223        let encoding_strategy = lance_encoding::encoder::default_encoding_strategy_with_params(
1224            LanceFileVersion::V2_1,
1225            params,
1226        )
1227        .unwrap();
1228
1229        let options = FileWriterOptions {
1230            encoding_strategy: Some(Arc::from(encoding_strategy)),
1231            format_version: Some(LanceFileVersion::V2_1),
1232            ..Default::default()
1233        };
1234        let mut writer = FileWriter::try_new(
1235            object_store
1236                .create(&Path::from(path.to_str().unwrap()))
1237                .await
1238                .unwrap(),
1239            lance_schema.clone(),
1240            options,
1241        )
1242        .unwrap();
1243
1244        writer.write_batch(&batch).await.unwrap();
1245        writer.finish().await.unwrap();
1246
1247        // Read back metadata
1248        let fs = FsFixture::default();
1249        let file_scheduler = fs
1250            .scheduler
1251            .open_file(
1252                &Path::from(path.to_str().unwrap()),
1253                &CachedFileSize::unknown(),
1254            )
1255            .await
1256            .unwrap();
1257        let file_reader = FileReader::try_open(
1258            file_scheduler,
1259            None,
1260            Arc::<DecoderPlugins>::default(),
1261            &LanceCache::no_cache(),
1262            FileReaderOptions::default(),
1263        )
1264        .await
1265        .unwrap();
1266
1267        let column_metadatas = &file_reader.metadata().column_metadatas;
1268
1269        // The text column (index 1) should use zstd compression based on metadata
1270        let text_encoding = describe_encoding(&column_metadatas[1].pages[0]);
1271        // For string columns, we expect Binary encoding with zstd compression
1272        assert!(
1273            text_encoding.contains("Zstd"),
1274            "text column should use zstd compression from field metadata, but got: {}",
1275            text_encoding
1276        );
1277
1278        // The data column (index 2) should use no compression based on metadata
1279        let data_encoding = describe_encoding(&column_metadatas[2].pages[0]);
1280        // For Int32 columns with "none" compression, we expect Flat encoding without compression
1281        assert!(
1282            data_encoding.contains("Flat") && data_encoding.contains("compression: None"),
1283            "data column should use no compression from field metadata, but got: {}",
1284            data_encoding
1285        );
1286    }
1287
1288    #[tokio::test]
1289    async fn test_field_metadata_rle_threshold() {
1290        // Test that RLE threshold from field metadata is respected
1291        let mut metadata = HashMap::new();
1292        metadata.insert(
1293            lance_encoding::constants::RLE_THRESHOLD_META_KEY.to_string(),
1294            "0.9".to_string(),
1295        );
1296        // Also set compression to ensure RLE is used
1297        metadata.insert(
1298            lance_encoding::constants::COMPRESSION_META_KEY.to_string(),
1299            "lz4".to_string(),
1300        );
1301        // Explicitly disable BSS to ensure RLE is tested
1302        metadata.insert(
1303            lance_encoding::constants::BSS_META_KEY.to_string(),
1304            "off".to_string(),
1305        );
1306
1307        let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
1308            "status",
1309            DataType::Int32,
1310            false,
1311        )
1312        .with_metadata(metadata)]));
1313
1314        let lance_schema = LanceSchema::try_from(arrow_schema.as_ref()).unwrap();
1315
1316        // Create data with very high repetition (3 runs for 10000 values = 0.0003 ratio)
1317        let status_array = Int32Array::from_iter_values(
1318            std::iter::repeat_n(200, 8000)
1319                .chain(std::iter::repeat_n(404, 1500))
1320                .chain(std::iter::repeat_n(500, 500)),
1321        );
1322
1323        let batch =
1324            RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(status_array)]).unwrap();
1325
1326        let tmp_dir = tempdir().unwrap();
1327        let path = tmp_dir.path().join("rle_threshold_test.lance");
1328        let object_store = ObjectStore::local();
1329
1330        // Create encoding strategy that will read from field metadata
1331        let params = CompressionParams::new();
1332        let encoding_strategy = lance_encoding::encoder::default_encoding_strategy_with_params(
1333            LanceFileVersion::V2_1,
1334            params,
1335        )
1336        .unwrap();
1337
1338        let options = FileWriterOptions {
1339            encoding_strategy: Some(Arc::from(encoding_strategy)),
1340            format_version: Some(LanceFileVersion::V2_1),
1341            ..Default::default()
1342        };
1343        let mut writer = FileWriter::try_new(
1344            object_store
1345                .create(&Path::from(path.to_str().unwrap()))
1346                .await
1347                .unwrap(),
1348            lance_schema.clone(),
1349            options,
1350        )
1351        .unwrap();
1352
1353        writer.write_batch(&batch).await.unwrap();
1354        writer.finish().await.unwrap();
1355
1356        // Read back and check encoding
1357        let fs = FsFixture::default();
1358        let file_scheduler = fs
1359            .scheduler
1360            .open_file(
1361                &Path::from(path.to_str().unwrap()),
1362                &CachedFileSize::unknown(),
1363            )
1364            .await
1365            .unwrap();
1366        let file_reader = FileReader::try_open(
1367            file_scheduler,
1368            None,
1369            Arc::<DecoderPlugins>::default(),
1370            &LanceCache::no_cache(),
1371            FileReaderOptions::default(),
1372        )
1373        .await
1374        .unwrap();
1375
1376        let column_metadatas = &file_reader.metadata().column_metadatas;
1377        let status_encoding = describe_encoding(&column_metadatas[0].pages[0]);
1378        assert!(
1379            status_encoding.contains("RLE") || status_encoding.contains("Rle"),
1380            "status column should use RLE encoding due to metadata threshold, but got: {}",
1381            status_encoding
1382        );
1383    }
1384}