1use core::panic;
5use std::collections::HashMap;
6use std::sync::atomic::AtomicBool;
7use std::sync::Arc;
8
9use arrow_array::RecordBatch;
10
11use arrow_data::ArrayData;
12use bytes::{BufMut, Bytes, BytesMut};
13use futures::stream::FuturesOrdered;
14use futures::StreamExt;
15use lance_core::datatypes::{Field, Schema as LanceSchema};
16use lance_core::utils::bit::pad_bytes;
17use lance_core::{Error, Result};
18use lance_encoding::decoder::PageEncoding;
19use lance_encoding::encoder::{
20 default_encoding_strategy, BatchEncoder, EncodeTask, EncodedBatch, EncodedPage,
21 EncodingOptions, FieldEncoder, FieldEncodingStrategy, OutOfLineBuffers,
22};
23use lance_encoding::repdef::RepDefBuilder;
24use lance_encoding::version::LanceFileVersion;
25use lance_io::object_store::ObjectStore;
26use lance_io::object_writer::ObjectWriter;
27use lance_io::traits::Writer;
28use log::{debug, warn};
29use object_store::path::Path;
30use prost::Message;
31use prost_types::Any;
32use snafu::location;
33use tokio::io::AsyncWriteExt;
34use tracing::instrument;
35
36use crate::datatypes::FieldsWithMeta;
37use crate::format::pb;
38use crate::format::pbfile;
39use crate::format::pbfile::DirectEncoding;
40use crate::format::MAGIC;
41
42pub(crate) const PAGE_BUFFER_ALIGNMENT: usize = 64;
44const PAD_BUFFER: [u8; PAGE_BUFFER_ALIGNMENT] = [72; PAGE_BUFFER_ALIGNMENT];
45const MAX_PAGE_BYTES: usize = 32 * 1024 * 1024;
46const ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES: &str = "LANCE_FILE_WRITER_MAX_PAGE_BYTES";
47
48#[derive(Debug, Clone, Default)]
49pub struct FileWriterOptions {
50 pub data_cache_bytes: Option<u64>,
65 pub max_page_bytes: Option<u64>,
71 pub keep_original_array: Option<bool>,
89 pub encoding_strategy: Option<Arc<dyn FieldEncodingStrategy>>,
90 pub format_version: Option<LanceFileVersion>,
96}
97
98pub struct FileWriter {
99 writer: ObjectWriter,
100 schema: Option<LanceSchema>,
101 column_writers: Vec<Box<dyn FieldEncoder>>,
102 column_metadata: Vec<pbfile::ColumnMetadata>,
103 field_id_to_column_indices: Vec<(u32, u32)>,
104 num_columns: u32,
105 rows_written: u64,
106 global_buffers: Vec<(u64, u64)>,
107 schema_metadata: HashMap<String, String>,
108 options: FileWriterOptions,
109}
110
111fn initial_column_metadata() -> pbfile::ColumnMetadata {
112 pbfile::ColumnMetadata {
113 pages: Vec::new(),
114 buffer_offsets: Vec::new(),
115 buffer_sizes: Vec::new(),
116 encoding: None,
117 }
118}
119
120static WARNED_ON_UNSTABLE_API: AtomicBool = AtomicBool::new(false);
121
122impl FileWriter {
123 pub fn try_new(
125 object_writer: ObjectWriter,
126 schema: LanceSchema,
127 options: FileWriterOptions,
128 ) -> Result<Self> {
129 let mut writer = Self::new_lazy(object_writer, options);
130 writer.initialize(schema)?;
131 Ok(writer)
132 }
133
134 pub fn new_lazy(object_writer: ObjectWriter, options: FileWriterOptions) -> Self {
139 if let Some(format_version) = options.format_version {
140 if format_version > LanceFileVersion::Stable
141 && WARNED_ON_UNSTABLE_API
142 .compare_exchange(
143 false,
144 true,
145 std::sync::atomic::Ordering::Relaxed,
146 std::sync::atomic::Ordering::Relaxed,
147 )
148 .is_ok()
149 {
150 warn!("You have requested an unstable format version. Files written with this format version may not be readable in the future! This is a development feature and should only be used for experimentation and never for production data.");
151 }
152 }
153 Self {
154 writer: object_writer,
155 schema: None,
156 column_writers: Vec::new(),
157 column_metadata: Vec::new(),
158 num_columns: 0,
159 rows_written: 0,
160 field_id_to_column_indices: Vec::new(),
161 global_buffers: Vec::new(),
162 schema_metadata: HashMap::new(),
163 options,
164 }
165 }
166
167 pub async fn create_file_with_batches(
171 store: &ObjectStore,
172 path: &Path,
173 schema: lance_core::datatypes::Schema,
174 batches: impl Iterator<Item = RecordBatch> + Send,
175 options: FileWriterOptions,
176 ) -> Result<usize> {
177 let writer = store.create(path).await?;
178 let mut writer = Self::try_new(writer, schema, options)?;
179 for batch in batches {
180 writer.write_batch(&batch).await?;
181 }
182 Ok(writer.finish().await? as usize)
183 }
184
185 async fn do_write_buffer(writer: &mut ObjectWriter, buf: &[u8]) -> Result<()> {
186 writer.write_all(buf).await?;
187 let pad_bytes = pad_bytes::<PAGE_BUFFER_ALIGNMENT>(buf.len());
188 writer.write_all(&PAD_BUFFER[..pad_bytes]).await?;
189 Ok(())
190 }
191
192 pub fn version(&self) -> LanceFileVersion {
194 self.options.format_version.unwrap_or_default()
195 }
196
197 async fn write_page(&mut self, encoded_page: EncodedPage) -> Result<()> {
198 let buffers = encoded_page.data;
199 let mut buffer_offsets = Vec::with_capacity(buffers.len());
200 let mut buffer_sizes = Vec::with_capacity(buffers.len());
201 for buffer in buffers {
202 buffer_offsets.push(self.writer.tell().await? as u64);
203 buffer_sizes.push(buffer.len() as u64);
204 Self::do_write_buffer(&mut self.writer, &buffer).await?;
205 }
206 let encoded_encoding = match encoded_page.description {
207 PageEncoding::Legacy(array_encoding) => Any::from_msg(&array_encoding)?.encode_to_vec(),
208 PageEncoding::Structural(page_layout) => Any::from_msg(&page_layout)?.encode_to_vec(),
209 };
210 let page = pbfile::column_metadata::Page {
211 buffer_offsets,
212 buffer_sizes,
213 encoding: Some(pbfile::Encoding {
214 location: Some(pbfile::encoding::Location::Direct(DirectEncoding {
215 encoding: encoded_encoding,
216 })),
217 }),
218 length: encoded_page.num_rows,
219 priority: encoded_page.row_number,
220 };
221 self.column_metadata[encoded_page.column_idx as usize]
222 .pages
223 .push(page);
224 Ok(())
225 }
226
227 #[instrument(skip_all, level = "debug")]
228 async fn write_pages(&mut self, mut encoding_tasks: FuturesOrdered<EncodeTask>) -> Result<()> {
229 while let Some(encoding_task) = encoding_tasks.next().await {
238 let encoded_page = encoding_task?;
239 self.write_page(encoded_page).await?;
240 }
241 self.writer.flush().await?;
246 Ok(())
247 }
248
249 pub async fn write_batches(
251 &mut self,
252 batches: impl Iterator<Item = &RecordBatch>,
253 ) -> Result<()> {
254 for batch in batches {
255 self.write_batch(batch).await?;
256 }
257 Ok(())
258 }
259
260 fn verify_field_nullability(arr: &ArrayData, field: &Field) -> Result<()> {
261 if !field.nullable && arr.null_count() > 0 {
262 return Err(Error::invalid_input(format!("The field `{}` contained null values even though the field is marked non-null in the schema", field.name), location!()));
263 }
264
265 for (child_field, child_arr) in field.children.iter().zip(arr.child_data()) {
266 Self::verify_field_nullability(child_arr, child_field)?;
267 }
268
269 Ok(())
270 }
271
272 fn verify_nullability_constraints(&self, batch: &RecordBatch) -> Result<()> {
273 for (col, field) in batch
274 .columns()
275 .iter()
276 .zip(self.schema.as_ref().unwrap().fields.iter())
277 {
278 Self::verify_field_nullability(&col.to_data(), field)?;
279 }
280 Ok(())
281 }
282
283 fn initialize(&mut self, mut schema: LanceSchema) -> Result<()> {
284 let cache_bytes_per_column = if let Some(data_cache_bytes) = self.options.data_cache_bytes {
285 data_cache_bytes / schema.fields.len() as u64
286 } else {
287 8 * 1024 * 1024
288 };
289
290 let max_page_bytes = self.options.max_page_bytes.unwrap_or_else(|| {
291 std::env::var(ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES)
292 .map(|s| {
293 s.parse::<u64>().unwrap_or_else(|e| {
294 warn!(
295 "Failed to parse {}: {}, using default",
296 ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, e
297 );
298 MAX_PAGE_BYTES as u64
299 })
300 })
301 .unwrap_or(MAX_PAGE_BYTES as u64)
302 });
303
304 schema.validate()?;
305
306 let keep_original_array = self.options.keep_original_array.unwrap_or(false);
307 let encoding_strategy = self.options.encoding_strategy.clone().unwrap_or_else(|| {
308 let version = self.version();
309 default_encoding_strategy(version).into()
310 });
311
312 let encoding_options = EncodingOptions {
313 cache_bytes_per_column,
314 max_page_bytes,
315 keep_original_array,
316 buffer_alignment: PAGE_BUFFER_ALIGNMENT as u64,
317 };
318 let encoder =
319 BatchEncoder::try_new(&schema, encoding_strategy.as_ref(), &encoding_options)?;
320 self.num_columns = encoder.num_columns();
321
322 self.column_writers = encoder.field_encoders;
323 self.column_metadata = vec![initial_column_metadata(); self.num_columns as usize];
324 self.field_id_to_column_indices = encoder.field_id_to_column_index;
325 self.schema_metadata
326 .extend(std::mem::take(&mut schema.metadata));
327 self.schema = Some(schema);
328 Ok(())
329 }
330
331 fn ensure_initialized(&mut self, batch: &RecordBatch) -> Result<&LanceSchema> {
332 if self.schema.is_none() {
333 let schema = LanceSchema::try_from(batch.schema().as_ref())?;
334 self.initialize(schema)?;
335 }
336 Ok(self.schema.as_ref().unwrap())
337 }
338
339 #[instrument(skip_all, level = "debug")]
340 fn encode_batch(
341 &mut self,
342 batch: &RecordBatch,
343 external_buffers: &mut OutOfLineBuffers,
344 ) -> Result<Vec<Vec<EncodeTask>>> {
345 self.schema
346 .as_ref()
347 .unwrap()
348 .fields
349 .iter()
350 .zip(self.column_writers.iter_mut())
351 .map(|(field, column_writer)| {
352 let array = batch
353 .column_by_name(&field.name)
354 .ok_or(Error::InvalidInput {
355 source: format!(
356 "Cannot write batch. The batch was missing the column `{}`",
357 field.name
358 )
359 .into(),
360 location: location!(),
361 })?;
362 let repdef = RepDefBuilder::default();
363 let num_rows = array.len() as u64;
364 column_writer.maybe_encode(
365 array.clone(),
366 external_buffers,
367 repdef,
368 self.rows_written,
369 num_rows,
370 )
371 })
372 .collect::<Result<Vec<_>>>()
373 }
374
375 pub async fn write_batch(&mut self, batch: &RecordBatch) -> Result<()> {
380 debug!(
381 "write_batch called with {} bytes of data",
382 batch.get_array_memory_size()
383 );
384 self.ensure_initialized(batch)?;
385 self.verify_nullability_constraints(batch)?;
386 let num_rows = batch.num_rows() as u64;
387 if num_rows == 0 {
388 return Ok(());
389 }
390 if num_rows > u32::MAX as u64 {
391 return Err(Error::InvalidInput {
392 source: "cannot write Lance files with more than 2^32 rows".into(),
393 location: location!(),
394 });
395 }
396 let mut external_buffers =
399 OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
400 let encoding_tasks = self.encode_batch(batch, &mut external_buffers)?;
401 for external_buffer in external_buffers.take_buffers() {
403 Self::do_write_buffer(&mut self.writer, &external_buffer).await?;
404 }
405
406 let encoding_tasks = encoding_tasks
407 .into_iter()
408 .flatten()
409 .collect::<FuturesOrdered<_>>();
410
411 self.rows_written = match self.rows_written.checked_add(batch.num_rows() as u64) {
412 Some(rows_written) => rows_written,
413 None => {
414 return Err(Error::InvalidInput { source: format!("cannot write batch with {} rows because {} rows have already been written and Lance files cannot contain more than 2^64 rows", num_rows, self.rows_written).into(), location: location!() });
415 }
416 };
417
418 self.write_pages(encoding_tasks).await?;
419
420 Ok(())
421 }
422
423 async fn write_column_metadata(
424 &mut self,
425 metadata: pbfile::ColumnMetadata,
426 ) -> Result<(u64, u64)> {
427 let metadata_bytes = metadata.encode_to_vec();
428 let position = self.writer.tell().await? as u64;
429 let len = metadata_bytes.len() as u64;
430 self.writer.write_all(&metadata_bytes).await?;
431 Ok((position, len))
432 }
433
434 async fn write_column_metadatas(&mut self) -> Result<Vec<(u64, u64)>> {
435 let mut metadatas = Vec::new();
436 std::mem::swap(&mut self.column_metadata, &mut metadatas);
437 let mut metadata_positions = Vec::with_capacity(metadatas.len());
438 for metadata in metadatas {
439 metadata_positions.push(self.write_column_metadata(metadata).await?);
440 }
441 Ok(metadata_positions)
442 }
443
444 fn make_file_descriptor(
445 schema: &lance_core::datatypes::Schema,
446 num_rows: u64,
447 ) -> Result<pb::FileDescriptor> {
448 let fields_with_meta = FieldsWithMeta::from(schema);
449 Ok(pb::FileDescriptor {
450 schema: Some(pb::Schema {
451 fields: fields_with_meta.fields.0,
452 metadata: fields_with_meta.metadata,
453 }),
454 length: num_rows,
455 })
456 }
457
458 async fn write_global_buffers(&mut self) -> Result<Vec<(u64, u64)>> {
459 let schema = self.schema.as_mut().ok_or(Error::invalid_input("No schema provided on writer open and no data provided. Schema is unknown and file cannot be created", location!()))?;
460 schema.metadata = std::mem::take(&mut self.schema_metadata);
461 let file_descriptor = Self::make_file_descriptor(schema, self.rows_written)?;
462 let file_descriptor_bytes = file_descriptor.encode_to_vec();
463 let file_descriptor_len = file_descriptor_bytes.len() as u64;
464 let file_descriptor_position = self.writer.tell().await? as u64;
465 self.writer.write_all(&file_descriptor_bytes).await?;
466 let mut gbo_table = Vec::with_capacity(1 + self.global_buffers.len());
467 gbo_table.push((file_descriptor_position, file_descriptor_len));
468 gbo_table.append(&mut self.global_buffers);
469 Ok(gbo_table)
470 }
471
472 pub fn add_schema_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
478 self.schema_metadata.insert(key.into(), value.into());
479 }
480
481 pub async fn add_global_buffer(&mut self, buffer: Bytes) -> Result<u32> {
487 let position = self.writer.tell().await? as u64;
488 let len = buffer.len() as u64;
489 Self::do_write_buffer(&mut self.writer, &buffer).await?;
490 self.global_buffers.push((position, len));
491 Ok(self.global_buffers.len() as u32)
492 }
493
494 async fn finish_writers(&mut self) -> Result<()> {
495 let mut col_idx = 0;
496 for mut writer in std::mem::take(&mut self.column_writers) {
497 let mut external_buffers =
498 OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
499 let columns = writer.finish(&mut external_buffers).await?;
500 for buffer in external_buffers.take_buffers() {
501 self.writer.write_all(&buffer).await?;
502 }
503 debug_assert_eq!(
504 columns.len(),
505 writer.num_columns() as usize,
506 "Expected {} columns from column at index {} and got {}",
507 writer.num_columns(),
508 col_idx,
509 columns.len()
510 );
511 for column in columns {
512 for page in column.final_pages {
513 self.write_page(page).await?;
514 }
515 let column_metadata = &mut self.column_metadata[col_idx];
516 let mut buffer_pos = self.writer.tell().await? as u64;
517 for buffer in column.column_buffers {
518 column_metadata.buffer_offsets.push(buffer_pos);
519 let mut size = 0;
520 Self::do_write_buffer(&mut self.writer, &buffer).await?;
521 size += buffer.len() as u64;
522 buffer_pos += size;
523 column_metadata.buffer_sizes.push(size);
524 }
525 let encoded_encoding = Any::from_msg(&column.encoding)?.encode_to_vec();
526 column_metadata.encoding = Some(pbfile::Encoding {
527 location: Some(pbfile::encoding::Location::Direct(pbfile::DirectEncoding {
528 encoding: encoded_encoding,
529 })),
530 });
531 col_idx += 1;
532 }
533 }
534 if col_idx != self.column_metadata.len() {
535 panic!(
536 "Column writers finished with {} columns but we expected {}",
537 col_idx,
538 self.column_metadata.len()
539 );
540 }
541 Ok(())
542 }
543
544 fn version_to_numbers(&self) -> (u16, u16) {
547 let version = self.options.format_version.unwrap_or_default();
548 match version.resolve() {
549 LanceFileVersion::V2_0 => (0, 3),
550 LanceFileVersion::V2_1 => (2, 1),
551 _ => panic!("Unsupported version: {}", version),
552 }
553 }
554
555 pub async fn finish(&mut self) -> Result<u64> {
563 let mut external_buffers =
565 OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
566 let encoding_tasks = self
567 .column_writers
568 .iter_mut()
569 .map(|writer| writer.flush(&mut external_buffers))
570 .collect::<Result<Vec<_>>>()?;
571 for external_buffer in external_buffers.take_buffers() {
572 Self::do_write_buffer(&mut self.writer, &external_buffer).await?;
573 }
574 let encoding_tasks = encoding_tasks
575 .into_iter()
576 .flatten()
577 .collect::<FuturesOrdered<_>>();
578 self.write_pages(encoding_tasks).await?;
579
580 self.finish_writers().await?;
581
582 let global_buffer_offsets = self.write_global_buffers().await?;
584 let num_global_buffers = global_buffer_offsets.len() as u32;
585
586 let column_metadata_start = self.writer.tell().await? as u64;
588 let metadata_positions = self.write_column_metadatas().await?;
589
590 let cmo_table_start = self.writer.tell().await? as u64;
592 for (meta_pos, meta_len) in metadata_positions {
593 self.writer.write_u64_le(meta_pos).await?;
594 self.writer.write_u64_le(meta_len).await?;
595 }
596
597 let gbo_table_start = self.writer.tell().await? as u64;
599 for (gbo_pos, gbo_len) in global_buffer_offsets {
600 self.writer.write_u64_le(gbo_pos).await?;
601 self.writer.write_u64_le(gbo_len).await?;
602 }
603
604 let (major, minor) = self.version_to_numbers();
605 self.writer.write_u64_le(column_metadata_start).await?;
607 self.writer.write_u64_le(cmo_table_start).await?;
608 self.writer.write_u64_le(gbo_table_start).await?;
609 self.writer.write_u32_le(num_global_buffers).await?;
610 self.writer.write_u32_le(self.num_columns).await?;
611 self.writer.write_u16_le(major).await?;
612 self.writer.write_u16_le(minor).await?;
613 self.writer.write_all(MAGIC).await?;
614
615 self.writer.shutdown().await?;
617 Ok(self.rows_written)
618 }
619
620 pub async fn abort(&mut self) {
621 self.writer.abort().await;
622 }
623
624 pub async fn tell(&mut self) -> Result<u64> {
625 Ok(self.writer.tell().await? as u64)
626 }
627
628 pub fn field_id_to_column_indices(&self) -> &[(u32, u32)] {
629 &self.field_id_to_column_indices
630 }
631}
632
633pub trait EncodedBatchWriteExt {
636 fn try_to_self_described_lance(&self, version: LanceFileVersion) -> Result<Bytes>;
638 fn try_to_mini_lance(&self, version: LanceFileVersion) -> Result<Bytes>;
642}
643
644fn concat_lance_footer(
649 batch: &EncodedBatch,
650 write_schema: bool,
651 version: LanceFileVersion,
652) -> Result<Bytes> {
653 let mut data = BytesMut::with_capacity(batch.data.len() + 1024 * 1024);
655 data.put(batch.data.clone());
656 let global_buffers = if write_schema {
658 let schema_start = data.len() as u64;
659 let lance_schema = lance_core::datatypes::Schema::try_from(batch.schema.as_ref())?;
660 let descriptor = FileWriter::make_file_descriptor(&lance_schema, batch.num_rows)?;
661 let descriptor_bytes = descriptor.encode_to_vec();
662 let descriptor_len = descriptor_bytes.len() as u64;
663 data.put(descriptor_bytes.as_slice());
664
665 vec![(schema_start, descriptor_len)]
666 } else {
667 vec![]
668 };
669 let col_metadata_start = data.len() as u64;
670
671 let mut col_metadata_positions = Vec::new();
672 for col in &batch.page_table {
674 let position = data.len() as u64;
675 let pages = col
676 .page_infos
677 .iter()
678 .map(|page_info| {
679 let encoded_encoding = match &page_info.encoding {
680 PageEncoding::Legacy(array_encoding) => {
681 Any::from_msg(array_encoding)?.encode_to_vec()
682 }
683 PageEncoding::Structural(page_layout) => {
684 Any::from_msg(page_layout)?.encode_to_vec()
685 }
686 };
687 let (buffer_offsets, buffer_sizes): (Vec<_>, Vec<_>) = page_info
688 .buffer_offsets_and_sizes
689 .as_ref()
690 .iter()
691 .cloned()
692 .unzip();
693 Ok(pbfile::column_metadata::Page {
694 buffer_offsets,
695 buffer_sizes,
696 encoding: Some(pbfile::Encoding {
697 location: Some(pbfile::encoding::Location::Direct(DirectEncoding {
698 encoding: encoded_encoding,
699 })),
700 }),
701 length: page_info.num_rows,
702 priority: page_info.priority,
703 })
704 })
705 .collect::<Result<Vec<_>>>()?;
706 let (buffer_offsets, buffer_sizes): (Vec<_>, Vec<_>) =
707 col.buffer_offsets_and_sizes.iter().cloned().unzip();
708 let encoded_col_encoding = Any::from_msg(&col.encoding)?.encode_to_vec();
709 let column = pbfile::ColumnMetadata {
710 pages,
711 buffer_offsets,
712 buffer_sizes,
713 encoding: Some(pbfile::Encoding {
714 location: Some(pbfile::encoding::Location::Direct(pbfile::DirectEncoding {
715 encoding: encoded_col_encoding,
716 })),
717 }),
718 };
719 let column_bytes = column.encode_to_vec();
720 col_metadata_positions.push((position, column_bytes.len() as u64));
721 data.put(column_bytes.as_slice());
722 }
723 let cmo_table_start = data.len() as u64;
725 for (meta_pos, meta_len) in col_metadata_positions {
726 data.put_u64_le(meta_pos);
727 data.put_u64_le(meta_len);
728 }
729 let gbo_table_start = data.len() as u64;
731 let num_global_buffers = global_buffers.len() as u32;
732 for (gbo_pos, gbo_len) in global_buffers {
733 data.put_u64_le(gbo_pos);
734 data.put_u64_le(gbo_len);
735 }
736
737 let (major, minor) = version.to_numbers();
738
739 data.put_u64_le(col_metadata_start);
741 data.put_u64_le(cmo_table_start);
742 data.put_u64_le(gbo_table_start);
743 data.put_u32_le(num_global_buffers);
744 data.put_u32_le(batch.page_table.len() as u32);
745 data.put_u16_le(major as u16);
746 data.put_u16_le(minor as u16);
747 data.put(MAGIC.as_slice());
748
749 Ok(data.freeze())
750}
751
752impl EncodedBatchWriteExt for EncodedBatch {
753 fn try_to_self_described_lance(&self, version: LanceFileVersion) -> Result<Bytes> {
754 concat_lance_footer(self, true, version)
755 }
756
757 fn try_to_mini_lance(&self, version: LanceFileVersion) -> Result<Bytes> {
758 concat_lance_footer(self, false, version)
759 }
760}
761
762#[cfg(test)]
763mod tests {
764 use std::collections::HashMap;
765 use std::sync::Arc;
766
767 use crate::v2::reader::{describe_encoding, FileReader, FileReaderOptions};
768 use crate::v2::testing::FsFixture;
769 use crate::v2::writer::{FileWriter, FileWriterOptions, ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES};
770 use arrow_array::builder::{Float32Builder, Int32Builder};
771 use arrow_array::{types::Float64Type, RecordBatchReader, StringArray};
772 use arrow_array::{Int32Array, RecordBatch, UInt64Array};
773 use arrow_schema::{DataType, Field, Field as ArrowField, Schema, Schema as ArrowSchema};
774 use lance_core::cache::LanceCache;
775 use lance_core::datatypes::Schema as LanceSchema;
776 use lance_datagen::{array, gen_batch, BatchCount, RowCount};
777 use lance_encoding::compression_config::{CompressionFieldParams, CompressionParams};
778 use lance_encoding::decoder::DecoderPlugins;
779 use lance_encoding::version::LanceFileVersion;
780 use lance_io::object_store::ObjectStore;
781 use lance_io::utils::CachedFileSize;
782 use object_store::path::Path;
783 use tempfile::tempdir;
784
785 #[tokio::test]
786 async fn test_basic_write() {
787 let tmp_dir = tempfile::tempdir().unwrap();
788 let tmp_path: String = tmp_dir.path().to_str().unwrap().to_owned();
789 let tmp_path = Path::parse(tmp_path).unwrap();
790 let tmp_path = tmp_path.child("some_file.lance");
791 let obj_store = Arc::new(ObjectStore::local());
792
793 let reader = gen_batch()
794 .col("score", array::rand::<Float64Type>())
795 .into_reader_rows(RowCount::from(1000), BatchCount::from(10));
796
797 let writer = obj_store.create(&tmp_path).await.unwrap();
798
799 let lance_schema =
800 lance_core::datatypes::Schema::try_from(reader.schema().as_ref()).unwrap();
801
802 let mut file_writer =
803 FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
804
805 for batch in reader {
806 file_writer.write_batch(&batch.unwrap()).await.unwrap();
807 }
808 file_writer.add_schema_metadata("foo", "bar");
809 file_writer.finish().await.unwrap();
810 }
812
813 #[tokio::test]
814 async fn test_write_empty() {
815 let tmp_dir = tempfile::tempdir().unwrap();
816 let tmp_path: String = tmp_dir.path().to_str().unwrap().to_owned();
817 let tmp_path = Path::parse(tmp_path).unwrap();
818 let tmp_path = tmp_path.child("some_file.lance");
819 let obj_store = Arc::new(ObjectStore::local());
820
821 let reader = gen_batch()
822 .col("score", array::rand::<Float64Type>())
823 .into_reader_rows(RowCount::from(0), BatchCount::from(0));
824
825 let writer = obj_store.create(&tmp_path).await.unwrap();
826
827 let lance_schema =
828 lance_core::datatypes::Schema::try_from(reader.schema().as_ref()).unwrap();
829
830 let mut file_writer =
831 FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
832
833 for batch in reader {
834 file_writer.write_batch(&batch.unwrap()).await.unwrap();
835 }
836 file_writer.add_schema_metadata("foo", "bar");
837 file_writer.finish().await.unwrap();
838 }
839
840 #[tokio::test]
841 async fn test_max_page_bytes_enforced() {
842 let arrow_field = Field::new("data", DataType::UInt64, false);
843 let arrow_schema = Schema::new(vec![arrow_field]);
844 let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();
845
846 let data: Vec<u64> = (0..1_000_000).collect();
848 let array = UInt64Array::from(data);
849 let batch =
850 RecordBatch::try_new(arrow_schema.clone().into(), vec![Arc::new(array)]).unwrap();
851
852 let options = FileWriterOptions {
853 max_page_bytes: Some(1024 * 1024), ..Default::default()
855 };
856
857 let tmp_dir = tempdir().unwrap();
858 let path = tmp_dir.path().join("test.lance");
859 let object_store = ObjectStore::local();
860 let mut writer = FileWriter::try_new(
861 object_store
862 .create(&Path::from(path.to_str().unwrap()))
863 .await
864 .unwrap(),
865 lance_schema,
866 options,
867 )
868 .unwrap();
869
870 writer.write_batch(&batch).await.unwrap();
871 writer.finish().await.unwrap();
872
873 let fs = FsFixture::default();
874 let file_scheduler = fs
875 .scheduler
876 .open_file(
877 &Path::from(path.to_str().unwrap()),
878 &CachedFileSize::unknown(),
879 )
880 .await
881 .unwrap();
882 let file_reader = FileReader::try_open(
883 file_scheduler,
884 None,
885 Arc::<DecoderPlugins>::default(),
886 &LanceCache::no_cache(),
887 FileReaderOptions::default(),
888 )
889 .await
890 .unwrap();
891
892 let column_meta = file_reader.metadata();
893
894 let mut total_page_num: u32 = 0;
895 for (col_idx, col_metadata) in column_meta.column_metadatas.iter().enumerate() {
896 assert!(
897 !col_metadata.pages.is_empty(),
898 "Column {} has no pages",
899 col_idx
900 );
901
902 for (page_idx, page) in col_metadata.pages.iter().enumerate() {
903 total_page_num += 1;
904 let total_size: u64 = page.buffer_sizes.iter().sum();
905 assert!(
906 total_size <= 1024 * 1024,
907 "Column {} Page {} size {} exceeds 1MB limit",
908 col_idx,
909 page_idx,
910 total_size
911 );
912 }
913 }
914
915 assert_eq!(total_page_num, 8)
916 }
917
918 #[tokio::test(flavor = "current_thread")]
919 async fn test_max_page_bytes_env_var() {
920 let arrow_field = Field::new("data", DataType::UInt64, false);
921 let arrow_schema = Schema::new(vec![arrow_field]);
922 let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();
923 let data: Vec<u64> = (0..500_000).collect();
925 let array = UInt64Array::from(data);
926 let batch =
927 RecordBatch::try_new(arrow_schema.clone().into(), vec![Arc::new(array)]).unwrap();
928
929 std::env::set_var(ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, "2097152");
931
932 let options = FileWriterOptions {
933 max_page_bytes: None, ..Default::default()
935 };
936
937 let tmp_dir = tempdir().unwrap();
938 let path = tmp_dir.path().join("test_env_var.lance");
939 let object_store = ObjectStore::local();
940 let mut writer = FileWriter::try_new(
941 object_store
942 .create(&Path::from(path.to_str().unwrap()))
943 .await
944 .unwrap(),
945 lance_schema.clone(),
946 options,
947 )
948 .unwrap();
949
950 writer.write_batch(&batch).await.unwrap();
951 writer.finish().await.unwrap();
952
953 let fs = FsFixture::default();
954 let file_scheduler = fs
955 .scheduler
956 .open_file(
957 &Path::from(path.to_str().unwrap()),
958 &CachedFileSize::unknown(),
959 )
960 .await
961 .unwrap();
962 let file_reader = FileReader::try_open(
963 file_scheduler,
964 None,
965 Arc::<DecoderPlugins>::default(),
966 &LanceCache::no_cache(),
967 FileReaderOptions::default(),
968 )
969 .await
970 .unwrap();
971
972 for col_metadata in file_reader.metadata().column_metadatas.iter() {
973 for page in col_metadata.pages.iter() {
974 let total_size: u64 = page.buffer_sizes.iter().sum();
975 assert!(
976 total_size <= 2 * 1024 * 1024,
977 "Page size {} exceeds 2MB limit",
978 total_size
979 );
980 }
981 }
982
983 std::env::set_var(ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, "");
984 }
985
986 #[tokio::test]
987 async fn test_compression_overrides_end_to_end() {
988 let arrow_schema = Arc::new(ArrowSchema::new(vec![
990 ArrowField::new("customer_id", DataType::Int32, false),
991 ArrowField::new("product_id", DataType::Int32, false),
992 ArrowField::new("quantity", DataType::Int32, false),
993 ArrowField::new("price", DataType::Float32, false),
994 ArrowField::new("description", DataType::Utf8, false),
995 ]));
996
997 let lance_schema = LanceSchema::try_from(arrow_schema.as_ref()).unwrap();
998
999 let mut customer_ids = Int32Builder::new();
1001 let mut product_ids = Int32Builder::new();
1002 let mut quantities = Int32Builder::new();
1003 let mut prices = Float32Builder::new();
1004 let mut descriptions = Vec::new();
1005
1006 for i in 0..10000 {
1013 customer_ids.append_value(i / 100);
1016
1017 product_ids.append_value(i / 2000);
1019
1020 quantities.append_value(if i % 10 == 0 { 5 } else { 1 });
1022
1023 prices.append_value(match i % 3 {
1025 0 => 9.99,
1026 1 => 19.99,
1027 _ => 29.99,
1028 });
1029
1030 descriptions.push(format!("Product {}", i / 2000));
1032 }
1033
1034 let batch = RecordBatch::try_new(
1035 arrow_schema.clone(),
1036 vec![
1037 Arc::new(customer_ids.finish()),
1038 Arc::new(product_ids.finish()),
1039 Arc::new(quantities.finish()),
1040 Arc::new(prices.finish()),
1041 Arc::new(StringArray::from(descriptions)),
1042 ],
1043 )
1044 .unwrap();
1045
1046 let mut params = CompressionParams::new();
1048
1049 params.columns.insert(
1051 "*_id".to_string(),
1052 CompressionFieldParams {
1053 rle_threshold: Some(0.5), compression: None, compression_level: None,
1056 bss: Some(lance_encoding::compression_config::BssMode::Off), },
1058 );
1059
1060 let encoding_strategy = lance_encoding::encoder::default_encoding_strategy_with_params(
1065 LanceFileVersion::V2_1,
1066 params,
1067 )
1068 .unwrap();
1069
1070 let options = FileWriterOptions {
1072 encoding_strategy: Some(Arc::from(encoding_strategy)),
1073 format_version: Some(LanceFileVersion::V2_1),
1074 max_page_bytes: Some(64 * 1024), ..Default::default()
1076 };
1077
1078 let tmp_dir = tempdir().unwrap();
1080 let path = tmp_dir.path().join("test_compression_overrides.lance");
1081 let object_store = ObjectStore::local();
1082
1083 let mut writer = FileWriter::try_new(
1084 object_store
1085 .create(&Path::from(path.to_str().unwrap()))
1086 .await
1087 .unwrap(),
1088 lance_schema.clone(),
1089 options,
1090 )
1091 .unwrap();
1092
1093 writer.write_batch(&batch).await.unwrap();
1094 writer.add_schema_metadata("compression_test", "configured_compression");
1095 writer.finish().await.unwrap();
1096
1097 let path_no_compression = tmp_dir.path().join("test_no_compression.lance");
1099 let default_options = FileWriterOptions {
1100 format_version: Some(LanceFileVersion::V2_1),
1101 max_page_bytes: Some(64 * 1024),
1102 ..Default::default()
1103 };
1104
1105 let mut writer_no_compression = FileWriter::try_new(
1106 object_store
1107 .create(&Path::from(path_no_compression.to_str().unwrap()))
1108 .await
1109 .unwrap(),
1110 lance_schema.clone(),
1111 default_options,
1112 )
1113 .unwrap();
1114
1115 writer_no_compression.write_batch(&batch).await.unwrap();
1116 writer_no_compression.finish().await.unwrap();
1117
1118 let fs = FsFixture::default();
1124 let file_scheduler = fs
1125 .scheduler
1126 .open_file(
1127 &Path::from(path.to_str().unwrap()),
1128 &CachedFileSize::unknown(),
1129 )
1130 .await
1131 .unwrap();
1132
1133 let file_reader = FileReader::try_open(
1134 file_scheduler,
1135 None,
1136 Arc::<DecoderPlugins>::default(),
1137 &LanceCache::no_cache(),
1138 FileReaderOptions::default(),
1139 )
1140 .await
1141 .unwrap();
1142
1143 let metadata = file_reader.metadata();
1145 assert_eq!(metadata.major_version, 2);
1146 assert_eq!(metadata.minor_version, 1);
1147
1148 let schema = file_reader.schema();
1149 assert_eq!(
1150 schema.metadata.get("compression_test"),
1151 Some(&"configured_compression".to_string())
1152 );
1153
1154 let column_metadatas = &metadata.column_metadatas;
1156
1157 assert!(!column_metadatas[0].pages.is_empty());
1159 let customer_id_encoding = describe_encoding(&column_metadatas[0].pages[0]);
1160 assert!(
1161 customer_id_encoding.contains("RLE") || customer_id_encoding.contains("Rle"),
1162 "customer_id column should use RLE encoding due to '*_id' pattern match, but got: {}",
1163 customer_id_encoding
1164 );
1165
1166 assert!(!column_metadatas[1].pages.is_empty());
1168 let product_id_encoding = describe_encoding(&column_metadatas[1].pages[0]);
1169 assert!(
1170 product_id_encoding.contains("RLE") || product_id_encoding.contains("Rle"),
1171 "product_id column should use RLE encoding due to '*_id' pattern match, but got: {}",
1172 product_id_encoding
1173 );
1174 }
1175
1176 #[tokio::test]
1177 async fn test_field_metadata_compression() {
1178 let mut metadata = HashMap::new();
1180 metadata.insert(
1181 lance_encoding::constants::COMPRESSION_META_KEY.to_string(),
1182 "zstd".to_string(),
1183 );
1184 metadata.insert(
1185 lance_encoding::constants::COMPRESSION_LEVEL_META_KEY.to_string(),
1186 "6".to_string(),
1187 );
1188
1189 let arrow_schema = Arc::new(ArrowSchema::new(vec![
1190 ArrowField::new("id", DataType::Int32, false),
1191 ArrowField::new("text", DataType::Utf8, false).with_metadata(metadata.clone()),
1192 ArrowField::new("data", DataType::Int32, false).with_metadata(HashMap::from([(
1193 lance_encoding::constants::COMPRESSION_META_KEY.to_string(),
1194 "none".to_string(),
1195 )])),
1196 ]));
1197
1198 let lance_schema = LanceSchema::try_from(arrow_schema.as_ref()).unwrap();
1199
1200 let id_array = Int32Array::from_iter_values(0..1000);
1202 let text_array = StringArray::from_iter_values(
1203 (0..1000).map(|i| format!("test string {} repeated text", i)),
1204 );
1205 let data_array = Int32Array::from_iter_values((0..1000).map(|i| i * 2));
1206
1207 let batch = RecordBatch::try_new(
1208 arrow_schema.clone(),
1209 vec![
1210 Arc::new(id_array),
1211 Arc::new(text_array),
1212 Arc::new(data_array),
1213 ],
1214 )
1215 .unwrap();
1216
1217 let tmp_dir = tempdir().unwrap();
1218 let path = tmp_dir.path().join("field_metadata_test.lance");
1219 let object_store = ObjectStore::local();
1220
1221 let params = CompressionParams::new();
1223 let encoding_strategy = lance_encoding::encoder::default_encoding_strategy_with_params(
1224 LanceFileVersion::V2_1,
1225 params,
1226 )
1227 .unwrap();
1228
1229 let options = FileWriterOptions {
1230 encoding_strategy: Some(Arc::from(encoding_strategy)),
1231 format_version: Some(LanceFileVersion::V2_1),
1232 ..Default::default()
1233 };
1234 let mut writer = FileWriter::try_new(
1235 object_store
1236 .create(&Path::from(path.to_str().unwrap()))
1237 .await
1238 .unwrap(),
1239 lance_schema.clone(),
1240 options,
1241 )
1242 .unwrap();
1243
1244 writer.write_batch(&batch).await.unwrap();
1245 writer.finish().await.unwrap();
1246
1247 let fs = FsFixture::default();
1249 let file_scheduler = fs
1250 .scheduler
1251 .open_file(
1252 &Path::from(path.to_str().unwrap()),
1253 &CachedFileSize::unknown(),
1254 )
1255 .await
1256 .unwrap();
1257 let file_reader = FileReader::try_open(
1258 file_scheduler,
1259 None,
1260 Arc::<DecoderPlugins>::default(),
1261 &LanceCache::no_cache(),
1262 FileReaderOptions::default(),
1263 )
1264 .await
1265 .unwrap();
1266
1267 let column_metadatas = &file_reader.metadata().column_metadatas;
1268
1269 let text_encoding = describe_encoding(&column_metadatas[1].pages[0]);
1271 assert!(
1273 text_encoding.contains("Zstd"),
1274 "text column should use zstd compression from field metadata, but got: {}",
1275 text_encoding
1276 );
1277
1278 let data_encoding = describe_encoding(&column_metadatas[2].pages[0]);
1280 assert!(
1282 data_encoding.contains("Flat") && data_encoding.contains("compression: None"),
1283 "data column should use no compression from field metadata, but got: {}",
1284 data_encoding
1285 );
1286 }
1287
1288 #[tokio::test]
1289 async fn test_field_metadata_rle_threshold() {
1290 let mut metadata = HashMap::new();
1292 metadata.insert(
1293 lance_encoding::constants::RLE_THRESHOLD_META_KEY.to_string(),
1294 "0.9".to_string(),
1295 );
1296 metadata.insert(
1298 lance_encoding::constants::COMPRESSION_META_KEY.to_string(),
1299 "lz4".to_string(),
1300 );
1301 metadata.insert(
1303 lance_encoding::constants::BSS_META_KEY.to_string(),
1304 "off".to_string(),
1305 );
1306
1307 let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
1308 "status",
1309 DataType::Int32,
1310 false,
1311 )
1312 .with_metadata(metadata)]));
1313
1314 let lance_schema = LanceSchema::try_from(arrow_schema.as_ref()).unwrap();
1315
1316 let status_array = Int32Array::from_iter_values(
1318 std::iter::repeat_n(200, 8000)
1319 .chain(std::iter::repeat_n(404, 1500))
1320 .chain(std::iter::repeat_n(500, 500)),
1321 );
1322
1323 let batch =
1324 RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(status_array)]).unwrap();
1325
1326 let tmp_dir = tempdir().unwrap();
1327 let path = tmp_dir.path().join("rle_threshold_test.lance");
1328 let object_store = ObjectStore::local();
1329
1330 let params = CompressionParams::new();
1332 let encoding_strategy = lance_encoding::encoder::default_encoding_strategy_with_params(
1333 LanceFileVersion::V2_1,
1334 params,
1335 )
1336 .unwrap();
1337
1338 let options = FileWriterOptions {
1339 encoding_strategy: Some(Arc::from(encoding_strategy)),
1340 format_version: Some(LanceFileVersion::V2_1),
1341 ..Default::default()
1342 };
1343 let mut writer = FileWriter::try_new(
1344 object_store
1345 .create(&Path::from(path.to_str().unwrap()))
1346 .await
1347 .unwrap(),
1348 lance_schema.clone(),
1349 options,
1350 )
1351 .unwrap();
1352
1353 writer.write_batch(&batch).await.unwrap();
1354 writer.finish().await.unwrap();
1355
1356 let fs = FsFixture::default();
1358 let file_scheduler = fs
1359 .scheduler
1360 .open_file(
1361 &Path::from(path.to_str().unwrap()),
1362 &CachedFileSize::unknown(),
1363 )
1364 .await
1365 .unwrap();
1366 let file_reader = FileReader::try_open(
1367 file_scheduler,
1368 None,
1369 Arc::<DecoderPlugins>::default(),
1370 &LanceCache::no_cache(),
1371 FileReaderOptions::default(),
1372 )
1373 .await
1374 .unwrap();
1375
1376 let column_metadatas = &file_reader.metadata().column_metadatas;
1377 let status_encoding = describe_encoding(&column_metadatas[0].pages[0]);
1378 assert!(
1379 status_encoding.contains("RLE") || status_encoding.contains("Rle"),
1380 "status column should use RLE encoding due to metadata threshold, but got: {}",
1381 status_encoding
1382 );
1383 }
1384}