lance_file/v2/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use core::panic;
5use std::collections::HashMap;
6use std::sync::atomic::AtomicBool;
7use std::sync::Arc;
8
9use arrow_array::RecordBatch;
10
11use arrow_data::ArrayData;
12use bytes::{BufMut, Bytes, BytesMut};
13use futures::stream::FuturesOrdered;
14use futures::StreamExt;
15use lance_core::datatypes::{Field, Schema as LanceSchema};
16use lance_core::utils::bit::pad_bytes;
17use lance_core::{Error, Result};
18use lance_encoding::decoder::PageEncoding;
19use lance_encoding::encoder::{
20    default_encoding_strategy, BatchEncoder, EncodeTask, EncodedBatch, EncodedPage,
21    EncodingOptions, FieldEncoder, FieldEncodingStrategy, OutOfLineBuffers,
22};
23use lance_encoding::repdef::RepDefBuilder;
24use lance_encoding::version::LanceFileVersion;
25use lance_io::object_store::ObjectStore;
26use lance_io::object_writer::ObjectWriter;
27use lance_io::traits::Writer;
28use log::{debug, warn};
29use object_store::path::Path;
30use prost::Message;
31use prost_types::Any;
32use snafu::location;
33use tokio::io::AsyncWriteExt;
34use tracing::instrument;
35
36use crate::datatypes::FieldsWithMeta;
37use crate::format::pb;
38use crate::format::pbfile;
39use crate::format::pbfile::DirectEncoding;
40use crate::format::MAGIC;
41
42/// Pages buffers are aligned to 64 bytes
43pub(crate) const PAGE_BUFFER_ALIGNMENT: usize = 64;
44const PAD_BUFFER: [u8; PAGE_BUFFER_ALIGNMENT] = [72; PAGE_BUFFER_ALIGNMENT];
45// In 2.1+, we split large pages on read instead of write to avoid empty pages
46// and small pages issues. However, we keep the write-time limit at 32MB to avoid
47// potential regressions in 2.0 format readers.
48//
49// This limit is not applied in the 2.1 writer
50const MAX_PAGE_BYTES: usize = 32 * 1024 * 1024;
51const ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES: &str = "LANCE_FILE_WRITER_MAX_PAGE_BYTES";
52
53#[derive(Debug, Clone, Default)]
54pub struct FileWriterOptions {
55    /// How many bytes to use for buffering column data
56    ///
57    /// When data comes in small batches the writer will buffer column data so that
58    /// larger pages can be created.  This value will be divided evenly across all of the
59    /// columns.  Generally you want this to be at least large enough to match your
60    /// filesystem's ideal read size per column.
61    ///
62    /// In some cases you might want this value to be even larger if you have highly
63    /// compressible data.  However, if this is too large, then the writer could require
64    /// a lot of memory and write performance may suffer if the CPU-expensive encoding
65    /// falls behind and can't be interleaved with the I/O expensive flushing.
66    ///
67    /// The default will use 8MiB per column which should be reasonable for most cases.
68    // TODO: Do we need to be able to set this on a per-column basis?
69    pub data_cache_bytes: Option<u64>,
70    /// A hint to indicate the max size of a page
71    ///
72    /// This hint can't always be respected.  A single value could be larger than this value
73    /// and we never slice single values.  In addition, there are some cases where it can be
74    /// difficult to know size up-front and so we might not be able to respect this value.
75    pub max_page_bytes: Option<u64>,
76    /// The file writer buffers columns until enough data has arrived to flush a page
77    /// to disk.
78    ///
79    /// Some columns with small data types may not flush very often.  These arrays can
80    /// stick around for a long time.  These arrays might also be keeping larger data
81    /// structures alive.  By default, the writer will make a deep copy of this array
82    /// to avoid any potential memory leaks.  However, this can be disabled for a
83    /// (probably minor) performance boost if you are sure that arrays are not keeping
84    /// any sibling structures alive (this typically means the array was allocated in
85    /// the same language / runtime as the writer)
86    ///
87    /// Do not enable this if your data is arriving from the C data interface.
88    /// Data typically arrives one "batch" at a time (encoded in the C data interface
89    /// as a struct array).  Each array in that batch keeps the entire batch alive.
90    /// This means a small boolean array (which we will buffer in memory for quite a
91    /// while) might keep a much larger record batch around in memory (even though most
92    /// of that batch's data has been written to disk)
93    pub keep_original_array: Option<bool>,
94    pub encoding_strategy: Option<Arc<dyn FieldEncodingStrategy>>,
95    /// The format version to use when writing the file
96    ///
97    /// This controls which encodings will be used when encoding the data.  Newer
98    /// versions may have more efficient encodings.  However, newer format versions will
99    /// require more up-to-date readers to read the data.
100    pub format_version: Option<LanceFileVersion>,
101}
102
103pub struct FileWriter {
104    writer: ObjectWriter,
105    schema: Option<LanceSchema>,
106    column_writers: Vec<Box<dyn FieldEncoder>>,
107    column_metadata: Vec<pbfile::ColumnMetadata>,
108    field_id_to_column_indices: Vec<(u32, u32)>,
109    num_columns: u32,
110    rows_written: u64,
111    global_buffers: Vec<(u64, u64)>,
112    schema_metadata: HashMap<String, String>,
113    options: FileWriterOptions,
114}
115
116fn initial_column_metadata() -> pbfile::ColumnMetadata {
117    pbfile::ColumnMetadata {
118        pages: Vec::new(),
119        buffer_offsets: Vec::new(),
120        buffer_sizes: Vec::new(),
121        encoding: None,
122    }
123}
124
125static WARNED_ON_UNSTABLE_API: AtomicBool = AtomicBool::new(false);
126
127impl FileWriter {
128    /// Create a new FileWriter with a desired output schema
129    pub fn try_new(
130        object_writer: ObjectWriter,
131        schema: LanceSchema,
132        options: FileWriterOptions,
133    ) -> Result<Self> {
134        let mut writer = Self::new_lazy(object_writer, options);
135        writer.initialize(schema)?;
136        Ok(writer)
137    }
138
139    /// Create a new FileWriter without a desired output schema
140    ///
141    /// The output schema will be set based on the first batch of data to arrive.
142    /// If no data arrives and the writer is finished then the write will fail.
143    pub fn new_lazy(object_writer: ObjectWriter, options: FileWriterOptions) -> Self {
144        if let Some(format_version) = options.format_version {
145            if format_version.is_unstable()
146                && WARNED_ON_UNSTABLE_API
147                    .compare_exchange(
148                        false,
149                        true,
150                        std::sync::atomic::Ordering::Relaxed,
151                        std::sync::atomic::Ordering::Relaxed,
152                    )
153                    .is_ok()
154            {
155                warn!("You have requested an unstable format version.  Files written with this format version may not be readable in the future!  This is a development feature and should only be used for experimentation and never for production data.");
156            }
157        }
158        Self {
159            writer: object_writer,
160            schema: None,
161            column_writers: Vec::new(),
162            column_metadata: Vec::new(),
163            num_columns: 0,
164            rows_written: 0,
165            field_id_to_column_indices: Vec::new(),
166            global_buffers: Vec::new(),
167            schema_metadata: HashMap::new(),
168            options,
169        }
170    }
171
172    /// Write a series of record batches to a new file
173    ///
174    /// Returns the number of rows written
175    pub async fn create_file_with_batches(
176        store: &ObjectStore,
177        path: &Path,
178        schema: lance_core::datatypes::Schema,
179        batches: impl Iterator<Item = RecordBatch> + Send,
180        options: FileWriterOptions,
181    ) -> Result<usize> {
182        let writer = store.create(path).await?;
183        let mut writer = Self::try_new(writer, schema, options)?;
184        for batch in batches {
185            writer.write_batch(&batch).await?;
186        }
187        Ok(writer.finish().await? as usize)
188    }
189
190    async fn do_write_buffer(writer: &mut ObjectWriter, buf: &[u8]) -> Result<()> {
191        writer.write_all(buf).await?;
192        let pad_bytes = pad_bytes::<PAGE_BUFFER_ALIGNMENT>(buf.len());
193        writer.write_all(&PAD_BUFFER[..pad_bytes]).await?;
194        Ok(())
195    }
196
197    /// Returns the format version that will be used when writing the file
198    pub fn version(&self) -> LanceFileVersion {
199        self.options.format_version.unwrap_or_default()
200    }
201
202    async fn write_page(&mut self, encoded_page: EncodedPage) -> Result<()> {
203        let buffers = encoded_page.data;
204        let mut buffer_offsets = Vec::with_capacity(buffers.len());
205        let mut buffer_sizes = Vec::with_capacity(buffers.len());
206        for buffer in buffers {
207            buffer_offsets.push(self.writer.tell().await? as u64);
208            buffer_sizes.push(buffer.len() as u64);
209            Self::do_write_buffer(&mut self.writer, &buffer).await?;
210        }
211        let encoded_encoding = match encoded_page.description {
212            PageEncoding::Legacy(array_encoding) => Any::from_msg(&array_encoding)?.encode_to_vec(),
213            PageEncoding::Structural(page_layout) => Any::from_msg(&page_layout)?.encode_to_vec(),
214        };
215        let page = pbfile::column_metadata::Page {
216            buffer_offsets,
217            buffer_sizes,
218            encoding: Some(pbfile::Encoding {
219                location: Some(pbfile::encoding::Location::Direct(DirectEncoding {
220                    encoding: encoded_encoding,
221                })),
222            }),
223            length: encoded_page.num_rows,
224            priority: encoded_page.row_number,
225        };
226        self.column_metadata[encoded_page.column_idx as usize]
227            .pages
228            .push(page);
229        Ok(())
230    }
231
232    #[instrument(skip_all, level = "debug")]
233    async fn write_pages(&mut self, mut encoding_tasks: FuturesOrdered<EncodeTask>) -> Result<()> {
234        // As soon as an encoding task is done we write it.  There is no parallelism
235        // needed here because "writing" is really just submitting the buffer to the
236        // underlying write scheduler (either the OS or object_store's scheduler for
237        // cloud writes).  The only time we might truly await on write_page is if the
238        // scheduler's write queue is full.
239        //
240        // Also, there is no point in trying to make write_page parallel anyways
241        // because we wouldn't want buffers getting mixed up across pages.
242        while let Some(encoding_task) = encoding_tasks.next().await {
243            let encoded_page = encoding_task?;
244            self.write_page(encoded_page).await?;
245        }
246        // It's important to flush here, we don't know when the next batch will arrive
247        // and the underlying cloud store could have writes in progress that won't advance
248        // until we interact with the writer again.  These in-progress writes will time out
249        // if we don't flush.
250        self.writer.flush().await?;
251        Ok(())
252    }
253
254    /// Schedule batches of data to be written to the file
255    pub async fn write_batches(
256        &mut self,
257        batches: impl Iterator<Item = &RecordBatch>,
258    ) -> Result<()> {
259        for batch in batches {
260            self.write_batch(batch).await?;
261        }
262        Ok(())
263    }
264
265    fn verify_field_nullability(arr: &ArrayData, field: &Field) -> Result<()> {
266        if !field.nullable && arr.null_count() > 0 {
267            return Err(Error::invalid_input(format!("The field `{}` contained null values even though the field is marked non-null in the schema", field.name), location!()));
268        }
269
270        for (child_field, child_arr) in field.children.iter().zip(arr.child_data()) {
271            Self::verify_field_nullability(child_arr, child_field)?;
272        }
273
274        Ok(())
275    }
276
277    fn verify_nullability_constraints(&self, batch: &RecordBatch) -> Result<()> {
278        for (col, field) in batch
279            .columns()
280            .iter()
281            .zip(self.schema.as_ref().unwrap().fields.iter())
282        {
283            Self::verify_field_nullability(&col.to_data(), field)?;
284        }
285        Ok(())
286    }
287
288    fn initialize(&mut self, mut schema: LanceSchema) -> Result<()> {
289        let cache_bytes_per_column = if let Some(data_cache_bytes) = self.options.data_cache_bytes {
290            data_cache_bytes / schema.fields.len() as u64
291        } else {
292            8 * 1024 * 1024
293        };
294
295        let max_page_bytes = self.options.max_page_bytes.unwrap_or_else(|| {
296            std::env::var(ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES)
297                .map(|s| {
298                    s.parse::<u64>().unwrap_or_else(|e| {
299                        warn!(
300                            "Failed to parse {}: {}, using default",
301                            ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, e
302                        );
303                        MAX_PAGE_BYTES as u64
304                    })
305                })
306                .unwrap_or(MAX_PAGE_BYTES as u64)
307        });
308
309        schema.validate()?;
310
311        let keep_original_array = self.options.keep_original_array.unwrap_or(false);
312        let encoding_strategy = self.options.encoding_strategy.clone().unwrap_or_else(|| {
313            let version = self.version();
314            default_encoding_strategy(version).into()
315        });
316
317        let encoding_options = EncodingOptions {
318            cache_bytes_per_column,
319            max_page_bytes,
320            keep_original_array,
321            buffer_alignment: PAGE_BUFFER_ALIGNMENT as u64,
322        };
323        let encoder =
324            BatchEncoder::try_new(&schema, encoding_strategy.as_ref(), &encoding_options)?;
325        self.num_columns = encoder.num_columns();
326
327        self.column_writers = encoder.field_encoders;
328        self.column_metadata = vec![initial_column_metadata(); self.num_columns as usize];
329        self.field_id_to_column_indices = encoder.field_id_to_column_index;
330        self.schema_metadata
331            .extend(std::mem::take(&mut schema.metadata));
332        self.schema = Some(schema);
333        Ok(())
334    }
335
336    fn ensure_initialized(&mut self, batch: &RecordBatch) -> Result<&LanceSchema> {
337        if self.schema.is_none() {
338            let schema = LanceSchema::try_from(batch.schema().as_ref())?;
339            self.initialize(schema)?;
340        }
341        Ok(self.schema.as_ref().unwrap())
342    }
343
344    #[instrument(skip_all, level = "debug")]
345    fn encode_batch(
346        &mut self,
347        batch: &RecordBatch,
348        external_buffers: &mut OutOfLineBuffers,
349    ) -> Result<Vec<Vec<EncodeTask>>> {
350        self.schema
351            .as_ref()
352            .unwrap()
353            .fields
354            .iter()
355            .zip(self.column_writers.iter_mut())
356            .map(|(field, column_writer)| {
357                let array = batch
358                    .column_by_name(&field.name)
359                    .ok_or(Error::InvalidInput {
360                        source: format!(
361                            "Cannot write batch.  The batch was missing the column `{}`",
362                            field.name
363                        )
364                        .into(),
365                        location: location!(),
366                    })?;
367                let repdef = RepDefBuilder::default();
368                let num_rows = array.len() as u64;
369                column_writer.maybe_encode(
370                    array.clone(),
371                    external_buffers,
372                    repdef,
373                    self.rows_written,
374                    num_rows,
375                )
376            })
377            .collect::<Result<Vec<_>>>()
378    }
379
380    /// Schedule a batch of data to be written to the file
381    ///
382    /// Note: the future returned by this method may complete before the data has been fully
383    /// flushed to the file (some data may be in the data cache or the I/O cache)
384    pub async fn write_batch(&mut self, batch: &RecordBatch) -> Result<()> {
385        debug!(
386            "write_batch called with {} rows, {} columns, and {} bytes of data",
387            batch.num_rows(),
388            batch.num_columns(),
389            batch.get_array_memory_size()
390        );
391        self.ensure_initialized(batch)?;
392        self.verify_nullability_constraints(batch)?;
393        let num_rows = batch.num_rows() as u64;
394        if num_rows == 0 {
395            return Ok(());
396        }
397        if num_rows > u32::MAX as u64 {
398            return Err(Error::InvalidInput {
399                source: "cannot write Lance files with more than 2^32 rows".into(),
400                location: location!(),
401            });
402        }
403        // First we push each array into its column writer.  This may or may not generate enough
404        // data to trigger an encoding task.  We collect any encoding tasks into a queue.
405        let mut external_buffers =
406            OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
407        let encoding_tasks = self.encode_batch(batch, &mut external_buffers)?;
408        // Next, write external buffers
409        for external_buffer in external_buffers.take_buffers() {
410            Self::do_write_buffer(&mut self.writer, &external_buffer).await?;
411        }
412
413        let encoding_tasks = encoding_tasks
414            .into_iter()
415            .flatten()
416            .collect::<FuturesOrdered<_>>();
417
418        self.rows_written = match self.rows_written.checked_add(batch.num_rows() as u64) {
419            Some(rows_written) => rows_written,
420            None => {
421                return Err(Error::InvalidInput { source: format!("cannot write batch with {} rows because {} rows have already been written and Lance files cannot contain more than 2^64 rows", num_rows, self.rows_written).into(), location: location!() });
422            }
423        };
424
425        self.write_pages(encoding_tasks).await?;
426
427        Ok(())
428    }
429
430    async fn write_column_metadata(
431        &mut self,
432        metadata: pbfile::ColumnMetadata,
433    ) -> Result<(u64, u64)> {
434        let metadata_bytes = metadata.encode_to_vec();
435        let position = self.writer.tell().await? as u64;
436        let len = metadata_bytes.len() as u64;
437        self.writer.write_all(&metadata_bytes).await?;
438        Ok((position, len))
439    }
440
441    async fn write_column_metadatas(&mut self) -> Result<Vec<(u64, u64)>> {
442        let mut metadatas = Vec::new();
443        std::mem::swap(&mut self.column_metadata, &mut metadatas);
444        let mut metadata_positions = Vec::with_capacity(metadatas.len());
445        for metadata in metadatas {
446            metadata_positions.push(self.write_column_metadata(metadata).await?);
447        }
448        Ok(metadata_positions)
449    }
450
451    fn make_file_descriptor(
452        schema: &lance_core::datatypes::Schema,
453        num_rows: u64,
454    ) -> Result<pb::FileDescriptor> {
455        let fields_with_meta = FieldsWithMeta::from(schema);
456        Ok(pb::FileDescriptor {
457            schema: Some(pb::Schema {
458                fields: fields_with_meta.fields.0,
459                metadata: fields_with_meta.metadata,
460            }),
461            length: num_rows,
462        })
463    }
464
465    async fn write_global_buffers(&mut self) -> Result<Vec<(u64, u64)>> {
466        let schema = self.schema.as_mut().ok_or(Error::invalid_input("No schema provided on writer open and no data provided.  Schema is unknown and file cannot be created", location!()))?;
467        schema.metadata = std::mem::take(&mut self.schema_metadata);
468        let file_descriptor = Self::make_file_descriptor(schema, self.rows_written)?;
469        let file_descriptor_bytes = file_descriptor.encode_to_vec();
470        let file_descriptor_len = file_descriptor_bytes.len() as u64;
471        let file_descriptor_position = self.writer.tell().await? as u64;
472        self.writer.write_all(&file_descriptor_bytes).await?;
473        let mut gbo_table = Vec::with_capacity(1 + self.global_buffers.len());
474        gbo_table.push((file_descriptor_position, file_descriptor_len));
475        gbo_table.append(&mut self.global_buffers);
476        Ok(gbo_table)
477    }
478
479    /// Add a metadata entry to the schema
480    ///
481    /// This method is useful because sometimes the metadata is not known until after the
482    /// data has been written.  This method allows you to alter the schema metadata.  It
483    /// must be called before `finish` is called.
484    pub fn add_schema_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
485        self.schema_metadata.insert(key.into(), value.into());
486    }
487
488    /// Adds a global buffer to the file
489    ///
490    /// The global buffer can contain any arbitrary bytes.  It will be written to the disk
491    /// immediately.  This method returns the index of the global buffer (this will always
492    /// start at 1 and increment by 1 each time this method is called)
493    pub async fn add_global_buffer(&mut self, buffer: Bytes) -> Result<u32> {
494        let position = self.writer.tell().await? as u64;
495        let len = buffer.len() as u64;
496        Self::do_write_buffer(&mut self.writer, &buffer).await?;
497        self.global_buffers.push((position, len));
498        Ok(self.global_buffers.len() as u32)
499    }
500
501    async fn finish_writers(&mut self) -> Result<()> {
502        let mut col_idx = 0;
503        for mut writer in std::mem::take(&mut self.column_writers) {
504            let mut external_buffers =
505                OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
506            let columns = writer.finish(&mut external_buffers).await?;
507            for buffer in external_buffers.take_buffers() {
508                self.writer.write_all(&buffer).await?;
509            }
510            debug_assert_eq!(
511                columns.len(),
512                writer.num_columns() as usize,
513                "Expected {} columns from column at index {} and got {}",
514                writer.num_columns(),
515                col_idx,
516                columns.len()
517            );
518            for column in columns {
519                for page in column.final_pages {
520                    self.write_page(page).await?;
521                }
522                let column_metadata = &mut self.column_metadata[col_idx];
523                let mut buffer_pos = self.writer.tell().await? as u64;
524                for buffer in column.column_buffers {
525                    column_metadata.buffer_offsets.push(buffer_pos);
526                    let mut size = 0;
527                    Self::do_write_buffer(&mut self.writer, &buffer).await?;
528                    size += buffer.len() as u64;
529                    buffer_pos += size;
530                    column_metadata.buffer_sizes.push(size);
531                }
532                let encoded_encoding = Any::from_msg(&column.encoding)?.encode_to_vec();
533                column_metadata.encoding = Some(pbfile::Encoding {
534                    location: Some(pbfile::encoding::Location::Direct(pbfile::DirectEncoding {
535                        encoding: encoded_encoding,
536                    })),
537                });
538                col_idx += 1;
539            }
540        }
541        if col_idx != self.column_metadata.len() {
542            panic!(
543                "Column writers finished with {} columns but we expected {}",
544                col_idx,
545                self.column_metadata.len()
546            );
547        }
548        Ok(())
549    }
550
551    /// Converts self.version (which is a mix of "software version" and
552    /// "format version" into a format version)
553    fn version_to_numbers(&self) -> (u16, u16) {
554        let version = self.options.format_version.unwrap_or_default();
555        match version.resolve() {
556            LanceFileVersion::V2_0 => (0, 3),
557            LanceFileVersion::V2_1 => (2, 1),
558            LanceFileVersion::V2_2 => (2, 2),
559            _ => panic!("Unsupported version: {}", version),
560        }
561    }
562
563    /// Finishes writing the file
564    ///
565    /// This method will wait until all data has been flushed to the file.  Then it
566    /// will write the file metadata and the footer.  It will not return until all
567    /// data has been flushed and the file has been closed.
568    ///
569    /// Returns the total number of rows written
570    pub async fn finish(&mut self) -> Result<u64> {
571        // 1. flush any remaining data and write out those pages
572        let mut external_buffers =
573            OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
574        let encoding_tasks = self
575            .column_writers
576            .iter_mut()
577            .map(|writer| writer.flush(&mut external_buffers))
578            .collect::<Result<Vec<_>>>()?;
579        for external_buffer in external_buffers.take_buffers() {
580            Self::do_write_buffer(&mut self.writer, &external_buffer).await?;
581        }
582        let encoding_tasks = encoding_tasks
583            .into_iter()
584            .flatten()
585            .collect::<FuturesOrdered<_>>();
586        self.write_pages(encoding_tasks).await?;
587
588        self.finish_writers().await?;
589
590        // 3. write global buffers (we write the schema here)
591        let global_buffer_offsets = self.write_global_buffers().await?;
592        let num_global_buffers = global_buffer_offsets.len() as u32;
593
594        // 4. write the column metadatas
595        let column_metadata_start = self.writer.tell().await? as u64;
596        let metadata_positions = self.write_column_metadatas().await?;
597
598        // 5. write the column metadata offset table
599        let cmo_table_start = self.writer.tell().await? as u64;
600        for (meta_pos, meta_len) in metadata_positions {
601            self.writer.write_u64_le(meta_pos).await?;
602            self.writer.write_u64_le(meta_len).await?;
603        }
604
605        // 6. write global buffers offset table
606        let gbo_table_start = self.writer.tell().await? as u64;
607        for (gbo_pos, gbo_len) in global_buffer_offsets {
608            self.writer.write_u64_le(gbo_pos).await?;
609            self.writer.write_u64_le(gbo_len).await?;
610        }
611
612        let (major, minor) = self.version_to_numbers();
613        // 7. write the footer
614        self.writer.write_u64_le(column_metadata_start).await?;
615        self.writer.write_u64_le(cmo_table_start).await?;
616        self.writer.write_u64_le(gbo_table_start).await?;
617        self.writer.write_u32_le(num_global_buffers).await?;
618        self.writer.write_u32_le(self.num_columns).await?;
619        self.writer.write_u16_le(major).await?;
620        self.writer.write_u16_le(minor).await?;
621        self.writer.write_all(MAGIC).await?;
622
623        // 7. close the writer
624        self.writer.shutdown().await?;
625        Ok(self.rows_written)
626    }
627
628    pub async fn abort(&mut self) {
629        self.writer.abort().await;
630    }
631
632    pub async fn tell(&mut self) -> Result<u64> {
633        Ok(self.writer.tell().await? as u64)
634    }
635
636    pub fn field_id_to_column_indices(&self) -> &[(u32, u32)] {
637        &self.field_id_to_column_indices
638    }
639}
640
641/// Utility trait for converting EncodedBatch to Bytes using the
642/// lance file format
643pub trait EncodedBatchWriteExt {
644    /// Serializes into a lance file, including the schema
645    fn try_to_self_described_lance(&self, version: LanceFileVersion) -> Result<Bytes>;
646    /// Serializes into a lance file, without the schema.
647    ///
648    /// The schema must be provided to deserialize the buffer
649    fn try_to_mini_lance(&self, version: LanceFileVersion) -> Result<Bytes>;
650}
651
652// Creates a lance footer and appends it to the encoded data
653//
654// The logic here is very similar to logic in the FileWriter except we
655// are using BufMut (put_xyz) instead of AsyncWrite (write_xyz).
656fn concat_lance_footer(
657    batch: &EncodedBatch,
658    write_schema: bool,
659    version: LanceFileVersion,
660) -> Result<Bytes> {
661    // Estimating 1MiB for file footer
662    let mut data = BytesMut::with_capacity(batch.data.len() + 1024 * 1024);
663    data.put(batch.data.clone());
664    // write global buffers (we write the schema here)
665    let global_buffers = if write_schema {
666        let schema_start = data.len() as u64;
667        let lance_schema = lance_core::datatypes::Schema::try_from(batch.schema.as_ref())?;
668        let descriptor = FileWriter::make_file_descriptor(&lance_schema, batch.num_rows)?;
669        let descriptor_bytes = descriptor.encode_to_vec();
670        let descriptor_len = descriptor_bytes.len() as u64;
671        data.put(descriptor_bytes.as_slice());
672
673        vec![(schema_start, descriptor_len)]
674    } else {
675        vec![]
676    };
677    let col_metadata_start = data.len() as u64;
678
679    let mut col_metadata_positions = Vec::new();
680    // Write column metadata
681    for col in &batch.page_table {
682        let position = data.len() as u64;
683        let pages = col
684            .page_infos
685            .iter()
686            .map(|page_info| {
687                let encoded_encoding = match &page_info.encoding {
688                    PageEncoding::Legacy(array_encoding) => {
689                        Any::from_msg(array_encoding)?.encode_to_vec()
690                    }
691                    PageEncoding::Structural(page_layout) => {
692                        Any::from_msg(page_layout)?.encode_to_vec()
693                    }
694                };
695                let (buffer_offsets, buffer_sizes): (Vec<_>, Vec<_>) = page_info
696                    .buffer_offsets_and_sizes
697                    .as_ref()
698                    .iter()
699                    .cloned()
700                    .unzip();
701                Ok(pbfile::column_metadata::Page {
702                    buffer_offsets,
703                    buffer_sizes,
704                    encoding: Some(pbfile::Encoding {
705                        location: Some(pbfile::encoding::Location::Direct(DirectEncoding {
706                            encoding: encoded_encoding,
707                        })),
708                    }),
709                    length: page_info.num_rows,
710                    priority: page_info.priority,
711                })
712            })
713            .collect::<Result<Vec<_>>>()?;
714        let (buffer_offsets, buffer_sizes): (Vec<_>, Vec<_>) =
715            col.buffer_offsets_and_sizes.iter().cloned().unzip();
716        let encoded_col_encoding = Any::from_msg(&col.encoding)?.encode_to_vec();
717        let column = pbfile::ColumnMetadata {
718            pages,
719            buffer_offsets,
720            buffer_sizes,
721            encoding: Some(pbfile::Encoding {
722                location: Some(pbfile::encoding::Location::Direct(pbfile::DirectEncoding {
723                    encoding: encoded_col_encoding,
724                })),
725            }),
726        };
727        let column_bytes = column.encode_to_vec();
728        col_metadata_positions.push((position, column_bytes.len() as u64));
729        data.put(column_bytes.as_slice());
730    }
731    // Write column metadata offsets table
732    let cmo_table_start = data.len() as u64;
733    for (meta_pos, meta_len) in col_metadata_positions {
734        data.put_u64_le(meta_pos);
735        data.put_u64_le(meta_len);
736    }
737    // Write global buffers offsets table
738    let gbo_table_start = data.len() as u64;
739    let num_global_buffers = global_buffers.len() as u32;
740    for (gbo_pos, gbo_len) in global_buffers {
741        data.put_u64_le(gbo_pos);
742        data.put_u64_le(gbo_len);
743    }
744
745    let (major, minor) = version.to_numbers();
746
747    // write the footer
748    data.put_u64_le(col_metadata_start);
749    data.put_u64_le(cmo_table_start);
750    data.put_u64_le(gbo_table_start);
751    data.put_u32_le(num_global_buffers);
752    data.put_u32_le(batch.page_table.len() as u32);
753    data.put_u16_le(major as u16);
754    data.put_u16_le(minor as u16);
755    data.put(MAGIC.as_slice());
756
757    Ok(data.freeze())
758}
759
760impl EncodedBatchWriteExt for EncodedBatch {
761    fn try_to_self_described_lance(&self, version: LanceFileVersion) -> Result<Bytes> {
762        concat_lance_footer(self, true, version)
763    }
764
765    fn try_to_mini_lance(&self, version: LanceFileVersion) -> Result<Bytes> {
766        concat_lance_footer(self, false, version)
767    }
768}
769
770#[cfg(test)]
771mod tests {
772    use std::collections::HashMap;
773    use std::sync::Arc;
774
775    use crate::v2::reader::{describe_encoding, FileReader, FileReaderOptions};
776    use crate::v2::testing::FsFixture;
777    use crate::v2::writer::{FileWriter, FileWriterOptions, ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES};
778    use arrow_array::builder::{Float32Builder, Int32Builder};
779    use arrow_array::{types::Float64Type, RecordBatchReader, StringArray};
780    use arrow_array::{Int32Array, RecordBatch, UInt64Array};
781    use arrow_schema::{DataType, Field, Field as ArrowField, Schema, Schema as ArrowSchema};
782    use lance_core::cache::LanceCache;
783    use lance_core::datatypes::Schema as LanceSchema;
784    use lance_core::utils::tempfile::TempObjFile;
785    use lance_datagen::{array, gen_batch, BatchCount, RowCount};
786    use lance_encoding::compression_config::{CompressionFieldParams, CompressionParams};
787    use lance_encoding::decoder::DecoderPlugins;
788    use lance_encoding::version::LanceFileVersion;
789    use lance_io::object_store::ObjectStore;
790    use lance_io::utils::CachedFileSize;
791
792    #[tokio::test]
793    async fn test_basic_write() {
794        let tmp_path = TempObjFile::default();
795        let obj_store = Arc::new(ObjectStore::local());
796
797        let reader = gen_batch()
798            .col("score", array::rand::<Float64Type>())
799            .into_reader_rows(RowCount::from(1000), BatchCount::from(10));
800
801        let writer = obj_store.create(&tmp_path).await.unwrap();
802
803        let lance_schema =
804            lance_core::datatypes::Schema::try_from(reader.schema().as_ref()).unwrap();
805
806        let mut file_writer =
807            FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
808
809        for batch in reader {
810            file_writer.write_batch(&batch.unwrap()).await.unwrap();
811        }
812        file_writer.add_schema_metadata("foo", "bar");
813        file_writer.finish().await.unwrap();
814        // Tests asserting the contents of the written file are in reader.rs
815    }
816
817    #[tokio::test]
818    async fn test_write_empty() {
819        let tmp_path = TempObjFile::default();
820        let obj_store = Arc::new(ObjectStore::local());
821
822        let reader = gen_batch()
823            .col("score", array::rand::<Float64Type>())
824            .into_reader_rows(RowCount::from(0), BatchCount::from(0));
825
826        let writer = obj_store.create(&tmp_path).await.unwrap();
827
828        let lance_schema =
829            lance_core::datatypes::Schema::try_from(reader.schema().as_ref()).unwrap();
830
831        let mut file_writer =
832            FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
833
834        for batch in reader {
835            file_writer.write_batch(&batch.unwrap()).await.unwrap();
836        }
837        file_writer.add_schema_metadata("foo", "bar");
838        file_writer.finish().await.unwrap();
839    }
840
841    #[tokio::test]
842    async fn test_max_page_bytes_enforced() {
843        let arrow_field = Field::new("data", DataType::UInt64, false);
844        let arrow_schema = Schema::new(vec![arrow_field]);
845        let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();
846
847        // 8MiB
848        let data: Vec<u64> = (0..1_000_000).collect();
849        let array = UInt64Array::from(data);
850        let batch =
851            RecordBatch::try_new(arrow_schema.clone().into(), vec![Arc::new(array)]).unwrap();
852
853        let options = FileWriterOptions {
854            max_page_bytes: Some(1024 * 1024), // 1MB
855            // This is a 2.0 only test because 2.1+ splits large pages on read instead of write
856            format_version: Some(LanceFileVersion::V2_0),
857            ..Default::default()
858        };
859
860        let path = TempObjFile::default();
861        let object_store = ObjectStore::local();
862        let mut writer = FileWriter::try_new(
863            object_store.create(&path).await.unwrap(),
864            lance_schema,
865            options,
866        )
867        .unwrap();
868
869        writer.write_batch(&batch).await.unwrap();
870        writer.finish().await.unwrap();
871
872        let fs = FsFixture::default();
873        let file_scheduler = fs
874            .scheduler
875            .open_file(&path, &CachedFileSize::unknown())
876            .await
877            .unwrap();
878        let file_reader = FileReader::try_open(
879            file_scheduler,
880            None,
881            Arc::<DecoderPlugins>::default(),
882            &LanceCache::no_cache(),
883            FileReaderOptions::default(),
884        )
885        .await
886        .unwrap();
887
888        let column_meta = file_reader.metadata();
889
890        let mut total_page_num: u32 = 0;
891        for (col_idx, col_metadata) in column_meta.column_metadatas.iter().enumerate() {
892            assert!(
893                !col_metadata.pages.is_empty(),
894                "Column {} has no pages",
895                col_idx
896            );
897
898            for (page_idx, page) in col_metadata.pages.iter().enumerate() {
899                total_page_num += 1;
900                let total_size: u64 = page.buffer_sizes.iter().sum();
901                assert!(
902                    total_size <= 1024 * 1024,
903                    "Column {} Page {} size {} exceeds 1MB limit",
904                    col_idx,
905                    page_idx,
906                    total_size
907                );
908            }
909        }
910
911        assert_eq!(total_page_num, 8)
912    }
913
914    #[tokio::test(flavor = "current_thread")]
915    async fn test_max_page_bytes_env_var() {
916        let arrow_field = Field::new("data", DataType::UInt64, false);
917        let arrow_schema = Schema::new(vec![arrow_field]);
918        let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();
919        // 4MiB
920        let data: Vec<u64> = (0..500_000).collect();
921        let array = UInt64Array::from(data);
922        let batch =
923            RecordBatch::try_new(arrow_schema.clone().into(), vec![Arc::new(array)]).unwrap();
924
925        // 2MiB
926        std::env::set_var(ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, "2097152");
927
928        let options = FileWriterOptions {
929            max_page_bytes: None, // enforce env
930            ..Default::default()
931        };
932
933        let path = TempObjFile::default();
934        let object_store = ObjectStore::local();
935        let mut writer = FileWriter::try_new(
936            object_store.create(&path).await.unwrap(),
937            lance_schema.clone(),
938            options,
939        )
940        .unwrap();
941
942        writer.write_batch(&batch).await.unwrap();
943        writer.finish().await.unwrap();
944
945        let fs = FsFixture::default();
946        let file_scheduler = fs
947            .scheduler
948            .open_file(&path, &CachedFileSize::unknown())
949            .await
950            .unwrap();
951        let file_reader = FileReader::try_open(
952            file_scheduler,
953            None,
954            Arc::<DecoderPlugins>::default(),
955            &LanceCache::no_cache(),
956            FileReaderOptions::default(),
957        )
958        .await
959        .unwrap();
960
961        for col_metadata in file_reader.metadata().column_metadatas.iter() {
962            for page in col_metadata.pages.iter() {
963                let total_size: u64 = page.buffer_sizes.iter().sum();
964                assert!(
965                    total_size <= 2 * 1024 * 1024,
966                    "Page size {} exceeds 2MB limit",
967                    total_size
968                );
969            }
970        }
971
972        std::env::set_var(ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, "");
973    }
974
975    #[tokio::test]
976    async fn test_compression_overrides_end_to_end() {
977        // Create test schema with different column types
978        let arrow_schema = Arc::new(ArrowSchema::new(vec![
979            ArrowField::new("customer_id", DataType::Int32, false),
980            ArrowField::new("product_id", DataType::Int32, false),
981            ArrowField::new("quantity", DataType::Int32, false),
982            ArrowField::new("price", DataType::Float32, false),
983            ArrowField::new("description", DataType::Utf8, false),
984        ]));
985
986        let lance_schema = LanceSchema::try_from(arrow_schema.as_ref()).unwrap();
987
988        // Create test data with patterns suitable for different compression
989        let mut customer_ids = Int32Builder::new();
990        let mut product_ids = Int32Builder::new();
991        let mut quantities = Int32Builder::new();
992        let mut prices = Float32Builder::new();
993        let mut descriptions = Vec::new();
994
995        // Generate data with specific patterns:
996        // - customer_id: highly repetitive (good for RLE)
997        // - product_id: moderately repetitive (good for RLE)
998        // - quantity: random values (not good for RLE)
999        // - price: some repetition
1000        // - description: long strings (good for Zstd)
1001        for i in 0..10000 {
1002            // Customer ID repeats every 100 rows (100 unique customers)
1003            // This creates runs of 100 identical values
1004            customer_ids.append_value(i / 100);
1005
1006            // Product ID has only 5 unique values with long runs
1007            product_ids.append_value(i / 2000);
1008
1009            // Quantity is mostly 1 with occasional other values
1010            quantities.append_value(if i % 10 == 0 { 5 } else { 1 });
1011
1012            // Price has only 3 unique values
1013            prices.append_value(match i % 3 {
1014                0 => 9.99,
1015                1 => 19.99,
1016                _ => 29.99,
1017            });
1018
1019            // Descriptions are repetitive but we'll keep them simple
1020            descriptions.push(format!("Product {}", i / 2000));
1021        }
1022
1023        let batch = RecordBatch::try_new(
1024            arrow_schema.clone(),
1025            vec![
1026                Arc::new(customer_ids.finish()),
1027                Arc::new(product_ids.finish()),
1028                Arc::new(quantities.finish()),
1029                Arc::new(prices.finish()),
1030                Arc::new(StringArray::from(descriptions)),
1031            ],
1032        )
1033        .unwrap();
1034
1035        // Configure compression parameters
1036        let mut params = CompressionParams::new();
1037
1038        // RLE for ID columns (ends with _id)
1039        params.columns.insert(
1040            "*_id".to_string(),
1041            CompressionFieldParams {
1042                rle_threshold: Some(0.5), // Lower threshold to trigger RLE more easily
1043                compression: None,        // Will use default compression if any
1044                compression_level: None,
1045                bss: Some(lance_encoding::compression_config::BssMode::Off), // Explicitly disable BSS to ensure RLE is used
1046            },
1047        );
1048
1049        // For now, we'll skip Zstd compression since it's not imported
1050        // In a real implementation, you could add other compression types here
1051
1052        // Build encoding strategy with compression parameters
1053        let encoding_strategy = lance_encoding::encoder::default_encoding_strategy_with_params(
1054            LanceFileVersion::V2_1,
1055            params,
1056        )
1057        .unwrap();
1058
1059        // Configure file writer options
1060        let options = FileWriterOptions {
1061            encoding_strategy: Some(Arc::from(encoding_strategy)),
1062            format_version: Some(LanceFileVersion::V2_1),
1063            max_page_bytes: Some(64 * 1024), // 64KB pages
1064            ..Default::default()
1065        };
1066
1067        // Write the file
1068        let path = TempObjFile::default();
1069        let object_store = ObjectStore::local();
1070
1071        let mut writer = FileWriter::try_new(
1072            object_store.create(&path).await.unwrap(),
1073            lance_schema.clone(),
1074            options,
1075        )
1076        .unwrap();
1077
1078        writer.write_batch(&batch).await.unwrap();
1079        writer.add_schema_metadata("compression_test", "configured_compression");
1080        writer.finish().await.unwrap();
1081
1082        // Now write the same data without compression overrides for comparison
1083        let path_no_compression = TempObjFile::default();
1084        let default_options = FileWriterOptions {
1085            format_version: Some(LanceFileVersion::V2_1),
1086            max_page_bytes: Some(64 * 1024),
1087            ..Default::default()
1088        };
1089
1090        let mut writer_no_compression = FileWriter::try_new(
1091            object_store.create(&path_no_compression).await.unwrap(),
1092            lance_schema.clone(),
1093            default_options,
1094        )
1095        .unwrap();
1096
1097        writer_no_compression.write_batch(&batch).await.unwrap();
1098        writer_no_compression.finish().await.unwrap();
1099
1100        // Note: With our current data patterns and RLE compression, the compressed file
1101        // might actually be slightly larger due to compression metadata overhead.
1102        // This is expected and the test is mainly to verify the system works end-to-end.
1103
1104        // Read back the compressed file and verify data integrity
1105        let fs = FsFixture::default();
1106        let file_scheduler = fs
1107            .scheduler
1108            .open_file(&path, &CachedFileSize::unknown())
1109            .await
1110            .unwrap();
1111
1112        let file_reader = FileReader::try_open(
1113            file_scheduler,
1114            None,
1115            Arc::<DecoderPlugins>::default(),
1116            &LanceCache::no_cache(),
1117            FileReaderOptions::default(),
1118        )
1119        .await
1120        .unwrap();
1121
1122        // Verify metadata
1123        let metadata = file_reader.metadata();
1124        assert_eq!(metadata.major_version, 2);
1125        assert_eq!(metadata.minor_version, 1);
1126
1127        let schema = file_reader.schema();
1128        assert_eq!(
1129            schema.metadata.get("compression_test"),
1130            Some(&"configured_compression".to_string())
1131        );
1132
1133        // Verify the actual encodings used
1134        let column_metadatas = &metadata.column_metadatas;
1135
1136        // Check customer_id column (index 0) - should use RLE due to our configuration
1137        assert!(!column_metadatas[0].pages.is_empty());
1138        let customer_id_encoding = describe_encoding(&column_metadatas[0].pages[0]);
1139        assert!(
1140            customer_id_encoding.contains("RLE") || customer_id_encoding.contains("Rle"),
1141            "customer_id column should use RLE encoding due to '*_id' pattern match, but got: {}",
1142            customer_id_encoding
1143        );
1144
1145        // Check product_id column (index 1) - should use RLE due to our configuration
1146        assert!(!column_metadatas[1].pages.is_empty());
1147        let product_id_encoding = describe_encoding(&column_metadatas[1].pages[0]);
1148        assert!(
1149            product_id_encoding.contains("RLE") || product_id_encoding.contains("Rle"),
1150            "product_id column should use RLE encoding due to '*_id' pattern match, but got: {}",
1151            product_id_encoding
1152        );
1153    }
1154
1155    #[tokio::test]
1156    async fn test_field_metadata_compression() {
1157        // Test that field metadata compression settings are respected
1158        let mut metadata = HashMap::new();
1159        metadata.insert(
1160            lance_encoding::constants::COMPRESSION_META_KEY.to_string(),
1161            "zstd".to_string(),
1162        );
1163        metadata.insert(
1164            lance_encoding::constants::COMPRESSION_LEVEL_META_KEY.to_string(),
1165            "6".to_string(),
1166        );
1167
1168        let arrow_schema = Arc::new(ArrowSchema::new(vec![
1169            ArrowField::new("id", DataType::Int32, false),
1170            ArrowField::new("text", DataType::Utf8, false).with_metadata(metadata.clone()),
1171            ArrowField::new("data", DataType::Int32, false).with_metadata(HashMap::from([(
1172                lance_encoding::constants::COMPRESSION_META_KEY.to_string(),
1173                "none".to_string(),
1174            )])),
1175        ]));
1176
1177        let lance_schema = LanceSchema::try_from(arrow_schema.as_ref()).unwrap();
1178
1179        // Create test data
1180        let id_array = Int32Array::from_iter_values(0..1000);
1181        let text_array = StringArray::from_iter_values(
1182            (0..1000).map(|i| format!("test string {} repeated text", i)),
1183        );
1184        let data_array = Int32Array::from_iter_values((0..1000).map(|i| i * 2));
1185
1186        let batch = RecordBatch::try_new(
1187            arrow_schema.clone(),
1188            vec![
1189                Arc::new(id_array),
1190                Arc::new(text_array),
1191                Arc::new(data_array),
1192            ],
1193        )
1194        .unwrap();
1195
1196        let path = TempObjFile::default();
1197        let object_store = ObjectStore::local();
1198
1199        // Create encoding strategy that will read from field metadata
1200        let params = CompressionParams::new();
1201        let encoding_strategy = lance_encoding::encoder::default_encoding_strategy_with_params(
1202            LanceFileVersion::V2_1,
1203            params,
1204        )
1205        .unwrap();
1206
1207        let options = FileWriterOptions {
1208            encoding_strategy: Some(Arc::from(encoding_strategy)),
1209            format_version: Some(LanceFileVersion::V2_1),
1210            ..Default::default()
1211        };
1212        let mut writer = FileWriter::try_new(
1213            object_store.create(&path).await.unwrap(),
1214            lance_schema.clone(),
1215            options,
1216        )
1217        .unwrap();
1218
1219        writer.write_batch(&batch).await.unwrap();
1220        writer.finish().await.unwrap();
1221
1222        // Read back metadata
1223        let fs = FsFixture::default();
1224        let file_scheduler = fs
1225            .scheduler
1226            .open_file(&path, &CachedFileSize::unknown())
1227            .await
1228            .unwrap();
1229        let file_reader = FileReader::try_open(
1230            file_scheduler,
1231            None,
1232            Arc::<DecoderPlugins>::default(),
1233            &LanceCache::no_cache(),
1234            FileReaderOptions::default(),
1235        )
1236        .await
1237        .unwrap();
1238
1239        let column_metadatas = &file_reader.metadata().column_metadatas;
1240
1241        // The text column (index 1) should use zstd compression based on metadata
1242        let text_encoding = describe_encoding(&column_metadatas[1].pages[0]);
1243        // For string columns, we expect Binary encoding with zstd compression
1244        assert!(
1245            text_encoding.contains("Zstd"),
1246            "text column should use zstd compression from field metadata, but got: {}",
1247            text_encoding
1248        );
1249
1250        // The data column (index 2) should use no compression based on metadata
1251        let data_encoding = describe_encoding(&column_metadatas[2].pages[0]);
1252        // For Int32 columns with "none" compression, we expect Flat encoding without compression
1253        assert!(
1254            data_encoding.contains("Flat") && data_encoding.contains("compression: None"),
1255            "data column should use no compression from field metadata, but got: {}",
1256            data_encoding
1257        );
1258    }
1259
1260    #[tokio::test]
1261    async fn test_field_metadata_rle_threshold() {
1262        // Test that RLE threshold from field metadata is respected
1263        let mut metadata = HashMap::new();
1264        metadata.insert(
1265            lance_encoding::constants::RLE_THRESHOLD_META_KEY.to_string(),
1266            "0.9".to_string(),
1267        );
1268        // Also set compression to ensure RLE is used
1269        metadata.insert(
1270            lance_encoding::constants::COMPRESSION_META_KEY.to_string(),
1271            "lz4".to_string(),
1272        );
1273        // Explicitly disable BSS to ensure RLE is tested
1274        metadata.insert(
1275            lance_encoding::constants::BSS_META_KEY.to_string(),
1276            "off".to_string(),
1277        );
1278
1279        let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
1280            "status",
1281            DataType::Int32,
1282            false,
1283        )
1284        .with_metadata(metadata)]));
1285
1286        let lance_schema = LanceSchema::try_from(arrow_schema.as_ref()).unwrap();
1287
1288        // Create data with very high repetition (3 runs for 10000 values = 0.0003 ratio)
1289        let status_array = Int32Array::from_iter_values(
1290            std::iter::repeat_n(200, 8000)
1291                .chain(std::iter::repeat_n(404, 1500))
1292                .chain(std::iter::repeat_n(500, 500)),
1293        );
1294
1295        let batch =
1296            RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(status_array)]).unwrap();
1297
1298        let path = TempObjFile::default();
1299        let object_store = ObjectStore::local();
1300
1301        // Create encoding strategy that will read from field metadata
1302        let params = CompressionParams::new();
1303        let encoding_strategy = lance_encoding::encoder::default_encoding_strategy_with_params(
1304            LanceFileVersion::V2_1,
1305            params,
1306        )
1307        .unwrap();
1308
1309        let options = FileWriterOptions {
1310            encoding_strategy: Some(Arc::from(encoding_strategy)),
1311            format_version: Some(LanceFileVersion::V2_1),
1312            ..Default::default()
1313        };
1314        let mut writer = FileWriter::try_new(
1315            object_store.create(&path).await.unwrap(),
1316            lance_schema.clone(),
1317            options,
1318        )
1319        .unwrap();
1320
1321        writer.write_batch(&batch).await.unwrap();
1322        writer.finish().await.unwrap();
1323
1324        // Read back and check encoding
1325        let fs = FsFixture::default();
1326        let file_scheduler = fs
1327            .scheduler
1328            .open_file(&path, &CachedFileSize::unknown())
1329            .await
1330            .unwrap();
1331        let file_reader = FileReader::try_open(
1332            file_scheduler,
1333            None,
1334            Arc::<DecoderPlugins>::default(),
1335            &LanceCache::no_cache(),
1336            FileReaderOptions::default(),
1337        )
1338        .await
1339        .unwrap();
1340
1341        let column_metadatas = &file_reader.metadata().column_metadatas;
1342        let status_encoding = describe_encoding(&column_metadatas[0].pages[0]);
1343        assert!(
1344            status_encoding.contains("RLE") || status_encoding.contains("Rle"),
1345            "status column should use RLE encoding due to metadata threshold, but got: {}",
1346            status_encoding
1347        );
1348    }
1349
1350    #[tokio::test]
1351    async fn test_large_page_split_on_read() {
1352        use arrow_array::Array;
1353        use futures::TryStreamExt;
1354        use lance_encoding::decoder::FilterExpression;
1355        use lance_io::ReadBatchParams;
1356
1357        // Test that large pages written with relaxed limits can be split during read
1358
1359        let arrow_field = ArrowField::new("data", DataType::Binary, false);
1360        let arrow_schema = ArrowSchema::new(vec![arrow_field]);
1361        let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();
1362
1363        // Create a large binary value (40MB) to trigger large page creation
1364        let large_value = vec![42u8; 40 * 1024 * 1024];
1365        let array = arrow_array::BinaryArray::from(vec![
1366            Some(large_value.as_slice()),
1367            Some(b"small value"),
1368        ]);
1369        let batch = RecordBatch::try_new(Arc::new(arrow_schema), vec![Arc::new(array)]).unwrap();
1370
1371        // Write with relaxed page size limit (128MB)
1372        let options = FileWriterOptions {
1373            max_page_bytes: Some(128 * 1024 * 1024),
1374            format_version: Some(LanceFileVersion::V2_1),
1375            ..Default::default()
1376        };
1377
1378        let fs = FsFixture::default();
1379        let path = fs.tmp_path;
1380
1381        let mut writer = FileWriter::try_new(
1382            fs.object_store.create(&path).await.unwrap(),
1383            lance_schema.clone(),
1384            options,
1385        )
1386        .unwrap();
1387
1388        writer.write_batch(&batch).await.unwrap();
1389        let num_rows = writer.finish().await.unwrap();
1390        assert_eq!(num_rows, 2);
1391
1392        // Read back with split configuration
1393        let file_scheduler = fs
1394            .scheduler
1395            .open_file(&path, &CachedFileSize::unknown())
1396            .await
1397            .unwrap();
1398
1399        // Configure reader to split pages larger than 10MB into chunks
1400        let reader_options = FileReaderOptions {
1401            read_chunk_size: 10 * 1024 * 1024, // 10MB chunks
1402            ..Default::default()
1403        };
1404
1405        let file_reader = FileReader::try_open(
1406            file_scheduler,
1407            None,
1408            Arc::<DecoderPlugins>::default(),
1409            &LanceCache::no_cache(),
1410            reader_options,
1411        )
1412        .await
1413        .unwrap();
1414
1415        // Read the data back
1416        let stream = file_reader
1417            .read_stream(
1418                ReadBatchParams::RangeFull,
1419                1024,
1420                10, // batch_readahead
1421                FilterExpression::no_filter(),
1422            )
1423            .unwrap();
1424
1425        let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1426        assert_eq!(batches.len(), 1);
1427
1428        // Verify the data is correctly read despite splitting
1429        let read_array = batches[0].column(0);
1430        let read_binary = read_array
1431            .as_any()
1432            .downcast_ref::<arrow_array::BinaryArray>()
1433            .unwrap();
1434
1435        assert_eq!(read_binary.len(), 2);
1436        assert_eq!(read_binary.value(0).len(), 40 * 1024 * 1024);
1437        assert_eq!(read_binary.value(1), b"small value");
1438
1439        // Verify first value matches what we wrote
1440        assert!(read_binary.value(0).iter().all(|&b| b == 42u8));
1441    }
1442}