lance_file/v2/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use core::panic;
5use std::collections::HashMap;
6use std::sync::atomic::AtomicBool;
7use std::sync::Arc;
8
9use arrow_array::RecordBatch;
10
11use arrow_data::ArrayData;
12use bytes::{BufMut, Bytes, BytesMut};
13use futures::stream::FuturesOrdered;
14use futures::StreamExt;
15use lance_core::datatypes::{Field, Schema as LanceSchema};
16use lance_core::utils::bit::pad_bytes;
17use lance_core::{Error, Result};
18use lance_encoding::decoder::PageEncoding;
19use lance_encoding::encoder::{
20    default_encoding_strategy, BatchEncoder, EncodeTask, EncodedBatch, EncodedPage,
21    EncodingOptions, FieldEncoder, FieldEncodingStrategy, OutOfLineBuffers,
22};
23use lance_encoding::repdef::RepDefBuilder;
24use lance_encoding::version::LanceFileVersion;
25use lance_io::object_store::ObjectStore;
26use lance_io::object_writer::ObjectWriter;
27use lance_io::traits::Writer;
28use log::{debug, warn};
29use object_store::path::Path;
30use prost::Message;
31use prost_types::Any;
32use snafu::location;
33use tokio::io::AsyncWriteExt;
34use tracing::instrument;
35
36use crate::datatypes::FieldsWithMeta;
37use crate::format::pb;
38use crate::format::pbfile;
39use crate::format::pbfile::DirectEncoding;
40use crate::format::MAGIC;
41
42/// Pages buffers are aligned to 64 bytes
43pub(crate) const PAGE_BUFFER_ALIGNMENT: usize = 64;
44const PAD_BUFFER: [u8; PAGE_BUFFER_ALIGNMENT] = [72; PAGE_BUFFER_ALIGNMENT];
45// In 2.1+, we split large pages on read instead of write to avoid empty pages
46// and small pages issues. However, we keep the write-time limit at 32MB to avoid
47// potential regressions in 2.0 format readers.
48//
49// This limit is not applied in the 2.1 writer
50const MAX_PAGE_BYTES: usize = 32 * 1024 * 1024;
51const ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES: &str = "LANCE_FILE_WRITER_MAX_PAGE_BYTES";
52
53#[derive(Debug, Clone, Default)]
54pub struct FileWriterOptions {
55    /// How many bytes to use for buffering column data
56    ///
57    /// When data comes in small batches the writer will buffer column data so that
58    /// larger pages can be created.  This value will be divided evenly across all of the
59    /// columns.  Generally you want this to be at least large enough to match your
60    /// filesystem's ideal read size per column.
61    ///
62    /// In some cases you might want this value to be even larger if you have highly
63    /// compressible data.  However, if this is too large, then the writer could require
64    /// a lot of memory and write performance may suffer if the CPU-expensive encoding
65    /// falls behind and can't be interleaved with the I/O expensive flushing.
66    ///
67    /// The default will use 8MiB per column which should be reasonable for most cases.
68    // TODO: Do we need to be able to set this on a per-column basis?
69    pub data_cache_bytes: Option<u64>,
70    /// A hint to indicate the max size of a page
71    ///
72    /// This hint can't always be respected.  A single value could be larger than this value
73    /// and we never slice single values.  In addition, there are some cases where it can be
74    /// difficult to know size up-front and so we might not be able to respect this value.
75    pub max_page_bytes: Option<u64>,
76    /// The file writer buffers columns until enough data has arrived to flush a page
77    /// to disk.
78    ///
79    /// Some columns with small data types may not flush very often.  These arrays can
80    /// stick around for a long time.  These arrays might also be keeping larger data
81    /// structures alive.  By default, the writer will make a deep copy of this array
82    /// to avoid any potential memory leaks.  However, this can be disabled for a
83    /// (probably minor) performance boost if you are sure that arrays are not keeping
84    /// any sibling structures alive (this typically means the array was allocated in
85    /// the same language / runtime as the writer)
86    ///
87    /// Do not enable this if your data is arriving from the C data interface.
88    /// Data typically arrives one "batch" at a time (encoded in the C data interface
89    /// as a struct array).  Each array in that batch keeps the entire batch alive.
90    /// This means a small boolean array (which we will buffer in memory for quite a
91    /// while) might keep a much larger record batch around in memory (even though most
92    /// of that batch's data has been written to disk)
93    pub keep_original_array: Option<bool>,
94    pub encoding_strategy: Option<Arc<dyn FieldEncodingStrategy>>,
95    /// The format version to use when writing the file
96    ///
97    /// This controls which encodings will be used when encoding the data.  Newer
98    /// versions may have more efficient encodings.  However, newer format versions will
99    /// require more up-to-date readers to read the data.
100    pub format_version: Option<LanceFileVersion>,
101}
102
103pub struct FileWriter {
104    writer: ObjectWriter,
105    schema: Option<LanceSchema>,
106    column_writers: Vec<Box<dyn FieldEncoder>>,
107    column_metadata: Vec<pbfile::ColumnMetadata>,
108    field_id_to_column_indices: Vec<(u32, u32)>,
109    num_columns: u32,
110    rows_written: u64,
111    global_buffers: Vec<(u64, u64)>,
112    schema_metadata: HashMap<String, String>,
113    options: FileWriterOptions,
114}
115
116fn initial_column_metadata() -> pbfile::ColumnMetadata {
117    pbfile::ColumnMetadata {
118        pages: Vec::new(),
119        buffer_offsets: Vec::new(),
120        buffer_sizes: Vec::new(),
121        encoding: None,
122    }
123}
124
125static WARNED_ON_UNSTABLE_API: AtomicBool = AtomicBool::new(false);
126
127impl FileWriter {
128    /// Create a new FileWriter with a desired output schema
129    pub fn try_new(
130        object_writer: ObjectWriter,
131        schema: LanceSchema,
132        options: FileWriterOptions,
133    ) -> Result<Self> {
134        let mut writer = Self::new_lazy(object_writer, options);
135        writer.initialize(schema)?;
136        Ok(writer)
137    }
138
139    /// Create a new FileWriter without a desired output schema
140    ///
141    /// The output schema will be set based on the first batch of data to arrive.
142    /// If no data arrives and the writer is finished then the write will fail.
143    pub fn new_lazy(object_writer: ObjectWriter, options: FileWriterOptions) -> Self {
144        if let Some(format_version) = options.format_version {
145            if format_version > LanceFileVersion::Stable
146                && WARNED_ON_UNSTABLE_API
147                    .compare_exchange(
148                        false,
149                        true,
150                        std::sync::atomic::Ordering::Relaxed,
151                        std::sync::atomic::Ordering::Relaxed,
152                    )
153                    .is_ok()
154            {
155                warn!("You have requested an unstable format version.  Files written with this format version may not be readable in the future!  This is a development feature and should only be used for experimentation and never for production data.");
156            }
157        }
158        Self {
159            writer: object_writer,
160            schema: None,
161            column_writers: Vec::new(),
162            column_metadata: Vec::new(),
163            num_columns: 0,
164            rows_written: 0,
165            field_id_to_column_indices: Vec::new(),
166            global_buffers: Vec::new(),
167            schema_metadata: HashMap::new(),
168            options,
169        }
170    }
171
172    /// Write a series of record batches to a new file
173    ///
174    /// Returns the number of rows written
175    pub async fn create_file_with_batches(
176        store: &ObjectStore,
177        path: &Path,
178        schema: lance_core::datatypes::Schema,
179        batches: impl Iterator<Item = RecordBatch> + Send,
180        options: FileWriterOptions,
181    ) -> Result<usize> {
182        let writer = store.create(path).await?;
183        let mut writer = Self::try_new(writer, schema, options)?;
184        for batch in batches {
185            writer.write_batch(&batch).await?;
186        }
187        Ok(writer.finish().await? as usize)
188    }
189
190    async fn do_write_buffer(writer: &mut ObjectWriter, buf: &[u8]) -> Result<()> {
191        writer.write_all(buf).await?;
192        let pad_bytes = pad_bytes::<PAGE_BUFFER_ALIGNMENT>(buf.len());
193        writer.write_all(&PAD_BUFFER[..pad_bytes]).await?;
194        Ok(())
195    }
196
197    /// Returns the format version that will be used when writing the file
198    pub fn version(&self) -> LanceFileVersion {
199        self.options.format_version.unwrap_or_default()
200    }
201
202    async fn write_page(&mut self, encoded_page: EncodedPage) -> Result<()> {
203        let buffers = encoded_page.data;
204        let mut buffer_offsets = Vec::with_capacity(buffers.len());
205        let mut buffer_sizes = Vec::with_capacity(buffers.len());
206        for buffer in buffers {
207            buffer_offsets.push(self.writer.tell().await? as u64);
208            buffer_sizes.push(buffer.len() as u64);
209            Self::do_write_buffer(&mut self.writer, &buffer).await?;
210        }
211        let encoded_encoding = match encoded_page.description {
212            PageEncoding::Legacy(array_encoding) => Any::from_msg(&array_encoding)?.encode_to_vec(),
213            PageEncoding::Structural(page_layout) => Any::from_msg(&page_layout)?.encode_to_vec(),
214        };
215        let page = pbfile::column_metadata::Page {
216            buffer_offsets,
217            buffer_sizes,
218            encoding: Some(pbfile::Encoding {
219                location: Some(pbfile::encoding::Location::Direct(DirectEncoding {
220                    encoding: encoded_encoding,
221                })),
222            }),
223            length: encoded_page.num_rows,
224            priority: encoded_page.row_number,
225        };
226        self.column_metadata[encoded_page.column_idx as usize]
227            .pages
228            .push(page);
229        Ok(())
230    }
231
232    #[instrument(skip_all, level = "debug")]
233    async fn write_pages(&mut self, mut encoding_tasks: FuturesOrdered<EncodeTask>) -> Result<()> {
234        // As soon as an encoding task is done we write it.  There is no parallelism
235        // needed here because "writing" is really just submitting the buffer to the
236        // underlying write scheduler (either the OS or object_store's scheduler for
237        // cloud writes).  The only time we might truly await on write_page is if the
238        // scheduler's write queue is full.
239        //
240        // Also, there is no point in trying to make write_page parallel anyways
241        // because we wouldn't want buffers getting mixed up across pages.
242        while let Some(encoding_task) = encoding_tasks.next().await {
243            let encoded_page = encoding_task?;
244            self.write_page(encoded_page).await?;
245        }
246        // It's important to flush here, we don't know when the next batch will arrive
247        // and the underlying cloud store could have writes in progress that won't advance
248        // until we interact with the writer again.  These in-progress writes will time out
249        // if we don't flush.
250        self.writer.flush().await?;
251        Ok(())
252    }
253
254    /// Schedule batches of data to be written to the file
255    pub async fn write_batches(
256        &mut self,
257        batches: impl Iterator<Item = &RecordBatch>,
258    ) -> Result<()> {
259        for batch in batches {
260            self.write_batch(batch).await?;
261        }
262        Ok(())
263    }
264
265    fn verify_field_nullability(arr: &ArrayData, field: &Field) -> Result<()> {
266        if !field.nullable && arr.null_count() > 0 {
267            return Err(Error::invalid_input(format!("The field `{}` contained null values even though the field is marked non-null in the schema", field.name), location!()));
268        }
269
270        for (child_field, child_arr) in field.children.iter().zip(arr.child_data()) {
271            Self::verify_field_nullability(child_arr, child_field)?;
272        }
273
274        Ok(())
275    }
276
277    fn verify_nullability_constraints(&self, batch: &RecordBatch) -> Result<()> {
278        for (col, field) in batch
279            .columns()
280            .iter()
281            .zip(self.schema.as_ref().unwrap().fields.iter())
282        {
283            Self::verify_field_nullability(&col.to_data(), field)?;
284        }
285        Ok(())
286    }
287
288    fn initialize(&mut self, mut schema: LanceSchema) -> Result<()> {
289        let cache_bytes_per_column = if let Some(data_cache_bytes) = self.options.data_cache_bytes {
290            data_cache_bytes / schema.fields.len() as u64
291        } else {
292            8 * 1024 * 1024
293        };
294
295        let max_page_bytes = self.options.max_page_bytes.unwrap_or_else(|| {
296            std::env::var(ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES)
297                .map(|s| {
298                    s.parse::<u64>().unwrap_or_else(|e| {
299                        warn!(
300                            "Failed to parse {}: {}, using default",
301                            ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, e
302                        );
303                        MAX_PAGE_BYTES as u64
304                    })
305                })
306                .unwrap_or(MAX_PAGE_BYTES as u64)
307        });
308
309        schema.validate()?;
310
311        let keep_original_array = self.options.keep_original_array.unwrap_or(false);
312        let encoding_strategy = self.options.encoding_strategy.clone().unwrap_or_else(|| {
313            let version = self.version();
314            default_encoding_strategy(version).into()
315        });
316
317        let encoding_options = EncodingOptions {
318            cache_bytes_per_column,
319            max_page_bytes,
320            keep_original_array,
321            buffer_alignment: PAGE_BUFFER_ALIGNMENT as u64,
322        };
323        let encoder =
324            BatchEncoder::try_new(&schema, encoding_strategy.as_ref(), &encoding_options)?;
325        self.num_columns = encoder.num_columns();
326
327        self.column_writers = encoder.field_encoders;
328        self.column_metadata = vec![initial_column_metadata(); self.num_columns as usize];
329        self.field_id_to_column_indices = encoder.field_id_to_column_index;
330        self.schema_metadata
331            .extend(std::mem::take(&mut schema.metadata));
332        self.schema = Some(schema);
333        Ok(())
334    }
335
336    fn ensure_initialized(&mut self, batch: &RecordBatch) -> Result<&LanceSchema> {
337        if self.schema.is_none() {
338            let schema = LanceSchema::try_from(batch.schema().as_ref())?;
339            self.initialize(schema)?;
340        }
341        Ok(self.schema.as_ref().unwrap())
342    }
343
344    #[instrument(skip_all, level = "debug")]
345    fn encode_batch(
346        &mut self,
347        batch: &RecordBatch,
348        external_buffers: &mut OutOfLineBuffers,
349    ) -> Result<Vec<Vec<EncodeTask>>> {
350        self.schema
351            .as_ref()
352            .unwrap()
353            .fields
354            .iter()
355            .zip(self.column_writers.iter_mut())
356            .map(|(field, column_writer)| {
357                let array = batch
358                    .column_by_name(&field.name)
359                    .ok_or(Error::InvalidInput {
360                        source: format!(
361                            "Cannot write batch.  The batch was missing the column `{}`",
362                            field.name
363                        )
364                        .into(),
365                        location: location!(),
366                    })?;
367                let repdef = RepDefBuilder::default();
368                let num_rows = array.len() as u64;
369                column_writer.maybe_encode(
370                    array.clone(),
371                    external_buffers,
372                    repdef,
373                    self.rows_written,
374                    num_rows,
375                )
376            })
377            .collect::<Result<Vec<_>>>()
378    }
379
380    /// Schedule a batch of data to be written to the file
381    ///
382    /// Note: the future returned by this method may complete before the data has been fully
383    /// flushed to the file (some data may be in the data cache or the I/O cache)
384    pub async fn write_batch(&mut self, batch: &RecordBatch) -> Result<()> {
385        debug!(
386            "write_batch called with {} bytes of data",
387            batch.get_array_memory_size()
388        );
389        self.ensure_initialized(batch)?;
390        self.verify_nullability_constraints(batch)?;
391        let num_rows = batch.num_rows() as u64;
392        if num_rows == 0 {
393            return Ok(());
394        }
395        if num_rows > u32::MAX as u64 {
396            return Err(Error::InvalidInput {
397                source: "cannot write Lance files with more than 2^32 rows".into(),
398                location: location!(),
399            });
400        }
401        // First we push each array into its column writer.  This may or may not generate enough
402        // data to trigger an encoding task.  We collect any encoding tasks into a queue.
403        let mut external_buffers =
404            OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
405        let encoding_tasks = self.encode_batch(batch, &mut external_buffers)?;
406        // Next, write external buffers
407        for external_buffer in external_buffers.take_buffers() {
408            Self::do_write_buffer(&mut self.writer, &external_buffer).await?;
409        }
410
411        let encoding_tasks = encoding_tasks
412            .into_iter()
413            .flatten()
414            .collect::<FuturesOrdered<_>>();
415
416        self.rows_written = match self.rows_written.checked_add(batch.num_rows() as u64) {
417            Some(rows_written) => rows_written,
418            None => {
419                return Err(Error::InvalidInput { source: format!("cannot write batch with {} rows because {} rows have already been written and Lance files cannot contain more than 2^64 rows", num_rows, self.rows_written).into(), location: location!() });
420            }
421        };
422
423        self.write_pages(encoding_tasks).await?;
424
425        Ok(())
426    }
427
428    async fn write_column_metadata(
429        &mut self,
430        metadata: pbfile::ColumnMetadata,
431    ) -> Result<(u64, u64)> {
432        let metadata_bytes = metadata.encode_to_vec();
433        let position = self.writer.tell().await? as u64;
434        let len = metadata_bytes.len() as u64;
435        self.writer.write_all(&metadata_bytes).await?;
436        Ok((position, len))
437    }
438
439    async fn write_column_metadatas(&mut self) -> Result<Vec<(u64, u64)>> {
440        let mut metadatas = Vec::new();
441        std::mem::swap(&mut self.column_metadata, &mut metadatas);
442        let mut metadata_positions = Vec::with_capacity(metadatas.len());
443        for metadata in metadatas {
444            metadata_positions.push(self.write_column_metadata(metadata).await?);
445        }
446        Ok(metadata_positions)
447    }
448
449    fn make_file_descriptor(
450        schema: &lance_core::datatypes::Schema,
451        num_rows: u64,
452    ) -> Result<pb::FileDescriptor> {
453        let fields_with_meta = FieldsWithMeta::from(schema);
454        Ok(pb::FileDescriptor {
455            schema: Some(pb::Schema {
456                fields: fields_with_meta.fields.0,
457                metadata: fields_with_meta.metadata,
458            }),
459            length: num_rows,
460        })
461    }
462
463    async fn write_global_buffers(&mut self) -> Result<Vec<(u64, u64)>> {
464        let schema = self.schema.as_mut().ok_or(Error::invalid_input("No schema provided on writer open and no data provided.  Schema is unknown and file cannot be created", location!()))?;
465        schema.metadata = std::mem::take(&mut self.schema_metadata);
466        let file_descriptor = Self::make_file_descriptor(schema, self.rows_written)?;
467        let file_descriptor_bytes = file_descriptor.encode_to_vec();
468        let file_descriptor_len = file_descriptor_bytes.len() as u64;
469        let file_descriptor_position = self.writer.tell().await? as u64;
470        self.writer.write_all(&file_descriptor_bytes).await?;
471        let mut gbo_table = Vec::with_capacity(1 + self.global_buffers.len());
472        gbo_table.push((file_descriptor_position, file_descriptor_len));
473        gbo_table.append(&mut self.global_buffers);
474        Ok(gbo_table)
475    }
476
477    /// Add a metadata entry to the schema
478    ///
479    /// This method is useful because sometimes the metadata is not known until after the
480    /// data has been written.  This method allows you to alter the schema metadata.  It
481    /// must be called before `finish` is called.
482    pub fn add_schema_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
483        self.schema_metadata.insert(key.into(), value.into());
484    }
485
486    /// Adds a global buffer to the file
487    ///
488    /// The global buffer can contain any arbitrary bytes.  It will be written to the disk
489    /// immediately.  This method returns the index of the global buffer (this will always
490    /// start at 1 and increment by 1 each time this method is called)
491    pub async fn add_global_buffer(&mut self, buffer: Bytes) -> Result<u32> {
492        let position = self.writer.tell().await? as u64;
493        let len = buffer.len() as u64;
494        Self::do_write_buffer(&mut self.writer, &buffer).await?;
495        self.global_buffers.push((position, len));
496        Ok(self.global_buffers.len() as u32)
497    }
498
499    async fn finish_writers(&mut self) -> Result<()> {
500        let mut col_idx = 0;
501        for mut writer in std::mem::take(&mut self.column_writers) {
502            let mut external_buffers =
503                OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
504            let columns = writer.finish(&mut external_buffers).await?;
505            for buffer in external_buffers.take_buffers() {
506                self.writer.write_all(&buffer).await?;
507            }
508            debug_assert_eq!(
509                columns.len(),
510                writer.num_columns() as usize,
511                "Expected {} columns from column at index {} and got {}",
512                writer.num_columns(),
513                col_idx,
514                columns.len()
515            );
516            for column in columns {
517                for page in column.final_pages {
518                    self.write_page(page).await?;
519                }
520                let column_metadata = &mut self.column_metadata[col_idx];
521                let mut buffer_pos = self.writer.tell().await? as u64;
522                for buffer in column.column_buffers {
523                    column_metadata.buffer_offsets.push(buffer_pos);
524                    let mut size = 0;
525                    Self::do_write_buffer(&mut self.writer, &buffer).await?;
526                    size += buffer.len() as u64;
527                    buffer_pos += size;
528                    column_metadata.buffer_sizes.push(size);
529                }
530                let encoded_encoding = Any::from_msg(&column.encoding)?.encode_to_vec();
531                column_metadata.encoding = Some(pbfile::Encoding {
532                    location: Some(pbfile::encoding::Location::Direct(pbfile::DirectEncoding {
533                        encoding: encoded_encoding,
534                    })),
535                });
536                col_idx += 1;
537            }
538        }
539        if col_idx != self.column_metadata.len() {
540            panic!(
541                "Column writers finished with {} columns but we expected {}",
542                col_idx,
543                self.column_metadata.len()
544            );
545        }
546        Ok(())
547    }
548
549    /// Converts self.version (which is a mix of "software version" and
550    /// "format version" into a format version)
551    fn version_to_numbers(&self) -> (u16, u16) {
552        let version = self.options.format_version.unwrap_or_default();
553        match version.resolve() {
554            LanceFileVersion::V2_0 => (0, 3),
555            LanceFileVersion::V2_1 => (2, 1),
556            LanceFileVersion::V2_2 => (2, 2),
557            _ => panic!("Unsupported version: {}", version),
558        }
559    }
560
561    /// Finishes writing the file
562    ///
563    /// This method will wait until all data has been flushed to the file.  Then it
564    /// will write the file metadata and the footer.  It will not return until all
565    /// data has been flushed and the file has been closed.
566    ///
567    /// Returns the total number of rows written
568    pub async fn finish(&mut self) -> Result<u64> {
569        // 1. flush any remaining data and write out those pages
570        let mut external_buffers =
571            OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
572        let encoding_tasks = self
573            .column_writers
574            .iter_mut()
575            .map(|writer| writer.flush(&mut external_buffers))
576            .collect::<Result<Vec<_>>>()?;
577        for external_buffer in external_buffers.take_buffers() {
578            Self::do_write_buffer(&mut self.writer, &external_buffer).await?;
579        }
580        let encoding_tasks = encoding_tasks
581            .into_iter()
582            .flatten()
583            .collect::<FuturesOrdered<_>>();
584        self.write_pages(encoding_tasks).await?;
585
586        self.finish_writers().await?;
587
588        // 3. write global buffers (we write the schema here)
589        let global_buffer_offsets = self.write_global_buffers().await?;
590        let num_global_buffers = global_buffer_offsets.len() as u32;
591
592        // 4. write the column metadatas
593        let column_metadata_start = self.writer.tell().await? as u64;
594        let metadata_positions = self.write_column_metadatas().await?;
595
596        // 5. write the column metadata offset table
597        let cmo_table_start = self.writer.tell().await? as u64;
598        for (meta_pos, meta_len) in metadata_positions {
599            self.writer.write_u64_le(meta_pos).await?;
600            self.writer.write_u64_le(meta_len).await?;
601        }
602
603        // 6. write global buffers offset table
604        let gbo_table_start = self.writer.tell().await? as u64;
605        for (gbo_pos, gbo_len) in global_buffer_offsets {
606            self.writer.write_u64_le(gbo_pos).await?;
607            self.writer.write_u64_le(gbo_len).await?;
608        }
609
610        let (major, minor) = self.version_to_numbers();
611        // 7. write the footer
612        self.writer.write_u64_le(column_metadata_start).await?;
613        self.writer.write_u64_le(cmo_table_start).await?;
614        self.writer.write_u64_le(gbo_table_start).await?;
615        self.writer.write_u32_le(num_global_buffers).await?;
616        self.writer.write_u32_le(self.num_columns).await?;
617        self.writer.write_u16_le(major).await?;
618        self.writer.write_u16_le(minor).await?;
619        self.writer.write_all(MAGIC).await?;
620
621        // 7. close the writer
622        self.writer.shutdown().await?;
623        Ok(self.rows_written)
624    }
625
626    pub async fn abort(&mut self) {
627        self.writer.abort().await;
628    }
629
630    pub async fn tell(&mut self) -> Result<u64> {
631        Ok(self.writer.tell().await? as u64)
632    }
633
634    pub fn field_id_to_column_indices(&self) -> &[(u32, u32)] {
635        &self.field_id_to_column_indices
636    }
637}
638
639/// Utility trait for converting EncodedBatch to Bytes using the
640/// lance file format
641pub trait EncodedBatchWriteExt {
642    /// Serializes into a lance file, including the schema
643    fn try_to_self_described_lance(&self, version: LanceFileVersion) -> Result<Bytes>;
644    /// Serializes into a lance file, without the schema.
645    ///
646    /// The schema must be provided to deserialize the buffer
647    fn try_to_mini_lance(&self, version: LanceFileVersion) -> Result<Bytes>;
648}
649
650// Creates a lance footer and appends it to the encoded data
651//
652// The logic here is very similar to logic in the FileWriter except we
653// are using BufMut (put_xyz) instead of AsyncWrite (write_xyz).
654fn concat_lance_footer(
655    batch: &EncodedBatch,
656    write_schema: bool,
657    version: LanceFileVersion,
658) -> Result<Bytes> {
659    // Estimating 1MiB for file footer
660    let mut data = BytesMut::with_capacity(batch.data.len() + 1024 * 1024);
661    data.put(batch.data.clone());
662    // write global buffers (we write the schema here)
663    let global_buffers = if write_schema {
664        let schema_start = data.len() as u64;
665        let lance_schema = lance_core::datatypes::Schema::try_from(batch.schema.as_ref())?;
666        let descriptor = FileWriter::make_file_descriptor(&lance_schema, batch.num_rows)?;
667        let descriptor_bytes = descriptor.encode_to_vec();
668        let descriptor_len = descriptor_bytes.len() as u64;
669        data.put(descriptor_bytes.as_slice());
670
671        vec![(schema_start, descriptor_len)]
672    } else {
673        vec![]
674    };
675    let col_metadata_start = data.len() as u64;
676
677    let mut col_metadata_positions = Vec::new();
678    // Write column metadata
679    for col in &batch.page_table {
680        let position = data.len() as u64;
681        let pages = col
682            .page_infos
683            .iter()
684            .map(|page_info| {
685                let encoded_encoding = match &page_info.encoding {
686                    PageEncoding::Legacy(array_encoding) => {
687                        Any::from_msg(array_encoding)?.encode_to_vec()
688                    }
689                    PageEncoding::Structural(page_layout) => {
690                        Any::from_msg(page_layout)?.encode_to_vec()
691                    }
692                };
693                let (buffer_offsets, buffer_sizes): (Vec<_>, Vec<_>) = page_info
694                    .buffer_offsets_and_sizes
695                    .as_ref()
696                    .iter()
697                    .cloned()
698                    .unzip();
699                Ok(pbfile::column_metadata::Page {
700                    buffer_offsets,
701                    buffer_sizes,
702                    encoding: Some(pbfile::Encoding {
703                        location: Some(pbfile::encoding::Location::Direct(DirectEncoding {
704                            encoding: encoded_encoding,
705                        })),
706                    }),
707                    length: page_info.num_rows,
708                    priority: page_info.priority,
709                })
710            })
711            .collect::<Result<Vec<_>>>()?;
712        let (buffer_offsets, buffer_sizes): (Vec<_>, Vec<_>) =
713            col.buffer_offsets_and_sizes.iter().cloned().unzip();
714        let encoded_col_encoding = Any::from_msg(&col.encoding)?.encode_to_vec();
715        let column = pbfile::ColumnMetadata {
716            pages,
717            buffer_offsets,
718            buffer_sizes,
719            encoding: Some(pbfile::Encoding {
720                location: Some(pbfile::encoding::Location::Direct(pbfile::DirectEncoding {
721                    encoding: encoded_col_encoding,
722                })),
723            }),
724        };
725        let column_bytes = column.encode_to_vec();
726        col_metadata_positions.push((position, column_bytes.len() as u64));
727        data.put(column_bytes.as_slice());
728    }
729    // Write column metadata offsets table
730    let cmo_table_start = data.len() as u64;
731    for (meta_pos, meta_len) in col_metadata_positions {
732        data.put_u64_le(meta_pos);
733        data.put_u64_le(meta_len);
734    }
735    // Write global buffers offsets table
736    let gbo_table_start = data.len() as u64;
737    let num_global_buffers = global_buffers.len() as u32;
738    for (gbo_pos, gbo_len) in global_buffers {
739        data.put_u64_le(gbo_pos);
740        data.put_u64_le(gbo_len);
741    }
742
743    let (major, minor) = version.to_numbers();
744
745    // write the footer
746    data.put_u64_le(col_metadata_start);
747    data.put_u64_le(cmo_table_start);
748    data.put_u64_le(gbo_table_start);
749    data.put_u32_le(num_global_buffers);
750    data.put_u32_le(batch.page_table.len() as u32);
751    data.put_u16_le(major as u16);
752    data.put_u16_le(minor as u16);
753    data.put(MAGIC.as_slice());
754
755    Ok(data.freeze())
756}
757
758impl EncodedBatchWriteExt for EncodedBatch {
759    fn try_to_self_described_lance(&self, version: LanceFileVersion) -> Result<Bytes> {
760        concat_lance_footer(self, true, version)
761    }
762
763    fn try_to_mini_lance(&self, version: LanceFileVersion) -> Result<Bytes> {
764        concat_lance_footer(self, false, version)
765    }
766}
767
768#[cfg(test)]
769mod tests {
770    use std::collections::HashMap;
771    use std::sync::Arc;
772
773    use crate::v2::reader::{describe_encoding, FileReader, FileReaderOptions};
774    use crate::v2::testing::FsFixture;
775    use crate::v2::writer::{FileWriter, FileWriterOptions, ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES};
776    use arrow_array::builder::{Float32Builder, Int32Builder};
777    use arrow_array::{types::Float64Type, RecordBatchReader, StringArray};
778    use arrow_array::{Int32Array, RecordBatch, UInt64Array};
779    use arrow_schema::{DataType, Field, Field as ArrowField, Schema, Schema as ArrowSchema};
780    use lance_core::cache::LanceCache;
781    use lance_core::datatypes::Schema as LanceSchema;
782    use lance_datagen::{array, gen_batch, BatchCount, RowCount};
783    use lance_encoding::compression_config::{CompressionFieldParams, CompressionParams};
784    use lance_encoding::decoder::DecoderPlugins;
785    use lance_encoding::version::LanceFileVersion;
786    use lance_io::object_store::ObjectStore;
787    use lance_io::utils::CachedFileSize;
788    use object_store::path::Path;
789    use tempfile::tempdir;
790
791    #[tokio::test]
792    async fn test_basic_write() {
793        let tmp_dir = tempfile::tempdir().unwrap();
794        let tmp_path: String = tmp_dir.path().to_str().unwrap().to_owned();
795        let tmp_path = Path::parse(tmp_path).unwrap();
796        let tmp_path = tmp_path.child("some_file.lance");
797        let obj_store = Arc::new(ObjectStore::local());
798
799        let reader = gen_batch()
800            .col("score", array::rand::<Float64Type>())
801            .into_reader_rows(RowCount::from(1000), BatchCount::from(10));
802
803        let writer = obj_store.create(&tmp_path).await.unwrap();
804
805        let lance_schema =
806            lance_core::datatypes::Schema::try_from(reader.schema().as_ref()).unwrap();
807
808        let mut file_writer =
809            FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
810
811        for batch in reader {
812            file_writer.write_batch(&batch.unwrap()).await.unwrap();
813        }
814        file_writer.add_schema_metadata("foo", "bar");
815        file_writer.finish().await.unwrap();
816        // Tests asserting the contents of the written file are in reader.rs
817    }
818
819    #[tokio::test]
820    async fn test_write_empty() {
821        let tmp_dir = tempfile::tempdir().unwrap();
822        let tmp_path: String = tmp_dir.path().to_str().unwrap().to_owned();
823        let tmp_path = Path::parse(tmp_path).unwrap();
824        let tmp_path = tmp_path.child("some_file.lance");
825        let obj_store = Arc::new(ObjectStore::local());
826
827        let reader = gen_batch()
828            .col("score", array::rand::<Float64Type>())
829            .into_reader_rows(RowCount::from(0), BatchCount::from(0));
830
831        let writer = obj_store.create(&tmp_path).await.unwrap();
832
833        let lance_schema =
834            lance_core::datatypes::Schema::try_from(reader.schema().as_ref()).unwrap();
835
836        let mut file_writer =
837            FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
838
839        for batch in reader {
840            file_writer.write_batch(&batch.unwrap()).await.unwrap();
841        }
842        file_writer.add_schema_metadata("foo", "bar");
843        file_writer.finish().await.unwrap();
844    }
845
846    #[tokio::test]
847    async fn test_max_page_bytes_enforced() {
848        let arrow_field = Field::new("data", DataType::UInt64, false);
849        let arrow_schema = Schema::new(vec![arrow_field]);
850        let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();
851
852        // 8MiB
853        let data: Vec<u64> = (0..1_000_000).collect();
854        let array = UInt64Array::from(data);
855        let batch =
856            RecordBatch::try_new(arrow_schema.clone().into(), vec![Arc::new(array)]).unwrap();
857
858        let options = FileWriterOptions {
859            max_page_bytes: Some(1024 * 1024), // 1MB
860            ..Default::default()
861        };
862
863        let tmp_dir = tempdir().unwrap();
864        let path = tmp_dir.path().join("test.lance");
865        let object_store = ObjectStore::local();
866        let mut writer = FileWriter::try_new(
867            object_store
868                .create(&Path::from(path.to_str().unwrap()))
869                .await
870                .unwrap(),
871            lance_schema,
872            options,
873        )
874        .unwrap();
875
876        writer.write_batch(&batch).await.unwrap();
877        writer.finish().await.unwrap();
878
879        let fs = FsFixture::default();
880        let file_scheduler = fs
881            .scheduler
882            .open_file(
883                &Path::from(path.to_str().unwrap()),
884                &CachedFileSize::unknown(),
885            )
886            .await
887            .unwrap();
888        let file_reader = FileReader::try_open(
889            file_scheduler,
890            None,
891            Arc::<DecoderPlugins>::default(),
892            &LanceCache::no_cache(),
893            FileReaderOptions::default(),
894        )
895        .await
896        .unwrap();
897
898        let column_meta = file_reader.metadata();
899
900        let mut total_page_num: u32 = 0;
901        for (col_idx, col_metadata) in column_meta.column_metadatas.iter().enumerate() {
902            assert!(
903                !col_metadata.pages.is_empty(),
904                "Column {} has no pages",
905                col_idx
906            );
907
908            for (page_idx, page) in col_metadata.pages.iter().enumerate() {
909                total_page_num += 1;
910                let total_size: u64 = page.buffer_sizes.iter().sum();
911                assert!(
912                    total_size <= 1024 * 1024,
913                    "Column {} Page {} size {} exceeds 1MB limit",
914                    col_idx,
915                    page_idx,
916                    total_size
917                );
918            }
919        }
920
921        assert_eq!(total_page_num, 8)
922    }
923
924    #[tokio::test(flavor = "current_thread")]
925    async fn test_max_page_bytes_env_var() {
926        let arrow_field = Field::new("data", DataType::UInt64, false);
927        let arrow_schema = Schema::new(vec![arrow_field]);
928        let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();
929        // 4MiB
930        let data: Vec<u64> = (0..500_000).collect();
931        let array = UInt64Array::from(data);
932        let batch =
933            RecordBatch::try_new(arrow_schema.clone().into(), vec![Arc::new(array)]).unwrap();
934
935        // 2MiB
936        std::env::set_var(ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, "2097152");
937
938        let options = FileWriterOptions {
939            max_page_bytes: None, // enforce env
940            ..Default::default()
941        };
942
943        let tmp_dir = tempdir().unwrap();
944        let path = tmp_dir.path().join("test_env_var.lance");
945        let object_store = ObjectStore::local();
946        let mut writer = FileWriter::try_new(
947            object_store
948                .create(&Path::from(path.to_str().unwrap()))
949                .await
950                .unwrap(),
951            lance_schema.clone(),
952            options,
953        )
954        .unwrap();
955
956        writer.write_batch(&batch).await.unwrap();
957        writer.finish().await.unwrap();
958
959        let fs = FsFixture::default();
960        let file_scheduler = fs
961            .scheduler
962            .open_file(
963                &Path::from(path.to_str().unwrap()),
964                &CachedFileSize::unknown(),
965            )
966            .await
967            .unwrap();
968        let file_reader = FileReader::try_open(
969            file_scheduler,
970            None,
971            Arc::<DecoderPlugins>::default(),
972            &LanceCache::no_cache(),
973            FileReaderOptions::default(),
974        )
975        .await
976        .unwrap();
977
978        for col_metadata in file_reader.metadata().column_metadatas.iter() {
979            for page in col_metadata.pages.iter() {
980                let total_size: u64 = page.buffer_sizes.iter().sum();
981                assert!(
982                    total_size <= 2 * 1024 * 1024,
983                    "Page size {} exceeds 2MB limit",
984                    total_size
985                );
986            }
987        }
988
989        std::env::set_var(ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, "");
990    }
991
992    #[tokio::test]
993    async fn test_compression_overrides_end_to_end() {
994        // Create test schema with different column types
995        let arrow_schema = Arc::new(ArrowSchema::new(vec![
996            ArrowField::new("customer_id", DataType::Int32, false),
997            ArrowField::new("product_id", DataType::Int32, false),
998            ArrowField::new("quantity", DataType::Int32, false),
999            ArrowField::new("price", DataType::Float32, false),
1000            ArrowField::new("description", DataType::Utf8, false),
1001        ]));
1002
1003        let lance_schema = LanceSchema::try_from(arrow_schema.as_ref()).unwrap();
1004
1005        // Create test data with patterns suitable for different compression
1006        let mut customer_ids = Int32Builder::new();
1007        let mut product_ids = Int32Builder::new();
1008        let mut quantities = Int32Builder::new();
1009        let mut prices = Float32Builder::new();
1010        let mut descriptions = Vec::new();
1011
1012        // Generate data with specific patterns:
1013        // - customer_id: highly repetitive (good for RLE)
1014        // - product_id: moderately repetitive (good for RLE)
1015        // - quantity: random values (not good for RLE)
1016        // - price: some repetition
1017        // - description: long strings (good for Zstd)
1018        for i in 0..10000 {
1019            // Customer ID repeats every 100 rows (100 unique customers)
1020            // This creates runs of 100 identical values
1021            customer_ids.append_value(i / 100);
1022
1023            // Product ID has only 5 unique values with long runs
1024            product_ids.append_value(i / 2000);
1025
1026            // Quantity is mostly 1 with occasional other values
1027            quantities.append_value(if i % 10 == 0 { 5 } else { 1 });
1028
1029            // Price has only 3 unique values
1030            prices.append_value(match i % 3 {
1031                0 => 9.99,
1032                1 => 19.99,
1033                _ => 29.99,
1034            });
1035
1036            // Descriptions are repetitive but we'll keep them simple
1037            descriptions.push(format!("Product {}", i / 2000));
1038        }
1039
1040        let batch = RecordBatch::try_new(
1041            arrow_schema.clone(),
1042            vec![
1043                Arc::new(customer_ids.finish()),
1044                Arc::new(product_ids.finish()),
1045                Arc::new(quantities.finish()),
1046                Arc::new(prices.finish()),
1047                Arc::new(StringArray::from(descriptions)),
1048            ],
1049        )
1050        .unwrap();
1051
1052        // Configure compression parameters
1053        let mut params = CompressionParams::new();
1054
1055        // RLE for ID columns (ends with _id)
1056        params.columns.insert(
1057            "*_id".to_string(),
1058            CompressionFieldParams {
1059                rle_threshold: Some(0.5), // Lower threshold to trigger RLE more easily
1060                compression: None,        // Will use default compression if any
1061                compression_level: None,
1062                bss: Some(lance_encoding::compression_config::BssMode::Off), // Explicitly disable BSS to ensure RLE is used
1063            },
1064        );
1065
1066        // For now, we'll skip Zstd compression since it's not imported
1067        // In a real implementation, you could add other compression types here
1068
1069        // Build encoding strategy with compression parameters
1070        let encoding_strategy = lance_encoding::encoder::default_encoding_strategy_with_params(
1071            LanceFileVersion::V2_1,
1072            params,
1073        )
1074        .unwrap();
1075
1076        // Configure file writer options
1077        let options = FileWriterOptions {
1078            encoding_strategy: Some(Arc::from(encoding_strategy)),
1079            format_version: Some(LanceFileVersion::V2_1),
1080            max_page_bytes: Some(64 * 1024), // 64KB pages
1081            ..Default::default()
1082        };
1083
1084        // Write the file
1085        let tmp_dir = tempdir().unwrap();
1086        let path = tmp_dir.path().join("test_compression_overrides.lance");
1087        let object_store = ObjectStore::local();
1088
1089        let mut writer = FileWriter::try_new(
1090            object_store
1091                .create(&Path::from(path.to_str().unwrap()))
1092                .await
1093                .unwrap(),
1094            lance_schema.clone(),
1095            options,
1096        )
1097        .unwrap();
1098
1099        writer.write_batch(&batch).await.unwrap();
1100        writer.add_schema_metadata("compression_test", "configured_compression");
1101        writer.finish().await.unwrap();
1102
1103        // Now write the same data without compression overrides for comparison
1104        let path_no_compression = tmp_dir.path().join("test_no_compression.lance");
1105        let default_options = FileWriterOptions {
1106            format_version: Some(LanceFileVersion::V2_1),
1107            max_page_bytes: Some(64 * 1024),
1108            ..Default::default()
1109        };
1110
1111        let mut writer_no_compression = FileWriter::try_new(
1112            object_store
1113                .create(&Path::from(path_no_compression.to_str().unwrap()))
1114                .await
1115                .unwrap(),
1116            lance_schema.clone(),
1117            default_options,
1118        )
1119        .unwrap();
1120
1121        writer_no_compression.write_batch(&batch).await.unwrap();
1122        writer_no_compression.finish().await.unwrap();
1123
1124        // Note: With our current data patterns and RLE compression, the compressed file
1125        // might actually be slightly larger due to compression metadata overhead.
1126        // This is expected and the test is mainly to verify the system works end-to-end.
1127
1128        // Read back the compressed file and verify data integrity
1129        let fs = FsFixture::default();
1130        let file_scheduler = fs
1131            .scheduler
1132            .open_file(
1133                &Path::from(path.to_str().unwrap()),
1134                &CachedFileSize::unknown(),
1135            )
1136            .await
1137            .unwrap();
1138
1139        let file_reader = FileReader::try_open(
1140            file_scheduler,
1141            None,
1142            Arc::<DecoderPlugins>::default(),
1143            &LanceCache::no_cache(),
1144            FileReaderOptions::default(),
1145        )
1146        .await
1147        .unwrap();
1148
1149        // Verify metadata
1150        let metadata = file_reader.metadata();
1151        assert_eq!(metadata.major_version, 2);
1152        assert_eq!(metadata.minor_version, 1);
1153
1154        let schema = file_reader.schema();
1155        assert_eq!(
1156            schema.metadata.get("compression_test"),
1157            Some(&"configured_compression".to_string())
1158        );
1159
1160        // Verify the actual encodings used
1161        let column_metadatas = &metadata.column_metadatas;
1162
1163        // Check customer_id column (index 0) - should use RLE due to our configuration
1164        assert!(!column_metadatas[0].pages.is_empty());
1165        let customer_id_encoding = describe_encoding(&column_metadatas[0].pages[0]);
1166        assert!(
1167            customer_id_encoding.contains("RLE") || customer_id_encoding.contains("Rle"),
1168            "customer_id column should use RLE encoding due to '*_id' pattern match, but got: {}",
1169            customer_id_encoding
1170        );
1171
1172        // Check product_id column (index 1) - should use RLE due to our configuration
1173        assert!(!column_metadatas[1].pages.is_empty());
1174        let product_id_encoding = describe_encoding(&column_metadatas[1].pages[0]);
1175        assert!(
1176            product_id_encoding.contains("RLE") || product_id_encoding.contains("Rle"),
1177            "product_id column should use RLE encoding due to '*_id' pattern match, but got: {}",
1178            product_id_encoding
1179        );
1180    }
1181
1182    #[tokio::test]
1183    async fn test_field_metadata_compression() {
1184        // Test that field metadata compression settings are respected
1185        let mut metadata = HashMap::new();
1186        metadata.insert(
1187            lance_encoding::constants::COMPRESSION_META_KEY.to_string(),
1188            "zstd".to_string(),
1189        );
1190        metadata.insert(
1191            lance_encoding::constants::COMPRESSION_LEVEL_META_KEY.to_string(),
1192            "6".to_string(),
1193        );
1194
1195        let arrow_schema = Arc::new(ArrowSchema::new(vec![
1196            ArrowField::new("id", DataType::Int32, false),
1197            ArrowField::new("text", DataType::Utf8, false).with_metadata(metadata.clone()),
1198            ArrowField::new("data", DataType::Int32, false).with_metadata(HashMap::from([(
1199                lance_encoding::constants::COMPRESSION_META_KEY.to_string(),
1200                "none".to_string(),
1201            )])),
1202        ]));
1203
1204        let lance_schema = LanceSchema::try_from(arrow_schema.as_ref()).unwrap();
1205
1206        // Create test data
1207        let id_array = Int32Array::from_iter_values(0..1000);
1208        let text_array = StringArray::from_iter_values(
1209            (0..1000).map(|i| format!("test string {} repeated text", i)),
1210        );
1211        let data_array = Int32Array::from_iter_values((0..1000).map(|i| i * 2));
1212
1213        let batch = RecordBatch::try_new(
1214            arrow_schema.clone(),
1215            vec![
1216                Arc::new(id_array),
1217                Arc::new(text_array),
1218                Arc::new(data_array),
1219            ],
1220        )
1221        .unwrap();
1222
1223        let tmp_dir = tempdir().unwrap();
1224        let path = tmp_dir.path().join("field_metadata_test.lance");
1225        let object_store = ObjectStore::local();
1226
1227        // Create encoding strategy that will read from field metadata
1228        let params = CompressionParams::new();
1229        let encoding_strategy = lance_encoding::encoder::default_encoding_strategy_with_params(
1230            LanceFileVersion::V2_1,
1231            params,
1232        )
1233        .unwrap();
1234
1235        let options = FileWriterOptions {
1236            encoding_strategy: Some(Arc::from(encoding_strategy)),
1237            format_version: Some(LanceFileVersion::V2_1),
1238            ..Default::default()
1239        };
1240        let mut writer = FileWriter::try_new(
1241            object_store
1242                .create(&Path::from(path.to_str().unwrap()))
1243                .await
1244                .unwrap(),
1245            lance_schema.clone(),
1246            options,
1247        )
1248        .unwrap();
1249
1250        writer.write_batch(&batch).await.unwrap();
1251        writer.finish().await.unwrap();
1252
1253        // Read back metadata
1254        let fs = FsFixture::default();
1255        let file_scheduler = fs
1256            .scheduler
1257            .open_file(
1258                &Path::from(path.to_str().unwrap()),
1259                &CachedFileSize::unknown(),
1260            )
1261            .await
1262            .unwrap();
1263        let file_reader = FileReader::try_open(
1264            file_scheduler,
1265            None,
1266            Arc::<DecoderPlugins>::default(),
1267            &LanceCache::no_cache(),
1268            FileReaderOptions::default(),
1269        )
1270        .await
1271        .unwrap();
1272
1273        let column_metadatas = &file_reader.metadata().column_metadatas;
1274
1275        // The text column (index 1) should use zstd compression based on metadata
1276        let text_encoding = describe_encoding(&column_metadatas[1].pages[0]);
1277        // For string columns, we expect Binary encoding with zstd compression
1278        assert!(
1279            text_encoding.contains("Zstd"),
1280            "text column should use zstd compression from field metadata, but got: {}",
1281            text_encoding
1282        );
1283
1284        // The data column (index 2) should use no compression based on metadata
1285        let data_encoding = describe_encoding(&column_metadatas[2].pages[0]);
1286        // For Int32 columns with "none" compression, we expect Flat encoding without compression
1287        assert!(
1288            data_encoding.contains("Flat") && data_encoding.contains("compression: None"),
1289            "data column should use no compression from field metadata, but got: {}",
1290            data_encoding
1291        );
1292    }
1293
1294    #[tokio::test]
1295    async fn test_field_metadata_rle_threshold() {
1296        // Test that RLE threshold from field metadata is respected
1297        let mut metadata = HashMap::new();
1298        metadata.insert(
1299            lance_encoding::constants::RLE_THRESHOLD_META_KEY.to_string(),
1300            "0.9".to_string(),
1301        );
1302        // Also set compression to ensure RLE is used
1303        metadata.insert(
1304            lance_encoding::constants::COMPRESSION_META_KEY.to_string(),
1305            "lz4".to_string(),
1306        );
1307        // Explicitly disable BSS to ensure RLE is tested
1308        metadata.insert(
1309            lance_encoding::constants::BSS_META_KEY.to_string(),
1310            "off".to_string(),
1311        );
1312
1313        let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
1314            "status",
1315            DataType::Int32,
1316            false,
1317        )
1318        .with_metadata(metadata)]));
1319
1320        let lance_schema = LanceSchema::try_from(arrow_schema.as_ref()).unwrap();
1321
1322        // Create data with very high repetition (3 runs for 10000 values = 0.0003 ratio)
1323        let status_array = Int32Array::from_iter_values(
1324            std::iter::repeat_n(200, 8000)
1325                .chain(std::iter::repeat_n(404, 1500))
1326                .chain(std::iter::repeat_n(500, 500)),
1327        );
1328
1329        let batch =
1330            RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(status_array)]).unwrap();
1331
1332        let tmp_dir = tempdir().unwrap();
1333        let path = tmp_dir.path().join("rle_threshold_test.lance");
1334        let object_store = ObjectStore::local();
1335
1336        // Create encoding strategy that will read from field metadata
1337        let params = CompressionParams::new();
1338        let encoding_strategy = lance_encoding::encoder::default_encoding_strategy_with_params(
1339            LanceFileVersion::V2_1,
1340            params,
1341        )
1342        .unwrap();
1343
1344        let options = FileWriterOptions {
1345            encoding_strategy: Some(Arc::from(encoding_strategy)),
1346            format_version: Some(LanceFileVersion::V2_1),
1347            ..Default::default()
1348        };
1349        let mut writer = FileWriter::try_new(
1350            object_store
1351                .create(&Path::from(path.to_str().unwrap()))
1352                .await
1353                .unwrap(),
1354            lance_schema.clone(),
1355            options,
1356        )
1357        .unwrap();
1358
1359        writer.write_batch(&batch).await.unwrap();
1360        writer.finish().await.unwrap();
1361
1362        // Read back and check encoding
1363        let fs = FsFixture::default();
1364        let file_scheduler = fs
1365            .scheduler
1366            .open_file(
1367                &Path::from(path.to_str().unwrap()),
1368                &CachedFileSize::unknown(),
1369            )
1370            .await
1371            .unwrap();
1372        let file_reader = FileReader::try_open(
1373            file_scheduler,
1374            None,
1375            Arc::<DecoderPlugins>::default(),
1376            &LanceCache::no_cache(),
1377            FileReaderOptions::default(),
1378        )
1379        .await
1380        .unwrap();
1381
1382        let column_metadatas = &file_reader.metadata().column_metadatas;
1383        let status_encoding = describe_encoding(&column_metadatas[0].pages[0]);
1384        assert!(
1385            status_encoding.contains("RLE") || status_encoding.contains("Rle"),
1386            "status column should use RLE encoding due to metadata threshold, but got: {}",
1387            status_encoding
1388        );
1389    }
1390
1391    #[tokio::test]
1392    async fn test_large_page_split_on_read() {
1393        use arrow_array::Array;
1394        use futures::TryStreamExt;
1395        use lance_encoding::decoder::FilterExpression;
1396        use lance_io::ReadBatchParams;
1397
1398        // Test that large pages written with relaxed limits can be split during read
1399
1400        let arrow_field = ArrowField::new("data", DataType::Binary, false);
1401        let arrow_schema = ArrowSchema::new(vec![arrow_field]);
1402        let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();
1403
1404        // Create a large binary value (40MB) to trigger large page creation
1405        let large_value = vec![42u8; 40 * 1024 * 1024];
1406        let array = arrow_array::BinaryArray::from(vec![
1407            Some(large_value.as_slice()),
1408            Some(b"small value"),
1409        ]);
1410        let batch = RecordBatch::try_new(Arc::new(arrow_schema), vec![Arc::new(array)]).unwrap();
1411
1412        // Write with relaxed page size limit (128MB)
1413        let options = FileWriterOptions {
1414            max_page_bytes: Some(128 * 1024 * 1024),
1415            format_version: Some(LanceFileVersion::V2_1),
1416            ..Default::default()
1417        };
1418
1419        let fs = FsFixture::default();
1420        let path = fs.tmp_path.child("large_page_test.lance");
1421
1422        let mut writer = FileWriter::try_new(
1423            fs.object_store.create(&path).await.unwrap(),
1424            lance_schema.clone(),
1425            options,
1426        )
1427        .unwrap();
1428
1429        writer.write_batch(&batch).await.unwrap();
1430        let num_rows = writer.finish().await.unwrap();
1431        assert_eq!(num_rows, 2);
1432
1433        // Read back with split configuration
1434        let file_scheduler = fs
1435            .scheduler
1436            .open_file(&path, &CachedFileSize::unknown())
1437            .await
1438            .unwrap();
1439
1440        // Configure reader to split pages larger than 10MB into chunks
1441        let reader_options = FileReaderOptions {
1442            read_chunk_size: 10 * 1024 * 1024, // 10MB chunks
1443            ..Default::default()
1444        };
1445
1446        let file_reader = FileReader::try_open(
1447            file_scheduler,
1448            None,
1449            Arc::<DecoderPlugins>::default(),
1450            &LanceCache::no_cache(),
1451            reader_options,
1452        )
1453        .await
1454        .unwrap();
1455
1456        // Read the data back
1457        let stream = file_reader
1458            .read_stream(
1459                ReadBatchParams::RangeFull,
1460                1024,
1461                10, // batch_readahead
1462                FilterExpression::no_filter(),
1463            )
1464            .unwrap();
1465
1466        let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1467        assert_eq!(batches.len(), 1);
1468
1469        // Verify the data is correctly read despite splitting
1470        let read_array = batches[0].column(0);
1471        let read_binary = read_array
1472            .as_any()
1473            .downcast_ref::<arrow_array::BinaryArray>()
1474            .unwrap();
1475
1476        assert_eq!(read_binary.len(), 2);
1477        assert_eq!(read_binary.value(0).len(), 40 * 1024 * 1024);
1478        assert_eq!(read_binary.value(1), b"small value");
1479
1480        // Verify first value matches what we wrote
1481        assert!(read_binary.value(0).iter().all(|&b| b == 42u8));
1482    }
1483}