Skip to main content

lance_file/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use core::panic;
5use std::collections::HashMap;
6use std::sync::Arc;
7use std::sync::atomic::AtomicBool;
8
9use arrow_array::RecordBatch;
10
11use arrow_data::ArrayData;
12use bytes::{Buf, BufMut, Bytes, BytesMut};
13use futures::StreamExt;
14use futures::stream::FuturesOrdered;
15use lance_core::datatypes::{Field, Schema as LanceSchema};
16use lance_core::utils::bit::pad_bytes;
17use lance_core::{Error, Result};
18use lance_encoding::decoder::PageEncoding;
19use lance_encoding::encoder::{
20    BatchEncoder, EncodeTask, EncodedBatch, EncodedPage, EncodingOptions, FieldEncoder,
21    FieldEncodingStrategy, OutOfLineBuffers, default_encoding_strategy,
22};
23use lance_encoding::repdef::RepDefBuilder;
24use lance_encoding::version::LanceFileVersion;
25use lance_io::object_store::ObjectStore;
26use lance_io::traits::Writer;
27use log::{debug, warn};
28use object_store::path::Path;
29use prost::Message;
30use prost_types::Any;
31use tokio::io::AsyncWrite;
32use tokio::io::AsyncWriteExt;
33use tracing::instrument;
34
35use crate::datatypes::FieldsWithMeta;
36use crate::format::MAGIC;
37use crate::format::pb;
38use crate::format::pbfile;
39use crate::format::pbfile::DirectEncoding;
40
41/// Pages buffers are aligned to 64 bytes
42pub(crate) const PAGE_BUFFER_ALIGNMENT: usize = 64;
43const PAD_BUFFER: [u8; PAGE_BUFFER_ALIGNMENT] = [72; PAGE_BUFFER_ALIGNMENT];
44// In 2.1+, we split large pages on read instead of write to avoid empty pages
45// and small pages issues. However, we keep the write-time limit at 32MB to avoid
46// potential regressions in 2.0 format readers.
47//
48// This limit is not applied in the 2.1 writer
49const MAX_PAGE_BYTES: usize = 32 * 1024 * 1024;
50const ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES: &str = "LANCE_FILE_WRITER_MAX_PAGE_BYTES";
51
52#[derive(Debug, Clone, Default)]
53pub struct FileWriterOptions {
54    /// How many bytes to use for buffering column data
55    ///
56    /// When data comes in small batches the writer will buffer column data so that
57    /// larger pages can be created.  This value will be divided evenly across all of the
58    /// columns.  Generally you want this to be at least large enough to match your
59    /// filesystem's ideal read size per column.
60    ///
61    /// In some cases you might want this value to be even larger if you have highly
62    /// compressible data.  However, if this is too large, then the writer could require
63    /// a lot of memory and write performance may suffer if the CPU-expensive encoding
64    /// falls behind and can't be interleaved with the I/O expensive flushing.
65    ///
66    /// The default will use 8MiB per column which should be reasonable for most cases.
67    // TODO: Do we need to be able to set this on a per-column basis?
68    pub data_cache_bytes: Option<u64>,
69    /// A hint to indicate the max size of a page
70    ///
71    /// This hint can't always be respected.  A single value could be larger than this value
72    /// and we never slice single values.  In addition, there are some cases where it can be
73    /// difficult to know size up-front and so we might not be able to respect this value.
74    pub max_page_bytes: Option<u64>,
75    /// The file writer buffers columns until enough data has arrived to flush a page
76    /// to disk.
77    ///
78    /// Some columns with small data types may not flush very often.  These arrays can
79    /// stick around for a long time.  These arrays might also be keeping larger data
80    /// structures alive.  By default, the writer will make a deep copy of this array
81    /// to avoid any potential memory leaks.  However, this can be disabled for a
82    /// (probably minor) performance boost if you are sure that arrays are not keeping
83    /// any sibling structures alive (this typically means the array was allocated in
84    /// the same language / runtime as the writer)
85    ///
86    /// Do not enable this if your data is arriving from the C data interface.
87    /// Data typically arrives one "batch" at a time (encoded in the C data interface
88    /// as a struct array).  Each array in that batch keeps the entire batch alive.
89    /// This means a small boolean array (which we will buffer in memory for quite a
90    /// while) might keep a much larger record batch around in memory (even though most
91    /// of that batch's data has been written to disk)
92    pub keep_original_array: Option<bool>,
93    pub encoding_strategy: Option<Arc<dyn FieldEncodingStrategy>>,
94    /// The format version to use when writing the file
95    ///
96    /// This controls which encodings will be used when encoding the data.  Newer
97    /// versions may have more efficient encodings.  However, newer format versions will
98    /// require more up-to-date readers to read the data.
99    pub format_version: Option<LanceFileVersion>,
100}
101
102// Total in-memory budget for buffering serialized page metadata before flushing
103// to the spill file. Divided evenly across columns (with a floor of 64 bytes).
104const DEFAULT_SPILL_BUFFER_LIMIT: usize = 256 * 1024;
105
106/// Spills serialized page metadata to a temporary file to bound memory usage.
107///
108/// The spill file is an unstructured sequence of "chunks". Each chunk is a
109/// contiguous run of length-delimited protobuf `Page` messages belonging to a
110/// single column. Chunks from different columns are interleaved in the order
111/// they are flushed (i.e. whenever a column's in-memory buffer exceeds
112/// `per_column_limit`). The `column_chunks` index records the (offset, length)
113/// of every chunk so each column's pages can be read back and reassembled in
114/// order.
115struct PageMetadataSpill {
116    writer: Box<dyn Writer>,
117    object_store: Arc<ObjectStore>,
118    path: Path,
119    /// Current write position in the spill file.
120    position: u64,
121    /// Per-column buffer of serialized (length-delimited protobuf) page metadata
122    /// that has not yet been flushed to the spill file.
123    column_buffers: Vec<Vec<u8>>,
124    /// Per-column list of chunks that have been flushed to the spill file.
125    /// Each entry is (offset, length) pointing into the spill file.
126    column_chunks: Vec<Vec<(u64, u32)>>,
127    /// Maximum bytes to buffer per column before flushing to the spill file.
128    per_column_limit: usize,
129}
130
131impl PageMetadataSpill {
132    async fn new(object_store: Arc<ObjectStore>, path: Path, num_columns: usize) -> Result<Self> {
133        let writer = object_store.create(&path).await?;
134        let per_column_limit = (DEFAULT_SPILL_BUFFER_LIMIT / num_columns.max(1)).max(64);
135        Ok(Self {
136            writer,
137            object_store,
138            path,
139            position: 0,
140            column_buffers: vec![Vec::new(); num_columns],
141            column_chunks: vec![Vec::new(); num_columns],
142            per_column_limit,
143        })
144    }
145
146    async fn append_page(
147        &mut self,
148        column_idx: usize,
149        page: &pbfile::column_metadata::Page,
150    ) -> Result<()> {
151        page.encode_length_delimited(&mut self.column_buffers[column_idx])
152            .map_err(|e| {
153                Error::io_source(Box::new(std::io::Error::new(
154                    std::io::ErrorKind::InvalidData,
155                    e,
156                )))
157            })?;
158        if self.column_buffers[column_idx].len() >= self.per_column_limit {
159            self.flush_column(column_idx).await?;
160        }
161        Ok(())
162    }
163
164    async fn flush_column(&mut self, column_idx: usize) -> Result<()> {
165        let buf = &self.column_buffers[column_idx];
166        if buf.is_empty() {
167            return Ok(());
168        }
169        let len = buf.len();
170        self.writer.write_all(buf).await?;
171        self.column_chunks[column_idx].push((self.position, len as u32));
172        self.position += len as u64;
173        self.column_buffers[column_idx].clear();
174        Ok(())
175    }
176
177    async fn shutdown_writer(&mut self) -> Result<()> {
178        for col_idx in 0..self.column_buffers.len() {
179            self.flush_column(col_idx).await?;
180        }
181        Writer::shutdown(self.writer.as_mut()).await?;
182        Ok(())
183    }
184}
185
186fn decode_spilled_chunk(data: &Bytes) -> Result<Vec<pbfile::column_metadata::Page>> {
187    let mut pages = Vec::new();
188    let mut cursor = data.clone();
189    while cursor.has_remaining() {
190        let page =
191            pbfile::column_metadata::Page::decode_length_delimited(&mut cursor).map_err(|e| {
192                Error::io_source(Box::new(std::io::Error::new(
193                    std::io::ErrorKind::InvalidData,
194                    e,
195                )))
196            })?;
197        pages.push(page);
198    }
199    Ok(pages)
200}
201
202enum PageSpillState {
203    Pending(Arc<ObjectStore>, Path),
204    Active(PageMetadataSpill),
205}
206
207pub struct FileWriter {
208    writer: Box<dyn Writer>,
209    schema: Option<LanceSchema>,
210    column_writers: Vec<Box<dyn FieldEncoder>>,
211    column_metadata: Vec<pbfile::ColumnMetadata>,
212    field_id_to_column_indices: Vec<(u32, u32)>,
213    num_columns: u32,
214    rows_written: u64,
215    global_buffers: Vec<(u64, u64)>,
216    schema_metadata: HashMap<String, String>,
217    options: FileWriterOptions,
218    page_spill: Option<PageSpillState>,
219}
220
221fn initial_column_metadata() -> pbfile::ColumnMetadata {
222    pbfile::ColumnMetadata {
223        pages: Vec::new(),
224        buffer_offsets: Vec::new(),
225        buffer_sizes: Vec::new(),
226        encoding: None,
227    }
228}
229
230static WARNED_ON_UNSTABLE_API: AtomicBool = AtomicBool::new(false);
231
232impl FileWriter {
233    /// Create a new FileWriter with a desired output schema
234    pub fn try_new(
235        object_writer: Box<dyn Writer>,
236        schema: LanceSchema,
237        options: FileWriterOptions,
238    ) -> Result<Self> {
239        let mut writer = Self::new_lazy(object_writer, options);
240        writer.initialize(schema)?;
241        Ok(writer)
242    }
243
244    /// Create a new FileWriter without a desired output schema
245    ///
246    /// The output schema will be set based on the first batch of data to arrive.
247    /// If no data arrives and the writer is finished then the write will fail.
248    pub fn new_lazy(object_writer: Box<dyn Writer>, options: FileWriterOptions) -> Self {
249        if let Some(format_version) = options.format_version
250            && format_version.is_unstable()
251            && WARNED_ON_UNSTABLE_API
252                .compare_exchange(
253                    false,
254                    true,
255                    std::sync::atomic::Ordering::Relaxed,
256                    std::sync::atomic::Ordering::Relaxed,
257                )
258                .is_ok()
259        {
260            warn!(
261                "You have requested an unstable format version.  Files written with this format version may not be readable in the future!  This is a development feature and should only be used for experimentation and never for production data."
262            );
263        }
264        Self {
265            writer: object_writer,
266            schema: None,
267            column_writers: Vec::new(),
268            column_metadata: Vec::new(),
269            num_columns: 0,
270            rows_written: 0,
271            field_id_to_column_indices: Vec::new(),
272            global_buffers: Vec::new(),
273            schema_metadata: HashMap::new(),
274            page_spill: None,
275            options,
276        }
277    }
278
279    /// Spill page metadata to a sidecar file instead of accumulating in memory.
280    ///
281    /// This can dramatically reduce memory usage when many writers are open
282    /// concurrently (e.g. IVF shuffle with thousands of partition writers).
283    /// The sidecar file is created lazily on the first page write. The caller
284    /// is responsible for cleaning up `path` (e.g. by placing it in a temp
285    /// directory that is removed via RAII).
286    pub fn with_page_metadata_spill(mut self, object_store: Arc<ObjectStore>, path: Path) -> Self {
287        self.page_spill = Some(PageSpillState::Pending(object_store, path));
288        self
289    }
290
291    /// Write a series of record batches to a new file
292    ///
293    /// Returns the number of rows written
294    pub async fn create_file_with_batches(
295        store: &ObjectStore,
296        path: &Path,
297        schema: lance_core::datatypes::Schema,
298        batches: impl Iterator<Item = RecordBatch> + Send,
299        options: FileWriterOptions,
300    ) -> Result<usize> {
301        let writer = store.create(path).await?;
302        let mut writer = Self::try_new(writer, schema, options)?;
303        for batch in batches {
304            writer.write_batch(&batch).await?;
305        }
306        Ok(writer.finish().await? as usize)
307    }
308
309    async fn do_write_buffer(writer: &mut (impl AsyncWrite + Unpin), buf: &[u8]) -> Result<()> {
310        writer.write_all(buf).await?;
311        let pad_bytes = pad_bytes::<PAGE_BUFFER_ALIGNMENT>(buf.len());
312        writer.write_all(&PAD_BUFFER[..pad_bytes]).await?;
313        Ok(())
314    }
315
316    /// Returns the format version that will be used when writing the file
317    pub fn version(&self) -> LanceFileVersion {
318        self.options.format_version.unwrap_or_default()
319    }
320
321    async fn write_page(&mut self, encoded_page: EncodedPage) -> Result<()> {
322        let buffers = encoded_page.data;
323        let mut buffer_offsets = Vec::with_capacity(buffers.len());
324        let mut buffer_sizes = Vec::with_capacity(buffers.len());
325        for buffer in buffers {
326            buffer_offsets.push(self.writer.tell().await? as u64);
327            buffer_sizes.push(buffer.len() as u64);
328            Self::do_write_buffer(&mut self.writer, &buffer).await?;
329        }
330        let encoded_encoding = match encoded_page.description {
331            PageEncoding::Legacy(array_encoding) => Any::from_msg(&array_encoding)?.encode_to_vec(),
332            PageEncoding::Structural(page_layout) => Any::from_msg(&page_layout)?.encode_to_vec(),
333        };
334        let page = pbfile::column_metadata::Page {
335            buffer_offsets,
336            buffer_sizes,
337            encoding: Some(pbfile::Encoding {
338                location: Some(pbfile::encoding::Location::Direct(DirectEncoding {
339                    encoding: encoded_encoding,
340                })),
341            }),
342            length: encoded_page.num_rows,
343            priority: encoded_page.row_number,
344        };
345        let col_idx = encoded_page.column_idx as usize;
346        if matches!(&self.page_spill, Some(PageSpillState::Pending(..))) {
347            let Some(PageSpillState::Pending(store, path)) = self.page_spill.take() else {
348                unreachable!()
349            };
350            self.page_spill = Some(PageSpillState::Active(
351                PageMetadataSpill::new(store, path, self.num_columns as usize).await?,
352            ));
353        }
354        match &mut self.page_spill {
355            Some(PageSpillState::Active(spill)) => spill.append_page(col_idx, &page).await?,
356            None => self.column_metadata[col_idx].pages.push(page),
357            Some(PageSpillState::Pending(..)) => unreachable!(),
358        }
359        Ok(())
360    }
361
362    #[instrument(skip_all, level = "debug")]
363    async fn write_pages(&mut self, mut encoding_tasks: FuturesOrdered<EncodeTask>) -> Result<()> {
364        // As soon as an encoding task is done we write it.  There is no parallelism
365        // needed here because "writing" is really just submitting the buffer to the
366        // underlying write scheduler (either the OS or object_store's scheduler for
367        // cloud writes).  The only time we might truly await on write_page is if the
368        // scheduler's write queue is full.
369        //
370        // Also, there is no point in trying to make write_page parallel anyways
371        // because we wouldn't want buffers getting mixed up across pages.
372        while let Some(encoding_task) = encoding_tasks.next().await {
373            let encoded_page = encoding_task?;
374            self.write_page(encoded_page).await?;
375        }
376        // It's important to flush here, we don't know when the next batch will arrive
377        // and the underlying cloud store could have writes in progress that won't advance
378        // until we interact with the writer again.  These in-progress writes will time out
379        // if we don't flush.
380        self.writer.flush().await?;
381        Ok(())
382    }
383
384    /// Schedule batches of data to be written to the file
385    pub async fn write_batches(
386        &mut self,
387        batches: impl Iterator<Item = &RecordBatch>,
388    ) -> Result<()> {
389        for batch in batches {
390            self.write_batch(batch).await?;
391        }
392        Ok(())
393    }
394
395    fn verify_field_nullability(arr: &ArrayData, field: &Field) -> Result<()> {
396        if !field.nullable && arr.null_count() > 0 {
397            return Err(Error::invalid_input(format!(
398                "The field `{}` contained null values even though the field is marked non-null in the schema",
399                field.name
400            )));
401        }
402
403        for (child_field, child_arr) in field.children.iter().zip(arr.child_data()) {
404            Self::verify_field_nullability(child_arr, child_field)?;
405        }
406
407        Ok(())
408    }
409
410    fn verify_nullability_constraints(&self, batch: &RecordBatch) -> Result<()> {
411        for (col, field) in batch
412            .columns()
413            .iter()
414            .zip(self.schema.as_ref().unwrap().fields.iter())
415        {
416            Self::verify_field_nullability(&col.to_data(), field)?;
417        }
418        Ok(())
419    }
420
421    fn initialize(&mut self, mut schema: LanceSchema) -> Result<()> {
422        let cache_bytes_per_column = if let Some(data_cache_bytes) = self.options.data_cache_bytes {
423            data_cache_bytes / schema.fields.len() as u64
424        } else {
425            8 * 1024 * 1024
426        };
427
428        let max_page_bytes = self.options.max_page_bytes.unwrap_or_else(|| {
429            std::env::var(ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES)
430                .map(|s| {
431                    s.parse::<u64>().unwrap_or_else(|e| {
432                        warn!(
433                            "Failed to parse {}: {}, using default",
434                            ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, e
435                        );
436                        MAX_PAGE_BYTES as u64
437                    })
438                })
439                .unwrap_or(MAX_PAGE_BYTES as u64)
440        });
441
442        schema.validate()?;
443
444        let keep_original_array = self.options.keep_original_array.unwrap_or(false);
445        let encoding_strategy = self.options.encoding_strategy.clone().unwrap_or_else(|| {
446            let version = self.version();
447            default_encoding_strategy(version).into()
448        });
449
450        let encoding_options = EncodingOptions {
451            cache_bytes_per_column,
452            max_page_bytes,
453            keep_original_array,
454            buffer_alignment: PAGE_BUFFER_ALIGNMENT as u64,
455            version: self.version(),
456        };
457        let encoder =
458            BatchEncoder::try_new(&schema, encoding_strategy.as_ref(), &encoding_options)?;
459        self.num_columns = encoder.num_columns();
460
461        self.column_writers = encoder.field_encoders;
462        self.column_metadata = vec![initial_column_metadata(); self.num_columns as usize];
463        self.field_id_to_column_indices = encoder.field_id_to_column_index;
464        self.schema_metadata
465            .extend(std::mem::take(&mut schema.metadata));
466        self.schema = Some(schema);
467        Ok(())
468    }
469
470    fn ensure_initialized(&mut self, batch: &RecordBatch) -> Result<&LanceSchema> {
471        if self.schema.is_none() {
472            let schema = LanceSchema::try_from(batch.schema().as_ref())?;
473            self.initialize(schema)?;
474        }
475        Ok(self.schema.as_ref().unwrap())
476    }
477
478    #[instrument(skip_all, level = "debug")]
479    fn encode_batch(
480        &mut self,
481        batch: &RecordBatch,
482        external_buffers: &mut OutOfLineBuffers,
483    ) -> Result<Vec<Vec<EncodeTask>>> {
484        self.schema
485            .as_ref()
486            .unwrap()
487            .fields
488            .iter()
489            .zip(self.column_writers.iter_mut())
490            .map(|(field, column_writer)| {
491                let array =
492                    batch
493                        .column_by_name(&field.name)
494                        .ok_or(Error::invalid_input_source(
495                            format!(
496                                "Cannot write batch.  The batch was missing the column `{}`",
497                                field.name
498                            )
499                            .into(),
500                        ))?;
501                let repdef = RepDefBuilder::default();
502                let num_rows = array.len() as u64;
503                column_writer.maybe_encode(
504                    array.clone(),
505                    external_buffers,
506                    repdef,
507                    self.rows_written,
508                    num_rows,
509                )
510            })
511            .collect::<Result<Vec<_>>>()
512    }
513
514    /// Schedule a batch of data to be written to the file
515    ///
516    /// Note: the future returned by this method may complete before the data has been fully
517    /// flushed to the file (some data may be in the data cache or the I/O cache)
518    pub async fn write_batch(&mut self, batch: &RecordBatch) -> Result<()> {
519        debug!(
520            "write_batch called with {} rows, {} columns, and {} bytes of data",
521            batch.num_rows(),
522            batch.num_columns(),
523            batch.get_array_memory_size()
524        );
525        self.ensure_initialized(batch)?;
526        self.verify_nullability_constraints(batch)?;
527        let num_rows = batch.num_rows() as u64;
528        if num_rows == 0 {
529            return Ok(());
530        }
531        if num_rows > u32::MAX as u64 {
532            return Err(Error::invalid_input_source(
533                "cannot write Lance files with more than 2^32 rows".into(),
534            ));
535        }
536        // First we push each array into its column writer.  This may or may not generate enough
537        // data to trigger an encoding task.  We collect any encoding tasks into a queue.
538        let mut external_buffers =
539            OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
540        let encoding_tasks = self.encode_batch(batch, &mut external_buffers)?;
541        // Next, write external buffers
542        for external_buffer in external_buffers.take_buffers() {
543            Self::do_write_buffer(&mut self.writer, &external_buffer).await?;
544        }
545
546        let encoding_tasks = encoding_tasks
547            .into_iter()
548            .flatten()
549            .collect::<FuturesOrdered<_>>();
550
551        self.rows_written = match self.rows_written.checked_add(batch.num_rows() as u64) {
552            Some(rows_written) => rows_written,
553            None => {
554                return Err(Error::invalid_input_source(format!("cannot write batch with {} rows because {} rows have already been written and Lance files cannot contain more than 2^64 rows", num_rows, self.rows_written).into()));
555            }
556        };
557
558        self.write_pages(encoding_tasks).await?;
559
560        Ok(())
561    }
562
563    async fn write_column_metadata(
564        &mut self,
565        metadata: pbfile::ColumnMetadata,
566    ) -> Result<(u64, u64)> {
567        let metadata_bytes = metadata.encode_to_vec();
568        let position = self.writer.tell().await? as u64;
569        let len = metadata_bytes.len() as u64;
570        self.writer.write_all(&metadata_bytes).await?;
571        Ok((position, len))
572    }
573
574    async fn write_column_metadatas(&mut self) -> Result<Vec<(u64, u64)>> {
575        let metadatas = std::mem::take(&mut self.column_metadata);
576
577        // If spilling, finalize the spill writer and reopen for reading.
578        // The spill file itself is cleaned up by the caller (it lives in a
579        // temp directory managed by the caller's RAII guard).
580        let spill_state = self.page_spill.take();
581        let (spill_chunks, spill_reader) =
582            if let Some(PageSpillState::Active(mut spill)) = spill_state {
583                spill.shutdown_writer().await?;
584                let reader = spill.object_store.open(&spill.path).await?;
585                let chunks = std::mem::take(&mut spill.column_chunks);
586                (chunks, Some(reader))
587            } else {
588                (Vec::new(), None)
589            };
590
591        let mut metadata_positions = Vec::with_capacity(metadatas.len());
592        for (col_idx, mut metadata) in metadatas.into_iter().enumerate() {
593            if let Some(reader) = &spill_reader {
594                let mut pages = Vec::new();
595                for &(offset, len) in &spill_chunks[col_idx] {
596                    let data = reader
597                        .get_range(offset as usize..(offset as usize + len as usize))
598                        .await
599                        .map_err(|e| Error::io_source(Box::new(e)))?;
600                    pages.extend(decode_spilled_chunk(&data)?);
601                }
602                metadata.pages = pages;
603            }
604            metadata_positions.push(self.write_column_metadata(metadata).await?);
605        }
606
607        Ok(metadata_positions)
608    }
609
610    fn make_file_descriptor(
611        schema: &lance_core::datatypes::Schema,
612        num_rows: u64,
613    ) -> Result<pb::FileDescriptor> {
614        let fields_with_meta = FieldsWithMeta::from(schema);
615        Ok(pb::FileDescriptor {
616            schema: Some(pb::Schema {
617                fields: fields_with_meta.fields.0,
618                metadata: fields_with_meta.metadata,
619            }),
620            length: num_rows,
621        })
622    }
623
624    async fn write_global_buffers(&mut self) -> Result<Vec<(u64, u64)>> {
625        let schema = self.schema.as_mut().ok_or(Error::invalid_input("No schema provided on writer open and no data provided.  Schema is unknown and file cannot be created"))?;
626        schema.metadata = std::mem::take(&mut self.schema_metadata);
627        // Use descriptor layout for blob v2 in the footer to avoid exposing logical child fields.
628        //
629        // TODO(xuanwo): this doesn't work on nested struct, need better solution like fields_per_order_mut?
630        schema.fields.iter_mut().for_each(|f| {
631            if f.is_blob_v2() {
632                f.unloaded_mut();
633            }
634        });
635
636        let file_descriptor = Self::make_file_descriptor(schema, self.rows_written)?;
637        let file_descriptor_bytes = file_descriptor.encode_to_vec();
638        let file_descriptor_len = file_descriptor_bytes.len() as u64;
639        let file_descriptor_position = self.writer.tell().await? as u64;
640        self.writer.write_all(&file_descriptor_bytes).await?;
641        let mut gbo_table = Vec::with_capacity(1 + self.global_buffers.len());
642        gbo_table.push((file_descriptor_position, file_descriptor_len));
643        gbo_table.append(&mut self.global_buffers);
644        Ok(gbo_table)
645    }
646
647    /// Add a metadata entry to the schema
648    ///
649    /// This method is useful because sometimes the metadata is not known until after the
650    /// data has been written.  This method allows you to alter the schema metadata.  It
651    /// must be called before `finish` is called.
652    pub fn add_schema_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
653        self.schema_metadata.insert(key.into(), value.into());
654    }
655
656    /// Prepare the writer when column data and metadata were produced externally.
657    ///
658    /// This is useful for flows that copy already-encoded pages (e.g., binary copy
659    /// during compaction) where the column buffers have been written directly and we
660    /// only need to write the footer and schema metadata. The provided
661    /// `column_metadata` must describe the buffers already persisted by the
662    /// underlying `ObjectWriter`, and `rows_written` should reflect the total number
663    /// of rows in those buffers.
664    pub fn initialize_with_external_metadata(
665        &mut self,
666        schema: lance_core::datatypes::Schema,
667        column_metadata: Vec<pbfile::ColumnMetadata>,
668        rows_written: u64,
669    ) {
670        self.schema = Some(schema);
671        self.num_columns = column_metadata.len() as u32;
672        self.column_metadata = column_metadata;
673        self.rows_written = rows_written;
674    }
675
676    /// Adds a global buffer to the file
677    ///
678    /// The global buffer can contain any arbitrary bytes.  It will be written to the disk
679    /// immediately.  This method returns the index of the global buffer (this will always
680    /// start at 1 and increment by 1 each time this method is called)
681    pub async fn add_global_buffer(&mut self, buffer: Bytes) -> Result<u32> {
682        let position = self.writer.tell().await? as u64;
683        let len = buffer.len() as u64;
684        Self::do_write_buffer(&mut self.writer, &buffer).await?;
685        self.global_buffers.push((position, len));
686        Ok(self.global_buffers.len() as u32)
687    }
688
689    async fn finish_writers(&mut self) -> Result<()> {
690        let mut col_idx = 0;
691        for mut writer in std::mem::take(&mut self.column_writers) {
692            let mut external_buffers =
693                OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
694            let columns = writer.finish(&mut external_buffers).await?;
695            for buffer in external_buffers.take_buffers() {
696                self.writer.write_all(&buffer).await?;
697            }
698            debug_assert_eq!(
699                columns.len(),
700                writer.num_columns() as usize,
701                "Expected {} columns from column at index {} and got {}",
702                writer.num_columns(),
703                col_idx,
704                columns.len()
705            );
706            for column in columns {
707                for page in column.final_pages {
708                    self.write_page(page).await?;
709                }
710                let column_metadata = &mut self.column_metadata[col_idx];
711                let mut buffer_pos = self.writer.tell().await? as u64;
712                for buffer in column.column_buffers {
713                    column_metadata.buffer_offsets.push(buffer_pos);
714                    let mut size = 0;
715                    Self::do_write_buffer(&mut self.writer, &buffer).await?;
716                    size += buffer.len() as u64;
717                    buffer_pos += size;
718                    column_metadata.buffer_sizes.push(size);
719                }
720                let encoded_encoding = Any::from_msg(&column.encoding)?.encode_to_vec();
721                column_metadata.encoding = Some(pbfile::Encoding {
722                    location: Some(pbfile::encoding::Location::Direct(pbfile::DirectEncoding {
723                        encoding: encoded_encoding,
724                    })),
725                });
726                col_idx += 1;
727            }
728        }
729        if col_idx != self.column_metadata.len() {
730            panic!(
731                "Column writers finished with {} columns but we expected {}",
732                col_idx,
733                self.column_metadata.len()
734            );
735        }
736        Ok(())
737    }
738
739    /// Converts self.version (which is a mix of "software version" and
740    /// "format version" into a format version)
741    fn version_to_numbers(&self) -> (u16, u16) {
742        let version = self.options.format_version.unwrap_or_default();
743        match version.resolve() {
744            LanceFileVersion::V2_0 => (0, 3),
745            LanceFileVersion::V2_1 => (2, 1),
746            LanceFileVersion::V2_2 => (2, 2),
747            LanceFileVersion::V2_3 => (2, 3),
748            _ => panic!("Unsupported version: {}", version),
749        }
750    }
751
752    /// Finishes writing the file
753    ///
754    /// This method will wait until all data has been flushed to the file.  Then it
755    /// will write the file metadata and the footer.  It will not return until all
756    /// data has been flushed and the file has been closed.
757    ///
758    /// Returns the total number of rows written
759    pub async fn finish(&mut self) -> Result<u64> {
760        // 1. flush any remaining data and write out those pages
761        let mut external_buffers =
762            OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
763        let encoding_tasks = self
764            .column_writers
765            .iter_mut()
766            .map(|writer| writer.flush(&mut external_buffers))
767            .collect::<Result<Vec<_>>>()?;
768        for external_buffer in external_buffers.take_buffers() {
769            Self::do_write_buffer(&mut self.writer, &external_buffer).await?;
770        }
771        let encoding_tasks = encoding_tasks
772            .into_iter()
773            .flatten()
774            .collect::<FuturesOrdered<_>>();
775        self.write_pages(encoding_tasks).await?;
776
777        if !self.column_writers.is_empty() {
778            self.finish_writers().await?;
779        }
780
781        // 3. write global buffers (we write the schema here)
782        let global_buffer_offsets = self.write_global_buffers().await?;
783        let num_global_buffers = global_buffer_offsets.len() as u32;
784
785        // 4. write the column metadatas
786        let column_metadata_start = self.writer.tell().await? as u64;
787        let metadata_positions = self.write_column_metadatas().await?;
788
789        // 5. write the column metadata offset table
790        let cmo_table_start = self.writer.tell().await? as u64;
791        for (meta_pos, meta_len) in metadata_positions {
792            self.writer.write_u64_le(meta_pos).await?;
793            self.writer.write_u64_le(meta_len).await?;
794        }
795
796        // 6. write global buffers offset table
797        let gbo_table_start = self.writer.tell().await? as u64;
798        for (gbo_pos, gbo_len) in global_buffer_offsets {
799            self.writer.write_u64_le(gbo_pos).await?;
800            self.writer.write_u64_le(gbo_len).await?;
801        }
802
803        let (major, minor) = self.version_to_numbers();
804        // 7. write the footer
805        self.writer.write_u64_le(column_metadata_start).await?;
806        self.writer.write_u64_le(cmo_table_start).await?;
807        self.writer.write_u64_le(gbo_table_start).await?;
808        self.writer.write_u32_le(num_global_buffers).await?;
809        self.writer.write_u32_le(self.num_columns).await?;
810        self.writer.write_u16_le(major).await?;
811        self.writer.write_u16_le(minor).await?;
812        self.writer.write_all(MAGIC).await?;
813
814        // 7. close the writer
815        Writer::shutdown(self.writer.as_mut()).await?;
816
817        Ok(self.rows_written)
818    }
819
820    pub async fn abort(&mut self) {
821        // For multipart uploads, ObjectWriter's Drop impl will abort
822        // the upload when the writer is dropped.
823    }
824
825    pub async fn tell(&mut self) -> Result<u64> {
826        Ok(self.writer.tell().await? as u64)
827    }
828
829    pub fn field_id_to_column_indices(&self) -> &[(u32, u32)] {
830        &self.field_id_to_column_indices
831    }
832}
833
834/// Utility trait for converting EncodedBatch to Bytes using the
835/// lance file format
836pub trait EncodedBatchWriteExt {
837    /// Serializes into a lance file, including the schema
838    fn try_to_self_described_lance(&self, version: LanceFileVersion) -> Result<Bytes>;
839    /// Serializes into a lance file, without the schema.
840    ///
841    /// The schema must be provided to deserialize the buffer
842    fn try_to_mini_lance(&self, version: LanceFileVersion) -> Result<Bytes>;
843}
844
845// Creates a lance footer and appends it to the encoded data
846//
847// The logic here is very similar to logic in the FileWriter except we
848// are using BufMut (put_xyz) instead of AsyncWrite (write_xyz).
849fn concat_lance_footer(
850    batch: &EncodedBatch,
851    write_schema: bool,
852    version: LanceFileVersion,
853) -> Result<Bytes> {
854    // Estimating 1MiB for file footer
855    let mut data = BytesMut::with_capacity(batch.data.len() + 1024 * 1024);
856    data.put(batch.data.clone());
857    // write global buffers (we write the schema here)
858    let global_buffers = if write_schema {
859        let schema_start = data.len() as u64;
860        let lance_schema = lance_core::datatypes::Schema::try_from(batch.schema.as_ref())?;
861        let descriptor = FileWriter::make_file_descriptor(&lance_schema, batch.num_rows)?;
862        let descriptor_bytes = descriptor.encode_to_vec();
863        let descriptor_len = descriptor_bytes.len() as u64;
864        data.put(descriptor_bytes.as_slice());
865
866        vec![(schema_start, descriptor_len)]
867    } else {
868        vec![]
869    };
870    let col_metadata_start = data.len() as u64;
871
872    let mut col_metadata_positions = Vec::new();
873    // Write column metadata
874    for col in &batch.page_table {
875        let position = data.len() as u64;
876        let pages = col
877            .page_infos
878            .iter()
879            .map(|page_info| {
880                let encoded_encoding = match &page_info.encoding {
881                    PageEncoding::Legacy(array_encoding) => {
882                        Any::from_msg(array_encoding)?.encode_to_vec()
883                    }
884                    PageEncoding::Structural(page_layout) => {
885                        Any::from_msg(page_layout)?.encode_to_vec()
886                    }
887                };
888                let (buffer_offsets, buffer_sizes): (Vec<_>, Vec<_>) = page_info
889                    .buffer_offsets_and_sizes
890                    .as_ref()
891                    .iter()
892                    .cloned()
893                    .unzip();
894                Ok(pbfile::column_metadata::Page {
895                    buffer_offsets,
896                    buffer_sizes,
897                    encoding: Some(pbfile::Encoding {
898                        location: Some(pbfile::encoding::Location::Direct(DirectEncoding {
899                            encoding: encoded_encoding,
900                        })),
901                    }),
902                    length: page_info.num_rows,
903                    priority: page_info.priority,
904                })
905            })
906            .collect::<Result<Vec<_>>>()?;
907        let (buffer_offsets, buffer_sizes): (Vec<_>, Vec<_>) =
908            col.buffer_offsets_and_sizes.iter().cloned().unzip();
909        let encoded_col_encoding = Any::from_msg(&col.encoding)?.encode_to_vec();
910        let column = pbfile::ColumnMetadata {
911            pages,
912            buffer_offsets,
913            buffer_sizes,
914            encoding: Some(pbfile::Encoding {
915                location: Some(pbfile::encoding::Location::Direct(pbfile::DirectEncoding {
916                    encoding: encoded_col_encoding,
917                })),
918            }),
919        };
920        let column_bytes = column.encode_to_vec();
921        col_metadata_positions.push((position, column_bytes.len() as u64));
922        data.put(column_bytes.as_slice());
923    }
924    // Write column metadata offsets table
925    let cmo_table_start = data.len() as u64;
926    for (meta_pos, meta_len) in col_metadata_positions {
927        data.put_u64_le(meta_pos);
928        data.put_u64_le(meta_len);
929    }
930    // Write global buffers offsets table
931    let gbo_table_start = data.len() as u64;
932    let num_global_buffers = global_buffers.len() as u32;
933    for (gbo_pos, gbo_len) in global_buffers {
934        data.put_u64_le(gbo_pos);
935        data.put_u64_le(gbo_len);
936    }
937
938    let (major, minor) = version.to_numbers();
939
940    // write the footer
941    data.put_u64_le(col_metadata_start);
942    data.put_u64_le(cmo_table_start);
943    data.put_u64_le(gbo_table_start);
944    data.put_u32_le(num_global_buffers);
945    data.put_u32_le(batch.page_table.len() as u32);
946    data.put_u16_le(major as u16);
947    data.put_u16_le(minor as u16);
948    data.put(MAGIC.as_slice());
949
950    Ok(data.freeze())
951}
952
953impl EncodedBatchWriteExt for EncodedBatch {
954    fn try_to_self_described_lance(&self, version: LanceFileVersion) -> Result<Bytes> {
955        concat_lance_footer(self, true, version)
956    }
957
958    fn try_to_mini_lance(&self, version: LanceFileVersion) -> Result<Bytes> {
959        concat_lance_footer(self, false, version)
960    }
961}
962
963#[cfg(test)]
964mod tests {
965    use std::collections::HashMap;
966    use std::sync::Arc;
967
968    use crate::reader::{FileReader, FileReaderOptions, describe_encoding};
969    use crate::testing::FsFixture;
970    use crate::writer::{ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, FileWriter, FileWriterOptions};
971    use arrow_array::builder::{Float32Builder, Int32Builder};
972    use arrow_array::{Int32Array, RecordBatch, UInt64Array};
973    use arrow_array::{RecordBatchReader, StringArray, types::Float64Type};
974    use arrow_schema::{DataType, Field, Field as ArrowField, Schema, Schema as ArrowSchema};
975    use lance_core::cache::LanceCache;
976    use lance_core::datatypes::Schema as LanceSchema;
977    use lance_core::utils::tempfile::TempObjFile;
978    use lance_datagen::{BatchCount, RowCount, array, gen_batch};
979    use lance_encoding::compression_config::{CompressionFieldParams, CompressionParams};
980    use lance_encoding::decoder::DecoderPlugins;
981    use lance_encoding::version::LanceFileVersion;
982    use lance_io::object_store::ObjectStore;
983    use lance_io::utils::CachedFileSize;
984
985    #[tokio::test]
986    async fn test_basic_write() {
987        let tmp_path = TempObjFile::default();
988        let obj_store = Arc::new(ObjectStore::local());
989
990        let reader = gen_batch()
991            .col("score", array::rand::<Float64Type>())
992            .into_reader_rows(RowCount::from(1000), BatchCount::from(10));
993
994        let writer = obj_store.create(&tmp_path).await.unwrap();
995
996        let lance_schema =
997            lance_core::datatypes::Schema::try_from(reader.schema().as_ref()).unwrap();
998
999        let mut file_writer =
1000            FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
1001
1002        for batch in reader {
1003            file_writer.write_batch(&batch.unwrap()).await.unwrap();
1004        }
1005        file_writer.add_schema_metadata("foo", "bar");
1006        file_writer.finish().await.unwrap();
1007        // Tests asserting the contents of the written file are in reader.rs
1008    }
1009
1010    #[tokio::test]
1011    async fn test_write_empty() {
1012        let tmp_path = TempObjFile::default();
1013        let obj_store = Arc::new(ObjectStore::local());
1014
1015        let reader = gen_batch()
1016            .col("score", array::rand::<Float64Type>())
1017            .into_reader_rows(RowCount::from(0), BatchCount::from(0));
1018
1019        let writer = obj_store.create(&tmp_path).await.unwrap();
1020
1021        let lance_schema =
1022            lance_core::datatypes::Schema::try_from(reader.schema().as_ref()).unwrap();
1023
1024        let mut file_writer =
1025            FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
1026
1027        for batch in reader {
1028            file_writer.write_batch(&batch.unwrap()).await.unwrap();
1029        }
1030        file_writer.add_schema_metadata("foo", "bar");
1031        file_writer.finish().await.unwrap();
1032    }
1033
1034    #[tokio::test]
1035    async fn test_max_page_bytes_enforced() {
1036        let arrow_field = Field::new("data", DataType::UInt64, false);
1037        let arrow_schema = Schema::new(vec![arrow_field]);
1038        let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();
1039
1040        // 8MiB
1041        let data: Vec<u64> = (0..1_000_000).collect();
1042        let array = UInt64Array::from(data);
1043        let batch =
1044            RecordBatch::try_new(arrow_schema.clone().into(), vec![Arc::new(array)]).unwrap();
1045
1046        let options = FileWriterOptions {
1047            max_page_bytes: Some(1024 * 1024), // 1MB
1048            // This is a 2.0 only test because 2.1+ splits large pages on read instead of write
1049            format_version: Some(LanceFileVersion::V2_0),
1050            ..Default::default()
1051        };
1052
1053        let path = TempObjFile::default();
1054        let object_store = ObjectStore::local();
1055        let mut writer = FileWriter::try_new(
1056            object_store.create(&path).await.unwrap(),
1057            lance_schema,
1058            options,
1059        )
1060        .unwrap();
1061
1062        writer.write_batch(&batch).await.unwrap();
1063        writer.finish().await.unwrap();
1064
1065        let fs = FsFixture::default();
1066        let file_scheduler = fs
1067            .scheduler
1068            .open_file(&path, &CachedFileSize::unknown())
1069            .await
1070            .unwrap();
1071        let file_reader = FileReader::try_open(
1072            file_scheduler,
1073            None,
1074            Arc::<DecoderPlugins>::default(),
1075            &LanceCache::no_cache(),
1076            FileReaderOptions::default(),
1077        )
1078        .await
1079        .unwrap();
1080
1081        let column_meta = file_reader.metadata();
1082
1083        let mut total_page_num: u32 = 0;
1084        for (col_idx, col_metadata) in column_meta.column_metadatas.iter().enumerate() {
1085            assert!(
1086                !col_metadata.pages.is_empty(),
1087                "Column {} has no pages",
1088                col_idx
1089            );
1090
1091            for (page_idx, page) in col_metadata.pages.iter().enumerate() {
1092                total_page_num += 1;
1093                let total_size: u64 = page.buffer_sizes.iter().sum();
1094                assert!(
1095                    total_size <= 1024 * 1024,
1096                    "Column {} Page {} size {} exceeds 1MB limit",
1097                    col_idx,
1098                    page_idx,
1099                    total_size
1100                );
1101            }
1102        }
1103
1104        assert_eq!(total_page_num, 8)
1105    }
1106
1107    #[tokio::test(flavor = "current_thread")]
1108    async fn test_max_page_bytes_env_var() {
1109        let arrow_field = Field::new("data", DataType::UInt64, false);
1110        let arrow_schema = Schema::new(vec![arrow_field]);
1111        let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();
1112        // 4MiB
1113        let data: Vec<u64> = (0..500_000).collect();
1114        let array = UInt64Array::from(data);
1115        let batch =
1116            RecordBatch::try_new(arrow_schema.clone().into(), vec![Arc::new(array)]).unwrap();
1117
1118        // 2MiB
1119        unsafe {
1120            std::env::set_var(ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, "2097152");
1121        }
1122
1123        let options = FileWriterOptions {
1124            max_page_bytes: None, // enforce env
1125            ..Default::default()
1126        };
1127
1128        let path = TempObjFile::default();
1129        let object_store = ObjectStore::local();
1130        let mut writer = FileWriter::try_new(
1131            object_store.create(&path).await.unwrap(),
1132            lance_schema.clone(),
1133            options,
1134        )
1135        .unwrap();
1136
1137        writer.write_batch(&batch).await.unwrap();
1138        writer.finish().await.unwrap();
1139
1140        let fs = FsFixture::default();
1141        let file_scheduler = fs
1142            .scheduler
1143            .open_file(&path, &CachedFileSize::unknown())
1144            .await
1145            .unwrap();
1146        let file_reader = FileReader::try_open(
1147            file_scheduler,
1148            None,
1149            Arc::<DecoderPlugins>::default(),
1150            &LanceCache::no_cache(),
1151            FileReaderOptions::default(),
1152        )
1153        .await
1154        .unwrap();
1155
1156        for col_metadata in file_reader.metadata().column_metadatas.iter() {
1157            for page in col_metadata.pages.iter() {
1158                let total_size: u64 = page.buffer_sizes.iter().sum();
1159                assert!(
1160                    total_size <= 2 * 1024 * 1024,
1161                    "Page size {} exceeds 2MB limit",
1162                    total_size
1163                );
1164            }
1165        }
1166
1167        unsafe {
1168            std::env::set_var(ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, "");
1169        }
1170    }
1171
1172    #[tokio::test]
1173    async fn test_compression_overrides_end_to_end() {
1174        // Create test schema with different column types
1175        let arrow_schema = Arc::new(ArrowSchema::new(vec![
1176            ArrowField::new("customer_id", DataType::Int32, false),
1177            ArrowField::new("product_id", DataType::Int32, false),
1178            ArrowField::new("quantity", DataType::Int32, false),
1179            ArrowField::new("price", DataType::Float32, false),
1180            ArrowField::new("description", DataType::Utf8, false),
1181        ]));
1182
1183        let lance_schema = LanceSchema::try_from(arrow_schema.as_ref()).unwrap();
1184
1185        // Create test data with patterns suitable for different compression
1186        let mut customer_ids = Int32Builder::new();
1187        let mut product_ids = Int32Builder::new();
1188        let mut quantities = Int32Builder::new();
1189        let mut prices = Float32Builder::new();
1190        let mut descriptions = Vec::new();
1191
1192        // Generate data with specific patterns:
1193        // - customer_id: highly repetitive (good for RLE)
1194        // - product_id: moderately repetitive (good for RLE)
1195        // - quantity: random values (not good for RLE)
1196        // - price: some repetition
1197        // - description: long strings (good for Zstd)
1198        for i in 0..10000 {
1199            // Customer ID repeats every 100 rows (100 unique customers)
1200            // This creates runs of 100 identical values
1201            customer_ids.append_value(i / 100);
1202
1203            // Product ID has only 5 unique values with long runs
1204            product_ids.append_value(i / 2000);
1205
1206            // Quantity is mostly 1 with occasional other values
1207            quantities.append_value(if i % 10 == 0 { 5 } else { 1 });
1208
1209            // Price has only 3 unique values
1210            prices.append_value(match i % 3 {
1211                0 => 9.99,
1212                1 => 19.99,
1213                _ => 29.99,
1214            });
1215
1216            // Descriptions are repetitive but we'll keep them simple
1217            descriptions.push(format!("Product {}", i / 2000));
1218        }
1219
1220        let batch = RecordBatch::try_new(
1221            arrow_schema.clone(),
1222            vec![
1223                Arc::new(customer_ids.finish()),
1224                Arc::new(product_ids.finish()),
1225                Arc::new(quantities.finish()),
1226                Arc::new(prices.finish()),
1227                Arc::new(StringArray::from(descriptions)),
1228            ],
1229        )
1230        .unwrap();
1231
1232        // Configure compression parameters
1233        let mut params = CompressionParams::new();
1234
1235        // RLE for ID columns (ends with _id)
1236        params.columns.insert(
1237            "*_id".to_string(),
1238            CompressionFieldParams {
1239                rle_threshold: Some(0.5), // Lower threshold to trigger RLE more easily
1240                compression: None,        // Will use default compression if any
1241                compression_level: None,
1242                bss: Some(lance_encoding::compression_config::BssMode::Off), // Explicitly disable BSS to ensure RLE is used
1243                minichunk_size: None,
1244            },
1245        );
1246
1247        // For now, we'll skip Zstd compression since it's not imported
1248        // In a real implementation, you could add other compression types here
1249
1250        // Build encoding strategy with compression parameters
1251        let encoding_strategy = lance_encoding::encoder::default_encoding_strategy_with_params(
1252            LanceFileVersion::V2_1,
1253            params,
1254        )
1255        .unwrap();
1256
1257        // Configure file writer options
1258        let options = FileWriterOptions {
1259            encoding_strategy: Some(Arc::from(encoding_strategy)),
1260            format_version: Some(LanceFileVersion::V2_1),
1261            max_page_bytes: Some(64 * 1024), // 64KB pages
1262            ..Default::default()
1263        };
1264
1265        // Write the file
1266        let path = TempObjFile::default();
1267        let object_store = ObjectStore::local();
1268
1269        let mut writer = FileWriter::try_new(
1270            object_store.create(&path).await.unwrap(),
1271            lance_schema.clone(),
1272            options,
1273        )
1274        .unwrap();
1275
1276        writer.write_batch(&batch).await.unwrap();
1277        writer.add_schema_metadata("compression_test", "configured_compression");
1278        writer.finish().await.unwrap();
1279
1280        // Now write the same data without compression overrides for comparison
1281        let path_no_compression = TempObjFile::default();
1282        let default_options = FileWriterOptions {
1283            format_version: Some(LanceFileVersion::V2_1),
1284            max_page_bytes: Some(64 * 1024),
1285            ..Default::default()
1286        };
1287
1288        let mut writer_no_compression = FileWriter::try_new(
1289            object_store.create(&path_no_compression).await.unwrap(),
1290            lance_schema.clone(),
1291            default_options,
1292        )
1293        .unwrap();
1294
1295        writer_no_compression.write_batch(&batch).await.unwrap();
1296        writer_no_compression.finish().await.unwrap();
1297
1298        // Note: With our current data patterns and RLE compression, the compressed file
1299        // might actually be slightly larger due to compression metadata overhead.
1300        // This is expected and the test is mainly to verify the system works end-to-end.
1301
1302        // Read back the compressed file and verify data integrity
1303        let fs = FsFixture::default();
1304        let file_scheduler = fs
1305            .scheduler
1306            .open_file(&path, &CachedFileSize::unknown())
1307            .await
1308            .unwrap();
1309
1310        let file_reader = FileReader::try_open(
1311            file_scheduler,
1312            None,
1313            Arc::<DecoderPlugins>::default(),
1314            &LanceCache::no_cache(),
1315            FileReaderOptions::default(),
1316        )
1317        .await
1318        .unwrap();
1319
1320        // Verify metadata
1321        let metadata = file_reader.metadata();
1322        assert_eq!(metadata.major_version, 2);
1323        assert_eq!(metadata.minor_version, 1);
1324
1325        let schema = file_reader.schema();
1326        assert_eq!(
1327            schema.metadata.get("compression_test"),
1328            Some(&"configured_compression".to_string())
1329        );
1330
1331        // Verify the actual encodings used
1332        let column_metadatas = &metadata.column_metadatas;
1333
1334        // Check customer_id column (index 0) - should use RLE due to our configuration
1335        assert!(!column_metadatas[0].pages.is_empty());
1336        let customer_id_encoding = describe_encoding(&column_metadatas[0].pages[0]);
1337        assert!(
1338            customer_id_encoding.contains("RLE") || customer_id_encoding.contains("Rle"),
1339            "customer_id column should use RLE encoding due to '*_id' pattern match, but got: {}",
1340            customer_id_encoding
1341        );
1342
1343        // Check product_id column (index 1) - should use RLE due to our configuration
1344        assert!(!column_metadatas[1].pages.is_empty());
1345        let product_id_encoding = describe_encoding(&column_metadatas[1].pages[0]);
1346        assert!(
1347            product_id_encoding.contains("RLE") || product_id_encoding.contains("Rle"),
1348            "product_id column should use RLE encoding due to '*_id' pattern match, but got: {}",
1349            product_id_encoding
1350        );
1351    }
1352
1353    #[tokio::test]
1354    async fn test_field_metadata_compression() {
1355        // Test that field metadata compression settings are respected
1356        let mut metadata = HashMap::new();
1357        metadata.insert(
1358            lance_encoding::constants::COMPRESSION_META_KEY.to_string(),
1359            "zstd".to_string(),
1360        );
1361        metadata.insert(
1362            lance_encoding::constants::COMPRESSION_LEVEL_META_KEY.to_string(),
1363            "6".to_string(),
1364        );
1365
1366        let arrow_schema = Arc::new(ArrowSchema::new(vec![
1367            ArrowField::new("id", DataType::Int32, false),
1368            ArrowField::new("text", DataType::Utf8, false).with_metadata(metadata.clone()),
1369            ArrowField::new("data", DataType::Int32, false).with_metadata(HashMap::from([(
1370                lance_encoding::constants::COMPRESSION_META_KEY.to_string(),
1371                "none".to_string(),
1372            )])),
1373        ]));
1374
1375        let lance_schema = LanceSchema::try_from(arrow_schema.as_ref()).unwrap();
1376
1377        // Create test data
1378        let id_array = Int32Array::from_iter_values(0..1000);
1379        let text_array = StringArray::from_iter_values(
1380            (0..1000).map(|i| format!("test string {} repeated text", i)),
1381        );
1382        let data_array = Int32Array::from_iter_values((0..1000).map(|i| i * 2));
1383
1384        let batch = RecordBatch::try_new(
1385            arrow_schema.clone(),
1386            vec![
1387                Arc::new(id_array),
1388                Arc::new(text_array),
1389                Arc::new(data_array),
1390            ],
1391        )
1392        .unwrap();
1393
1394        let path = TempObjFile::default();
1395        let object_store = ObjectStore::local();
1396
1397        // Create encoding strategy that will read from field metadata
1398        let params = CompressionParams::new();
1399        let encoding_strategy = lance_encoding::encoder::default_encoding_strategy_with_params(
1400            LanceFileVersion::V2_1,
1401            params,
1402        )
1403        .unwrap();
1404
1405        let options = FileWriterOptions {
1406            encoding_strategy: Some(Arc::from(encoding_strategy)),
1407            format_version: Some(LanceFileVersion::V2_1),
1408            ..Default::default()
1409        };
1410        let mut writer = FileWriter::try_new(
1411            object_store.create(&path).await.unwrap(),
1412            lance_schema.clone(),
1413            options,
1414        )
1415        .unwrap();
1416
1417        writer.write_batch(&batch).await.unwrap();
1418        writer.finish().await.unwrap();
1419
1420        // Read back metadata
1421        let fs = FsFixture::default();
1422        let file_scheduler = fs
1423            .scheduler
1424            .open_file(&path, &CachedFileSize::unknown())
1425            .await
1426            .unwrap();
1427        let file_reader = FileReader::try_open(
1428            file_scheduler,
1429            None,
1430            Arc::<DecoderPlugins>::default(),
1431            &LanceCache::no_cache(),
1432            FileReaderOptions::default(),
1433        )
1434        .await
1435        .unwrap();
1436
1437        let column_metadatas = &file_reader.metadata().column_metadatas;
1438
1439        // The text column (index 1) should use zstd compression based on metadata
1440        let text_encoding = describe_encoding(&column_metadatas[1].pages[0]);
1441        // For string columns, we expect Binary encoding with zstd compression
1442        assert!(
1443            text_encoding.contains("Zstd"),
1444            "text column should use zstd compression from field metadata, but got: {}",
1445            text_encoding
1446        );
1447
1448        // The data column (index 2) should use no compression based on metadata
1449        let data_encoding = describe_encoding(&column_metadatas[2].pages[0]);
1450        // For Int32 columns with "none" compression, we expect Flat encoding without compression
1451        assert!(
1452            data_encoding.contains("Flat") && data_encoding.contains("compression: None"),
1453            "data column should use no compression from field metadata, but got: {}",
1454            data_encoding
1455        );
1456    }
1457
1458    #[tokio::test]
1459    async fn test_field_metadata_rle_threshold() {
1460        // Test that RLE threshold from field metadata is respected
1461        let mut metadata = HashMap::new();
1462        metadata.insert(
1463            lance_encoding::constants::RLE_THRESHOLD_META_KEY.to_string(),
1464            "0.9".to_string(),
1465        );
1466        // Also set compression to ensure RLE is used
1467        metadata.insert(
1468            lance_encoding::constants::COMPRESSION_META_KEY.to_string(),
1469            "lz4".to_string(),
1470        );
1471        // Explicitly disable BSS to ensure RLE is tested
1472        metadata.insert(
1473            lance_encoding::constants::BSS_META_KEY.to_string(),
1474            "off".to_string(),
1475        );
1476
1477        let arrow_schema = Arc::new(ArrowSchema::new(vec![
1478            ArrowField::new("status", DataType::Int32, false).with_metadata(metadata),
1479        ]));
1480
1481        let lance_schema = LanceSchema::try_from(arrow_schema.as_ref()).unwrap();
1482
1483        // Create data with very high repetition (3 runs for 10000 values = 0.0003 ratio)
1484        let status_array = Int32Array::from_iter_values(
1485            std::iter::repeat_n(200, 8000)
1486                .chain(std::iter::repeat_n(404, 1500))
1487                .chain(std::iter::repeat_n(500, 500)),
1488        );
1489
1490        let batch =
1491            RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(status_array)]).unwrap();
1492
1493        let path = TempObjFile::default();
1494        let object_store = ObjectStore::local();
1495
1496        // Create encoding strategy that will read from field metadata
1497        let params = CompressionParams::new();
1498        let encoding_strategy = lance_encoding::encoder::default_encoding_strategy_with_params(
1499            LanceFileVersion::V2_1,
1500            params,
1501        )
1502        .unwrap();
1503
1504        let options = FileWriterOptions {
1505            encoding_strategy: Some(Arc::from(encoding_strategy)),
1506            format_version: Some(LanceFileVersion::V2_1),
1507            ..Default::default()
1508        };
1509        let mut writer = FileWriter::try_new(
1510            object_store.create(&path).await.unwrap(),
1511            lance_schema.clone(),
1512            options,
1513        )
1514        .unwrap();
1515
1516        writer.write_batch(&batch).await.unwrap();
1517        writer.finish().await.unwrap();
1518
1519        // Read back and check encoding
1520        let fs = FsFixture::default();
1521        let file_scheduler = fs
1522            .scheduler
1523            .open_file(&path, &CachedFileSize::unknown())
1524            .await
1525            .unwrap();
1526        let file_reader = FileReader::try_open(
1527            file_scheduler,
1528            None,
1529            Arc::<DecoderPlugins>::default(),
1530            &LanceCache::no_cache(),
1531            FileReaderOptions::default(),
1532        )
1533        .await
1534        .unwrap();
1535
1536        let column_metadatas = &file_reader.metadata().column_metadatas;
1537        let status_encoding = describe_encoding(&column_metadatas[0].pages[0]);
1538        assert!(
1539            status_encoding.contains("RLE") || status_encoding.contains("Rle"),
1540            "status column should use RLE encoding due to metadata threshold, but got: {}",
1541            status_encoding
1542        );
1543    }
1544
1545    #[tokio::test]
1546    async fn test_large_page_split_on_read() {
1547        use arrow_array::Array;
1548        use futures::TryStreamExt;
1549        use lance_encoding::decoder::FilterExpression;
1550        use lance_io::ReadBatchParams;
1551
1552        // Test that large pages written with relaxed limits can be split during read
1553
1554        let arrow_field = ArrowField::new("data", DataType::Binary, false);
1555        let arrow_schema = ArrowSchema::new(vec![arrow_field]);
1556        let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();
1557
1558        // Create a large binary value (40MB) to trigger large page creation
1559        let large_value = vec![42u8; 40 * 1024 * 1024];
1560        let array = arrow_array::BinaryArray::from(vec![
1561            Some(large_value.as_slice()),
1562            Some(b"small value"),
1563        ]);
1564        let batch = RecordBatch::try_new(Arc::new(arrow_schema), vec![Arc::new(array)]).unwrap();
1565
1566        // Write with relaxed page size limit (128MB)
1567        let options = FileWriterOptions {
1568            max_page_bytes: Some(128 * 1024 * 1024),
1569            format_version: Some(LanceFileVersion::V2_1),
1570            ..Default::default()
1571        };
1572
1573        let fs = FsFixture::default();
1574        let path = fs.tmp_path;
1575
1576        let mut writer = FileWriter::try_new(
1577            fs.object_store.create(&path).await.unwrap(),
1578            lance_schema.clone(),
1579            options,
1580        )
1581        .unwrap();
1582
1583        writer.write_batch(&batch).await.unwrap();
1584        let num_rows = writer.finish().await.unwrap();
1585        assert_eq!(num_rows, 2);
1586
1587        // Read back with split configuration
1588        let file_scheduler = fs
1589            .scheduler
1590            .open_file(&path, &CachedFileSize::unknown())
1591            .await
1592            .unwrap();
1593
1594        // Configure reader to split pages larger than 10MB into chunks
1595        let reader_options = FileReaderOptions {
1596            read_chunk_size: 10 * 1024 * 1024, // 10MB chunks
1597            ..Default::default()
1598        };
1599
1600        let file_reader = FileReader::try_open(
1601            file_scheduler,
1602            None,
1603            Arc::<DecoderPlugins>::default(),
1604            &LanceCache::no_cache(),
1605            reader_options,
1606        )
1607        .await
1608        .unwrap();
1609
1610        // Read the data back
1611        let stream = file_reader
1612            .read_stream(
1613                ReadBatchParams::RangeFull,
1614                1024,
1615                10, // batch_readahead
1616                FilterExpression::no_filter(),
1617            )
1618            .unwrap();
1619
1620        let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1621        assert_eq!(batches.len(), 1);
1622
1623        // Verify the data is correctly read despite splitting
1624        let read_array = batches[0].column(0);
1625        let read_binary = read_array
1626            .as_any()
1627            .downcast_ref::<arrow_array::BinaryArray>()
1628            .unwrap();
1629
1630        assert_eq!(read_binary.len(), 2);
1631        assert_eq!(read_binary.value(0).len(), 40 * 1024 * 1024);
1632        assert_eq!(read_binary.value(1), b"small value");
1633
1634        // Verify first value matches what we wrote
1635        assert!(read_binary.value(0).iter().all(|&b| b == 42u8));
1636    }
1637
1638    fn spill_config() -> (TempObjFile, Arc<ObjectStore>) {
1639        let spill_path = TempObjFile::default();
1640        (spill_path, Arc::new(ObjectStore::local()))
1641    }
1642
1643    fn make_batches(num_batches: i32, num_cols: usize, rows_per_batch: i32) -> Vec<RecordBatch> {
1644        let fields: Vec<_> = (0..num_cols)
1645            .map(|c| ArrowField::new(format!("c{c}"), DataType::Int32, false))
1646            .collect();
1647        let schema = Arc::new(ArrowSchema::new(fields));
1648        (0..num_batches)
1649            .map(|i| {
1650                let cols: Vec<Arc<dyn arrow_array::Array>> = (0..num_cols)
1651                    .map(|c| {
1652                        let start = (i * rows_per_batch + c as i32) * 100;
1653                        Arc::new(Int32Array::from_iter_values(start..start + rows_per_batch))
1654                            as Arc<dyn arrow_array::Array>
1655                    })
1656                    .collect();
1657                RecordBatch::try_new(schema.clone(), cols).unwrap()
1658            })
1659            .collect()
1660    }
1661
1662    async fn write_and_read_batches(
1663        batches: &[RecordBatch],
1664        spill: Option<(Arc<ObjectStore>, object_store::path::Path)>,
1665    ) -> Vec<RecordBatch> {
1666        let fs = FsFixture::default();
1667        let lance_schema = LanceSchema::try_from(batches[0].schema().as_ref()).unwrap();
1668        let writer = fs.object_store.create(&fs.tmp_path).await.unwrap();
1669        let mut file_writer =
1670            FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
1671        if let Some((store, path)) = spill {
1672            file_writer = file_writer.with_page_metadata_spill(store, path);
1673        }
1674        for batch in batches {
1675            file_writer.write_batch(batch).await.unwrap();
1676        }
1677        file_writer.add_schema_metadata("foo", "bar");
1678        file_writer.finish().await.unwrap();
1679
1680        crate::testing::read_lance_file(
1681            &fs,
1682            Arc::<DecoderPlugins>::default(),
1683            lance_encoding::decoder::FilterExpression::no_filter(),
1684        )
1685        .await
1686    }
1687
1688    #[rstest::rstest]
1689    #[case::multi_col(20, 2, 100)]
1690    #[case::many_batches(50, 2, 100)]
1691    #[tokio::test]
1692    async fn test_page_metadata_spill_roundtrip(
1693        #[case] num_batches: i32,
1694        #[case] num_cols: usize,
1695        #[case] rows_per_batch: i32,
1696    ) {
1697        let batches = make_batches(num_batches, num_cols, rows_per_batch);
1698        let baseline = write_and_read_batches(&batches, None).await;
1699        let (spill_path, spill_store) = spill_config();
1700        let spilled =
1701            write_and_read_batches(&batches, Some((spill_store, spill_path.as_ref().clone())))
1702                .await;
1703        assert_eq!(baseline, spilled);
1704    }
1705
1706    #[tokio::test]
1707    async fn test_page_metadata_spill_many_columns() {
1708        // Many columns forces small per-column buffer limits, exercising mid-write flushing.
1709        let batches = make_batches(10, 500, 100);
1710        let baseline = write_and_read_batches(&batches, None).await;
1711        let (spill_path, spill_store) = spill_config();
1712        let spilled =
1713            write_and_read_batches(&batches, Some((spill_store, spill_path.as_ref().clone())))
1714                .await;
1715        assert_eq!(baseline, spilled);
1716    }
1717}