lance_file/v2/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use core::panic;
5use std::collections::HashMap;
6use std::sync::atomic::AtomicBool;
7use std::sync::Arc;
8
9use arrow_array::RecordBatch;
10
11use arrow_data::ArrayData;
12use bytes::{BufMut, Bytes, BytesMut};
13use futures::stream::FuturesOrdered;
14use futures::StreamExt;
15use lance_core::datatypes::{Field, Schema as LanceSchema};
16use lance_core::utils::bit::pad_bytes;
17use lance_core::{Error, Result};
18use lance_encoding::decoder::PageEncoding;
19use lance_encoding::encoder::{
20    default_encoding_strategy, BatchEncoder, EncodeTask, EncodedBatch, EncodedPage,
21    EncodingOptions, FieldEncoder, FieldEncodingStrategy, OutOfLineBuffers,
22};
23use lance_encoding::repdef::RepDefBuilder;
24use lance_encoding::version::LanceFileVersion;
25use lance_io::object_store::ObjectStore;
26use lance_io::object_writer::ObjectWriter;
27use lance_io::traits::Writer;
28use log::{debug, warn};
29use object_store::path::Path;
30use prost::Message;
31use prost_types::Any;
32use snafu::location;
33use tokio::io::AsyncWriteExt;
34use tracing::instrument;
35
36use crate::datatypes::FieldsWithMeta;
37use crate::format::pb;
38use crate::format::pbfile;
39use crate::format::pbfile::DirectEncoding;
40use crate::format::MAGIC;
41
42/// Pages buffers are aligned to 64 bytes
43pub(crate) const PAGE_BUFFER_ALIGNMENT: usize = 64;
44const PAD_BUFFER: [u8; PAGE_BUFFER_ALIGNMENT] = [72; PAGE_BUFFER_ALIGNMENT];
45
46#[derive(Debug, Clone, Default)]
47pub struct FileWriterOptions {
48    /// How many bytes to use for buffering column data
49    ///
50    /// When data comes in small batches the writer will buffer column data so that
51    /// larger pages can be created.  This value will be divided evenly across all of the
52    /// columns.  Generally you want this to be at least large enough to match your
53    /// filesystem's ideal read size per column.
54    ///
55    /// In some cases you might want this value to be even larger if you have highly
56    /// compressible data.  However, if this is too large, then the writer could require
57    /// a lot of memory and write performance may suffer if the CPU-expensive encoding
58    /// falls behind and can't be interleaved with the I/O expensive flushing.
59    ///
60    /// The default will use 8MiB per column which should be reasonable for most cases.
61    // TODO: Do we need to be able to set this on a per-column basis?
62    pub data_cache_bytes: Option<u64>,
63    /// A hint to indicate the max size of a page
64    ///
65    /// This hint can't always be respected.  A single value could be larger than this value
66    /// and we never slice single values.  In addition, there are some cases where it can be
67    /// difficult to know size up-front and so we might not be able to respect this value.
68    pub max_page_bytes: Option<u64>,
69    /// The file writer buffers columns until enough data has arrived to flush a page
70    /// to disk.
71    ///
72    /// Some columns with small data types may not flush very often.  These arrays can
73    /// stick around for a long time.  These arrays might also be keeping larger data
74    /// structures alive.  By default, the writer will make a deep copy of this array
75    /// to avoid any potential memory leaks.  However, this can be disabled for a
76    /// (probably minor) performance boost if you are sure that arrays are not keeping
77    /// any sibling structures alive (this typically means the array was allocated in
78    /// the same language / runtime as the writer)
79    ///
80    /// Do not enable this if your data is arriving from the C data interface.
81    /// Data typically arrives one "batch" at a time (encoded in the C data interface
82    /// as a struct array).  Each array in that batch keeps the entire batch alive.
83    /// This means a small boolean array (which we will buffer in memory for quite a
84    /// while) might keep a much larger record batch around in memory (even though most
85    /// of that batch's data has been written to disk)
86    pub keep_original_array: Option<bool>,
87    pub encoding_strategy: Option<Arc<dyn FieldEncodingStrategy>>,
88    /// The format version to use when writing the file
89    ///
90    /// This controls which encodings will be used when encoding the data.  Newer
91    /// versions may have more efficient encodings.  However, newer format versions will
92    /// require more up-to-date readers to read the data.
93    pub format_version: Option<LanceFileVersion>,
94}
95
96pub struct FileWriter {
97    writer: ObjectWriter,
98    schema: Option<LanceSchema>,
99    column_writers: Vec<Box<dyn FieldEncoder>>,
100    column_metadata: Vec<pbfile::ColumnMetadata>,
101    field_id_to_column_indices: Vec<(u32, u32)>,
102    num_columns: u32,
103    rows_written: u64,
104    global_buffers: Vec<(u64, u64)>,
105    schema_metadata: HashMap<String, String>,
106    options: FileWriterOptions,
107}
108
109fn initial_column_metadata() -> pbfile::ColumnMetadata {
110    pbfile::ColumnMetadata {
111        pages: Vec::new(),
112        buffer_offsets: Vec::new(),
113        buffer_sizes: Vec::new(),
114        encoding: None,
115    }
116}
117
118static WARNED_ON_UNSTABLE_API: AtomicBool = AtomicBool::new(false);
119
120impl FileWriter {
121    /// Create a new FileWriter with a desired output schema
122    pub fn try_new(
123        object_writer: ObjectWriter,
124        schema: LanceSchema,
125        options: FileWriterOptions,
126    ) -> Result<Self> {
127        let mut writer = Self::new_lazy(object_writer, options);
128        writer.initialize(schema)?;
129        Ok(writer)
130    }
131
132    /// Create a new FileWriter without a desired output schema
133    ///
134    /// The output schema will be set based on the first batch of data to arrive.
135    /// If no data arrives and the writer is finished then the write will fail.
136    pub fn new_lazy(object_writer: ObjectWriter, options: FileWriterOptions) -> Self {
137        if let Some(format_version) = options.format_version {
138            if format_version > LanceFileVersion::Stable
139                && WARNED_ON_UNSTABLE_API
140                    .compare_exchange(
141                        false,
142                        true,
143                        std::sync::atomic::Ordering::Relaxed,
144                        std::sync::atomic::Ordering::Relaxed,
145                    )
146                    .is_ok()
147            {
148                warn!("You have requested an unstable format version.  Files written with this format version may not be readable in the future!  This is a development feature and should only be used for experimentation and never for production data.");
149            }
150        }
151        Self {
152            writer: object_writer,
153            schema: None,
154            column_writers: Vec::new(),
155            column_metadata: Vec::new(),
156            num_columns: 0,
157            rows_written: 0,
158            field_id_to_column_indices: Vec::new(),
159            global_buffers: Vec::new(),
160            schema_metadata: HashMap::new(),
161            options,
162        }
163    }
164
165    /// Write a series of record batches to a new file
166    ///
167    /// Returns the number of rows written
168    pub async fn create_file_with_batches(
169        store: &ObjectStore,
170        path: &Path,
171        schema: lance_core::datatypes::Schema,
172        batches: impl Iterator<Item = RecordBatch> + Send,
173        options: FileWriterOptions,
174    ) -> Result<usize> {
175        let writer = store.create(path).await?;
176        let mut writer = Self::try_new(writer, schema, options)?;
177        for batch in batches {
178            writer.write_batch(&batch).await?;
179        }
180        Ok(writer.finish().await? as usize)
181    }
182
183    async fn do_write_buffer(writer: &mut ObjectWriter, buf: &[u8]) -> Result<()> {
184        writer.write_all(buf).await?;
185        let pad_bytes = pad_bytes::<PAGE_BUFFER_ALIGNMENT>(buf.len());
186        writer.write_all(&PAD_BUFFER[..pad_bytes]).await?;
187        Ok(())
188    }
189
190    /// Returns the format version that will be used when writing the file
191    pub fn version(&self) -> LanceFileVersion {
192        self.options.format_version.unwrap_or_default()
193    }
194
195    async fn write_page(&mut self, encoded_page: EncodedPage) -> Result<()> {
196        let buffers = encoded_page.data;
197        let mut buffer_offsets = Vec::with_capacity(buffers.len());
198        let mut buffer_sizes = Vec::with_capacity(buffers.len());
199        for buffer in buffers {
200            buffer_offsets.push(self.writer.tell().await? as u64);
201            buffer_sizes.push(buffer.len() as u64);
202            Self::do_write_buffer(&mut self.writer, &buffer).await?;
203        }
204        let encoded_encoding = match encoded_page.description {
205            PageEncoding::Legacy(array_encoding) => Any::from_msg(&array_encoding)?.encode_to_vec(),
206            PageEncoding::Structural(page_layout) => Any::from_msg(&page_layout)?.encode_to_vec(),
207        };
208        let page = pbfile::column_metadata::Page {
209            buffer_offsets,
210            buffer_sizes,
211            encoding: Some(pbfile::Encoding {
212                location: Some(pbfile::encoding::Location::Direct(DirectEncoding {
213                    encoding: encoded_encoding,
214                })),
215            }),
216            length: encoded_page.num_rows,
217            priority: encoded_page.row_number,
218        };
219        self.column_metadata[encoded_page.column_idx as usize]
220            .pages
221            .push(page);
222        Ok(())
223    }
224
225    #[instrument(skip_all, level = "debug")]
226    async fn write_pages(&mut self, mut encoding_tasks: FuturesOrdered<EncodeTask>) -> Result<()> {
227        // As soon as an encoding task is done we write it.  There is no parallelism
228        // needed here because "writing" is really just submitting the buffer to the
229        // underlying write scheduler (either the OS or object_store's scheduler for
230        // cloud writes).  The only time we might truly await on write_page is if the
231        // scheduler's write queue is full.
232        //
233        // Also, there is no point in trying to make write_page parallel anyways
234        // because we wouldn't want buffers getting mixed up across pages.
235        while let Some(encoding_task) = encoding_tasks.next().await {
236            let encoded_page = encoding_task?;
237            self.write_page(encoded_page).await?;
238        }
239        // It's important to flush here, we don't know when the next batch will arrive
240        // and the underlying cloud store could have writes in progress that won't advance
241        // until we interact with the writer again.  These in-progress writes will time out
242        // if we don't flush.
243        self.writer.flush().await?;
244        Ok(())
245    }
246
247    /// Schedule batches of data to be written to the file
248    pub async fn write_batches(
249        &mut self,
250        batches: impl Iterator<Item = &RecordBatch>,
251    ) -> Result<()> {
252        for batch in batches {
253            self.write_batch(batch).await?;
254        }
255        Ok(())
256    }
257
258    fn verify_field_nullability(arr: &ArrayData, field: &Field) -> Result<()> {
259        if !field.nullable && arr.null_count() > 0 {
260            return Err(Error::invalid_input(format!("The field `{}` contained null values even though the field is marked non-null in the schema", field.name), location!()));
261        }
262
263        for (child_field, child_arr) in field.children.iter().zip(arr.child_data()) {
264            Self::verify_field_nullability(child_arr, child_field)?;
265        }
266
267        Ok(())
268    }
269
270    fn verify_nullability_constraints(&self, batch: &RecordBatch) -> Result<()> {
271        for (col, field) in batch
272            .columns()
273            .iter()
274            .zip(self.schema.as_ref().unwrap().fields.iter())
275        {
276            Self::verify_field_nullability(&col.to_data(), field)?;
277        }
278        Ok(())
279    }
280
281    fn initialize(&mut self, mut schema: LanceSchema) -> Result<()> {
282        let cache_bytes_per_column = if let Some(data_cache_bytes) = self.options.data_cache_bytes {
283            data_cache_bytes / schema.fields.len() as u64
284        } else {
285            8 * 1024 * 1024
286        };
287
288        let max_page_bytes = self.options.max_page_bytes.unwrap_or(32 * 1024 * 1024);
289
290        schema.validate()?;
291
292        let keep_original_array = self.options.keep_original_array.unwrap_or(false);
293        let encoding_strategy = self.options.encoding_strategy.clone().unwrap_or_else(|| {
294            let version = self.version();
295            default_encoding_strategy(version).into()
296        });
297
298        let encoding_options = EncodingOptions {
299            cache_bytes_per_column,
300            max_page_bytes,
301            keep_original_array,
302            buffer_alignment: PAGE_BUFFER_ALIGNMENT as u64,
303        };
304        let encoder =
305            BatchEncoder::try_new(&schema, encoding_strategy.as_ref(), &encoding_options)?;
306        self.num_columns = encoder.num_columns();
307
308        self.column_writers = encoder.field_encoders;
309        self.column_metadata = vec![initial_column_metadata(); self.num_columns as usize];
310        self.field_id_to_column_indices = encoder.field_id_to_column_index;
311        self.schema_metadata
312            .extend(std::mem::take(&mut schema.metadata));
313        self.schema = Some(schema);
314        Ok(())
315    }
316
317    fn ensure_initialized(&mut self, batch: &RecordBatch) -> Result<&LanceSchema> {
318        if self.schema.is_none() {
319            let schema = LanceSchema::try_from(batch.schema().as_ref())?;
320            self.initialize(schema)?;
321        }
322        Ok(self.schema.as_ref().unwrap())
323    }
324
325    #[instrument(skip_all, level = "debug")]
326    fn encode_batch(
327        &mut self,
328        batch: &RecordBatch,
329        external_buffers: &mut OutOfLineBuffers,
330    ) -> Result<Vec<Vec<EncodeTask>>> {
331        self.schema
332            .as_ref()
333            .unwrap()
334            .fields
335            .iter()
336            .zip(self.column_writers.iter_mut())
337            .map(|(field, column_writer)| {
338                let array = batch
339                    .column_by_name(&field.name)
340                    .ok_or(Error::InvalidInput {
341                        source: format!(
342                            "Cannot write batch.  The batch was missing the column `{}`",
343                            field.name
344                        )
345                        .into(),
346                        location: location!(),
347                    })?;
348                let repdef = RepDefBuilder::default();
349                let num_rows = array.len() as u64;
350                column_writer.maybe_encode(
351                    array.clone(),
352                    external_buffers,
353                    repdef,
354                    self.rows_written,
355                    num_rows,
356                )
357            })
358            .collect::<Result<Vec<_>>>()
359    }
360
361    /// Schedule a batch of data to be written to the file
362    ///
363    /// Note: the future returned by this method may complete before the data has been fully
364    /// flushed to the file (some data may be in the data cache or the I/O cache)
365    pub async fn write_batch(&mut self, batch: &RecordBatch) -> Result<()> {
366        debug!(
367            "write_batch called with {} bytes of data",
368            batch.get_array_memory_size()
369        );
370        self.ensure_initialized(batch)?;
371        self.verify_nullability_constraints(batch)?;
372        let num_rows = batch.num_rows() as u64;
373        if num_rows == 0 {
374            return Ok(());
375        }
376        if num_rows > u32::MAX as u64 {
377            return Err(Error::InvalidInput {
378                source: "cannot write Lance files with more than 2^32 rows".into(),
379                location: location!(),
380            });
381        }
382        // First we push each array into its column writer.  This may or may not generate enough
383        // data to trigger an encoding task.  We collect any encoding tasks into a queue.
384        let mut external_buffers =
385            OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
386        let encoding_tasks = self.encode_batch(batch, &mut external_buffers)?;
387        // Next, write external buffers
388        for external_buffer in external_buffers.take_buffers() {
389            Self::do_write_buffer(&mut self.writer, &external_buffer).await?;
390        }
391
392        let encoding_tasks = encoding_tasks
393            .into_iter()
394            .flatten()
395            .collect::<FuturesOrdered<_>>();
396
397        self.rows_written = match self.rows_written.checked_add(batch.num_rows() as u64) {
398            Some(rows_written) => rows_written,
399            None => {
400                return Err(Error::InvalidInput { source: format!("cannot write batch with {} rows because {} rows have already been written and Lance files cannot contain more than 2^64 rows", num_rows, self.rows_written).into(), location: location!() });
401            }
402        };
403
404        self.write_pages(encoding_tasks).await?;
405
406        Ok(())
407    }
408
409    async fn write_column_metadata(
410        &mut self,
411        metadata: pbfile::ColumnMetadata,
412    ) -> Result<(u64, u64)> {
413        let metadata_bytes = metadata.encode_to_vec();
414        let position = self.writer.tell().await? as u64;
415        let len = metadata_bytes.len() as u64;
416        self.writer.write_all(&metadata_bytes).await?;
417        Ok((position, len))
418    }
419
420    async fn write_column_metadatas(&mut self) -> Result<Vec<(u64, u64)>> {
421        let mut metadatas = Vec::new();
422        std::mem::swap(&mut self.column_metadata, &mut metadatas);
423        let mut metadata_positions = Vec::with_capacity(metadatas.len());
424        for metadata in metadatas {
425            metadata_positions.push(self.write_column_metadata(metadata).await?);
426        }
427        Ok(metadata_positions)
428    }
429
430    fn make_file_descriptor(
431        schema: &lance_core::datatypes::Schema,
432        num_rows: u64,
433    ) -> Result<pb::FileDescriptor> {
434        let fields_with_meta = FieldsWithMeta::from(schema);
435        Ok(pb::FileDescriptor {
436            schema: Some(pb::Schema {
437                fields: fields_with_meta.fields.0,
438                metadata: fields_with_meta.metadata,
439            }),
440            length: num_rows,
441        })
442    }
443
444    async fn write_global_buffers(&mut self) -> Result<Vec<(u64, u64)>> {
445        let schema = self.schema.as_mut().ok_or(Error::invalid_input("No schema provided on writer open and no data provided.  Schema is unknown and file cannot be created", location!()))?;
446        schema.metadata = std::mem::take(&mut self.schema_metadata);
447        let file_descriptor = Self::make_file_descriptor(schema, self.rows_written)?;
448        let file_descriptor_bytes = file_descriptor.encode_to_vec();
449        let file_descriptor_len = file_descriptor_bytes.len() as u64;
450        let file_descriptor_position = self.writer.tell().await? as u64;
451        self.writer.write_all(&file_descriptor_bytes).await?;
452        let mut gbo_table = Vec::with_capacity(1 + self.global_buffers.len());
453        gbo_table.push((file_descriptor_position, file_descriptor_len));
454        gbo_table.append(&mut self.global_buffers);
455        Ok(gbo_table)
456    }
457
458    /// Add a metadata entry to the schema
459    ///
460    /// This method is useful because sometimes the metadata is not known until after the
461    /// data has been written.  This method allows you to alter the schema metadata.  It
462    /// must be called before `finish` is called.
463    pub fn add_schema_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
464        self.schema_metadata.insert(key.into(), value.into());
465    }
466
467    /// Adds a global buffer to the file
468    ///
469    /// The global buffer can contain any arbitrary bytes.  It will be written to the disk
470    /// immediately.  This method returns the index of the global buffer (this will always
471    /// start at 1 and increment by 1 each time this method is called)
472    pub async fn add_global_buffer(&mut self, buffer: Bytes) -> Result<u32> {
473        let position = self.writer.tell().await? as u64;
474        let len = buffer.len() as u64;
475        Self::do_write_buffer(&mut self.writer, &buffer).await?;
476        self.global_buffers.push((position, len));
477        Ok(self.global_buffers.len() as u32)
478    }
479
480    async fn finish_writers(&mut self) -> Result<()> {
481        let mut col_idx = 0;
482        for mut writer in std::mem::take(&mut self.column_writers) {
483            let mut external_buffers =
484                OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
485            let columns = writer.finish(&mut external_buffers).await?;
486            for buffer in external_buffers.take_buffers() {
487                self.writer.write_all(&buffer).await?;
488            }
489            debug_assert_eq!(
490                columns.len(),
491                writer.num_columns() as usize,
492                "Expected {} columns from column at index {} and got {}",
493                writer.num_columns(),
494                col_idx,
495                columns.len()
496            );
497            for column in columns {
498                for page in column.final_pages {
499                    self.write_page(page).await?;
500                }
501                let column_metadata = &mut self.column_metadata[col_idx];
502                let mut buffer_pos = self.writer.tell().await? as u64;
503                for buffer in column.column_buffers {
504                    column_metadata.buffer_offsets.push(buffer_pos);
505                    let mut size = 0;
506                    Self::do_write_buffer(&mut self.writer, &buffer).await?;
507                    size += buffer.len() as u64;
508                    buffer_pos += size;
509                    column_metadata.buffer_sizes.push(size);
510                }
511                let encoded_encoding = Any::from_msg(&column.encoding)?.encode_to_vec();
512                column_metadata.encoding = Some(pbfile::Encoding {
513                    location: Some(pbfile::encoding::Location::Direct(pbfile::DirectEncoding {
514                        encoding: encoded_encoding,
515                    })),
516                });
517                col_idx += 1;
518            }
519        }
520        if col_idx != self.column_metadata.len() {
521            panic!(
522                "Column writers finished with {} columns but we expected {}",
523                col_idx,
524                self.column_metadata.len()
525            );
526        }
527        Ok(())
528    }
529
530    /// Converts self.version (which is a mix of "software version" and
531    /// "format version" into a format version)
532    fn version_to_numbers(&self) -> (u16, u16) {
533        let version = self.options.format_version.unwrap_or_default();
534        match version.resolve() {
535            LanceFileVersion::V2_0 => (0, 3),
536            LanceFileVersion::V2_1 => (2, 1),
537            _ => panic!("Unsupported version: {}", version),
538        }
539    }
540
541    /// Finishes writing the file
542    ///
543    /// This method will wait until all data has been flushed to the file.  Then it
544    /// will write the file metadata and the footer.  It will not return until all
545    /// data has been flushed and the file has been closed.
546    ///
547    /// Returns the total number of rows written
548    pub async fn finish(&mut self) -> Result<u64> {
549        // 1. flush any remaining data and write out those pages
550        let mut external_buffers =
551            OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
552        let encoding_tasks = self
553            .column_writers
554            .iter_mut()
555            .map(|writer| writer.flush(&mut external_buffers))
556            .collect::<Result<Vec<_>>>()?;
557        for external_buffer in external_buffers.take_buffers() {
558            Self::do_write_buffer(&mut self.writer, &external_buffer).await?;
559        }
560        let encoding_tasks = encoding_tasks
561            .into_iter()
562            .flatten()
563            .collect::<FuturesOrdered<_>>();
564        self.write_pages(encoding_tasks).await?;
565
566        self.finish_writers().await?;
567
568        // 3. write global buffers (we write the schema here)
569        let global_buffer_offsets = self.write_global_buffers().await?;
570        let num_global_buffers = global_buffer_offsets.len() as u32;
571
572        // 4. write the column metadatas
573        let column_metadata_start = self.writer.tell().await? as u64;
574        let metadata_positions = self.write_column_metadatas().await?;
575
576        // 5. write the column metadata offset table
577        let cmo_table_start = self.writer.tell().await? as u64;
578        for (meta_pos, meta_len) in metadata_positions {
579            self.writer.write_u64_le(meta_pos).await?;
580            self.writer.write_u64_le(meta_len).await?;
581        }
582
583        // 6. write global buffers offset table
584        let gbo_table_start = self.writer.tell().await? as u64;
585        for (gbo_pos, gbo_len) in global_buffer_offsets {
586            self.writer.write_u64_le(gbo_pos).await?;
587            self.writer.write_u64_le(gbo_len).await?;
588        }
589
590        let (major, minor) = self.version_to_numbers();
591        // 7. write the footer
592        self.writer.write_u64_le(column_metadata_start).await?;
593        self.writer.write_u64_le(cmo_table_start).await?;
594        self.writer.write_u64_le(gbo_table_start).await?;
595        self.writer.write_u32_le(num_global_buffers).await?;
596        self.writer.write_u32_le(self.num_columns).await?;
597        self.writer.write_u16_le(major).await?;
598        self.writer.write_u16_le(minor).await?;
599        self.writer.write_all(MAGIC).await?;
600
601        // 7. close the writer
602        self.writer.shutdown().await?;
603        Ok(self.rows_written)
604    }
605
606    pub async fn tell(&mut self) -> Result<u64> {
607        Ok(self.writer.tell().await? as u64)
608    }
609
610    pub fn field_id_to_column_indices(&self) -> &[(u32, u32)] {
611        &self.field_id_to_column_indices
612    }
613}
614
615/// Utility trait for converting EncodedBatch to Bytes using the
616/// lance file format
617pub trait EncodedBatchWriteExt {
618    /// Serializes into a lance file, including the schema
619    fn try_to_self_described_lance(&self) -> Result<Bytes>;
620    /// Serializes into a lance file, without the schema.
621    ///
622    /// The schema must be provided to deserialize the buffer
623    fn try_to_mini_lance(&self) -> Result<Bytes>;
624}
625
626// Creates a lance footer and appends it to the encoded data
627//
628// The logic here is very similar to logic in the FileWriter except we
629// are using BufMut (put_xyz) instead of AsyncWrite (write_xyz).
630fn concat_lance_footer(batch: &EncodedBatch, write_schema: bool) -> Result<Bytes> {
631    // Estimating 1MiB for file footer
632    let mut data = BytesMut::with_capacity(batch.data.len() + 1024 * 1024);
633    data.put(batch.data.clone());
634    // write global buffers (we write the schema here)
635    let global_buffers = if write_schema {
636        let schema_start = data.len() as u64;
637        let lance_schema = lance_core::datatypes::Schema::try_from(batch.schema.as_ref())?;
638        let descriptor = FileWriter::make_file_descriptor(&lance_schema, batch.num_rows)?;
639        let descriptor_bytes = descriptor.encode_to_vec();
640        let descriptor_len = descriptor_bytes.len() as u64;
641        data.put(descriptor_bytes.as_slice());
642
643        vec![(schema_start, descriptor_len)]
644    } else {
645        vec![]
646    };
647    let col_metadata_start = data.len() as u64;
648
649    let mut col_metadata_positions = Vec::new();
650    // Write column metadata
651    for col in &batch.page_table {
652        let position = data.len() as u64;
653        let pages = col
654            .page_infos
655            .iter()
656            .map(|page_info| {
657                let encoded_encoding = match &page_info.encoding {
658                    PageEncoding::Legacy(array_encoding) => {
659                        Any::from_msg(array_encoding)?.encode_to_vec()
660                    }
661                    PageEncoding::Structural(page_layout) => {
662                        Any::from_msg(page_layout)?.encode_to_vec()
663                    }
664                };
665                let (buffer_offsets, buffer_sizes): (Vec<_>, Vec<_>) = page_info
666                    .buffer_offsets_and_sizes
667                    .as_ref()
668                    .iter()
669                    .cloned()
670                    .unzip();
671                Ok(pbfile::column_metadata::Page {
672                    buffer_offsets,
673                    buffer_sizes,
674                    encoding: Some(pbfile::Encoding {
675                        location: Some(pbfile::encoding::Location::Direct(DirectEncoding {
676                            encoding: encoded_encoding,
677                        })),
678                    }),
679                    length: page_info.num_rows,
680                    priority: page_info.priority,
681                })
682            })
683            .collect::<Result<Vec<_>>>()?;
684        let (buffer_offsets, buffer_sizes): (Vec<_>, Vec<_>) =
685            col.buffer_offsets_and_sizes.iter().cloned().unzip();
686        let encoded_col_encoding = Any::from_msg(&col.encoding)?.encode_to_vec();
687        let column = pbfile::ColumnMetadata {
688            pages,
689            buffer_offsets,
690            buffer_sizes,
691            encoding: Some(pbfile::Encoding {
692                location: Some(pbfile::encoding::Location::Direct(pbfile::DirectEncoding {
693                    encoding: encoded_col_encoding,
694                })),
695            }),
696        };
697        let column_bytes = column.encode_to_vec();
698        col_metadata_positions.push((position, column_bytes.len() as u64));
699        data.put(column_bytes.as_slice());
700    }
701    // Write column metadata offsets table
702    let cmo_table_start = data.len() as u64;
703    for (meta_pos, meta_len) in col_metadata_positions {
704        data.put_u64_le(meta_pos);
705        data.put_u64_le(meta_len);
706    }
707    // Write global buffers offsets table
708    let gbo_table_start = data.len() as u64;
709    let num_global_buffers = global_buffers.len() as u32;
710    for (gbo_pos, gbo_len) in global_buffers {
711        data.put_u64_le(gbo_pos);
712        data.put_u64_le(gbo_len);
713    }
714
715    let (major, minor) = LanceFileVersion::default().to_numbers();
716
717    // write the footer
718    data.put_u64_le(col_metadata_start);
719    data.put_u64_le(cmo_table_start);
720    data.put_u64_le(gbo_table_start);
721    data.put_u32_le(num_global_buffers);
722    data.put_u32_le(batch.page_table.len() as u32);
723    data.put_u16_le(major as u16);
724    data.put_u16_le(minor as u16);
725    data.put(MAGIC.as_slice());
726
727    Ok(data.freeze())
728}
729
730impl EncodedBatchWriteExt for EncodedBatch {
731    fn try_to_self_described_lance(&self) -> Result<Bytes> {
732        concat_lance_footer(self, true)
733    }
734
735    fn try_to_mini_lance(&self) -> Result<Bytes> {
736        concat_lance_footer(self, false)
737    }
738}
739
740#[cfg(test)]
741mod tests {
742    use std::sync::Arc;
743
744    use arrow_array::{types::Float64Type, RecordBatchReader};
745    use lance_datagen::{array, gen, BatchCount, RowCount};
746    use lance_io::object_store::ObjectStore;
747    use object_store::path::Path;
748
749    use crate::v2::writer::{FileWriter, FileWriterOptions};
750
751    #[tokio::test]
752    async fn test_basic_write() {
753        let tmp_dir = tempfile::tempdir().unwrap();
754        let tmp_path: String = tmp_dir.path().to_str().unwrap().to_owned();
755        let tmp_path = Path::parse(tmp_path).unwrap();
756        let tmp_path = tmp_path.child("some_file.lance");
757        let obj_store = Arc::new(ObjectStore::local());
758
759        let reader = gen()
760            .col("score", array::rand::<Float64Type>())
761            .into_reader_rows(RowCount::from(1000), BatchCount::from(10));
762
763        let writer = obj_store.create(&tmp_path).await.unwrap();
764
765        let lance_schema =
766            lance_core::datatypes::Schema::try_from(reader.schema().as_ref()).unwrap();
767
768        let mut file_writer =
769            FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
770
771        for batch in reader {
772            file_writer.write_batch(&batch.unwrap()).await.unwrap();
773        }
774        file_writer.add_schema_metadata("foo", "bar");
775        file_writer.finish().await.unwrap();
776        // Tests asserting the contents of the written file are in reader.rs
777    }
778
779    #[tokio::test]
780    async fn test_write_empty() {
781        let tmp_dir = tempfile::tempdir().unwrap();
782        let tmp_path: String = tmp_dir.path().to_str().unwrap().to_owned();
783        let tmp_path = Path::parse(tmp_path).unwrap();
784        let tmp_path = tmp_path.child("some_file.lance");
785        let obj_store = Arc::new(ObjectStore::local());
786
787        let reader = gen()
788            .col("score", array::rand::<Float64Type>())
789            .into_reader_rows(RowCount::from(0), BatchCount::from(0));
790
791        let writer = obj_store.create(&tmp_path).await.unwrap();
792
793        let lance_schema =
794            lance_core::datatypes::Schema::try_from(reader.schema().as_ref()).unwrap();
795
796        let mut file_writer =
797            FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
798
799        for batch in reader {
800            file_writer.write_batch(&batch.unwrap()).await.unwrap();
801        }
802        file_writer.add_schema_metadata("foo", "bar");
803        file_writer.finish().await.unwrap();
804    }
805}