lance_file/v2/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use core::panic;
5use std::collections::HashMap;
6use std::sync::atomic::AtomicBool;
7use std::sync::Arc;
8
9use arrow_array::RecordBatch;
10
11use arrow_data::ArrayData;
12use bytes::{BufMut, Bytes, BytesMut};
13use futures::stream::FuturesOrdered;
14use futures::StreamExt;
15use lance_core::datatypes::{Field, Schema as LanceSchema};
16use lance_core::utils::bit::pad_bytes;
17use lance_core::{Error, Result};
18use lance_encoding::decoder::PageEncoding;
19use lance_encoding::encoder::{
20    default_encoding_strategy, BatchEncoder, EncodeTask, EncodedBatch, EncodedPage,
21    EncodingOptions, FieldEncoder, FieldEncodingStrategy, OutOfLineBuffers,
22};
23use lance_encoding::repdef::RepDefBuilder;
24use lance_encoding::version::LanceFileVersion;
25use lance_io::object_store::ObjectStore;
26use lance_io::object_writer::ObjectWriter;
27use lance_io::traits::Writer;
28use log::{debug, warn};
29use object_store::path::Path;
30use prost::Message;
31use prost_types::Any;
32use snafu::location;
33use tokio::io::AsyncWriteExt;
34use tracing::instrument;
35
36use crate::datatypes::FieldsWithMeta;
37use crate::format::pb;
38use crate::format::pbfile;
39use crate::format::pbfile::DirectEncoding;
40use crate::format::MAGIC;
41
42/// Pages buffers are aligned to 64 bytes
43pub(crate) const PAGE_BUFFER_ALIGNMENT: usize = 64;
44const PAD_BUFFER: [u8; PAGE_BUFFER_ALIGNMENT] = [72; PAGE_BUFFER_ALIGNMENT];
45const MAX_PAGE_BYTES: usize = 32 * 1024 * 1024;
46const ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES: &str = "LANCE_FILE_WRITER_MAX_PAGE_BYTES";
47
48#[derive(Debug, Clone, Default)]
49pub struct FileWriterOptions {
50    /// How many bytes to use for buffering column data
51    ///
52    /// When data comes in small batches the writer will buffer column data so that
53    /// larger pages can be created.  This value will be divided evenly across all of the
54    /// columns.  Generally you want this to be at least large enough to match your
55    /// filesystem's ideal read size per column.
56    ///
57    /// In some cases you might want this value to be even larger if you have highly
58    /// compressible data.  However, if this is too large, then the writer could require
59    /// a lot of memory and write performance may suffer if the CPU-expensive encoding
60    /// falls behind and can't be interleaved with the I/O expensive flushing.
61    ///
62    /// The default will use 8MiB per column which should be reasonable for most cases.
63    // TODO: Do we need to be able to set this on a per-column basis?
64    pub data_cache_bytes: Option<u64>,
65    /// A hint to indicate the max size of a page
66    ///
67    /// This hint can't always be respected.  A single value could be larger than this value
68    /// and we never slice single values.  In addition, there are some cases where it can be
69    /// difficult to know size up-front and so we might not be able to respect this value.
70    pub max_page_bytes: Option<u64>,
71    /// The file writer buffers columns until enough data has arrived to flush a page
72    /// to disk.
73    ///
74    /// Some columns with small data types may not flush very often.  These arrays can
75    /// stick around for a long time.  These arrays might also be keeping larger data
76    /// structures alive.  By default, the writer will make a deep copy of this array
77    /// to avoid any potential memory leaks.  However, this can be disabled for a
78    /// (probably minor) performance boost if you are sure that arrays are not keeping
79    /// any sibling structures alive (this typically means the array was allocated in
80    /// the same language / runtime as the writer)
81    ///
82    /// Do not enable this if your data is arriving from the C data interface.
83    /// Data typically arrives one "batch" at a time (encoded in the C data interface
84    /// as a struct array).  Each array in that batch keeps the entire batch alive.
85    /// This means a small boolean array (which we will buffer in memory for quite a
86    /// while) might keep a much larger record batch around in memory (even though most
87    /// of that batch's data has been written to disk)
88    pub keep_original_array: Option<bool>,
89    pub encoding_strategy: Option<Arc<dyn FieldEncodingStrategy>>,
90    /// The format version to use when writing the file
91    ///
92    /// This controls which encodings will be used when encoding the data.  Newer
93    /// versions may have more efficient encodings.  However, newer format versions will
94    /// require more up-to-date readers to read the data.
95    pub format_version: Option<LanceFileVersion>,
96}
97
98pub struct FileWriter {
99    writer: ObjectWriter,
100    schema: Option<LanceSchema>,
101    column_writers: Vec<Box<dyn FieldEncoder>>,
102    column_metadata: Vec<pbfile::ColumnMetadata>,
103    field_id_to_column_indices: Vec<(u32, u32)>,
104    num_columns: u32,
105    rows_written: u64,
106    global_buffers: Vec<(u64, u64)>,
107    schema_metadata: HashMap<String, String>,
108    options: FileWriterOptions,
109}
110
111fn initial_column_metadata() -> pbfile::ColumnMetadata {
112    pbfile::ColumnMetadata {
113        pages: Vec::new(),
114        buffer_offsets: Vec::new(),
115        buffer_sizes: Vec::new(),
116        encoding: None,
117    }
118}
119
120static WARNED_ON_UNSTABLE_API: AtomicBool = AtomicBool::new(false);
121
122impl FileWriter {
123    /// Create a new FileWriter with a desired output schema
124    pub fn try_new(
125        object_writer: ObjectWriter,
126        schema: LanceSchema,
127        options: FileWriterOptions,
128    ) -> Result<Self> {
129        let mut writer = Self::new_lazy(object_writer, options);
130        writer.initialize(schema)?;
131        Ok(writer)
132    }
133
134    /// Create a new FileWriter without a desired output schema
135    ///
136    /// The output schema will be set based on the first batch of data to arrive.
137    /// If no data arrives and the writer is finished then the write will fail.
138    pub fn new_lazy(object_writer: ObjectWriter, options: FileWriterOptions) -> Self {
139        if let Some(format_version) = options.format_version {
140            if format_version > LanceFileVersion::Stable
141                && WARNED_ON_UNSTABLE_API
142                    .compare_exchange(
143                        false,
144                        true,
145                        std::sync::atomic::Ordering::Relaxed,
146                        std::sync::atomic::Ordering::Relaxed,
147                    )
148                    .is_ok()
149            {
150                warn!("You have requested an unstable format version.  Files written with this format version may not be readable in the future!  This is a development feature and should only be used for experimentation and never for production data.");
151            }
152        }
153        Self {
154            writer: object_writer,
155            schema: None,
156            column_writers: Vec::new(),
157            column_metadata: Vec::new(),
158            num_columns: 0,
159            rows_written: 0,
160            field_id_to_column_indices: Vec::new(),
161            global_buffers: Vec::new(),
162            schema_metadata: HashMap::new(),
163            options,
164        }
165    }
166
167    /// Write a series of record batches to a new file
168    ///
169    /// Returns the number of rows written
170    pub async fn create_file_with_batches(
171        store: &ObjectStore,
172        path: &Path,
173        schema: lance_core::datatypes::Schema,
174        batches: impl Iterator<Item = RecordBatch> + Send,
175        options: FileWriterOptions,
176    ) -> Result<usize> {
177        let writer = store.create(path).await?;
178        let mut writer = Self::try_new(writer, schema, options)?;
179        for batch in batches {
180            writer.write_batch(&batch).await?;
181        }
182        Ok(writer.finish().await? as usize)
183    }
184
185    async fn do_write_buffer(writer: &mut ObjectWriter, buf: &[u8]) -> Result<()> {
186        writer.write_all(buf).await?;
187        let pad_bytes = pad_bytes::<PAGE_BUFFER_ALIGNMENT>(buf.len());
188        writer.write_all(&PAD_BUFFER[..pad_bytes]).await?;
189        Ok(())
190    }
191
192    /// Returns the format version that will be used when writing the file
193    pub fn version(&self) -> LanceFileVersion {
194        self.options.format_version.unwrap_or_default()
195    }
196
197    async fn write_page(&mut self, encoded_page: EncodedPage) -> Result<()> {
198        let buffers = encoded_page.data;
199        let mut buffer_offsets = Vec::with_capacity(buffers.len());
200        let mut buffer_sizes = Vec::with_capacity(buffers.len());
201        for buffer in buffers {
202            buffer_offsets.push(self.writer.tell().await? as u64);
203            buffer_sizes.push(buffer.len() as u64);
204            Self::do_write_buffer(&mut self.writer, &buffer).await?;
205        }
206        let encoded_encoding = match encoded_page.description {
207            PageEncoding::Legacy(array_encoding) => Any::from_msg(&array_encoding)?.encode_to_vec(),
208            PageEncoding::Structural(page_layout) => Any::from_msg(&page_layout)?.encode_to_vec(),
209        };
210        let page = pbfile::column_metadata::Page {
211            buffer_offsets,
212            buffer_sizes,
213            encoding: Some(pbfile::Encoding {
214                location: Some(pbfile::encoding::Location::Direct(DirectEncoding {
215                    encoding: encoded_encoding,
216                })),
217            }),
218            length: encoded_page.num_rows,
219            priority: encoded_page.row_number,
220        };
221        self.column_metadata[encoded_page.column_idx as usize]
222            .pages
223            .push(page);
224        Ok(())
225    }
226
227    #[instrument(skip_all, level = "debug")]
228    async fn write_pages(&mut self, mut encoding_tasks: FuturesOrdered<EncodeTask>) -> Result<()> {
229        // As soon as an encoding task is done we write it.  There is no parallelism
230        // needed here because "writing" is really just submitting the buffer to the
231        // underlying write scheduler (either the OS or object_store's scheduler for
232        // cloud writes).  The only time we might truly await on write_page is if the
233        // scheduler's write queue is full.
234        //
235        // Also, there is no point in trying to make write_page parallel anyways
236        // because we wouldn't want buffers getting mixed up across pages.
237        while let Some(encoding_task) = encoding_tasks.next().await {
238            let encoded_page = encoding_task?;
239            self.write_page(encoded_page).await?;
240        }
241        // It's important to flush here, we don't know when the next batch will arrive
242        // and the underlying cloud store could have writes in progress that won't advance
243        // until we interact with the writer again.  These in-progress writes will time out
244        // if we don't flush.
245        self.writer.flush().await?;
246        Ok(())
247    }
248
249    /// Schedule batches of data to be written to the file
250    pub async fn write_batches(
251        &mut self,
252        batches: impl Iterator<Item = &RecordBatch>,
253    ) -> Result<()> {
254        for batch in batches {
255            self.write_batch(batch).await?;
256        }
257        Ok(())
258    }
259
260    fn verify_field_nullability(arr: &ArrayData, field: &Field) -> Result<()> {
261        if !field.nullable && arr.null_count() > 0 {
262            return Err(Error::invalid_input(format!("The field `{}` contained null values even though the field is marked non-null in the schema", field.name), location!()));
263        }
264
265        for (child_field, child_arr) in field.children.iter().zip(arr.child_data()) {
266            Self::verify_field_nullability(child_arr, child_field)?;
267        }
268
269        Ok(())
270    }
271
272    fn verify_nullability_constraints(&self, batch: &RecordBatch) -> Result<()> {
273        for (col, field) in batch
274            .columns()
275            .iter()
276            .zip(self.schema.as_ref().unwrap().fields.iter())
277        {
278            Self::verify_field_nullability(&col.to_data(), field)?;
279        }
280        Ok(())
281    }
282
283    fn initialize(&mut self, mut schema: LanceSchema) -> Result<()> {
284        let cache_bytes_per_column = if let Some(data_cache_bytes) = self.options.data_cache_bytes {
285            data_cache_bytes / schema.fields.len() as u64
286        } else {
287            8 * 1024 * 1024
288        };
289
290        let max_page_bytes = self.options.max_page_bytes.unwrap_or_else(|| {
291            std::env::var(ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES)
292                .map(|s| {
293                    s.parse::<u64>().unwrap_or_else(|e| {
294                        warn!(
295                            "Failed to parse {}: {}, using default",
296                            ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, e
297                        );
298                        MAX_PAGE_BYTES as u64
299                    })
300                })
301                .unwrap_or(MAX_PAGE_BYTES as u64)
302        });
303
304        schema.validate()?;
305
306        let keep_original_array = self.options.keep_original_array.unwrap_or(false);
307        let encoding_strategy = self.options.encoding_strategy.clone().unwrap_or_else(|| {
308            let version = self.version();
309            default_encoding_strategy(version).into()
310        });
311
312        let encoding_options = EncodingOptions {
313            cache_bytes_per_column,
314            max_page_bytes,
315            keep_original_array,
316            buffer_alignment: PAGE_BUFFER_ALIGNMENT as u64,
317        };
318        let encoder =
319            BatchEncoder::try_new(&schema, encoding_strategy.as_ref(), &encoding_options)?;
320        self.num_columns = encoder.num_columns();
321
322        self.column_writers = encoder.field_encoders;
323        self.column_metadata = vec![initial_column_metadata(); self.num_columns as usize];
324        self.field_id_to_column_indices = encoder.field_id_to_column_index;
325        self.schema_metadata
326            .extend(std::mem::take(&mut schema.metadata));
327        self.schema = Some(schema);
328        Ok(())
329    }
330
331    fn ensure_initialized(&mut self, batch: &RecordBatch) -> Result<&LanceSchema> {
332        if self.schema.is_none() {
333            let schema = LanceSchema::try_from(batch.schema().as_ref())?;
334            self.initialize(schema)?;
335        }
336        Ok(self.schema.as_ref().unwrap())
337    }
338
339    #[instrument(skip_all, level = "debug")]
340    fn encode_batch(
341        &mut self,
342        batch: &RecordBatch,
343        external_buffers: &mut OutOfLineBuffers,
344    ) -> Result<Vec<Vec<EncodeTask>>> {
345        self.schema
346            .as_ref()
347            .unwrap()
348            .fields
349            .iter()
350            .zip(self.column_writers.iter_mut())
351            .map(|(field, column_writer)| {
352                let array = batch
353                    .column_by_name(&field.name)
354                    .ok_or(Error::InvalidInput {
355                        source: format!(
356                            "Cannot write batch.  The batch was missing the column `{}`",
357                            field.name
358                        )
359                        .into(),
360                        location: location!(),
361                    })?;
362                let repdef = RepDefBuilder::default();
363                let num_rows = array.len() as u64;
364                column_writer.maybe_encode(
365                    array.clone(),
366                    external_buffers,
367                    repdef,
368                    self.rows_written,
369                    num_rows,
370                )
371            })
372            .collect::<Result<Vec<_>>>()
373    }
374
375    /// Schedule a batch of data to be written to the file
376    ///
377    /// Note: the future returned by this method may complete before the data has been fully
378    /// flushed to the file (some data may be in the data cache or the I/O cache)
379    pub async fn write_batch(&mut self, batch: &RecordBatch) -> Result<()> {
380        debug!(
381            "write_batch called with {} bytes of data",
382            batch.get_array_memory_size()
383        );
384        self.ensure_initialized(batch)?;
385        self.verify_nullability_constraints(batch)?;
386        let num_rows = batch.num_rows() as u64;
387        if num_rows == 0 {
388            return Ok(());
389        }
390        if num_rows > u32::MAX as u64 {
391            return Err(Error::InvalidInput {
392                source: "cannot write Lance files with more than 2^32 rows".into(),
393                location: location!(),
394            });
395        }
396        // First we push each array into its column writer.  This may or may not generate enough
397        // data to trigger an encoding task.  We collect any encoding tasks into a queue.
398        let mut external_buffers =
399            OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
400        let encoding_tasks = self.encode_batch(batch, &mut external_buffers)?;
401        // Next, write external buffers
402        for external_buffer in external_buffers.take_buffers() {
403            Self::do_write_buffer(&mut self.writer, &external_buffer).await?;
404        }
405
406        let encoding_tasks = encoding_tasks
407            .into_iter()
408            .flatten()
409            .collect::<FuturesOrdered<_>>();
410
411        self.rows_written = match self.rows_written.checked_add(batch.num_rows() as u64) {
412            Some(rows_written) => rows_written,
413            None => {
414                return Err(Error::InvalidInput { source: format!("cannot write batch with {} rows because {} rows have already been written and Lance files cannot contain more than 2^64 rows", num_rows, self.rows_written).into(), location: location!() });
415            }
416        };
417
418        self.write_pages(encoding_tasks).await?;
419
420        Ok(())
421    }
422
423    async fn write_column_metadata(
424        &mut self,
425        metadata: pbfile::ColumnMetadata,
426    ) -> Result<(u64, u64)> {
427        let metadata_bytes = metadata.encode_to_vec();
428        let position = self.writer.tell().await? as u64;
429        let len = metadata_bytes.len() as u64;
430        self.writer.write_all(&metadata_bytes).await?;
431        Ok((position, len))
432    }
433
434    async fn write_column_metadatas(&mut self) -> Result<Vec<(u64, u64)>> {
435        let mut metadatas = Vec::new();
436        std::mem::swap(&mut self.column_metadata, &mut metadatas);
437        let mut metadata_positions = Vec::with_capacity(metadatas.len());
438        for metadata in metadatas {
439            metadata_positions.push(self.write_column_metadata(metadata).await?);
440        }
441        Ok(metadata_positions)
442    }
443
444    fn make_file_descriptor(
445        schema: &lance_core::datatypes::Schema,
446        num_rows: u64,
447    ) -> Result<pb::FileDescriptor> {
448        let fields_with_meta = FieldsWithMeta::from(schema);
449        Ok(pb::FileDescriptor {
450            schema: Some(pb::Schema {
451                fields: fields_with_meta.fields.0,
452                metadata: fields_with_meta.metadata,
453            }),
454            length: num_rows,
455        })
456    }
457
458    async fn write_global_buffers(&mut self) -> Result<Vec<(u64, u64)>> {
459        let schema = self.schema.as_mut().ok_or(Error::invalid_input("No schema provided on writer open and no data provided.  Schema is unknown and file cannot be created", location!()))?;
460        schema.metadata = std::mem::take(&mut self.schema_metadata);
461        let file_descriptor = Self::make_file_descriptor(schema, self.rows_written)?;
462        let file_descriptor_bytes = file_descriptor.encode_to_vec();
463        let file_descriptor_len = file_descriptor_bytes.len() as u64;
464        let file_descriptor_position = self.writer.tell().await? as u64;
465        self.writer.write_all(&file_descriptor_bytes).await?;
466        let mut gbo_table = Vec::with_capacity(1 + self.global_buffers.len());
467        gbo_table.push((file_descriptor_position, file_descriptor_len));
468        gbo_table.append(&mut self.global_buffers);
469        Ok(gbo_table)
470    }
471
472    /// Add a metadata entry to the schema
473    ///
474    /// This method is useful because sometimes the metadata is not known until after the
475    /// data has been written.  This method allows you to alter the schema metadata.  It
476    /// must be called before `finish` is called.
477    pub fn add_schema_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
478        self.schema_metadata.insert(key.into(), value.into());
479    }
480
481    /// Adds a global buffer to the file
482    ///
483    /// The global buffer can contain any arbitrary bytes.  It will be written to the disk
484    /// immediately.  This method returns the index of the global buffer (this will always
485    /// start at 1 and increment by 1 each time this method is called)
486    pub async fn add_global_buffer(&mut self, buffer: Bytes) -> Result<u32> {
487        let position = self.writer.tell().await? as u64;
488        let len = buffer.len() as u64;
489        Self::do_write_buffer(&mut self.writer, &buffer).await?;
490        self.global_buffers.push((position, len));
491        Ok(self.global_buffers.len() as u32)
492    }
493
494    async fn finish_writers(&mut self) -> Result<()> {
495        let mut col_idx = 0;
496        for mut writer in std::mem::take(&mut self.column_writers) {
497            let mut external_buffers =
498                OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
499            let columns = writer.finish(&mut external_buffers).await?;
500            for buffer in external_buffers.take_buffers() {
501                self.writer.write_all(&buffer).await?;
502            }
503            debug_assert_eq!(
504                columns.len(),
505                writer.num_columns() as usize,
506                "Expected {} columns from column at index {} and got {}",
507                writer.num_columns(),
508                col_idx,
509                columns.len()
510            );
511            for column in columns {
512                for page in column.final_pages {
513                    self.write_page(page).await?;
514                }
515                let column_metadata = &mut self.column_metadata[col_idx];
516                let mut buffer_pos = self.writer.tell().await? as u64;
517                for buffer in column.column_buffers {
518                    column_metadata.buffer_offsets.push(buffer_pos);
519                    let mut size = 0;
520                    Self::do_write_buffer(&mut self.writer, &buffer).await?;
521                    size += buffer.len() as u64;
522                    buffer_pos += size;
523                    column_metadata.buffer_sizes.push(size);
524                }
525                let encoded_encoding = Any::from_msg(&column.encoding)?.encode_to_vec();
526                column_metadata.encoding = Some(pbfile::Encoding {
527                    location: Some(pbfile::encoding::Location::Direct(pbfile::DirectEncoding {
528                        encoding: encoded_encoding,
529                    })),
530                });
531                col_idx += 1;
532            }
533        }
534        if col_idx != self.column_metadata.len() {
535            panic!(
536                "Column writers finished with {} columns but we expected {}",
537                col_idx,
538                self.column_metadata.len()
539            );
540        }
541        Ok(())
542    }
543
544    /// Converts self.version (which is a mix of "software version" and
545    /// "format version" into a format version)
546    fn version_to_numbers(&self) -> (u16, u16) {
547        let version = self.options.format_version.unwrap_or_default();
548        match version.resolve() {
549            LanceFileVersion::V2_0 => (0, 3),
550            LanceFileVersion::V2_1 => (2, 1),
551            _ => panic!("Unsupported version: {}", version),
552        }
553    }
554
555    /// Finishes writing the file
556    ///
557    /// This method will wait until all data has been flushed to the file.  Then it
558    /// will write the file metadata and the footer.  It will not return until all
559    /// data has been flushed and the file has been closed.
560    ///
561    /// Returns the total number of rows written
562    pub async fn finish(&mut self) -> Result<u64> {
563        // 1. flush any remaining data and write out those pages
564        let mut external_buffers =
565            OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
566        let encoding_tasks = self
567            .column_writers
568            .iter_mut()
569            .map(|writer| writer.flush(&mut external_buffers))
570            .collect::<Result<Vec<_>>>()?;
571        for external_buffer in external_buffers.take_buffers() {
572            Self::do_write_buffer(&mut self.writer, &external_buffer).await?;
573        }
574        let encoding_tasks = encoding_tasks
575            .into_iter()
576            .flatten()
577            .collect::<FuturesOrdered<_>>();
578        self.write_pages(encoding_tasks).await?;
579
580        self.finish_writers().await?;
581
582        // 3. write global buffers (we write the schema here)
583        let global_buffer_offsets = self.write_global_buffers().await?;
584        let num_global_buffers = global_buffer_offsets.len() as u32;
585
586        // 4. write the column metadatas
587        let column_metadata_start = self.writer.tell().await? as u64;
588        let metadata_positions = self.write_column_metadatas().await?;
589
590        // 5. write the column metadata offset table
591        let cmo_table_start = self.writer.tell().await? as u64;
592        for (meta_pos, meta_len) in metadata_positions {
593            self.writer.write_u64_le(meta_pos).await?;
594            self.writer.write_u64_le(meta_len).await?;
595        }
596
597        // 6. write global buffers offset table
598        let gbo_table_start = self.writer.tell().await? as u64;
599        for (gbo_pos, gbo_len) in global_buffer_offsets {
600            self.writer.write_u64_le(gbo_pos).await?;
601            self.writer.write_u64_le(gbo_len).await?;
602        }
603
604        let (major, minor) = self.version_to_numbers();
605        // 7. write the footer
606        self.writer.write_u64_le(column_metadata_start).await?;
607        self.writer.write_u64_le(cmo_table_start).await?;
608        self.writer.write_u64_le(gbo_table_start).await?;
609        self.writer.write_u32_le(num_global_buffers).await?;
610        self.writer.write_u32_le(self.num_columns).await?;
611        self.writer.write_u16_le(major).await?;
612        self.writer.write_u16_le(minor).await?;
613        self.writer.write_all(MAGIC).await?;
614
615        // 7. close the writer
616        self.writer.shutdown().await?;
617        Ok(self.rows_written)
618    }
619
620    pub async fn tell(&mut self) -> Result<u64> {
621        Ok(self.writer.tell().await? as u64)
622    }
623
624    pub fn field_id_to_column_indices(&self) -> &[(u32, u32)] {
625        &self.field_id_to_column_indices
626    }
627}
628
629/// Utility trait for converting EncodedBatch to Bytes using the
630/// lance file format
631pub trait EncodedBatchWriteExt {
632    /// Serializes into a lance file, including the schema
633    fn try_to_self_described_lance(&self) -> Result<Bytes>;
634    /// Serializes into a lance file, without the schema.
635    ///
636    /// The schema must be provided to deserialize the buffer
637    fn try_to_mini_lance(&self) -> Result<Bytes>;
638}
639
640// Creates a lance footer and appends it to the encoded data
641//
642// The logic here is very similar to logic in the FileWriter except we
643// are using BufMut (put_xyz) instead of AsyncWrite (write_xyz).
644fn concat_lance_footer(batch: &EncodedBatch, write_schema: bool) -> Result<Bytes> {
645    // Estimating 1MiB for file footer
646    let mut data = BytesMut::with_capacity(batch.data.len() + 1024 * 1024);
647    data.put(batch.data.clone());
648    // write global buffers (we write the schema here)
649    let global_buffers = if write_schema {
650        let schema_start = data.len() as u64;
651        let lance_schema = lance_core::datatypes::Schema::try_from(batch.schema.as_ref())?;
652        let descriptor = FileWriter::make_file_descriptor(&lance_schema, batch.num_rows)?;
653        let descriptor_bytes = descriptor.encode_to_vec();
654        let descriptor_len = descriptor_bytes.len() as u64;
655        data.put(descriptor_bytes.as_slice());
656
657        vec![(schema_start, descriptor_len)]
658    } else {
659        vec![]
660    };
661    let col_metadata_start = data.len() as u64;
662
663    let mut col_metadata_positions = Vec::new();
664    // Write column metadata
665    for col in &batch.page_table {
666        let position = data.len() as u64;
667        let pages = col
668            .page_infos
669            .iter()
670            .map(|page_info| {
671                let encoded_encoding = match &page_info.encoding {
672                    PageEncoding::Legacy(array_encoding) => {
673                        Any::from_msg(array_encoding)?.encode_to_vec()
674                    }
675                    PageEncoding::Structural(page_layout) => {
676                        Any::from_msg(page_layout)?.encode_to_vec()
677                    }
678                };
679                let (buffer_offsets, buffer_sizes): (Vec<_>, Vec<_>) = page_info
680                    .buffer_offsets_and_sizes
681                    .as_ref()
682                    .iter()
683                    .cloned()
684                    .unzip();
685                Ok(pbfile::column_metadata::Page {
686                    buffer_offsets,
687                    buffer_sizes,
688                    encoding: Some(pbfile::Encoding {
689                        location: Some(pbfile::encoding::Location::Direct(DirectEncoding {
690                            encoding: encoded_encoding,
691                        })),
692                    }),
693                    length: page_info.num_rows,
694                    priority: page_info.priority,
695                })
696            })
697            .collect::<Result<Vec<_>>>()?;
698        let (buffer_offsets, buffer_sizes): (Vec<_>, Vec<_>) =
699            col.buffer_offsets_and_sizes.iter().cloned().unzip();
700        let encoded_col_encoding = Any::from_msg(&col.encoding)?.encode_to_vec();
701        let column = pbfile::ColumnMetadata {
702            pages,
703            buffer_offsets,
704            buffer_sizes,
705            encoding: Some(pbfile::Encoding {
706                location: Some(pbfile::encoding::Location::Direct(pbfile::DirectEncoding {
707                    encoding: encoded_col_encoding,
708                })),
709            }),
710        };
711        let column_bytes = column.encode_to_vec();
712        col_metadata_positions.push((position, column_bytes.len() as u64));
713        data.put(column_bytes.as_slice());
714    }
715    // Write column metadata offsets table
716    let cmo_table_start = data.len() as u64;
717    for (meta_pos, meta_len) in col_metadata_positions {
718        data.put_u64_le(meta_pos);
719        data.put_u64_le(meta_len);
720    }
721    // Write global buffers offsets table
722    let gbo_table_start = data.len() as u64;
723    let num_global_buffers = global_buffers.len() as u32;
724    for (gbo_pos, gbo_len) in global_buffers {
725        data.put_u64_le(gbo_pos);
726        data.put_u64_le(gbo_len);
727    }
728
729    let (major, minor) = LanceFileVersion::default().to_numbers();
730
731    // write the footer
732    data.put_u64_le(col_metadata_start);
733    data.put_u64_le(cmo_table_start);
734    data.put_u64_le(gbo_table_start);
735    data.put_u32_le(num_global_buffers);
736    data.put_u32_le(batch.page_table.len() as u32);
737    data.put_u16_le(major as u16);
738    data.put_u16_le(minor as u16);
739    data.put(MAGIC.as_slice());
740
741    Ok(data.freeze())
742}
743
744impl EncodedBatchWriteExt for EncodedBatch {
745    fn try_to_self_described_lance(&self) -> Result<Bytes> {
746        concat_lance_footer(self, true)
747    }
748
749    fn try_to_mini_lance(&self) -> Result<Bytes> {
750        concat_lance_footer(self, false)
751    }
752}
753
754#[cfg(test)]
755mod tests {
756    use std::sync::Arc;
757
758    use crate::v2::reader::{FileReader, FileReaderOptions};
759    use crate::v2::testing::FsFixture;
760    use crate::v2::writer::{FileWriter, FileWriterOptions, ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES};
761    use arrow_array::{types::Float64Type, RecordBatchReader};
762    use arrow_array::{RecordBatch, UInt64Array};
763    use arrow_schema::{DataType, Field, Schema};
764    use lance_core::cache::LanceCache;
765    use lance_core::datatypes::Schema as LanceSchema;
766    use lance_datagen::{array, gen, BatchCount, RowCount};
767    use lance_encoding::decoder::DecoderPlugins;
768    use lance_io::object_store::ObjectStore;
769    use lance_io::utils::CachedFileSize;
770    use object_store::path::Path;
771    use tempfile::tempdir;
772
773    #[tokio::test]
774    async fn test_basic_write() {
775        let tmp_dir = tempfile::tempdir().unwrap();
776        let tmp_path: String = tmp_dir.path().to_str().unwrap().to_owned();
777        let tmp_path = Path::parse(tmp_path).unwrap();
778        let tmp_path = tmp_path.child("some_file.lance");
779        let obj_store = Arc::new(ObjectStore::local());
780
781        let reader = gen()
782            .col("score", array::rand::<Float64Type>())
783            .into_reader_rows(RowCount::from(1000), BatchCount::from(10));
784
785        let writer = obj_store.create(&tmp_path).await.unwrap();
786
787        let lance_schema =
788            lance_core::datatypes::Schema::try_from(reader.schema().as_ref()).unwrap();
789
790        let mut file_writer =
791            FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
792
793        for batch in reader {
794            file_writer.write_batch(&batch.unwrap()).await.unwrap();
795        }
796        file_writer.add_schema_metadata("foo", "bar");
797        file_writer.finish().await.unwrap();
798        // Tests asserting the contents of the written file are in reader.rs
799    }
800
801    #[tokio::test]
802    async fn test_write_empty() {
803        let tmp_dir = tempfile::tempdir().unwrap();
804        let tmp_path: String = tmp_dir.path().to_str().unwrap().to_owned();
805        let tmp_path = Path::parse(tmp_path).unwrap();
806        let tmp_path = tmp_path.child("some_file.lance");
807        let obj_store = Arc::new(ObjectStore::local());
808
809        let reader = gen()
810            .col("score", array::rand::<Float64Type>())
811            .into_reader_rows(RowCount::from(0), BatchCount::from(0));
812
813        let writer = obj_store.create(&tmp_path).await.unwrap();
814
815        let lance_schema =
816            lance_core::datatypes::Schema::try_from(reader.schema().as_ref()).unwrap();
817
818        let mut file_writer =
819            FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
820
821        for batch in reader {
822            file_writer.write_batch(&batch.unwrap()).await.unwrap();
823        }
824        file_writer.add_schema_metadata("foo", "bar");
825        file_writer.finish().await.unwrap();
826    }
827
828    #[tokio::test]
829    async fn test_max_page_bytes_enforced() {
830        let arrow_field = Field::new("data", DataType::UInt64, false);
831        let arrow_schema = Schema::new(vec![arrow_field]);
832        let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();
833
834        // 8MiB
835        let data: Vec<u64> = (0..1_000_000).collect();
836        let array = UInt64Array::from(data);
837        let batch =
838            RecordBatch::try_new(arrow_schema.clone().into(), vec![Arc::new(array)]).unwrap();
839
840        let options = FileWriterOptions {
841            max_page_bytes: Some(1024 * 1024), // 1MB
842            ..Default::default()
843        };
844
845        let tmp_dir = tempdir().unwrap();
846        let path = tmp_dir.path().join("test.lance");
847        let object_store = ObjectStore::local();
848        let mut writer = FileWriter::try_new(
849            object_store
850                .create(&Path::from(path.to_str().unwrap()))
851                .await
852                .unwrap(),
853            lance_schema,
854            options,
855        )
856        .unwrap();
857
858        writer.write_batch(&batch).await.unwrap();
859        writer.finish().await.unwrap();
860
861        let fs = FsFixture::default();
862        let file_scheduler = fs
863            .scheduler
864            .open_file(
865                &Path::from(path.to_str().unwrap()),
866                &CachedFileSize::unknown(),
867            )
868            .await
869            .unwrap();
870        let file_reader = FileReader::try_open(
871            file_scheduler,
872            None,
873            Arc::<DecoderPlugins>::default(),
874            &LanceCache::no_cache(),
875            FileReaderOptions::default(),
876        )
877        .await
878        .unwrap();
879
880        let column_meta = file_reader.metadata();
881
882        let mut total_page_num: u32 = 0;
883        for (col_idx, col_metadata) in column_meta.column_metadatas.iter().enumerate() {
884            assert!(
885                !col_metadata.pages.is_empty(),
886                "Column {} has no pages",
887                col_idx
888            );
889
890            for (page_idx, page) in col_metadata.pages.iter().enumerate() {
891                total_page_num += 1;
892                let total_size: u64 = page.buffer_sizes.iter().sum();
893                assert!(
894                    total_size <= 1024 * 1024,
895                    "Column {} Page {} size {} exceeds 1MB limit",
896                    col_idx,
897                    page_idx,
898                    total_size
899                );
900            }
901        }
902
903        assert_eq!(total_page_num, 8)
904    }
905
906    #[tokio::test(flavor = "current_thread")]
907    async fn test_max_page_bytes_env_var() {
908        let arrow_field = Field::new("data", DataType::UInt64, false);
909        let arrow_schema = Schema::new(vec![arrow_field]);
910        let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();
911        // 4MiB
912        let data: Vec<u64> = (0..500_000).collect();
913        let array = UInt64Array::from(data);
914        let batch =
915            RecordBatch::try_new(arrow_schema.clone().into(), vec![Arc::new(array)]).unwrap();
916
917        // 2MiB
918        std::env::set_var(ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, "2097152");
919
920        let options = FileWriterOptions {
921            max_page_bytes: None, // enforce env
922            ..Default::default()
923        };
924
925        let tmp_dir = tempdir().unwrap();
926        let path = tmp_dir.path().join("test_env_var.lance");
927        let object_store = ObjectStore::local();
928        let mut writer = FileWriter::try_new(
929            object_store
930                .create(&Path::from(path.to_str().unwrap()))
931                .await
932                .unwrap(),
933            lance_schema.clone(),
934            options,
935        )
936        .unwrap();
937
938        writer.write_batch(&batch).await.unwrap();
939        writer.finish().await.unwrap();
940
941        let fs = FsFixture::default();
942        let file_scheduler = fs
943            .scheduler
944            .open_file(
945                &Path::from(path.to_str().unwrap()),
946                &CachedFileSize::unknown(),
947            )
948            .await
949            .unwrap();
950        let file_reader = FileReader::try_open(
951            file_scheduler,
952            None,
953            Arc::<DecoderPlugins>::default(),
954            &LanceCache::no_cache(),
955            FileReaderOptions::default(),
956        )
957        .await
958        .unwrap();
959
960        for col_metadata in file_reader.metadata().column_metadatas.iter() {
961            for page in col_metadata.pages.iter() {
962                let total_size: u64 = page.buffer_sizes.iter().sum();
963                assert!(
964                    total_size <= 2 * 1024 * 1024,
965                    "Page size {} exceeds 2MB limit",
966                    total_size
967                );
968            }
969        }
970
971        std::env::set_var(ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, "");
972    }
973}