lance_file/v2/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use core::panic;
5use std::collections::HashMap;
6use std::sync::atomic::AtomicBool;
7use std::sync::Arc;
8
9use arrow_array::RecordBatch;
10
11use arrow_data::ArrayData;
12use bytes::{BufMut, Bytes, BytesMut};
13use futures::stream::FuturesOrdered;
14use futures::StreamExt;
15use lance_core::datatypes::{Field, Schema as LanceSchema};
16use lance_core::utils::bit::pad_bytes;
17use lance_core::{Error, Result};
18use lance_encoding::decoder::PageEncoding;
19use lance_encoding::encoder::{
20    default_encoding_strategy, BatchEncoder, EncodeTask, EncodedBatch, EncodedPage,
21    EncodingOptions, FieldEncoder, FieldEncodingStrategy, OutOfLineBuffers,
22};
23use lance_encoding::repdef::RepDefBuilder;
24use lance_encoding::version::LanceFileVersion;
25use lance_io::object_store::ObjectStore;
26use lance_io::object_writer::ObjectWriter;
27use lance_io::traits::Writer;
28use log::{debug, warn};
29use object_store::path::Path;
30use prost::Message;
31use prost_types::Any;
32use snafu::location;
33use tokio::io::AsyncWriteExt;
34use tracing::instrument;
35
36use crate::datatypes::FieldsWithMeta;
37use crate::format::pb;
38use crate::format::pbfile;
39use crate::format::pbfile::DirectEncoding;
40use crate::format::MAGIC;
41
42/// Pages buffers are aligned to 64 bytes
43pub(crate) const PAGE_BUFFER_ALIGNMENT: usize = 64;
44const PAD_BUFFER: [u8; PAGE_BUFFER_ALIGNMENT] = [72; PAGE_BUFFER_ALIGNMENT];
45const MAX_PAGE_BYTES: usize = 32 * 1024 * 1024;
46const ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES: &str = "LANCE_FILE_WRITER_MAX_PAGE_BYTES";
47
48#[derive(Debug, Clone, Default)]
49pub struct FileWriterOptions {
50    /// How many bytes to use for buffering column data
51    ///
52    /// When data comes in small batches the writer will buffer column data so that
53    /// larger pages can be created.  This value will be divided evenly across all of the
54    /// columns.  Generally you want this to be at least large enough to match your
55    /// filesystem's ideal read size per column.
56    ///
57    /// In some cases you might want this value to be even larger if you have highly
58    /// compressible data.  However, if this is too large, then the writer could require
59    /// a lot of memory and write performance may suffer if the CPU-expensive encoding
60    /// falls behind and can't be interleaved with the I/O expensive flushing.
61    ///
62    /// The default will use 8MiB per column which should be reasonable for most cases.
63    // TODO: Do we need to be able to set this on a per-column basis?
64    pub data_cache_bytes: Option<u64>,
65    /// A hint to indicate the max size of a page
66    ///
67    /// This hint can't always be respected.  A single value could be larger than this value
68    /// and we never slice single values.  In addition, there are some cases where it can be
69    /// difficult to know size up-front and so we might not be able to respect this value.
70    pub max_page_bytes: Option<u64>,
71    /// The file writer buffers columns until enough data has arrived to flush a page
72    /// to disk.
73    ///
74    /// Some columns with small data types may not flush very often.  These arrays can
75    /// stick around for a long time.  These arrays might also be keeping larger data
76    /// structures alive.  By default, the writer will make a deep copy of this array
77    /// to avoid any potential memory leaks.  However, this can be disabled for a
78    /// (probably minor) performance boost if you are sure that arrays are not keeping
79    /// any sibling structures alive (this typically means the array was allocated in
80    /// the same language / runtime as the writer)
81    ///
82    /// Do not enable this if your data is arriving from the C data interface.
83    /// Data typically arrives one "batch" at a time (encoded in the C data interface
84    /// as a struct array).  Each array in that batch keeps the entire batch alive.
85    /// This means a small boolean array (which we will buffer in memory for quite a
86    /// while) might keep a much larger record batch around in memory (even though most
87    /// of that batch's data has been written to disk)
88    pub keep_original_array: Option<bool>,
89    pub encoding_strategy: Option<Arc<dyn FieldEncodingStrategy>>,
90    /// The format version to use when writing the file
91    ///
92    /// This controls which encodings will be used when encoding the data.  Newer
93    /// versions may have more efficient encodings.  However, newer format versions will
94    /// require more up-to-date readers to read the data.
95    pub format_version: Option<LanceFileVersion>,
96}
97
98pub struct FileWriter {
99    writer: ObjectWriter,
100    schema: Option<LanceSchema>,
101    column_writers: Vec<Box<dyn FieldEncoder>>,
102    column_metadata: Vec<pbfile::ColumnMetadata>,
103    field_id_to_column_indices: Vec<(u32, u32)>,
104    num_columns: u32,
105    rows_written: u64,
106    global_buffers: Vec<(u64, u64)>,
107    schema_metadata: HashMap<String, String>,
108    options: FileWriterOptions,
109}
110
111fn initial_column_metadata() -> pbfile::ColumnMetadata {
112    pbfile::ColumnMetadata {
113        pages: Vec::new(),
114        buffer_offsets: Vec::new(),
115        buffer_sizes: Vec::new(),
116        encoding: None,
117    }
118}
119
120static WARNED_ON_UNSTABLE_API: AtomicBool = AtomicBool::new(false);
121
122impl FileWriter {
123    /// Create a new FileWriter with a desired output schema
124    pub fn try_new(
125        object_writer: ObjectWriter,
126        schema: LanceSchema,
127        options: FileWriterOptions,
128    ) -> Result<Self> {
129        let mut writer = Self::new_lazy(object_writer, options);
130        writer.initialize(schema)?;
131        Ok(writer)
132    }
133
134    /// Create a new FileWriter without a desired output schema
135    ///
136    /// The output schema will be set based on the first batch of data to arrive.
137    /// If no data arrives and the writer is finished then the write will fail.
138    pub fn new_lazy(object_writer: ObjectWriter, options: FileWriterOptions) -> Self {
139        if let Some(format_version) = options.format_version {
140            if format_version > LanceFileVersion::Stable
141                && WARNED_ON_UNSTABLE_API
142                    .compare_exchange(
143                        false,
144                        true,
145                        std::sync::atomic::Ordering::Relaxed,
146                        std::sync::atomic::Ordering::Relaxed,
147                    )
148                    .is_ok()
149            {
150                warn!("You have requested an unstable format version.  Files written with this format version may not be readable in the future!  This is a development feature and should only be used for experimentation and never for production data.");
151            }
152        }
153        Self {
154            writer: object_writer,
155            schema: None,
156            column_writers: Vec::new(),
157            column_metadata: Vec::new(),
158            num_columns: 0,
159            rows_written: 0,
160            field_id_to_column_indices: Vec::new(),
161            global_buffers: Vec::new(),
162            schema_metadata: HashMap::new(),
163            options,
164        }
165    }
166
167    /// Write a series of record batches to a new file
168    ///
169    /// Returns the number of rows written
170    pub async fn create_file_with_batches(
171        store: &ObjectStore,
172        path: &Path,
173        schema: lance_core::datatypes::Schema,
174        batches: impl Iterator<Item = RecordBatch> + Send,
175        options: FileWriterOptions,
176    ) -> Result<usize> {
177        let writer = store.create(path).await?;
178        let mut writer = Self::try_new(writer, schema, options)?;
179        for batch in batches {
180            writer.write_batch(&batch).await?;
181        }
182        Ok(writer.finish().await? as usize)
183    }
184
185    async fn do_write_buffer(writer: &mut ObjectWriter, buf: &[u8]) -> Result<()> {
186        writer.write_all(buf).await?;
187        let pad_bytes = pad_bytes::<PAGE_BUFFER_ALIGNMENT>(buf.len());
188        writer.write_all(&PAD_BUFFER[..pad_bytes]).await?;
189        Ok(())
190    }
191
192    /// Returns the format version that will be used when writing the file
193    pub fn version(&self) -> LanceFileVersion {
194        self.options.format_version.unwrap_or_default()
195    }
196
197    async fn write_page(&mut self, encoded_page: EncodedPage) -> Result<()> {
198        let buffers = encoded_page.data;
199        let mut buffer_offsets = Vec::with_capacity(buffers.len());
200        let mut buffer_sizes = Vec::with_capacity(buffers.len());
201        for buffer in buffers {
202            buffer_offsets.push(self.writer.tell().await? as u64);
203            buffer_sizes.push(buffer.len() as u64);
204            Self::do_write_buffer(&mut self.writer, &buffer).await?;
205        }
206        let encoded_encoding = match encoded_page.description {
207            PageEncoding::Legacy(array_encoding) => Any::from_msg(&array_encoding)?.encode_to_vec(),
208            PageEncoding::Structural(page_layout) => Any::from_msg(&page_layout)?.encode_to_vec(),
209        };
210        let page = pbfile::column_metadata::Page {
211            buffer_offsets,
212            buffer_sizes,
213            encoding: Some(pbfile::Encoding {
214                location: Some(pbfile::encoding::Location::Direct(DirectEncoding {
215                    encoding: encoded_encoding,
216                })),
217            }),
218            length: encoded_page.num_rows,
219            priority: encoded_page.row_number,
220        };
221        self.column_metadata[encoded_page.column_idx as usize]
222            .pages
223            .push(page);
224        Ok(())
225    }
226
227    #[instrument(skip_all, level = "debug")]
228    async fn write_pages(&mut self, mut encoding_tasks: FuturesOrdered<EncodeTask>) -> Result<()> {
229        // As soon as an encoding task is done we write it.  There is no parallelism
230        // needed here because "writing" is really just submitting the buffer to the
231        // underlying write scheduler (either the OS or object_store's scheduler for
232        // cloud writes).  The only time we might truly await on write_page is if the
233        // scheduler's write queue is full.
234        //
235        // Also, there is no point in trying to make write_page parallel anyways
236        // because we wouldn't want buffers getting mixed up across pages.
237        while let Some(encoding_task) = encoding_tasks.next().await {
238            let encoded_page = encoding_task?;
239            self.write_page(encoded_page).await?;
240        }
241        // It's important to flush here, we don't know when the next batch will arrive
242        // and the underlying cloud store could have writes in progress that won't advance
243        // until we interact with the writer again.  These in-progress writes will time out
244        // if we don't flush.
245        self.writer.flush().await?;
246        Ok(())
247    }
248
249    /// Schedule batches of data to be written to the file
250    pub async fn write_batches(
251        &mut self,
252        batches: impl Iterator<Item = &RecordBatch>,
253    ) -> Result<()> {
254        for batch in batches {
255            self.write_batch(batch).await?;
256        }
257        Ok(())
258    }
259
260    fn verify_field_nullability(arr: &ArrayData, field: &Field) -> Result<()> {
261        if !field.nullable && arr.null_count() > 0 {
262            return Err(Error::invalid_input(format!("The field `{}` contained null values even though the field is marked non-null in the schema", field.name), location!()));
263        }
264
265        for (child_field, child_arr) in field.children.iter().zip(arr.child_data()) {
266            Self::verify_field_nullability(child_arr, child_field)?;
267        }
268
269        Ok(())
270    }
271
272    fn verify_nullability_constraints(&self, batch: &RecordBatch) -> Result<()> {
273        for (col, field) in batch
274            .columns()
275            .iter()
276            .zip(self.schema.as_ref().unwrap().fields.iter())
277        {
278            Self::verify_field_nullability(&col.to_data(), field)?;
279        }
280        Ok(())
281    }
282
283    fn initialize(&mut self, mut schema: LanceSchema) -> Result<()> {
284        let cache_bytes_per_column = if let Some(data_cache_bytes) = self.options.data_cache_bytes {
285            data_cache_bytes / schema.fields.len() as u64
286        } else {
287            8 * 1024 * 1024
288        };
289
290        let max_page_bytes = self.options.max_page_bytes.unwrap_or_else(|| {
291            std::env::var(ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES)
292                .map(|s| {
293                    s.parse::<u64>().unwrap_or_else(|e| {
294                        warn!(
295                            "Failed to parse {}: {}, using default",
296                            ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, e
297                        );
298                        MAX_PAGE_BYTES as u64
299                    })
300                })
301                .unwrap_or(MAX_PAGE_BYTES as u64)
302        });
303
304        schema.validate()?;
305
306        let keep_original_array = self.options.keep_original_array.unwrap_or(false);
307        let encoding_strategy = self.options.encoding_strategy.clone().unwrap_or_else(|| {
308            let version = self.version();
309            default_encoding_strategy(version).into()
310        });
311
312        let encoding_options = EncodingOptions {
313            cache_bytes_per_column,
314            max_page_bytes,
315            keep_original_array,
316            buffer_alignment: PAGE_BUFFER_ALIGNMENT as u64,
317        };
318        let encoder =
319            BatchEncoder::try_new(&schema, encoding_strategy.as_ref(), &encoding_options)?;
320        self.num_columns = encoder.num_columns();
321
322        self.column_writers = encoder.field_encoders;
323        self.column_metadata = vec![initial_column_metadata(); self.num_columns as usize];
324        self.field_id_to_column_indices = encoder.field_id_to_column_index;
325        self.schema_metadata
326            .extend(std::mem::take(&mut schema.metadata));
327        self.schema = Some(schema);
328        Ok(())
329    }
330
331    fn ensure_initialized(&mut self, batch: &RecordBatch) -> Result<&LanceSchema> {
332        if self.schema.is_none() {
333            let schema = LanceSchema::try_from(batch.schema().as_ref())?;
334            self.initialize(schema)?;
335        }
336        Ok(self.schema.as_ref().unwrap())
337    }
338
339    #[instrument(skip_all, level = "debug")]
340    fn encode_batch(
341        &mut self,
342        batch: &RecordBatch,
343        external_buffers: &mut OutOfLineBuffers,
344    ) -> Result<Vec<Vec<EncodeTask>>> {
345        self.schema
346            .as_ref()
347            .unwrap()
348            .fields
349            .iter()
350            .zip(self.column_writers.iter_mut())
351            .map(|(field, column_writer)| {
352                let array = batch
353                    .column_by_name(&field.name)
354                    .ok_or(Error::InvalidInput {
355                        source: format!(
356                            "Cannot write batch.  The batch was missing the column `{}`",
357                            field.name
358                        )
359                        .into(),
360                        location: location!(),
361                    })?;
362                let repdef = RepDefBuilder::default();
363                let num_rows = array.len() as u64;
364                column_writer.maybe_encode(
365                    array.clone(),
366                    external_buffers,
367                    repdef,
368                    self.rows_written,
369                    num_rows,
370                )
371            })
372            .collect::<Result<Vec<_>>>()
373    }
374
375    /// Schedule a batch of data to be written to the file
376    ///
377    /// Note: the future returned by this method may complete before the data has been fully
378    /// flushed to the file (some data may be in the data cache or the I/O cache)
379    pub async fn write_batch(&mut self, batch: &RecordBatch) -> Result<()> {
380        debug!(
381            "write_batch called with {} bytes of data",
382            batch.get_array_memory_size()
383        );
384        self.ensure_initialized(batch)?;
385        self.verify_nullability_constraints(batch)?;
386        let num_rows = batch.num_rows() as u64;
387        if num_rows == 0 {
388            return Ok(());
389        }
390        if num_rows > u32::MAX as u64 {
391            return Err(Error::InvalidInput {
392                source: "cannot write Lance files with more than 2^32 rows".into(),
393                location: location!(),
394            });
395        }
396        // First we push each array into its column writer.  This may or may not generate enough
397        // data to trigger an encoding task.  We collect any encoding tasks into a queue.
398        let mut external_buffers =
399            OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
400        let encoding_tasks = self.encode_batch(batch, &mut external_buffers)?;
401        // Next, write external buffers
402        for external_buffer in external_buffers.take_buffers() {
403            Self::do_write_buffer(&mut self.writer, &external_buffer).await?;
404        }
405
406        let encoding_tasks = encoding_tasks
407            .into_iter()
408            .flatten()
409            .collect::<FuturesOrdered<_>>();
410
411        self.rows_written = match self.rows_written.checked_add(batch.num_rows() as u64) {
412            Some(rows_written) => rows_written,
413            None => {
414                return Err(Error::InvalidInput { source: format!("cannot write batch with {} rows because {} rows have already been written and Lance files cannot contain more than 2^64 rows", num_rows, self.rows_written).into(), location: location!() });
415            }
416        };
417
418        self.write_pages(encoding_tasks).await?;
419
420        Ok(())
421    }
422
423    async fn write_column_metadata(
424        &mut self,
425        metadata: pbfile::ColumnMetadata,
426    ) -> Result<(u64, u64)> {
427        let metadata_bytes = metadata.encode_to_vec();
428        let position = self.writer.tell().await? as u64;
429        let len = metadata_bytes.len() as u64;
430        self.writer.write_all(&metadata_bytes).await?;
431        Ok((position, len))
432    }
433
434    async fn write_column_metadatas(&mut self) -> Result<Vec<(u64, u64)>> {
435        let mut metadatas = Vec::new();
436        std::mem::swap(&mut self.column_metadata, &mut metadatas);
437        let mut metadata_positions = Vec::with_capacity(metadatas.len());
438        for metadata in metadatas {
439            metadata_positions.push(self.write_column_metadata(metadata).await?);
440        }
441        Ok(metadata_positions)
442    }
443
444    fn make_file_descriptor(
445        schema: &lance_core::datatypes::Schema,
446        num_rows: u64,
447    ) -> Result<pb::FileDescriptor> {
448        let fields_with_meta = FieldsWithMeta::from(schema);
449        Ok(pb::FileDescriptor {
450            schema: Some(pb::Schema {
451                fields: fields_with_meta.fields.0,
452                metadata: fields_with_meta.metadata,
453            }),
454            length: num_rows,
455        })
456    }
457
458    async fn write_global_buffers(&mut self) -> Result<Vec<(u64, u64)>> {
459        let schema = self.schema.as_mut().ok_or(Error::invalid_input("No schema provided on writer open and no data provided.  Schema is unknown and file cannot be created", location!()))?;
460        schema.metadata = std::mem::take(&mut self.schema_metadata);
461        let file_descriptor = Self::make_file_descriptor(schema, self.rows_written)?;
462        let file_descriptor_bytes = file_descriptor.encode_to_vec();
463        let file_descriptor_len = file_descriptor_bytes.len() as u64;
464        let file_descriptor_position = self.writer.tell().await? as u64;
465        self.writer.write_all(&file_descriptor_bytes).await?;
466        let mut gbo_table = Vec::with_capacity(1 + self.global_buffers.len());
467        gbo_table.push((file_descriptor_position, file_descriptor_len));
468        gbo_table.append(&mut self.global_buffers);
469        Ok(gbo_table)
470    }
471
472    /// Add a metadata entry to the schema
473    ///
474    /// This method is useful because sometimes the metadata is not known until after the
475    /// data has been written.  This method allows you to alter the schema metadata.  It
476    /// must be called before `finish` is called.
477    pub fn add_schema_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
478        self.schema_metadata.insert(key.into(), value.into());
479    }
480
481    /// Adds a global buffer to the file
482    ///
483    /// The global buffer can contain any arbitrary bytes.  It will be written to the disk
484    /// immediately.  This method returns the index of the global buffer (this will always
485    /// start at 1 and increment by 1 each time this method is called)
486    pub async fn add_global_buffer(&mut self, buffer: Bytes) -> Result<u32> {
487        let position = self.writer.tell().await? as u64;
488        let len = buffer.len() as u64;
489        Self::do_write_buffer(&mut self.writer, &buffer).await?;
490        self.global_buffers.push((position, len));
491        Ok(self.global_buffers.len() as u32)
492    }
493
494    async fn finish_writers(&mut self) -> Result<()> {
495        let mut col_idx = 0;
496        for mut writer in std::mem::take(&mut self.column_writers) {
497            let mut external_buffers =
498                OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
499            let columns = writer.finish(&mut external_buffers).await?;
500            for buffer in external_buffers.take_buffers() {
501                self.writer.write_all(&buffer).await?;
502            }
503            debug_assert_eq!(
504                columns.len(),
505                writer.num_columns() as usize,
506                "Expected {} columns from column at index {} and got {}",
507                writer.num_columns(),
508                col_idx,
509                columns.len()
510            );
511            for column in columns {
512                for page in column.final_pages {
513                    self.write_page(page).await?;
514                }
515                let column_metadata = &mut self.column_metadata[col_idx];
516                let mut buffer_pos = self.writer.tell().await? as u64;
517                for buffer in column.column_buffers {
518                    column_metadata.buffer_offsets.push(buffer_pos);
519                    let mut size = 0;
520                    Self::do_write_buffer(&mut self.writer, &buffer).await?;
521                    size += buffer.len() as u64;
522                    buffer_pos += size;
523                    column_metadata.buffer_sizes.push(size);
524                }
525                let encoded_encoding = Any::from_msg(&column.encoding)?.encode_to_vec();
526                column_metadata.encoding = Some(pbfile::Encoding {
527                    location: Some(pbfile::encoding::Location::Direct(pbfile::DirectEncoding {
528                        encoding: encoded_encoding,
529                    })),
530                });
531                col_idx += 1;
532            }
533        }
534        if col_idx != self.column_metadata.len() {
535            panic!(
536                "Column writers finished with {} columns but we expected {}",
537                col_idx,
538                self.column_metadata.len()
539            );
540        }
541        Ok(())
542    }
543
544    /// Converts self.version (which is a mix of "software version" and
545    /// "format version" into a format version)
546    fn version_to_numbers(&self) -> (u16, u16) {
547        let version = self.options.format_version.unwrap_or_default();
548        match version.resolve() {
549            LanceFileVersion::V2_0 => (0, 3),
550            LanceFileVersion::V2_1 => (2, 1),
551            _ => panic!("Unsupported version: {}", version),
552        }
553    }
554
555    /// Finishes writing the file
556    ///
557    /// This method will wait until all data has been flushed to the file.  Then it
558    /// will write the file metadata and the footer.  It will not return until all
559    /// data has been flushed and the file has been closed.
560    ///
561    /// Returns the total number of rows written
562    pub async fn finish(&mut self) -> Result<u64> {
563        // 1. flush any remaining data and write out those pages
564        let mut external_buffers =
565            OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
566        let encoding_tasks = self
567            .column_writers
568            .iter_mut()
569            .map(|writer| writer.flush(&mut external_buffers))
570            .collect::<Result<Vec<_>>>()?;
571        for external_buffer in external_buffers.take_buffers() {
572            Self::do_write_buffer(&mut self.writer, &external_buffer).await?;
573        }
574        let encoding_tasks = encoding_tasks
575            .into_iter()
576            .flatten()
577            .collect::<FuturesOrdered<_>>();
578        self.write_pages(encoding_tasks).await?;
579
580        self.finish_writers().await?;
581
582        // 3. write global buffers (we write the schema here)
583        let global_buffer_offsets = self.write_global_buffers().await?;
584        let num_global_buffers = global_buffer_offsets.len() as u32;
585
586        // 4. write the column metadatas
587        let column_metadata_start = self.writer.tell().await? as u64;
588        let metadata_positions = self.write_column_metadatas().await?;
589
590        // 5. write the column metadata offset table
591        let cmo_table_start = self.writer.tell().await? as u64;
592        for (meta_pos, meta_len) in metadata_positions {
593            self.writer.write_u64_le(meta_pos).await?;
594            self.writer.write_u64_le(meta_len).await?;
595        }
596
597        // 6. write global buffers offset table
598        let gbo_table_start = self.writer.tell().await? as u64;
599        for (gbo_pos, gbo_len) in global_buffer_offsets {
600            self.writer.write_u64_le(gbo_pos).await?;
601            self.writer.write_u64_le(gbo_len).await?;
602        }
603
604        let (major, minor) = self.version_to_numbers();
605        // 7. write the footer
606        self.writer.write_u64_le(column_metadata_start).await?;
607        self.writer.write_u64_le(cmo_table_start).await?;
608        self.writer.write_u64_le(gbo_table_start).await?;
609        self.writer.write_u32_le(num_global_buffers).await?;
610        self.writer.write_u32_le(self.num_columns).await?;
611        self.writer.write_u16_le(major).await?;
612        self.writer.write_u16_le(minor).await?;
613        self.writer.write_all(MAGIC).await?;
614
615        // 7. close the writer
616        self.writer.shutdown().await?;
617        Ok(self.rows_written)
618    }
619
620    pub async fn tell(&mut self) -> Result<u64> {
621        Ok(self.writer.tell().await? as u64)
622    }
623
624    pub fn field_id_to_column_indices(&self) -> &[(u32, u32)] {
625        &self.field_id_to_column_indices
626    }
627}
628
629/// Utility trait for converting EncodedBatch to Bytes using the
630/// lance file format
631pub trait EncodedBatchWriteExt {
632    /// Serializes into a lance file, including the schema
633    fn try_to_self_described_lance(&self, version: LanceFileVersion) -> Result<Bytes>;
634    /// Serializes into a lance file, without the schema.
635    ///
636    /// The schema must be provided to deserialize the buffer
637    fn try_to_mini_lance(&self, version: LanceFileVersion) -> Result<Bytes>;
638}
639
640// Creates a lance footer and appends it to the encoded data
641//
642// The logic here is very similar to logic in the FileWriter except we
643// are using BufMut (put_xyz) instead of AsyncWrite (write_xyz).
644fn concat_lance_footer(
645    batch: &EncodedBatch,
646    write_schema: bool,
647    version: LanceFileVersion,
648) -> Result<Bytes> {
649    // Estimating 1MiB for file footer
650    let mut data = BytesMut::with_capacity(batch.data.len() + 1024 * 1024);
651    data.put(batch.data.clone());
652    // write global buffers (we write the schema here)
653    let global_buffers = if write_schema {
654        let schema_start = data.len() as u64;
655        let lance_schema = lance_core::datatypes::Schema::try_from(batch.schema.as_ref())?;
656        let descriptor = FileWriter::make_file_descriptor(&lance_schema, batch.num_rows)?;
657        let descriptor_bytes = descriptor.encode_to_vec();
658        let descriptor_len = descriptor_bytes.len() as u64;
659        data.put(descriptor_bytes.as_slice());
660
661        vec![(schema_start, descriptor_len)]
662    } else {
663        vec![]
664    };
665    let col_metadata_start = data.len() as u64;
666
667    let mut col_metadata_positions = Vec::new();
668    // Write column metadata
669    for col in &batch.page_table {
670        let position = data.len() as u64;
671        let pages = col
672            .page_infos
673            .iter()
674            .map(|page_info| {
675                let encoded_encoding = match &page_info.encoding {
676                    PageEncoding::Legacy(array_encoding) => {
677                        Any::from_msg(array_encoding)?.encode_to_vec()
678                    }
679                    PageEncoding::Structural(page_layout) => {
680                        Any::from_msg(page_layout)?.encode_to_vec()
681                    }
682                };
683                let (buffer_offsets, buffer_sizes): (Vec<_>, Vec<_>) = page_info
684                    .buffer_offsets_and_sizes
685                    .as_ref()
686                    .iter()
687                    .cloned()
688                    .unzip();
689                Ok(pbfile::column_metadata::Page {
690                    buffer_offsets,
691                    buffer_sizes,
692                    encoding: Some(pbfile::Encoding {
693                        location: Some(pbfile::encoding::Location::Direct(DirectEncoding {
694                            encoding: encoded_encoding,
695                        })),
696                    }),
697                    length: page_info.num_rows,
698                    priority: page_info.priority,
699                })
700            })
701            .collect::<Result<Vec<_>>>()?;
702        let (buffer_offsets, buffer_sizes): (Vec<_>, Vec<_>) =
703            col.buffer_offsets_and_sizes.iter().cloned().unzip();
704        let encoded_col_encoding = Any::from_msg(&col.encoding)?.encode_to_vec();
705        let column = pbfile::ColumnMetadata {
706            pages,
707            buffer_offsets,
708            buffer_sizes,
709            encoding: Some(pbfile::Encoding {
710                location: Some(pbfile::encoding::Location::Direct(pbfile::DirectEncoding {
711                    encoding: encoded_col_encoding,
712                })),
713            }),
714        };
715        let column_bytes = column.encode_to_vec();
716        col_metadata_positions.push((position, column_bytes.len() as u64));
717        data.put(column_bytes.as_slice());
718    }
719    // Write column metadata offsets table
720    let cmo_table_start = data.len() as u64;
721    for (meta_pos, meta_len) in col_metadata_positions {
722        data.put_u64_le(meta_pos);
723        data.put_u64_le(meta_len);
724    }
725    // Write global buffers offsets table
726    let gbo_table_start = data.len() as u64;
727    let num_global_buffers = global_buffers.len() as u32;
728    for (gbo_pos, gbo_len) in global_buffers {
729        data.put_u64_le(gbo_pos);
730        data.put_u64_le(gbo_len);
731    }
732
733    let (major, minor) = version.to_numbers();
734
735    // write the footer
736    data.put_u64_le(col_metadata_start);
737    data.put_u64_le(cmo_table_start);
738    data.put_u64_le(gbo_table_start);
739    data.put_u32_le(num_global_buffers);
740    data.put_u32_le(batch.page_table.len() as u32);
741    data.put_u16_le(major as u16);
742    data.put_u16_le(minor as u16);
743    data.put(MAGIC.as_slice());
744
745    Ok(data.freeze())
746}
747
748impl EncodedBatchWriteExt for EncodedBatch {
749    fn try_to_self_described_lance(&self, version: LanceFileVersion) -> Result<Bytes> {
750        concat_lance_footer(self, true, version)
751    }
752
753    fn try_to_mini_lance(&self, version: LanceFileVersion) -> Result<Bytes> {
754        concat_lance_footer(self, false, version)
755    }
756}
757
758#[cfg(test)]
759mod tests {
760    use std::sync::Arc;
761
762    use crate::v2::reader::{FileReader, FileReaderOptions};
763    use crate::v2::testing::FsFixture;
764    use crate::v2::writer::{FileWriter, FileWriterOptions, ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES};
765    use arrow_array::{types::Float64Type, RecordBatchReader};
766    use arrow_array::{RecordBatch, UInt64Array};
767    use arrow_schema::{DataType, Field, Schema};
768    use lance_core::cache::LanceCache;
769    use lance_core::datatypes::Schema as LanceSchema;
770    use lance_datagen::{array, gen, BatchCount, RowCount};
771    use lance_encoding::decoder::DecoderPlugins;
772    use lance_io::object_store::ObjectStore;
773    use lance_io::utils::CachedFileSize;
774    use object_store::path::Path;
775    use tempfile::tempdir;
776
777    #[tokio::test]
778    async fn test_basic_write() {
779        let tmp_dir = tempfile::tempdir().unwrap();
780        let tmp_path: String = tmp_dir.path().to_str().unwrap().to_owned();
781        let tmp_path = Path::parse(tmp_path).unwrap();
782        let tmp_path = tmp_path.child("some_file.lance");
783        let obj_store = Arc::new(ObjectStore::local());
784
785        let reader = gen()
786            .col("score", array::rand::<Float64Type>())
787            .into_reader_rows(RowCount::from(1000), BatchCount::from(10));
788
789        let writer = obj_store.create(&tmp_path).await.unwrap();
790
791        let lance_schema =
792            lance_core::datatypes::Schema::try_from(reader.schema().as_ref()).unwrap();
793
794        let mut file_writer =
795            FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
796
797        for batch in reader {
798            file_writer.write_batch(&batch.unwrap()).await.unwrap();
799        }
800        file_writer.add_schema_metadata("foo", "bar");
801        file_writer.finish().await.unwrap();
802        // Tests asserting the contents of the written file are in reader.rs
803    }
804
805    #[tokio::test]
806    async fn test_write_empty() {
807        let tmp_dir = tempfile::tempdir().unwrap();
808        let tmp_path: String = tmp_dir.path().to_str().unwrap().to_owned();
809        let tmp_path = Path::parse(tmp_path).unwrap();
810        let tmp_path = tmp_path.child("some_file.lance");
811        let obj_store = Arc::new(ObjectStore::local());
812
813        let reader = gen()
814            .col("score", array::rand::<Float64Type>())
815            .into_reader_rows(RowCount::from(0), BatchCount::from(0));
816
817        let writer = obj_store.create(&tmp_path).await.unwrap();
818
819        let lance_schema =
820            lance_core::datatypes::Schema::try_from(reader.schema().as_ref()).unwrap();
821
822        let mut file_writer =
823            FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
824
825        for batch in reader {
826            file_writer.write_batch(&batch.unwrap()).await.unwrap();
827        }
828        file_writer.add_schema_metadata("foo", "bar");
829        file_writer.finish().await.unwrap();
830    }
831
832    #[tokio::test]
833    async fn test_max_page_bytes_enforced() {
834        let arrow_field = Field::new("data", DataType::UInt64, false);
835        let arrow_schema = Schema::new(vec![arrow_field]);
836        let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();
837
838        // 8MiB
839        let data: Vec<u64> = (0..1_000_000).collect();
840        let array = UInt64Array::from(data);
841        let batch =
842            RecordBatch::try_new(arrow_schema.clone().into(), vec![Arc::new(array)]).unwrap();
843
844        let options = FileWriterOptions {
845            max_page_bytes: Some(1024 * 1024), // 1MB
846            ..Default::default()
847        };
848
849        let tmp_dir = tempdir().unwrap();
850        let path = tmp_dir.path().join("test.lance");
851        let object_store = ObjectStore::local();
852        let mut writer = FileWriter::try_new(
853            object_store
854                .create(&Path::from(path.to_str().unwrap()))
855                .await
856                .unwrap(),
857            lance_schema,
858            options,
859        )
860        .unwrap();
861
862        writer.write_batch(&batch).await.unwrap();
863        writer.finish().await.unwrap();
864
865        let fs = FsFixture::default();
866        let file_scheduler = fs
867            .scheduler
868            .open_file(
869                &Path::from(path.to_str().unwrap()),
870                &CachedFileSize::unknown(),
871            )
872            .await
873            .unwrap();
874        let file_reader = FileReader::try_open(
875            file_scheduler,
876            None,
877            Arc::<DecoderPlugins>::default(),
878            &LanceCache::no_cache(),
879            FileReaderOptions::default(),
880        )
881        .await
882        .unwrap();
883
884        let column_meta = file_reader.metadata();
885
886        let mut total_page_num: u32 = 0;
887        for (col_idx, col_metadata) in column_meta.column_metadatas.iter().enumerate() {
888            assert!(
889                !col_metadata.pages.is_empty(),
890                "Column {} has no pages",
891                col_idx
892            );
893
894            for (page_idx, page) in col_metadata.pages.iter().enumerate() {
895                total_page_num += 1;
896                let total_size: u64 = page.buffer_sizes.iter().sum();
897                assert!(
898                    total_size <= 1024 * 1024,
899                    "Column {} Page {} size {} exceeds 1MB limit",
900                    col_idx,
901                    page_idx,
902                    total_size
903                );
904            }
905        }
906
907        assert_eq!(total_page_num, 8)
908    }
909
910    #[tokio::test(flavor = "current_thread")]
911    async fn test_max_page_bytes_env_var() {
912        let arrow_field = Field::new("data", DataType::UInt64, false);
913        let arrow_schema = Schema::new(vec![arrow_field]);
914        let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();
915        // 4MiB
916        let data: Vec<u64> = (0..500_000).collect();
917        let array = UInt64Array::from(data);
918        let batch =
919            RecordBatch::try_new(arrow_schema.clone().into(), vec![Arc::new(array)]).unwrap();
920
921        // 2MiB
922        std::env::set_var(ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, "2097152");
923
924        let options = FileWriterOptions {
925            max_page_bytes: None, // enforce env
926            ..Default::default()
927        };
928
929        let tmp_dir = tempdir().unwrap();
930        let path = tmp_dir.path().join("test_env_var.lance");
931        let object_store = ObjectStore::local();
932        let mut writer = FileWriter::try_new(
933            object_store
934                .create(&Path::from(path.to_str().unwrap()))
935                .await
936                .unwrap(),
937            lance_schema.clone(),
938            options,
939        )
940        .unwrap();
941
942        writer.write_batch(&batch).await.unwrap();
943        writer.finish().await.unwrap();
944
945        let fs = FsFixture::default();
946        let file_scheduler = fs
947            .scheduler
948            .open_file(
949                &Path::from(path.to_str().unwrap()),
950                &CachedFileSize::unknown(),
951            )
952            .await
953            .unwrap();
954        let file_reader = FileReader::try_open(
955            file_scheduler,
956            None,
957            Arc::<DecoderPlugins>::default(),
958            &LanceCache::no_cache(),
959            FileReaderOptions::default(),
960        )
961        .await
962        .unwrap();
963
964        for col_metadata in file_reader.metadata().column_metadatas.iter() {
965            for page in col_metadata.pages.iter() {
966                let total_size: u64 = page.buffer_sizes.iter().sum();
967                assert!(
968                    total_size <= 2 * 1024 * 1024,
969                    "Page size {} exceeds 2MB limit",
970                    total_size
971                );
972            }
973        }
974
975        std::env::set_var(ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, "");
976    }
977}