Skip to main content

lance_file/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use core::panic;
5use std::collections::HashMap;
6use std::sync::Arc;
7use std::sync::atomic::AtomicBool;
8
9use arrow_array::RecordBatch;
10
11use arrow_data::ArrayData;
12use bytes::{Buf, BufMut, Bytes, BytesMut};
13use futures::StreamExt;
14use futures::stream::FuturesOrdered;
15use lance_core::datatypes::{Field, Schema as LanceSchema};
16use lance_core::utils::bit::pad_bytes;
17use lance_core::{Error, Result};
18use lance_encoding::decoder::PageEncoding;
19use lance_encoding::encoder::{
20    BatchEncoder, EncodeTask, EncodedBatch, EncodedPage, EncodingOptions, FieldEncoder,
21    FieldEncodingStrategy, OutOfLineBuffers, default_encoding_strategy,
22};
23use lance_encoding::repdef::RepDefBuilder;
24use lance_encoding::version::LanceFileVersion;
25use lance_io::object_store::ObjectStore;
26use lance_io::traits::Writer;
27use log::{debug, warn};
28use object_store::path::Path;
29use prost::Message;
30use prost_types::Any;
31use tokio::io::AsyncWrite;
32use tokio::io::AsyncWriteExt;
33use tracing::instrument;
34
35use crate::datatypes::FieldsWithMeta;
36use crate::format::MAGIC;
37use crate::format::pb;
38use crate::format::pbfile;
39use crate::format::pbfile::DirectEncoding;
40
41/// Pages buffers are aligned to 64 bytes
42pub(crate) const PAGE_BUFFER_ALIGNMENT: usize = 64;
43const PAD_BUFFER: [u8; PAGE_BUFFER_ALIGNMENT] = [72; PAGE_BUFFER_ALIGNMENT];
44// In 2.1+, we split large pages on read instead of write to avoid empty pages
45// and small pages issues. However, we keep the write-time limit at 32MB to avoid
46// potential regressions in 2.0 format readers.
47//
48// This limit is not applied in the 2.1 writer
49const MAX_PAGE_BYTES: usize = 32 * 1024 * 1024;
50const ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES: &str = "LANCE_FILE_WRITER_MAX_PAGE_BYTES";
51
52/// Summary of a completed Lance file write.
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub struct FileWriteSummary {
55    /// The number of rows written to the file.
56    pub num_rows: u64,
57    /// The final size of the file in bytes.
58    pub size_bytes: u64,
59}
60
61#[derive(Debug, Clone, Default)]
62pub struct FileWriterOptions {
63    /// How many bytes to use for buffering column data
64    ///
65    /// When data comes in small batches the writer will buffer column data so that
66    /// larger pages can be created.  This value will be divided evenly across all of the
67    /// columns.  Generally you want this to be at least large enough to match your
68    /// filesystem's ideal read size per column.
69    ///
70    /// In some cases you might want this value to be even larger if you have highly
71    /// compressible data.  However, if this is too large, then the writer could require
72    /// a lot of memory and write performance may suffer if the CPU-expensive encoding
73    /// falls behind and can't be interleaved with the I/O expensive flushing.
74    ///
75    /// The default will use 8MiB per column which should be reasonable for most cases.
76    // TODO: Do we need to be able to set this on a per-column basis?
77    pub data_cache_bytes: Option<u64>,
78    /// A hint to indicate the max size of a page
79    ///
80    /// This hint can't always be respected.  A single value could be larger than this value
81    /// and we never slice single values.  In addition, there are some cases where it can be
82    /// difficult to know size up-front and so we might not be able to respect this value.
83    pub max_page_bytes: Option<u64>,
84    /// The file writer buffers columns until enough data has arrived to flush a page
85    /// to disk.
86    ///
87    /// Some columns with small data types may not flush very often.  These arrays can
88    /// stick around for a long time.  These arrays might also be keeping larger data
89    /// structures alive.  By default, the writer will make a deep copy of this array
90    /// to avoid any potential memory leaks.  However, this can be disabled for a
91    /// (probably minor) performance boost if you are sure that arrays are not keeping
92    /// any sibling structures alive (this typically means the array was allocated in
93    /// the same language / runtime as the writer)
94    ///
95    /// Do not enable this if your data is arriving from the C data interface.
96    /// Data typically arrives one "batch" at a time (encoded in the C data interface
97    /// as a struct array).  Each array in that batch keeps the entire batch alive.
98    /// This means a small boolean array (which we will buffer in memory for quite a
99    /// while) might keep a much larger record batch around in memory (even though most
100    /// of that batch's data has been written to disk)
101    pub keep_original_array: Option<bool>,
102    pub encoding_strategy: Option<Arc<dyn FieldEncodingStrategy>>,
103    /// The format version to use when writing the file
104    ///
105    /// This controls which encodings will be used when encoding the data.  Newer
106    /// versions may have more efficient encodings.  However, newer format versions will
107    /// require more up-to-date readers to read the data.
108    pub format_version: Option<LanceFileVersion>,
109}
110
111// Total in-memory budget for buffering serialized page metadata before flushing
112// to the spill file. Divided evenly across columns (with a floor of 64 bytes).
113const DEFAULT_SPILL_BUFFER_LIMIT: usize = 256 * 1024;
114
115/// Spills serialized page metadata to a temporary file to bound memory usage.
116///
117/// The spill file is an unstructured sequence of "chunks". Each chunk is a
118/// contiguous run of length-delimited protobuf `Page` messages belonging to a
119/// single column. Chunks from different columns are interleaved in the order
120/// they are flushed (i.e. whenever a column's in-memory buffer exceeds
121/// `per_column_limit`). The `column_chunks` index records the (offset, length)
122/// of every chunk so each column's pages can be read back and reassembled in
123/// order.
124struct PageMetadataSpill {
125    writer: Box<dyn Writer>,
126    object_store: Arc<ObjectStore>,
127    path: Path,
128    /// Current write position in the spill file.
129    position: u64,
130    /// Per-column buffer of serialized (length-delimited protobuf) page metadata
131    /// that has not yet been flushed to the spill file.
132    column_buffers: Vec<Vec<u8>>,
133    /// Per-column list of chunks that have been flushed to the spill file.
134    /// Each entry is (offset, length) pointing into the spill file.
135    column_chunks: Vec<Vec<(u64, u32)>>,
136    /// Maximum bytes to buffer per column before flushing to the spill file.
137    per_column_limit: usize,
138}
139
140impl PageMetadataSpill {
141    async fn new(object_store: Arc<ObjectStore>, path: Path, num_columns: usize) -> Result<Self> {
142        let writer = object_store.create(&path).await?;
143        let per_column_limit = (DEFAULT_SPILL_BUFFER_LIMIT / num_columns.max(1)).max(64);
144        Ok(Self {
145            writer,
146            object_store,
147            path,
148            position: 0,
149            column_buffers: vec![Vec::new(); num_columns],
150            column_chunks: vec![Vec::new(); num_columns],
151            per_column_limit,
152        })
153    }
154
155    async fn append_page(
156        &mut self,
157        column_idx: usize,
158        page: &pbfile::column_metadata::Page,
159    ) -> Result<()> {
160        page.encode_length_delimited(&mut self.column_buffers[column_idx])
161            .map_err(|e| {
162                Error::io_source(Box::new(std::io::Error::new(
163                    std::io::ErrorKind::InvalidData,
164                    e,
165                )))
166            })?;
167        if self.column_buffers[column_idx].len() >= self.per_column_limit {
168            self.flush_column(column_idx).await?;
169        }
170        Ok(())
171    }
172
173    async fn flush_column(&mut self, column_idx: usize) -> Result<()> {
174        let buf = &self.column_buffers[column_idx];
175        if buf.is_empty() {
176            return Ok(());
177        }
178        let len = buf.len();
179        self.writer.write_all(buf).await?;
180        self.column_chunks[column_idx].push((self.position, len as u32));
181        self.position += len as u64;
182        self.column_buffers[column_idx].clear();
183        Ok(())
184    }
185
186    async fn shutdown_writer(&mut self) -> Result<()> {
187        for col_idx in 0..self.column_buffers.len() {
188            self.flush_column(col_idx).await?;
189        }
190        Writer::shutdown(self.writer.as_mut()).await?;
191        Ok(())
192    }
193}
194
195fn decode_spilled_chunk(data: &Bytes) -> Result<Vec<pbfile::column_metadata::Page>> {
196    let mut pages = Vec::new();
197    let mut cursor = data.clone();
198    while cursor.has_remaining() {
199        let page =
200            pbfile::column_metadata::Page::decode_length_delimited(&mut cursor).map_err(|e| {
201                Error::io_source(Box::new(std::io::Error::new(
202                    std::io::ErrorKind::InvalidData,
203                    e,
204                )))
205            })?;
206        pages.push(page);
207    }
208    Ok(pages)
209}
210
211enum PageSpillState {
212    Pending(Arc<ObjectStore>, Path),
213    Active(PageMetadataSpill),
214}
215
216pub struct FileWriter {
217    writer: Box<dyn Writer>,
218    schema: Option<LanceSchema>,
219    column_writers: Vec<Box<dyn FieldEncoder>>,
220    column_metadata: Vec<pbfile::ColumnMetadata>,
221    field_id_to_column_indices: Vec<(u32, u32)>,
222    num_columns: u32,
223    rows_written: u64,
224    global_buffers: Vec<(u64, u64)>,
225    schema_metadata: HashMap<String, String>,
226    options: FileWriterOptions,
227    page_spill: Option<PageSpillState>,
228}
229
230fn initial_column_metadata() -> pbfile::ColumnMetadata {
231    pbfile::ColumnMetadata {
232        pages: Vec::new(),
233        buffer_offsets: Vec::new(),
234        buffer_sizes: Vec::new(),
235        encoding: None,
236    }
237}
238
239static WARNED_ON_UNSTABLE_API: AtomicBool = AtomicBool::new(false);
240
241impl FileWriter {
242    /// Create a new FileWriter with a desired output schema
243    pub fn try_new(
244        object_writer: Box<dyn Writer>,
245        schema: LanceSchema,
246        options: FileWriterOptions,
247    ) -> Result<Self> {
248        let mut writer = Self::new_lazy(object_writer, options);
249        writer.initialize(schema)?;
250        Ok(writer)
251    }
252
253    /// Create a new FileWriter without a desired output schema
254    ///
255    /// The output schema will be set based on the first batch of data to arrive.
256    /// If no data arrives and the writer is finished then the write will fail.
257    pub fn new_lazy(object_writer: Box<dyn Writer>, options: FileWriterOptions) -> Self {
258        if let Some(format_version) = options.format_version
259            && format_version.is_unstable()
260            && WARNED_ON_UNSTABLE_API
261                .compare_exchange(
262                    false,
263                    true,
264                    std::sync::atomic::Ordering::Relaxed,
265                    std::sync::atomic::Ordering::Relaxed,
266                )
267                .is_ok()
268        {
269            warn!(
270                "You have requested an unstable format version.  Files written with this format version may not be readable in the future!  This is a development feature and should only be used for experimentation and never for production data."
271            );
272        }
273        Self {
274            writer: object_writer,
275            schema: None,
276            column_writers: Vec::new(),
277            column_metadata: Vec::new(),
278            num_columns: 0,
279            rows_written: 0,
280            field_id_to_column_indices: Vec::new(),
281            global_buffers: Vec::new(),
282            schema_metadata: HashMap::new(),
283            page_spill: None,
284            options,
285        }
286    }
287
288    /// Spill page metadata to a sidecar file instead of accumulating in memory.
289    ///
290    /// This can dramatically reduce memory usage when many writers are open
291    /// concurrently (e.g. IVF shuffle with thousands of partition writers).
292    /// The sidecar file is created lazily on the first page write. The caller
293    /// is responsible for cleaning up `path` (e.g. by placing it in a temp
294    /// directory that is removed via RAII).
295    pub fn with_page_metadata_spill(mut self, object_store: Arc<ObjectStore>, path: Path) -> Self {
296        self.page_spill = Some(PageSpillState::Pending(object_store, path));
297        self
298    }
299
300    /// Write a series of record batches to a new file
301    ///
302    /// Returns the number of rows written
303    pub async fn create_file_with_batches(
304        store: &ObjectStore,
305        path: &Path,
306        schema: lance_core::datatypes::Schema,
307        batches: impl Iterator<Item = RecordBatch> + Send,
308        options: FileWriterOptions,
309    ) -> Result<usize> {
310        let writer = store.create(path).await?;
311        let mut writer = Self::try_new(writer, schema, options)?;
312        for batch in batches {
313            writer.write_batch(&batch).await?;
314        }
315        Ok(writer.finish().await?.num_rows as usize)
316    }
317
318    async fn do_write_buffer(writer: &mut (impl AsyncWrite + Unpin), buf: &[u8]) -> Result<()> {
319        writer.write_all(buf).await?;
320        let pad_bytes = pad_bytes::<PAGE_BUFFER_ALIGNMENT>(buf.len());
321        writer.write_all(&PAD_BUFFER[..pad_bytes]).await?;
322        Ok(())
323    }
324
325    /// Returns the format version that will be used when writing the file
326    pub fn version(&self) -> LanceFileVersion {
327        self.options.format_version.unwrap_or_default()
328    }
329
330    async fn write_page(&mut self, encoded_page: EncodedPage) -> Result<()> {
331        let buffers = encoded_page.data;
332        let mut buffer_offsets = Vec::with_capacity(buffers.len());
333        let mut buffer_sizes = Vec::with_capacity(buffers.len());
334        for buffer in buffers {
335            buffer_offsets.push(self.writer.tell().await? as u64);
336            buffer_sizes.push(buffer.len() as u64);
337            Self::do_write_buffer(&mut self.writer, &buffer).await?;
338        }
339        let encoded_encoding = match encoded_page.description {
340            PageEncoding::Legacy(array_encoding) => Any::from_msg(&array_encoding)?.encode_to_vec(),
341            PageEncoding::Structural(page_layout) => Any::from_msg(&page_layout)?.encode_to_vec(),
342        };
343        let page = pbfile::column_metadata::Page {
344            buffer_offsets,
345            buffer_sizes,
346            encoding: Some(pbfile::Encoding {
347                location: Some(pbfile::encoding::Location::Direct(DirectEncoding {
348                    encoding: encoded_encoding,
349                })),
350            }),
351            length: encoded_page.num_rows,
352            priority: encoded_page.row_number,
353        };
354        let col_idx = encoded_page.column_idx as usize;
355        if matches!(&self.page_spill, Some(PageSpillState::Pending(..))) {
356            let Some(PageSpillState::Pending(store, path)) = self.page_spill.take() else {
357                unreachable!()
358            };
359            self.page_spill = Some(PageSpillState::Active(
360                PageMetadataSpill::new(store, path, self.num_columns as usize).await?,
361            ));
362        }
363        match &mut self.page_spill {
364            Some(PageSpillState::Active(spill)) => spill.append_page(col_idx, &page).await?,
365            None => self.column_metadata[col_idx].pages.push(page),
366            Some(PageSpillState::Pending(..)) => unreachable!(),
367        }
368        Ok(())
369    }
370
371    #[instrument(skip_all, level = "debug")]
372    async fn write_pages(&mut self, mut encoding_tasks: FuturesOrdered<EncodeTask>) -> Result<()> {
373        // As soon as an encoding task is done we write it.  There is no parallelism
374        // needed here because "writing" is really just submitting the buffer to the
375        // underlying write scheduler (either the OS or object_store's scheduler for
376        // cloud writes).  The only time we might truly await on write_page is if the
377        // scheduler's write queue is full.
378        //
379        // Also, there is no point in trying to make write_page parallel anyways
380        // because we wouldn't want buffers getting mixed up across pages.
381        while let Some(encoding_task) = encoding_tasks.next().await {
382            let encoded_page = encoding_task?;
383            self.write_page(encoded_page).await?;
384        }
385        // It's important to flush here, we don't know when the next batch will arrive
386        // and the underlying cloud store could have writes in progress that won't advance
387        // until we interact with the writer again.  These in-progress writes will time out
388        // if we don't flush.
389        self.writer.flush().await?;
390        Ok(())
391    }
392
393    /// Schedule batches of data to be written to the file
394    pub async fn write_batches(
395        &mut self,
396        batches: impl Iterator<Item = &RecordBatch>,
397    ) -> Result<()> {
398        for batch in batches {
399            self.write_batch(batch).await?;
400        }
401        Ok(())
402    }
403
404    fn verify_field_nullability(arr: &ArrayData, field: &Field) -> Result<()> {
405        if !field.nullable && arr.null_count() > 0 {
406            return Err(Error::invalid_input(format!(
407                "The field `{}` contained null values even though the field is marked non-null in the schema",
408                field.name
409            )));
410        }
411
412        for (child_field, child_arr) in field.children.iter().zip(arr.child_data()) {
413            Self::verify_field_nullability(child_arr, child_field)?;
414        }
415
416        Ok(())
417    }
418
419    fn verify_nullability_constraints(&self, batch: &RecordBatch) -> Result<()> {
420        for (col, field) in batch
421            .columns()
422            .iter()
423            .zip(self.schema.as_ref().unwrap().fields.iter())
424        {
425            Self::verify_field_nullability(&col.to_data(), field)?;
426        }
427        Ok(())
428    }
429
430    fn initialize(&mut self, mut schema: LanceSchema) -> Result<()> {
431        let cache_bytes_per_column = if let Some(data_cache_bytes) = self.options.data_cache_bytes {
432            data_cache_bytes / schema.fields.len() as u64
433        } else {
434            8 * 1024 * 1024
435        };
436
437        let max_page_bytes = self.options.max_page_bytes.unwrap_or_else(|| {
438            std::env::var(ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES)
439                .map(|s| {
440                    s.parse::<u64>().unwrap_or_else(|e| {
441                        warn!(
442                            "Failed to parse {}: {}, using default",
443                            ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, e
444                        );
445                        MAX_PAGE_BYTES as u64
446                    })
447                })
448                .unwrap_or(MAX_PAGE_BYTES as u64)
449        });
450
451        schema.validate()?;
452
453        let keep_original_array = self.options.keep_original_array.unwrap_or(false);
454        let encoding_strategy = self.options.encoding_strategy.clone().unwrap_or_else(|| {
455            let version = self.version();
456            default_encoding_strategy(version).into()
457        });
458
459        let encoding_options = EncodingOptions {
460            cache_bytes_per_column,
461            max_page_bytes,
462            keep_original_array,
463            buffer_alignment: PAGE_BUFFER_ALIGNMENT as u64,
464            version: self.version(),
465        };
466        let encoder =
467            BatchEncoder::try_new(&schema, encoding_strategy.as_ref(), &encoding_options)?;
468        self.num_columns = encoder.num_columns();
469
470        self.column_writers = encoder.field_encoders;
471        self.column_metadata = vec![initial_column_metadata(); self.num_columns as usize];
472        self.field_id_to_column_indices = encoder.field_id_to_column_index;
473        self.schema_metadata
474            .extend(std::mem::take(&mut schema.metadata));
475        self.schema = Some(schema);
476        Ok(())
477    }
478
479    fn ensure_initialized(&mut self, batch: &RecordBatch) -> Result<&LanceSchema> {
480        if self.schema.is_none() {
481            let schema = LanceSchema::try_from(batch.schema().as_ref())?;
482            self.initialize(schema)?;
483        }
484        Ok(self.schema.as_ref().unwrap())
485    }
486
487    #[instrument(skip_all, level = "debug")]
488    fn encode_batch(
489        &mut self,
490        batch: &RecordBatch,
491        external_buffers: &mut OutOfLineBuffers,
492    ) -> Result<Vec<Vec<EncodeTask>>> {
493        self.schema
494            .as_ref()
495            .unwrap()
496            .fields
497            .iter()
498            .zip(self.column_writers.iter_mut())
499            .map(|(field, column_writer)| {
500                let array =
501                    batch
502                        .column_by_name(&field.name)
503                        .ok_or(Error::invalid_input_source(
504                            format!(
505                                "Cannot write batch.  The batch was missing the column `{}`",
506                                field.name
507                            )
508                            .into(),
509                        ))?;
510                let repdef = RepDefBuilder::default();
511                let num_rows = array.len() as u64;
512                column_writer.maybe_encode(
513                    array.clone(),
514                    external_buffers,
515                    repdef,
516                    self.rows_written,
517                    num_rows,
518                )
519            })
520            .collect::<Result<Vec<_>>>()
521    }
522
523    /// Schedule a batch of data to be written to the file
524    ///
525    /// Note: the future returned by this method may complete before the data has been fully
526    /// flushed to the file (some data may be in the data cache or the I/O cache)
527    pub async fn write_batch(&mut self, batch: &RecordBatch) -> Result<()> {
528        debug!(
529            "write_batch called with {} rows, {} columns, and {} bytes of data",
530            batch.num_rows(),
531            batch.num_columns(),
532            batch.get_array_memory_size()
533        );
534        self.ensure_initialized(batch)?;
535        self.verify_nullability_constraints(batch)?;
536        let num_rows = batch.num_rows() as u64;
537        if num_rows == 0 {
538            return Ok(());
539        }
540        if num_rows > u32::MAX as u64 {
541            return Err(Error::invalid_input_source(
542                "cannot write Lance files with more than 2^32 rows".into(),
543            ));
544        }
545        // First we push each array into its column writer.  This may or may not generate enough
546        // data to trigger an encoding task.  We collect any encoding tasks into a queue.
547        let mut external_buffers =
548            OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
549        let encoding_tasks = self.encode_batch(batch, &mut external_buffers)?;
550        // Next, write external buffers
551        for external_buffer in external_buffers.take_buffers() {
552            Self::do_write_buffer(&mut self.writer, &external_buffer).await?;
553        }
554
555        let encoding_tasks = encoding_tasks
556            .into_iter()
557            .flatten()
558            .collect::<FuturesOrdered<_>>();
559
560        self.rows_written = match self.rows_written.checked_add(batch.num_rows() as u64) {
561            Some(rows_written) => rows_written,
562            None => {
563                return Err(Error::invalid_input_source(format!("cannot write batch with {} rows because {} rows have already been written and Lance files cannot contain more than 2^64 rows", num_rows, self.rows_written).into()));
564            }
565        };
566
567        self.write_pages(encoding_tasks).await?;
568
569        Ok(())
570    }
571
572    async fn write_column_metadata(
573        &mut self,
574        metadata: pbfile::ColumnMetadata,
575    ) -> Result<(u64, u64)> {
576        let metadata_bytes = metadata.encode_to_vec();
577        let position = self.writer.tell().await? as u64;
578        let len = metadata_bytes.len() as u64;
579        self.writer.write_all(&metadata_bytes).await?;
580        Ok((position, len))
581    }
582
583    async fn write_column_metadatas(&mut self) -> Result<Vec<(u64, u64)>> {
584        let metadatas = std::mem::take(&mut self.column_metadata);
585
586        // If spilling, finalize the spill writer and reopen for reading.
587        // The spill file itself is cleaned up by the caller (it lives in a
588        // temp directory managed by the caller's RAII guard).
589        let spill_state = self.page_spill.take();
590        let (spill_chunks, spill_reader) =
591            if let Some(PageSpillState::Active(mut spill)) = spill_state {
592                spill.shutdown_writer().await?;
593                let reader = spill.object_store.open(&spill.path).await?;
594                let chunks = std::mem::take(&mut spill.column_chunks);
595                (chunks, Some(reader))
596            } else {
597                (Vec::new(), None)
598            };
599
600        let mut metadata_positions = Vec::with_capacity(metadatas.len());
601        for (col_idx, mut metadata) in metadatas.into_iter().enumerate() {
602            if let Some(reader) = &spill_reader {
603                let mut pages = Vec::new();
604                for &(offset, len) in &spill_chunks[col_idx] {
605                    let data = reader
606                        .get_range(offset as usize..(offset as usize + len as usize))
607                        .await
608                        .map_err(|e| Error::io_source(Box::new(e)))?;
609                    pages.extend(decode_spilled_chunk(&data)?);
610                }
611                metadata.pages = pages;
612            }
613            metadata_positions.push(self.write_column_metadata(metadata).await?);
614        }
615
616        Ok(metadata_positions)
617    }
618
619    fn make_file_descriptor(
620        schema: &lance_core::datatypes::Schema,
621        num_rows: u64,
622    ) -> Result<pb::FileDescriptor> {
623        let fields_with_meta = FieldsWithMeta::from(schema);
624        Ok(pb::FileDescriptor {
625            schema: Some(pb::Schema {
626                fields: fields_with_meta.fields.0,
627                metadata: fields_with_meta.metadata,
628            }),
629            length: num_rows,
630        })
631    }
632
633    async fn write_global_buffers(&mut self) -> Result<Vec<(u64, u64)>> {
634        let schema = self.schema.as_mut().ok_or(Error::invalid_input("No schema provided on writer open and no data provided.  Schema is unknown and file cannot be created"))?;
635        schema.metadata = std::mem::take(&mut self.schema_metadata);
636        // Use descriptor layout for blob v2 fields in the footer to avoid exposing logical child fields.
637        schema
638            .fields
639            .iter_mut()
640            .for_each(|f| f.unload_blobs_recursive());
641
642        let file_descriptor = Self::make_file_descriptor(schema, self.rows_written)?;
643        let file_descriptor_bytes = file_descriptor.encode_to_vec();
644        let file_descriptor_len = file_descriptor_bytes.len() as u64;
645        let file_descriptor_position = self.writer.tell().await? as u64;
646        self.writer.write_all(&file_descriptor_bytes).await?;
647        let mut gbo_table = Vec::with_capacity(1 + self.global_buffers.len());
648        gbo_table.push((file_descriptor_position, file_descriptor_len));
649        gbo_table.append(&mut self.global_buffers);
650        Ok(gbo_table)
651    }
652
653    /// Add a metadata entry to the schema
654    ///
655    /// This method is useful because sometimes the metadata is not known until after the
656    /// data has been written.  This method allows you to alter the schema metadata.  It
657    /// must be called before `finish` is called.
658    pub fn add_schema_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
659        self.schema_metadata.insert(key.into(), value.into());
660    }
661
662    /// Prepare the writer when column data and metadata were produced externally.
663    ///
664    /// This is useful for flows that copy already-encoded pages (e.g., binary copy
665    /// during compaction) where the column buffers have been written directly and we
666    /// only need to write the footer and schema metadata. The provided
667    /// `column_metadata` must describe the buffers already persisted by the
668    /// underlying `ObjectWriter`, and `rows_written` should reflect the total number
669    /// of rows in those buffers.
670    pub fn initialize_with_external_metadata(
671        &mut self,
672        schema: lance_core::datatypes::Schema,
673        column_metadata: Vec<pbfile::ColumnMetadata>,
674        rows_written: u64,
675    ) {
676        self.schema = Some(schema);
677        self.num_columns = column_metadata.len() as u32;
678        self.column_metadata = column_metadata;
679        self.rows_written = rows_written;
680    }
681
682    /// Adds a global buffer to the file
683    ///
684    /// The global buffer can contain any arbitrary bytes.  It will be written to the disk
685    /// immediately.  This method returns the index of the global buffer (this will always
686    /// start at 1 and increment by 1 each time this method is called)
687    pub async fn add_global_buffer(&mut self, buffer: Bytes) -> Result<u32> {
688        let position = self.writer.tell().await? as u64;
689        let len = buffer.len() as u64;
690        Self::do_write_buffer(&mut self.writer, &buffer).await?;
691        self.global_buffers.push((position, len));
692        Ok(self.global_buffers.len() as u32)
693    }
694
695    async fn finish_writers(&mut self) -> Result<()> {
696        let mut col_idx = 0;
697        for mut writer in std::mem::take(&mut self.column_writers) {
698            let mut external_buffers =
699                OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
700            let columns = writer.finish(&mut external_buffers).await?;
701            for buffer in external_buffers.take_buffers() {
702                self.writer.write_all(&buffer).await?;
703            }
704            debug_assert_eq!(
705                columns.len(),
706                writer.num_columns() as usize,
707                "Expected {} columns from column at index {} and got {}",
708                writer.num_columns(),
709                col_idx,
710                columns.len()
711            );
712            for column in columns {
713                for page in column.final_pages {
714                    self.write_page(page).await?;
715                }
716                let column_metadata = &mut self.column_metadata[col_idx];
717                let mut buffer_pos = self.writer.tell().await? as u64;
718                for buffer in column.column_buffers {
719                    column_metadata.buffer_offsets.push(buffer_pos);
720                    let mut size = 0;
721                    Self::do_write_buffer(&mut self.writer, &buffer).await?;
722                    size += buffer.len() as u64;
723                    buffer_pos += size;
724                    column_metadata.buffer_sizes.push(size);
725                }
726                let encoded_encoding = Any::from_msg(&column.encoding)?.encode_to_vec();
727                column_metadata.encoding = Some(pbfile::Encoding {
728                    location: Some(pbfile::encoding::Location::Direct(pbfile::DirectEncoding {
729                        encoding: encoded_encoding,
730                    })),
731                });
732                col_idx += 1;
733            }
734        }
735        if col_idx != self.column_metadata.len() {
736            panic!(
737                "Column writers finished with {} columns but we expected {}",
738                col_idx,
739                self.column_metadata.len()
740            );
741        }
742        Ok(())
743    }
744
745    /// Converts self.version (which is a mix of "software version" and
746    /// "format version" into a format version)
747    fn version_to_numbers(&self) -> (u16, u16) {
748        let version = self.options.format_version.unwrap_or_default();
749        match version.resolve() {
750            LanceFileVersion::V2_0 => (0, 3),
751            LanceFileVersion::V2_1 => (2, 1),
752            LanceFileVersion::V2_2 => (2, 2),
753            LanceFileVersion::V2_3 => (2, 3),
754            _ => panic!("Unsupported version: {}", version),
755        }
756    }
757
758    /// Finishes writing the file
759    ///
760    /// This method will wait until all data has been flushed to the file.  Then it
761    /// will write the file metadata and the footer.  It will not return until all
762    /// data has been flushed and the file has been closed.
763    ///
764    /// Returns a summary of the completed file write.
765    pub async fn finish(&mut self) -> Result<FileWriteSummary> {
766        // 1. flush any remaining data and write out those pages
767        let mut external_buffers =
768            OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
769        let encoding_tasks = self
770            .column_writers
771            .iter_mut()
772            .map(|writer| writer.flush(&mut external_buffers))
773            .collect::<Result<Vec<_>>>()?;
774        for external_buffer in external_buffers.take_buffers() {
775            Self::do_write_buffer(&mut self.writer, &external_buffer).await?;
776        }
777        let encoding_tasks = encoding_tasks
778            .into_iter()
779            .flatten()
780            .collect::<FuturesOrdered<_>>();
781        self.write_pages(encoding_tasks).await?;
782
783        if !self.column_writers.is_empty() {
784            self.finish_writers().await?;
785        }
786
787        // 3. write global buffers (we write the schema here)
788        let global_buffer_offsets = self.write_global_buffers().await?;
789        let num_global_buffers = global_buffer_offsets.len() as u32;
790
791        // 4. write the column metadatas
792        let column_metadata_start = self.writer.tell().await? as u64;
793        let metadata_positions = self.write_column_metadatas().await?;
794
795        // 5. write the column metadata offset table
796        let cmo_table_start = self.writer.tell().await? as u64;
797        for (meta_pos, meta_len) in metadata_positions {
798            self.writer.write_u64_le(meta_pos).await?;
799            self.writer.write_u64_le(meta_len).await?;
800        }
801
802        // 6. write global buffers offset table
803        let gbo_table_start = self.writer.tell().await? as u64;
804        for (gbo_pos, gbo_len) in global_buffer_offsets {
805            self.writer.write_u64_le(gbo_pos).await?;
806            self.writer.write_u64_le(gbo_len).await?;
807        }
808
809        let (major, minor) = self.version_to_numbers();
810        // 7. write the footer
811        self.writer.write_u64_le(column_metadata_start).await?;
812        self.writer.write_u64_le(cmo_table_start).await?;
813        self.writer.write_u64_le(gbo_table_start).await?;
814        self.writer.write_u32_le(num_global_buffers).await?;
815        self.writer.write_u32_le(self.num_columns).await?;
816        self.writer.write_u16_le(major).await?;
817        self.writer.write_u16_le(minor).await?;
818        self.writer.write_all(MAGIC).await?;
819
820        // 7. close the writer
821        let write_result = Writer::shutdown(self.writer.as_mut()).await?;
822
823        Ok(FileWriteSummary {
824            num_rows: self.rows_written,
825            size_bytes: write_result.size as u64,
826        })
827    }
828
829    pub async fn abort(&mut self) {
830        // For multipart uploads, ObjectWriter's Drop impl will abort
831        // the upload when the writer is dropped.
832    }
833
834    pub async fn tell(&mut self) -> Result<u64> {
835        Ok(self.writer.tell().await? as u64)
836    }
837
838    pub fn field_id_to_column_indices(&self) -> &[(u32, u32)] {
839        &self.field_id_to_column_indices
840    }
841}
842
843/// Utility trait for converting EncodedBatch to Bytes using the
844/// lance file format
845pub trait EncodedBatchWriteExt {
846    /// Serializes into a lance file, including the schema
847    fn try_to_self_described_lance(&self, version: LanceFileVersion) -> Result<Bytes>;
848    /// Serializes into a lance file, without the schema.
849    ///
850    /// The schema must be provided to deserialize the buffer
851    fn try_to_mini_lance(&self, version: LanceFileVersion) -> Result<Bytes>;
852}
853
854// Creates a lance footer and appends it to the encoded data
855//
856// The logic here is very similar to logic in the FileWriter except we
857// are using BufMut (put_xyz) instead of AsyncWrite (write_xyz).
858fn concat_lance_footer(
859    batch: &EncodedBatch,
860    write_schema: bool,
861    version: LanceFileVersion,
862) -> Result<Bytes> {
863    // Estimating 1MiB for file footer
864    let mut data = BytesMut::with_capacity(batch.data.len() + 1024 * 1024);
865    data.put(batch.data.clone());
866    // write global buffers (we write the schema here)
867    let global_buffers = if write_schema {
868        let schema_start = data.len() as u64;
869        let lance_schema = lance_core::datatypes::Schema::try_from(batch.schema.as_ref())?;
870        let descriptor = FileWriter::make_file_descriptor(&lance_schema, batch.num_rows)?;
871        let descriptor_bytes = descriptor.encode_to_vec();
872        let descriptor_len = descriptor_bytes.len() as u64;
873        data.put(descriptor_bytes.as_slice());
874
875        vec![(schema_start, descriptor_len)]
876    } else {
877        vec![]
878    };
879    let col_metadata_start = data.len() as u64;
880
881    let mut col_metadata_positions = Vec::new();
882    // Write column metadata
883    for col in &batch.page_table {
884        let position = data.len() as u64;
885        let pages = col
886            .page_infos
887            .iter()
888            .map(|page_info| {
889                let encoded_encoding = match &page_info.encoding {
890                    PageEncoding::Legacy(array_encoding) => {
891                        Any::from_msg(array_encoding)?.encode_to_vec()
892                    }
893                    PageEncoding::Structural(page_layout) => {
894                        Any::from_msg(page_layout)?.encode_to_vec()
895                    }
896                };
897                let (buffer_offsets, buffer_sizes): (Vec<_>, Vec<_>) = page_info
898                    .buffer_offsets_and_sizes
899                    .as_ref()
900                    .iter()
901                    .cloned()
902                    .unzip();
903                Ok(pbfile::column_metadata::Page {
904                    buffer_offsets,
905                    buffer_sizes,
906                    encoding: Some(pbfile::Encoding {
907                        location: Some(pbfile::encoding::Location::Direct(DirectEncoding {
908                            encoding: encoded_encoding,
909                        })),
910                    }),
911                    length: page_info.num_rows,
912                    priority: page_info.priority,
913                })
914            })
915            .collect::<Result<Vec<_>>>()?;
916        let (buffer_offsets, buffer_sizes): (Vec<_>, Vec<_>) =
917            col.buffer_offsets_and_sizes.iter().cloned().unzip();
918        let encoded_col_encoding = Any::from_msg(&col.encoding)?.encode_to_vec();
919        let column = pbfile::ColumnMetadata {
920            pages,
921            buffer_offsets,
922            buffer_sizes,
923            encoding: Some(pbfile::Encoding {
924                location: Some(pbfile::encoding::Location::Direct(pbfile::DirectEncoding {
925                    encoding: encoded_col_encoding,
926                })),
927            }),
928        };
929        let column_bytes = column.encode_to_vec();
930        col_metadata_positions.push((position, column_bytes.len() as u64));
931        data.put(column_bytes.as_slice());
932    }
933    // Write column metadata offsets table
934    let cmo_table_start = data.len() as u64;
935    for (meta_pos, meta_len) in col_metadata_positions {
936        data.put_u64_le(meta_pos);
937        data.put_u64_le(meta_len);
938    }
939    // Write global buffers offsets table
940    let gbo_table_start = data.len() as u64;
941    let num_global_buffers = global_buffers.len() as u32;
942    for (gbo_pos, gbo_len) in global_buffers {
943        data.put_u64_le(gbo_pos);
944        data.put_u64_le(gbo_len);
945    }
946
947    let (major, minor) = version.to_numbers();
948
949    // write the footer
950    data.put_u64_le(col_metadata_start);
951    data.put_u64_le(cmo_table_start);
952    data.put_u64_le(gbo_table_start);
953    data.put_u32_le(num_global_buffers);
954    data.put_u32_le(batch.page_table.len() as u32);
955    data.put_u16_le(major as u16);
956    data.put_u16_le(minor as u16);
957    data.put(MAGIC.as_slice());
958
959    Ok(data.freeze())
960}
961
962impl EncodedBatchWriteExt for EncodedBatch {
963    fn try_to_self_described_lance(&self, version: LanceFileVersion) -> Result<Bytes> {
964        concat_lance_footer(self, true, version)
965    }
966
967    fn try_to_mini_lance(&self, version: LanceFileVersion) -> Result<Bytes> {
968        concat_lance_footer(self, false, version)
969    }
970}
971
972#[cfg(test)]
973mod tests {
974    use std::collections::HashMap;
975    use std::sync::Arc;
976
977    use crate::reader::{FileReader, FileReaderOptions, describe_encoding};
978    use crate::testing::FsFixture;
979    use crate::writer::{ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, FileWriter, FileWriterOptions};
980    use arrow_array::builder::{Float32Builder, Int32Builder};
981    use arrow_array::{Int32Array, RecordBatch, UInt64Array};
982    use arrow_array::{RecordBatchReader, StringArray, types::Float64Type};
983    use arrow_schema::{DataType, Field, Field as ArrowField, Schema, Schema as ArrowSchema};
984    use lance_core::cache::LanceCache;
985    use lance_core::datatypes::Schema as LanceSchema;
986    use lance_core::utils::tempfile::TempObjFile;
987    use lance_datagen::{BatchCount, RowCount, array, gen_batch};
988    use lance_encoding::compression_config::{CompressionFieldParams, CompressionParams};
989    use lance_encoding::decoder::DecoderPlugins;
990    use lance_encoding::version::LanceFileVersion;
991    use lance_io::object_store::ObjectStore;
992    use lance_io::utils::CachedFileSize;
993
994    #[tokio::test]
995    async fn test_basic_write() {
996        let tmp_path = TempObjFile::default();
997        let obj_store = Arc::new(ObjectStore::local());
998
999        let reader = gen_batch()
1000            .col("score", array::rand::<Float64Type>())
1001            .into_reader_rows(RowCount::from(1000), BatchCount::from(10));
1002
1003        let writer = obj_store.create(&tmp_path).await.unwrap();
1004
1005        let lance_schema =
1006            lance_core::datatypes::Schema::try_from(reader.schema().as_ref()).unwrap();
1007
1008        let mut file_writer =
1009            FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
1010
1011        for batch in reader {
1012            file_writer.write_batch(&batch.unwrap()).await.unwrap();
1013        }
1014        file_writer.add_schema_metadata("foo", "bar");
1015        file_writer.finish().await.unwrap();
1016        // Tests asserting the contents of the written file are in reader.rs
1017    }
1018
1019    #[tokio::test]
1020    async fn test_write_empty() {
1021        let tmp_path = TempObjFile::default();
1022        let obj_store = Arc::new(ObjectStore::local());
1023
1024        let reader = gen_batch()
1025            .col("score", array::rand::<Float64Type>())
1026            .into_reader_rows(RowCount::from(0), BatchCount::from(0));
1027
1028        let writer = obj_store.create(&tmp_path).await.unwrap();
1029
1030        let lance_schema =
1031            lance_core::datatypes::Schema::try_from(reader.schema().as_ref()).unwrap();
1032
1033        let mut file_writer =
1034            FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
1035
1036        for batch in reader {
1037            file_writer.write_batch(&batch.unwrap()).await.unwrap();
1038        }
1039        file_writer.add_schema_metadata("foo", "bar");
1040        file_writer.finish().await.unwrap();
1041    }
1042
1043    #[tokio::test]
1044    async fn test_max_page_bytes_enforced() {
1045        let arrow_field = Field::new("data", DataType::UInt64, false);
1046        let arrow_schema = Schema::new(vec![arrow_field]);
1047        let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();
1048
1049        // 8MiB
1050        let data: Vec<u64> = (0..1_000_000).collect();
1051        let array = UInt64Array::from(data);
1052        let batch =
1053            RecordBatch::try_new(arrow_schema.clone().into(), vec![Arc::new(array)]).unwrap();
1054
1055        let options = FileWriterOptions {
1056            max_page_bytes: Some(1024 * 1024), // 1MB
1057            // This is a 2.0 only test because 2.1+ splits large pages on read instead of write
1058            format_version: Some(LanceFileVersion::V2_0),
1059            ..Default::default()
1060        };
1061
1062        let path = TempObjFile::default();
1063        let object_store = ObjectStore::local();
1064        let mut writer = FileWriter::try_new(
1065            object_store.create(&path).await.unwrap(),
1066            lance_schema,
1067            options,
1068        )
1069        .unwrap();
1070
1071        writer.write_batch(&batch).await.unwrap();
1072        writer.finish().await.unwrap();
1073
1074        let fs = FsFixture::default();
1075        let file_scheduler = fs
1076            .scheduler
1077            .open_file(&path, &CachedFileSize::unknown())
1078            .await
1079            .unwrap();
1080        let file_reader = FileReader::try_open(
1081            file_scheduler,
1082            None,
1083            Arc::<DecoderPlugins>::default(),
1084            &LanceCache::no_cache(),
1085            FileReaderOptions::default(),
1086        )
1087        .await
1088        .unwrap();
1089
1090        let column_meta = file_reader.metadata();
1091
1092        let mut total_page_num: u32 = 0;
1093        for (col_idx, col_metadata) in column_meta.column_metadatas.iter().enumerate() {
1094            assert!(
1095                !col_metadata.pages.is_empty(),
1096                "Column {} has no pages",
1097                col_idx
1098            );
1099
1100            for (page_idx, page) in col_metadata.pages.iter().enumerate() {
1101                total_page_num += 1;
1102                let total_size: u64 = page.buffer_sizes.iter().sum();
1103                assert!(
1104                    total_size <= 1024 * 1024,
1105                    "Column {} Page {} size {} exceeds 1MB limit",
1106                    col_idx,
1107                    page_idx,
1108                    total_size
1109                );
1110            }
1111        }
1112
1113        assert_eq!(total_page_num, 8)
1114    }
1115
1116    #[tokio::test(flavor = "current_thread")]
1117    async fn test_max_page_bytes_env_var() {
1118        let arrow_field = Field::new("data", DataType::UInt64, false);
1119        let arrow_schema = Schema::new(vec![arrow_field]);
1120        let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();
1121        // 4MiB
1122        let data: Vec<u64> = (0..500_000).collect();
1123        let array = UInt64Array::from(data);
1124        let batch =
1125            RecordBatch::try_new(arrow_schema.clone().into(), vec![Arc::new(array)]).unwrap();
1126
1127        // 2MiB
1128        unsafe {
1129            std::env::set_var(ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, "2097152");
1130        }
1131
1132        let options = FileWriterOptions {
1133            max_page_bytes: None, // enforce env
1134            ..Default::default()
1135        };
1136
1137        let path = TempObjFile::default();
1138        let object_store = ObjectStore::local();
1139        let mut writer = FileWriter::try_new(
1140            object_store.create(&path).await.unwrap(),
1141            lance_schema.clone(),
1142            options,
1143        )
1144        .unwrap();
1145
1146        writer.write_batch(&batch).await.unwrap();
1147        writer.finish().await.unwrap();
1148
1149        let fs = FsFixture::default();
1150        let file_scheduler = fs
1151            .scheduler
1152            .open_file(&path, &CachedFileSize::unknown())
1153            .await
1154            .unwrap();
1155        let file_reader = FileReader::try_open(
1156            file_scheduler,
1157            None,
1158            Arc::<DecoderPlugins>::default(),
1159            &LanceCache::no_cache(),
1160            FileReaderOptions::default(),
1161        )
1162        .await
1163        .unwrap();
1164
1165        for col_metadata in file_reader.metadata().column_metadatas.iter() {
1166            for page in col_metadata.pages.iter() {
1167                let total_size: u64 = page.buffer_sizes.iter().sum();
1168                assert!(
1169                    total_size <= 2 * 1024 * 1024,
1170                    "Page size {} exceeds 2MB limit",
1171                    total_size
1172                );
1173            }
1174        }
1175
1176        unsafe {
1177            std::env::set_var(ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, "");
1178        }
1179    }
1180
1181    #[tokio::test]
1182    async fn test_compression_overrides_end_to_end() {
1183        // Create test schema with different column types
1184        let arrow_schema = Arc::new(ArrowSchema::new(vec![
1185            ArrowField::new("customer_id", DataType::Int32, false),
1186            ArrowField::new("product_id", DataType::Int32, false),
1187            ArrowField::new("quantity", DataType::Int32, false),
1188            ArrowField::new("price", DataType::Float32, false),
1189            ArrowField::new("description", DataType::Utf8, false),
1190        ]));
1191
1192        let lance_schema = LanceSchema::try_from(arrow_schema.as_ref()).unwrap();
1193
1194        // Create test data with patterns suitable for different compression
1195        let mut customer_ids = Int32Builder::new();
1196        let mut product_ids = Int32Builder::new();
1197        let mut quantities = Int32Builder::new();
1198        let mut prices = Float32Builder::new();
1199        let mut descriptions = Vec::new();
1200
1201        // Generate data with specific patterns:
1202        // - customer_id: highly repetitive (good for RLE)
1203        // - product_id: moderately repetitive (good for RLE)
1204        // - quantity: random values (not good for RLE)
1205        // - price: some repetition
1206        // - description: long strings (good for Zstd)
1207        for i in 0..10000 {
1208            // Customer ID repeats every 100 rows (100 unique customers)
1209            // This creates runs of 100 identical values
1210            customer_ids.append_value(i / 100);
1211
1212            // Product ID has only 5 unique values with long runs
1213            product_ids.append_value(i / 2000);
1214
1215            // Quantity is mostly 1 with occasional other values
1216            quantities.append_value(if i % 10 == 0 { 5 } else { 1 });
1217
1218            // Price has only 3 unique values
1219            prices.append_value(match i % 3 {
1220                0 => 9.99,
1221                1 => 19.99,
1222                _ => 29.99,
1223            });
1224
1225            // Descriptions are repetitive but we'll keep them simple
1226            descriptions.push(format!("Product {}", i / 2000));
1227        }
1228
1229        let batch = RecordBatch::try_new(
1230            arrow_schema.clone(),
1231            vec![
1232                Arc::new(customer_ids.finish()),
1233                Arc::new(product_ids.finish()),
1234                Arc::new(quantities.finish()),
1235                Arc::new(prices.finish()),
1236                Arc::new(StringArray::from(descriptions)),
1237            ],
1238        )
1239        .unwrap();
1240
1241        // Configure compression parameters
1242        let mut params = CompressionParams::new();
1243
1244        // RLE for ID columns (ends with _id)
1245        params.columns.insert(
1246            "*_id".to_string(),
1247            CompressionFieldParams {
1248                rle_threshold: Some(0.5), // Lower threshold to trigger RLE more easily
1249                compression: None,        // Will use default compression if any
1250                compression_level: None,
1251                bss: Some(lance_encoding::compression_config::BssMode::Off), // Explicitly disable BSS to ensure RLE is used
1252                minichunk_size: None,
1253            },
1254        );
1255
1256        // For now, we'll skip Zstd compression since it's not imported
1257        // In a real implementation, you could add other compression types here
1258
1259        // Build encoding strategy with compression parameters
1260        let encoding_strategy = lance_encoding::encoder::default_encoding_strategy_with_params(
1261            LanceFileVersion::V2_1,
1262            params,
1263        )
1264        .unwrap();
1265
1266        // Configure file writer options
1267        let options = FileWriterOptions {
1268            encoding_strategy: Some(Arc::from(encoding_strategy)),
1269            format_version: Some(LanceFileVersion::V2_1),
1270            max_page_bytes: Some(64 * 1024), // 64KB pages
1271            ..Default::default()
1272        };
1273
1274        // Write the file
1275        let path = TempObjFile::default();
1276        let object_store = ObjectStore::local();
1277
1278        let mut writer = FileWriter::try_new(
1279            object_store.create(&path).await.unwrap(),
1280            lance_schema.clone(),
1281            options,
1282        )
1283        .unwrap();
1284
1285        writer.write_batch(&batch).await.unwrap();
1286        writer.add_schema_metadata("compression_test", "configured_compression");
1287        writer.finish().await.unwrap();
1288
1289        // Now write the same data without compression overrides for comparison
1290        let path_no_compression = TempObjFile::default();
1291        let default_options = FileWriterOptions {
1292            format_version: Some(LanceFileVersion::V2_1),
1293            max_page_bytes: Some(64 * 1024),
1294            ..Default::default()
1295        };
1296
1297        let mut writer_no_compression = FileWriter::try_new(
1298            object_store.create(&path_no_compression).await.unwrap(),
1299            lance_schema.clone(),
1300            default_options,
1301        )
1302        .unwrap();
1303
1304        writer_no_compression.write_batch(&batch).await.unwrap();
1305        writer_no_compression.finish().await.unwrap();
1306
1307        // Note: With our current data patterns and RLE compression, the compressed file
1308        // might actually be slightly larger due to compression metadata overhead.
1309        // This is expected and the test is mainly to verify the system works end-to-end.
1310
1311        // Read back the compressed file and verify data integrity
1312        let fs = FsFixture::default();
1313        let file_scheduler = fs
1314            .scheduler
1315            .open_file(&path, &CachedFileSize::unknown())
1316            .await
1317            .unwrap();
1318
1319        let file_reader = FileReader::try_open(
1320            file_scheduler,
1321            None,
1322            Arc::<DecoderPlugins>::default(),
1323            &LanceCache::no_cache(),
1324            FileReaderOptions::default(),
1325        )
1326        .await
1327        .unwrap();
1328
1329        // Verify metadata
1330        let metadata = file_reader.metadata();
1331        assert_eq!(metadata.major_version, 2);
1332        assert_eq!(metadata.minor_version, 1);
1333
1334        let schema = file_reader.schema();
1335        assert_eq!(
1336            schema.metadata.get("compression_test"),
1337            Some(&"configured_compression".to_string())
1338        );
1339
1340        // Verify the actual encodings used
1341        let column_metadatas = &metadata.column_metadatas;
1342
1343        // Check customer_id column (index 0) - should use RLE due to our configuration
1344        assert!(!column_metadatas[0].pages.is_empty());
1345        let customer_id_encoding = describe_encoding(&column_metadatas[0].pages[0]);
1346        assert!(
1347            customer_id_encoding.contains("RLE") || customer_id_encoding.contains("Rle"),
1348            "customer_id column should use RLE encoding due to '*_id' pattern match, but got: {}",
1349            customer_id_encoding
1350        );
1351
1352        // Check product_id column (index 1) - should use RLE due to our configuration
1353        assert!(!column_metadatas[1].pages.is_empty());
1354        let product_id_encoding = describe_encoding(&column_metadatas[1].pages[0]);
1355        assert!(
1356            product_id_encoding.contains("RLE") || product_id_encoding.contains("Rle"),
1357            "product_id column should use RLE encoding due to '*_id' pattern match, but got: {}",
1358            product_id_encoding
1359        );
1360    }
1361
1362    #[tokio::test]
1363    async fn test_field_metadata_compression() {
1364        // Test that field metadata compression settings are respected
1365        let mut metadata = HashMap::new();
1366        metadata.insert(
1367            lance_encoding::constants::COMPRESSION_META_KEY.to_string(),
1368            "zstd".to_string(),
1369        );
1370        metadata.insert(
1371            lance_encoding::constants::COMPRESSION_LEVEL_META_KEY.to_string(),
1372            "6".to_string(),
1373        );
1374
1375        let arrow_schema = Arc::new(ArrowSchema::new(vec![
1376            ArrowField::new("id", DataType::Int32, false),
1377            ArrowField::new("text", DataType::Utf8, false).with_metadata(metadata.clone()),
1378            ArrowField::new("data", DataType::Int32, false).with_metadata(HashMap::from([(
1379                lance_encoding::constants::COMPRESSION_META_KEY.to_string(),
1380                "none".to_string(),
1381            )])),
1382        ]));
1383
1384        let lance_schema = LanceSchema::try_from(arrow_schema.as_ref()).unwrap();
1385
1386        // Create test data
1387        let id_array = Int32Array::from_iter_values(0..1000);
1388        let text_array = StringArray::from_iter_values(
1389            (0..1000).map(|i| format!("test string {} repeated text", i)),
1390        );
1391        let data_array = Int32Array::from_iter_values((0..1000).map(|i| i * 2));
1392
1393        let batch = RecordBatch::try_new(
1394            arrow_schema.clone(),
1395            vec![
1396                Arc::new(id_array),
1397                Arc::new(text_array),
1398                Arc::new(data_array),
1399            ],
1400        )
1401        .unwrap();
1402
1403        let path = TempObjFile::default();
1404        let object_store = ObjectStore::local();
1405
1406        // Create encoding strategy that will read from field metadata
1407        let params = CompressionParams::new();
1408        let encoding_strategy = lance_encoding::encoder::default_encoding_strategy_with_params(
1409            LanceFileVersion::V2_1,
1410            params,
1411        )
1412        .unwrap();
1413
1414        let options = FileWriterOptions {
1415            encoding_strategy: Some(Arc::from(encoding_strategy)),
1416            format_version: Some(LanceFileVersion::V2_1),
1417            ..Default::default()
1418        };
1419        let mut writer = FileWriter::try_new(
1420            object_store.create(&path).await.unwrap(),
1421            lance_schema.clone(),
1422            options,
1423        )
1424        .unwrap();
1425
1426        writer.write_batch(&batch).await.unwrap();
1427        writer.finish().await.unwrap();
1428
1429        // Read back metadata
1430        let fs = FsFixture::default();
1431        let file_scheduler = fs
1432            .scheduler
1433            .open_file(&path, &CachedFileSize::unknown())
1434            .await
1435            .unwrap();
1436        let file_reader = FileReader::try_open(
1437            file_scheduler,
1438            None,
1439            Arc::<DecoderPlugins>::default(),
1440            &LanceCache::no_cache(),
1441            FileReaderOptions::default(),
1442        )
1443        .await
1444        .unwrap();
1445
1446        let column_metadatas = &file_reader.metadata().column_metadatas;
1447
1448        // The text column (index 1) should use zstd compression based on metadata
1449        let text_encoding = describe_encoding(&column_metadatas[1].pages[0]);
1450        // For string columns, we expect Binary encoding with zstd compression
1451        assert!(
1452            text_encoding.contains("Zstd"),
1453            "text column should use zstd compression from field metadata, but got: {}",
1454            text_encoding
1455        );
1456
1457        // The data column (index 2) should use no compression based on metadata
1458        let data_encoding = describe_encoding(&column_metadatas[2].pages[0]);
1459        // For Int32 columns with "none" compression, we expect Flat encoding without compression
1460        assert!(
1461            data_encoding.contains("Flat") && data_encoding.contains("compression: None"),
1462            "data column should use no compression from field metadata, but got: {}",
1463            data_encoding
1464        );
1465    }
1466
1467    #[tokio::test]
1468    async fn test_field_metadata_rle_threshold() {
1469        // Test that RLE threshold from field metadata is respected
1470        let mut metadata = HashMap::new();
1471        metadata.insert(
1472            lance_encoding::constants::RLE_THRESHOLD_META_KEY.to_string(),
1473            "0.9".to_string(),
1474        );
1475        // Also set compression to ensure RLE is used
1476        metadata.insert(
1477            lance_encoding::constants::COMPRESSION_META_KEY.to_string(),
1478            "lz4".to_string(),
1479        );
1480        // Explicitly disable BSS to ensure RLE is tested
1481        metadata.insert(
1482            lance_encoding::constants::BSS_META_KEY.to_string(),
1483            "off".to_string(),
1484        );
1485
1486        let arrow_schema = Arc::new(ArrowSchema::new(vec![
1487            ArrowField::new("status", DataType::Int32, false).with_metadata(metadata),
1488        ]));
1489
1490        let lance_schema = LanceSchema::try_from(arrow_schema.as_ref()).unwrap();
1491
1492        // Create data with very high repetition (3 runs for 10000 values = 0.0003 ratio)
1493        let status_array = Int32Array::from_iter_values(
1494            std::iter::repeat_n(200, 8000)
1495                .chain(std::iter::repeat_n(404, 1500))
1496                .chain(std::iter::repeat_n(500, 500)),
1497        );
1498
1499        let batch =
1500            RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(status_array)]).unwrap();
1501
1502        let path = TempObjFile::default();
1503        let object_store = ObjectStore::local();
1504
1505        // Create encoding strategy that will read from field metadata
1506        let params = CompressionParams::new();
1507        let encoding_strategy = lance_encoding::encoder::default_encoding_strategy_with_params(
1508            LanceFileVersion::V2_1,
1509            params,
1510        )
1511        .unwrap();
1512
1513        let options = FileWriterOptions {
1514            encoding_strategy: Some(Arc::from(encoding_strategy)),
1515            format_version: Some(LanceFileVersion::V2_1),
1516            ..Default::default()
1517        };
1518        let mut writer = FileWriter::try_new(
1519            object_store.create(&path).await.unwrap(),
1520            lance_schema.clone(),
1521            options,
1522        )
1523        .unwrap();
1524
1525        writer.write_batch(&batch).await.unwrap();
1526        writer.finish().await.unwrap();
1527
1528        // Read back and check encoding
1529        let fs = FsFixture::default();
1530        let file_scheduler = fs
1531            .scheduler
1532            .open_file(&path, &CachedFileSize::unknown())
1533            .await
1534            .unwrap();
1535        let file_reader = FileReader::try_open(
1536            file_scheduler,
1537            None,
1538            Arc::<DecoderPlugins>::default(),
1539            &LanceCache::no_cache(),
1540            FileReaderOptions::default(),
1541        )
1542        .await
1543        .unwrap();
1544
1545        let column_metadatas = &file_reader.metadata().column_metadatas;
1546        let status_encoding = describe_encoding(&column_metadatas[0].pages[0]);
1547        assert!(
1548            status_encoding.contains("RLE") || status_encoding.contains("Rle"),
1549            "status column should use RLE encoding due to metadata threshold, but got: {}",
1550            status_encoding
1551        );
1552    }
1553
1554    #[tokio::test]
1555    async fn test_large_page_split_on_read() {
1556        use arrow_array::Array;
1557        use futures::TryStreamExt;
1558        use lance_encoding::decoder::FilterExpression;
1559        use lance_io::ReadBatchParams;
1560
1561        // Test that large pages written with relaxed limits can be split during read
1562
1563        let arrow_field = ArrowField::new("data", DataType::Binary, false);
1564        let arrow_schema = ArrowSchema::new(vec![arrow_field]);
1565        let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();
1566
1567        // Create a large binary value (40MB) to trigger large page creation
1568        let large_value = vec![42u8; 40 * 1024 * 1024];
1569        let array = arrow_array::BinaryArray::from(vec![
1570            Some(large_value.as_slice()),
1571            Some(b"small value"),
1572        ]);
1573        let batch = RecordBatch::try_new(Arc::new(arrow_schema), vec![Arc::new(array)]).unwrap();
1574
1575        // Write with relaxed page size limit (128MB)
1576        let options = FileWriterOptions {
1577            max_page_bytes: Some(128 * 1024 * 1024),
1578            format_version: Some(LanceFileVersion::V2_1),
1579            ..Default::default()
1580        };
1581
1582        let fs = FsFixture::default();
1583        let path = fs.tmp_path;
1584
1585        let mut writer = FileWriter::try_new(
1586            fs.object_store.create(&path).await.unwrap(),
1587            lance_schema.clone(),
1588            options,
1589        )
1590        .unwrap();
1591
1592        writer.write_batch(&batch).await.unwrap();
1593        let write_summary = writer.finish().await.unwrap();
1594        assert_eq!(write_summary.num_rows, 2);
1595        assert_eq!(
1596            write_summary.size_bytes,
1597            fs.object_store.size(&path).await.unwrap()
1598        );
1599
1600        // Read back with split configuration
1601        let file_scheduler = fs
1602            .scheduler
1603            .open_file(&path, &CachedFileSize::unknown())
1604            .await
1605            .unwrap();
1606
1607        // Configure reader to split pages larger than 10MB into chunks
1608        let reader_options = FileReaderOptions {
1609            read_chunk_size: 10 * 1024 * 1024, // 10MB chunks
1610            ..Default::default()
1611        };
1612
1613        let file_reader = FileReader::try_open(
1614            file_scheduler,
1615            None,
1616            Arc::<DecoderPlugins>::default(),
1617            &LanceCache::no_cache(),
1618            reader_options,
1619        )
1620        .await
1621        .unwrap();
1622
1623        // Read the data back
1624        let stream = file_reader
1625            .read_stream(
1626                ReadBatchParams::RangeFull,
1627                1024,
1628                10, // batch_readahead
1629                FilterExpression::no_filter(),
1630            )
1631            .await
1632            .unwrap();
1633
1634        let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1635        assert_eq!(batches.len(), 1);
1636
1637        // Verify the data is correctly read despite splitting
1638        let read_array = batches[0].column(0);
1639        let read_binary = read_array
1640            .as_any()
1641            .downcast_ref::<arrow_array::BinaryArray>()
1642            .unwrap();
1643
1644        assert_eq!(read_binary.len(), 2);
1645        assert_eq!(read_binary.value(0).len(), 40 * 1024 * 1024);
1646        assert_eq!(read_binary.value(1), b"small value");
1647
1648        // Verify first value matches what we wrote
1649        assert!(read_binary.value(0).iter().all(|&b| b == 42u8));
1650    }
1651
1652    fn spill_config() -> (TempObjFile, Arc<ObjectStore>) {
1653        let spill_path = TempObjFile::default();
1654        (spill_path, Arc::new(ObjectStore::local()))
1655    }
1656
1657    fn make_batches(num_batches: i32, num_cols: usize, rows_per_batch: i32) -> Vec<RecordBatch> {
1658        let fields: Vec<_> = (0..num_cols)
1659            .map(|c| ArrowField::new(format!("c{c}"), DataType::Int32, false))
1660            .collect();
1661        let schema = Arc::new(ArrowSchema::new(fields));
1662        (0..num_batches)
1663            .map(|i| {
1664                let cols: Vec<Arc<dyn arrow_array::Array>> = (0..num_cols)
1665                    .map(|c| {
1666                        let start = (i * rows_per_batch + c as i32) * 100;
1667                        Arc::new(Int32Array::from_iter_values(start..start + rows_per_batch))
1668                            as Arc<dyn arrow_array::Array>
1669                    })
1670                    .collect();
1671                RecordBatch::try_new(schema.clone(), cols).unwrap()
1672            })
1673            .collect()
1674    }
1675
1676    async fn write_and_read_batches(
1677        batches: &[RecordBatch],
1678        spill: Option<(Arc<ObjectStore>, object_store::path::Path)>,
1679    ) -> Vec<RecordBatch> {
1680        let fs = FsFixture::default();
1681        let lance_schema = LanceSchema::try_from(batches[0].schema().as_ref()).unwrap();
1682        let writer = fs.object_store.create(&fs.tmp_path).await.unwrap();
1683        let mut file_writer =
1684            FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
1685        if let Some((store, path)) = spill {
1686            file_writer = file_writer.with_page_metadata_spill(store, path);
1687        }
1688        for batch in batches {
1689            file_writer.write_batch(batch).await.unwrap();
1690        }
1691        file_writer.add_schema_metadata("foo", "bar");
1692        file_writer.finish().await.unwrap();
1693
1694        crate::testing::read_lance_file(
1695            &fs,
1696            Arc::<DecoderPlugins>::default(),
1697            lance_encoding::decoder::FilterExpression::no_filter(),
1698        )
1699        .await
1700    }
1701
1702    #[rstest::rstest]
1703    #[case::multi_col(20, 2, 100)]
1704    #[case::many_batches(50, 2, 100)]
1705    #[tokio::test]
1706    async fn test_page_metadata_spill_roundtrip(
1707        #[case] num_batches: i32,
1708        #[case] num_cols: usize,
1709        #[case] rows_per_batch: i32,
1710    ) {
1711        let batches = make_batches(num_batches, num_cols, rows_per_batch);
1712        let baseline = write_and_read_batches(&batches, None).await;
1713        let (spill_path, spill_store) = spill_config();
1714        let spilled =
1715            write_and_read_batches(&batches, Some((spill_store, spill_path.as_ref().clone())))
1716                .await;
1717        assert_eq!(baseline, spilled);
1718    }
1719
1720    #[tokio::test]
1721    async fn test_page_metadata_spill_many_columns() {
1722        // Many columns forces small per-column buffer limits, exercising mid-write flushing.
1723        let batches = make_batches(10, 500, 100);
1724        let baseline = write_and_read_batches(&batches, None).await;
1725        let (spill_path, spill_store) = spill_config();
1726        let spilled =
1727            write_and_read_batches(&batches, Some((spill_store, spill_path.as_ref().clone())))
1728                .await;
1729        assert_eq!(baseline, spilled);
1730    }
1731}