1use core::panic;
5use std::collections::HashMap;
6use std::sync::Arc;
7use std::sync::atomic::AtomicBool;
8
9use arrow_array::RecordBatch;
10
11use arrow_data::ArrayData;
12use bytes::{Buf, BufMut, Bytes, BytesMut};
13use futures::StreamExt;
14use futures::stream::FuturesOrdered;
15use lance_core::datatypes::{Field, Schema as LanceSchema};
16use lance_core::utils::bit::pad_bytes;
17use lance_core::{Error, Result};
18use lance_encoding::decoder::PageEncoding;
19use lance_encoding::encoder::{
20 BatchEncoder, EncodeTask, EncodedBatch, EncodedPage, EncodingOptions, FieldEncoder,
21 FieldEncodingStrategy, OutOfLineBuffers, default_encoding_strategy,
22};
23use lance_encoding::repdef::RepDefBuilder;
24use lance_encoding::version::LanceFileVersion;
25use lance_io::object_store::ObjectStore;
26use lance_io::traits::Writer;
27use log::{debug, warn};
28use object_store::path::Path;
29use prost::Message;
30use prost_types::Any;
31use tokio::io::AsyncWrite;
32use tokio::io::AsyncWriteExt;
33use tracing::instrument;
34
35use crate::datatypes::FieldsWithMeta;
36use crate::format::MAGIC;
37use crate::format::pb;
38use crate::format::pbfile;
39use crate::format::pbfile::DirectEncoding;
40
41pub(crate) const PAGE_BUFFER_ALIGNMENT: usize = 64;
43const PAD_BUFFER: [u8; PAGE_BUFFER_ALIGNMENT] = [72; PAGE_BUFFER_ALIGNMENT];
44const MAX_PAGE_BYTES: usize = 32 * 1024 * 1024;
50const ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES: &str = "LANCE_FILE_WRITER_MAX_PAGE_BYTES";
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub struct FileWriteSummary {
55 pub num_rows: u64,
57 pub size_bytes: u64,
59}
60
61#[derive(Debug, Clone, Default)]
62pub struct FileWriterOptions {
63 pub data_cache_bytes: Option<u64>,
78 pub max_page_bytes: Option<u64>,
84 pub keep_original_array: Option<bool>,
102 pub encoding_strategy: Option<Arc<dyn FieldEncodingStrategy>>,
103 pub format_version: Option<LanceFileVersion>,
109}
110
111const DEFAULT_SPILL_BUFFER_LIMIT: usize = 256 * 1024;
114
115struct PageMetadataSpill {
125 writer: Box<dyn Writer>,
126 object_store: Arc<ObjectStore>,
127 path: Path,
128 position: u64,
130 column_buffers: Vec<Vec<u8>>,
133 column_chunks: Vec<Vec<(u64, u32)>>,
136 per_column_limit: usize,
138}
139
140impl PageMetadataSpill {
141 async fn new(object_store: Arc<ObjectStore>, path: Path, num_columns: usize) -> Result<Self> {
142 let writer = object_store.create(&path).await?;
143 let per_column_limit = (DEFAULT_SPILL_BUFFER_LIMIT / num_columns.max(1)).max(64);
144 Ok(Self {
145 writer,
146 object_store,
147 path,
148 position: 0,
149 column_buffers: vec![Vec::new(); num_columns],
150 column_chunks: vec![Vec::new(); num_columns],
151 per_column_limit,
152 })
153 }
154
155 async fn append_page(
156 &mut self,
157 column_idx: usize,
158 page: &pbfile::column_metadata::Page,
159 ) -> Result<()> {
160 page.encode_length_delimited(&mut self.column_buffers[column_idx])
161 .map_err(|e| {
162 Error::io_source(Box::new(std::io::Error::new(
163 std::io::ErrorKind::InvalidData,
164 e,
165 )))
166 })?;
167 if self.column_buffers[column_idx].len() >= self.per_column_limit {
168 self.flush_column(column_idx).await?;
169 }
170 Ok(())
171 }
172
173 async fn flush_column(&mut self, column_idx: usize) -> Result<()> {
174 let buf = &self.column_buffers[column_idx];
175 if buf.is_empty() {
176 return Ok(());
177 }
178 let len = buf.len();
179 self.writer.write_all(buf).await?;
180 self.column_chunks[column_idx].push((self.position, len as u32));
181 self.position += len as u64;
182 self.column_buffers[column_idx].clear();
183 Ok(())
184 }
185
186 async fn shutdown_writer(&mut self) -> Result<()> {
187 for col_idx in 0..self.column_buffers.len() {
188 self.flush_column(col_idx).await?;
189 }
190 Writer::shutdown(self.writer.as_mut()).await?;
191 Ok(())
192 }
193}
194
195fn decode_spilled_chunk(data: &Bytes) -> Result<Vec<pbfile::column_metadata::Page>> {
196 let mut pages = Vec::new();
197 let mut cursor = data.clone();
198 while cursor.has_remaining() {
199 let page =
200 pbfile::column_metadata::Page::decode_length_delimited(&mut cursor).map_err(|e| {
201 Error::io_source(Box::new(std::io::Error::new(
202 std::io::ErrorKind::InvalidData,
203 e,
204 )))
205 })?;
206 pages.push(page);
207 }
208 Ok(pages)
209}
210
211enum PageSpillState {
212 Pending(Arc<ObjectStore>, Path),
213 Active(PageMetadataSpill),
214}
215
216pub struct FileWriter {
217 writer: Box<dyn Writer>,
218 schema: Option<LanceSchema>,
219 column_writers: Vec<Box<dyn FieldEncoder>>,
220 column_metadata: Vec<pbfile::ColumnMetadata>,
221 field_id_to_column_indices: Vec<(u32, u32)>,
222 num_columns: u32,
223 rows_written: u64,
224 global_buffers: Vec<(u64, u64)>,
225 schema_metadata: HashMap<String, String>,
226 options: FileWriterOptions,
227 page_spill: Option<PageSpillState>,
228}
229
230fn initial_column_metadata() -> pbfile::ColumnMetadata {
231 pbfile::ColumnMetadata {
232 pages: Vec::new(),
233 buffer_offsets: Vec::new(),
234 buffer_sizes: Vec::new(),
235 encoding: None,
236 }
237}
238
239static WARNED_ON_UNSTABLE_API: AtomicBool = AtomicBool::new(false);
240
241impl FileWriter {
242 pub fn try_new(
244 object_writer: Box<dyn Writer>,
245 schema: LanceSchema,
246 options: FileWriterOptions,
247 ) -> Result<Self> {
248 let mut writer = Self::new_lazy(object_writer, options);
249 writer.initialize(schema)?;
250 Ok(writer)
251 }
252
253 pub fn new_lazy(object_writer: Box<dyn Writer>, options: FileWriterOptions) -> Self {
258 if let Some(format_version) = options.format_version
259 && format_version.is_unstable()
260 && WARNED_ON_UNSTABLE_API
261 .compare_exchange(
262 false,
263 true,
264 std::sync::atomic::Ordering::Relaxed,
265 std::sync::atomic::Ordering::Relaxed,
266 )
267 .is_ok()
268 {
269 warn!(
270 "You have requested an unstable format version. Files written with this format version may not be readable in the future! This is a development feature and should only be used for experimentation and never for production data."
271 );
272 }
273 Self {
274 writer: object_writer,
275 schema: None,
276 column_writers: Vec::new(),
277 column_metadata: Vec::new(),
278 num_columns: 0,
279 rows_written: 0,
280 field_id_to_column_indices: Vec::new(),
281 global_buffers: Vec::new(),
282 schema_metadata: HashMap::new(),
283 page_spill: None,
284 options,
285 }
286 }
287
288 pub fn with_page_metadata_spill(mut self, object_store: Arc<ObjectStore>, path: Path) -> Self {
296 self.page_spill = Some(PageSpillState::Pending(object_store, path));
297 self
298 }
299
300 pub async fn create_file_with_batches(
304 store: &ObjectStore,
305 path: &Path,
306 schema: lance_core::datatypes::Schema,
307 batches: impl Iterator<Item = RecordBatch> + Send,
308 options: FileWriterOptions,
309 ) -> Result<usize> {
310 let writer = store.create(path).await?;
311 let mut writer = Self::try_new(writer, schema, options)?;
312 for batch in batches {
313 writer.write_batch(&batch).await?;
314 }
315 Ok(writer.finish().await?.num_rows as usize)
316 }
317
318 async fn do_write_buffer(writer: &mut (impl AsyncWrite + Unpin), buf: &[u8]) -> Result<()> {
319 writer.write_all(buf).await?;
320 let pad_bytes = pad_bytes::<PAGE_BUFFER_ALIGNMENT>(buf.len());
321 writer.write_all(&PAD_BUFFER[..pad_bytes]).await?;
322 Ok(())
323 }
324
325 pub fn version(&self) -> LanceFileVersion {
327 self.options.format_version.unwrap_or_default()
328 }
329
330 async fn write_page(&mut self, encoded_page: EncodedPage) -> Result<()> {
331 let buffers = encoded_page.data;
332 let mut buffer_offsets = Vec::with_capacity(buffers.len());
333 let mut buffer_sizes = Vec::with_capacity(buffers.len());
334 for buffer in buffers {
335 buffer_offsets.push(self.writer.tell().await? as u64);
336 buffer_sizes.push(buffer.len() as u64);
337 Self::do_write_buffer(&mut self.writer, &buffer).await?;
338 }
339 let encoded_encoding = match encoded_page.description {
340 PageEncoding::Legacy(array_encoding) => Any::from_msg(&array_encoding)?.encode_to_vec(),
341 PageEncoding::Structural(page_layout) => Any::from_msg(&page_layout)?.encode_to_vec(),
342 };
343 let page = pbfile::column_metadata::Page {
344 buffer_offsets,
345 buffer_sizes,
346 encoding: Some(pbfile::Encoding {
347 location: Some(pbfile::encoding::Location::Direct(DirectEncoding {
348 encoding: encoded_encoding,
349 })),
350 }),
351 length: encoded_page.num_rows,
352 priority: encoded_page.row_number,
353 };
354 let col_idx = encoded_page.column_idx as usize;
355 if matches!(&self.page_spill, Some(PageSpillState::Pending(..))) {
356 let Some(PageSpillState::Pending(store, path)) = self.page_spill.take() else {
357 unreachable!()
358 };
359 self.page_spill = Some(PageSpillState::Active(
360 PageMetadataSpill::new(store, path, self.num_columns as usize).await?,
361 ));
362 }
363 match &mut self.page_spill {
364 Some(PageSpillState::Active(spill)) => spill.append_page(col_idx, &page).await?,
365 None => self.column_metadata[col_idx].pages.push(page),
366 Some(PageSpillState::Pending(..)) => unreachable!(),
367 }
368 Ok(())
369 }
370
371 #[instrument(skip_all, level = "debug")]
372 async fn write_pages(&mut self, mut encoding_tasks: FuturesOrdered<EncodeTask>) -> Result<()> {
373 while let Some(encoding_task) = encoding_tasks.next().await {
382 let encoded_page = encoding_task?;
383 self.write_page(encoded_page).await?;
384 }
385 self.writer.flush().await?;
390 Ok(())
391 }
392
393 pub async fn write_batches(
395 &mut self,
396 batches: impl Iterator<Item = &RecordBatch>,
397 ) -> Result<()> {
398 for batch in batches {
399 self.write_batch(batch).await?;
400 }
401 Ok(())
402 }
403
404 fn verify_field_nullability(arr: &ArrayData, field: &Field) -> Result<()> {
405 if !field.nullable && arr.null_count() > 0 {
406 return Err(Error::invalid_input(format!(
407 "The field `{}` contained null values even though the field is marked non-null in the schema",
408 field.name
409 )));
410 }
411
412 for (child_field, child_arr) in field.children.iter().zip(arr.child_data()) {
413 Self::verify_field_nullability(child_arr, child_field)?;
414 }
415
416 Ok(())
417 }
418
419 fn verify_nullability_constraints(&self, batch: &RecordBatch) -> Result<()> {
420 for (col, field) in batch
421 .columns()
422 .iter()
423 .zip(self.schema.as_ref().unwrap().fields.iter())
424 {
425 Self::verify_field_nullability(&col.to_data(), field)?;
426 }
427 Ok(())
428 }
429
430 fn initialize(&mut self, mut schema: LanceSchema) -> Result<()> {
431 let cache_bytes_per_column = if let Some(data_cache_bytes) = self.options.data_cache_bytes {
432 data_cache_bytes / schema.fields.len() as u64
433 } else {
434 8 * 1024 * 1024
435 };
436
437 let max_page_bytes = self.options.max_page_bytes.unwrap_or_else(|| {
438 std::env::var(ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES)
439 .map(|s| {
440 s.parse::<u64>().unwrap_or_else(|e| {
441 warn!(
442 "Failed to parse {}: {}, using default",
443 ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, e
444 );
445 MAX_PAGE_BYTES as u64
446 })
447 })
448 .unwrap_or(MAX_PAGE_BYTES as u64)
449 });
450
451 schema.validate()?;
452
453 let keep_original_array = self.options.keep_original_array.unwrap_or(false);
454 let encoding_strategy = self.options.encoding_strategy.clone().unwrap_or_else(|| {
455 let version = self.version();
456 default_encoding_strategy(version).into()
457 });
458
459 let encoding_options = EncodingOptions {
460 cache_bytes_per_column,
461 max_page_bytes,
462 keep_original_array,
463 buffer_alignment: PAGE_BUFFER_ALIGNMENT as u64,
464 version: self.version(),
465 };
466 let encoder =
467 BatchEncoder::try_new(&schema, encoding_strategy.as_ref(), &encoding_options)?;
468 self.num_columns = encoder.num_columns();
469
470 self.column_writers = encoder.field_encoders;
471 self.column_metadata = vec![initial_column_metadata(); self.num_columns as usize];
472 self.field_id_to_column_indices = encoder.field_id_to_column_index;
473 self.schema_metadata
474 .extend(std::mem::take(&mut schema.metadata));
475 self.schema = Some(schema);
476 Ok(())
477 }
478
479 fn ensure_initialized(&mut self, batch: &RecordBatch) -> Result<&LanceSchema> {
480 if self.schema.is_none() {
481 let schema = LanceSchema::try_from(batch.schema().as_ref())?;
482 self.initialize(schema)?;
483 }
484 Ok(self.schema.as_ref().unwrap())
485 }
486
487 #[instrument(skip_all, level = "debug")]
488 fn encode_batch(
489 &mut self,
490 batch: &RecordBatch,
491 external_buffers: &mut OutOfLineBuffers,
492 ) -> Result<Vec<Vec<EncodeTask>>> {
493 self.schema
494 .as_ref()
495 .unwrap()
496 .fields
497 .iter()
498 .zip(self.column_writers.iter_mut())
499 .map(|(field, column_writer)| {
500 let array =
501 batch
502 .column_by_name(&field.name)
503 .ok_or(Error::invalid_input_source(
504 format!(
505 "Cannot write batch. The batch was missing the column `{}`",
506 field.name
507 )
508 .into(),
509 ))?;
510 let repdef = RepDefBuilder::default();
511 let num_rows = array.len() as u64;
512 column_writer.maybe_encode(
513 array.clone(),
514 external_buffers,
515 repdef,
516 self.rows_written,
517 num_rows,
518 )
519 })
520 .collect::<Result<Vec<_>>>()
521 }
522
523 pub async fn write_batch(&mut self, batch: &RecordBatch) -> Result<()> {
528 debug!(
529 "write_batch called with {} rows, {} columns, and {} bytes of data",
530 batch.num_rows(),
531 batch.num_columns(),
532 batch.get_array_memory_size()
533 );
534 self.ensure_initialized(batch)?;
535 self.verify_nullability_constraints(batch)?;
536 let num_rows = batch.num_rows() as u64;
537 if num_rows == 0 {
538 return Ok(());
539 }
540 if num_rows > u32::MAX as u64 {
541 return Err(Error::invalid_input_source(
542 "cannot write Lance files with more than 2^32 rows".into(),
543 ));
544 }
545 let mut external_buffers =
548 OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
549 let encoding_tasks = self.encode_batch(batch, &mut external_buffers)?;
550 for external_buffer in external_buffers.take_buffers() {
552 Self::do_write_buffer(&mut self.writer, &external_buffer).await?;
553 }
554
555 let encoding_tasks = encoding_tasks
556 .into_iter()
557 .flatten()
558 .collect::<FuturesOrdered<_>>();
559
560 self.rows_written = match self.rows_written.checked_add(batch.num_rows() as u64) {
561 Some(rows_written) => rows_written,
562 None => {
563 return Err(Error::invalid_input_source(format!("cannot write batch with {} rows because {} rows have already been written and Lance files cannot contain more than 2^64 rows", num_rows, self.rows_written).into()));
564 }
565 };
566
567 self.write_pages(encoding_tasks).await?;
568
569 Ok(())
570 }
571
572 async fn write_column_metadata(
573 &mut self,
574 metadata: pbfile::ColumnMetadata,
575 ) -> Result<(u64, u64)> {
576 let metadata_bytes = metadata.encode_to_vec();
577 let position = self.writer.tell().await? as u64;
578 let len = metadata_bytes.len() as u64;
579 self.writer.write_all(&metadata_bytes).await?;
580 Ok((position, len))
581 }
582
583 async fn write_column_metadatas(&mut self) -> Result<Vec<(u64, u64)>> {
584 let metadatas = std::mem::take(&mut self.column_metadata);
585
586 let spill_state = self.page_spill.take();
590 let (spill_chunks, spill_reader) =
591 if let Some(PageSpillState::Active(mut spill)) = spill_state {
592 spill.shutdown_writer().await?;
593 let reader = spill.object_store.open(&spill.path).await?;
594 let chunks = std::mem::take(&mut spill.column_chunks);
595 (chunks, Some(reader))
596 } else {
597 (Vec::new(), None)
598 };
599
600 let mut metadata_positions = Vec::with_capacity(metadatas.len());
601 for (col_idx, mut metadata) in metadatas.into_iter().enumerate() {
602 if let Some(reader) = &spill_reader {
603 let mut pages = Vec::new();
604 for &(offset, len) in &spill_chunks[col_idx] {
605 let data = reader
606 .get_range(offset as usize..(offset as usize + len as usize))
607 .await
608 .map_err(|e| Error::io_source(Box::new(e)))?;
609 pages.extend(decode_spilled_chunk(&data)?);
610 }
611 metadata.pages = pages;
612 }
613 metadata_positions.push(self.write_column_metadata(metadata).await?);
614 }
615
616 Ok(metadata_positions)
617 }
618
619 fn make_file_descriptor(
620 schema: &lance_core::datatypes::Schema,
621 num_rows: u64,
622 ) -> Result<pb::FileDescriptor> {
623 let fields_with_meta = FieldsWithMeta::from(schema);
624 Ok(pb::FileDescriptor {
625 schema: Some(pb::Schema {
626 fields: fields_with_meta.fields.0,
627 metadata: fields_with_meta.metadata,
628 }),
629 length: num_rows,
630 })
631 }
632
633 async fn write_global_buffers(&mut self) -> Result<Vec<(u64, u64)>> {
634 let schema = self.schema.as_mut().ok_or(Error::invalid_input("No schema provided on writer open and no data provided. Schema is unknown and file cannot be created"))?;
635 schema.metadata = std::mem::take(&mut self.schema_metadata);
636 schema
638 .fields
639 .iter_mut()
640 .for_each(|f| f.unload_blobs_recursive());
641
642 let file_descriptor = Self::make_file_descriptor(schema, self.rows_written)?;
643 let file_descriptor_bytes = file_descriptor.encode_to_vec();
644 let file_descriptor_len = file_descriptor_bytes.len() as u64;
645 let file_descriptor_position = self.writer.tell().await? as u64;
646 self.writer.write_all(&file_descriptor_bytes).await?;
647 let mut gbo_table = Vec::with_capacity(1 + self.global_buffers.len());
648 gbo_table.push((file_descriptor_position, file_descriptor_len));
649 gbo_table.append(&mut self.global_buffers);
650 Ok(gbo_table)
651 }
652
653 pub fn add_schema_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
659 self.schema_metadata.insert(key.into(), value.into());
660 }
661
662 pub fn initialize_with_external_metadata(
671 &mut self,
672 schema: lance_core::datatypes::Schema,
673 column_metadata: Vec<pbfile::ColumnMetadata>,
674 rows_written: u64,
675 ) {
676 self.schema = Some(schema);
677 self.num_columns = column_metadata.len() as u32;
678 self.column_metadata = column_metadata;
679 self.rows_written = rows_written;
680 }
681
682 pub async fn add_global_buffer(&mut self, buffer: Bytes) -> Result<u32> {
688 let position = self.writer.tell().await? as u64;
689 let len = buffer.len() as u64;
690 Self::do_write_buffer(&mut self.writer, &buffer).await?;
691 self.global_buffers.push((position, len));
692 Ok(self.global_buffers.len() as u32)
693 }
694
695 async fn finish_writers(&mut self) -> Result<()> {
696 let mut col_idx = 0;
697 for mut writer in std::mem::take(&mut self.column_writers) {
698 let mut external_buffers =
699 OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
700 let columns = writer.finish(&mut external_buffers).await?;
701 for buffer in external_buffers.take_buffers() {
702 self.writer.write_all(&buffer).await?;
703 }
704 debug_assert_eq!(
705 columns.len(),
706 writer.num_columns() as usize,
707 "Expected {} columns from column at index {} and got {}",
708 writer.num_columns(),
709 col_idx,
710 columns.len()
711 );
712 for column in columns {
713 for page in column.final_pages {
714 self.write_page(page).await?;
715 }
716 let column_metadata = &mut self.column_metadata[col_idx];
717 let mut buffer_pos = self.writer.tell().await? as u64;
718 for buffer in column.column_buffers {
719 column_metadata.buffer_offsets.push(buffer_pos);
720 let mut size = 0;
721 Self::do_write_buffer(&mut self.writer, &buffer).await?;
722 size += buffer.len() as u64;
723 buffer_pos += size;
724 column_metadata.buffer_sizes.push(size);
725 }
726 let encoded_encoding = Any::from_msg(&column.encoding)?.encode_to_vec();
727 column_metadata.encoding = Some(pbfile::Encoding {
728 location: Some(pbfile::encoding::Location::Direct(pbfile::DirectEncoding {
729 encoding: encoded_encoding,
730 })),
731 });
732 col_idx += 1;
733 }
734 }
735 if col_idx != self.column_metadata.len() {
736 panic!(
737 "Column writers finished with {} columns but we expected {}",
738 col_idx,
739 self.column_metadata.len()
740 );
741 }
742 Ok(())
743 }
744
745 fn version_to_numbers(&self) -> (u16, u16) {
748 let version = self.options.format_version.unwrap_or_default();
749 match version.resolve() {
750 LanceFileVersion::V2_0 => (0, 3),
751 LanceFileVersion::V2_1 => (2, 1),
752 LanceFileVersion::V2_2 => (2, 2),
753 LanceFileVersion::V2_3 => (2, 3),
754 _ => panic!("Unsupported version: {}", version),
755 }
756 }
757
758 pub async fn finish(&mut self) -> Result<FileWriteSummary> {
766 let mut external_buffers =
768 OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
769 let encoding_tasks = self
770 .column_writers
771 .iter_mut()
772 .map(|writer| writer.flush(&mut external_buffers))
773 .collect::<Result<Vec<_>>>()?;
774 for external_buffer in external_buffers.take_buffers() {
775 Self::do_write_buffer(&mut self.writer, &external_buffer).await?;
776 }
777 let encoding_tasks = encoding_tasks
778 .into_iter()
779 .flatten()
780 .collect::<FuturesOrdered<_>>();
781 self.write_pages(encoding_tasks).await?;
782
783 if !self.column_writers.is_empty() {
784 self.finish_writers().await?;
785 }
786
787 let global_buffer_offsets = self.write_global_buffers().await?;
789 let num_global_buffers = global_buffer_offsets.len() as u32;
790
791 let column_metadata_start = self.writer.tell().await? as u64;
793 let metadata_positions = self.write_column_metadatas().await?;
794
795 let cmo_table_start = self.writer.tell().await? as u64;
797 for (meta_pos, meta_len) in metadata_positions {
798 self.writer.write_u64_le(meta_pos).await?;
799 self.writer.write_u64_le(meta_len).await?;
800 }
801
802 let gbo_table_start = self.writer.tell().await? as u64;
804 for (gbo_pos, gbo_len) in global_buffer_offsets {
805 self.writer.write_u64_le(gbo_pos).await?;
806 self.writer.write_u64_le(gbo_len).await?;
807 }
808
809 let (major, minor) = self.version_to_numbers();
810 self.writer.write_u64_le(column_metadata_start).await?;
812 self.writer.write_u64_le(cmo_table_start).await?;
813 self.writer.write_u64_le(gbo_table_start).await?;
814 self.writer.write_u32_le(num_global_buffers).await?;
815 self.writer.write_u32_le(self.num_columns).await?;
816 self.writer.write_u16_le(major).await?;
817 self.writer.write_u16_le(minor).await?;
818 self.writer.write_all(MAGIC).await?;
819
820 let write_result = Writer::shutdown(self.writer.as_mut()).await?;
822
823 Ok(FileWriteSummary {
824 num_rows: self.rows_written,
825 size_bytes: write_result.size as u64,
826 })
827 }
828
829 pub async fn abort(&mut self) {
830 }
833
834 pub async fn tell(&mut self) -> Result<u64> {
835 Ok(self.writer.tell().await? as u64)
836 }
837
838 pub fn field_id_to_column_indices(&self) -> &[(u32, u32)] {
839 &self.field_id_to_column_indices
840 }
841}
842
843pub trait EncodedBatchWriteExt {
846 fn try_to_self_described_lance(&self, version: LanceFileVersion) -> Result<Bytes>;
848 fn try_to_mini_lance(&self, version: LanceFileVersion) -> Result<Bytes>;
852}
853
854fn concat_lance_footer(
859 batch: &EncodedBatch,
860 write_schema: bool,
861 version: LanceFileVersion,
862) -> Result<Bytes> {
863 let mut data = BytesMut::with_capacity(batch.data.len() + 1024 * 1024);
865 data.put(batch.data.clone());
866 let global_buffers = if write_schema {
868 let schema_start = data.len() as u64;
869 let lance_schema = lance_core::datatypes::Schema::try_from(batch.schema.as_ref())?;
870 let descriptor = FileWriter::make_file_descriptor(&lance_schema, batch.num_rows)?;
871 let descriptor_bytes = descriptor.encode_to_vec();
872 let descriptor_len = descriptor_bytes.len() as u64;
873 data.put(descriptor_bytes.as_slice());
874
875 vec![(schema_start, descriptor_len)]
876 } else {
877 vec![]
878 };
879 let col_metadata_start = data.len() as u64;
880
881 let mut col_metadata_positions = Vec::new();
882 for col in &batch.page_table {
884 let position = data.len() as u64;
885 let pages = col
886 .page_infos
887 .iter()
888 .map(|page_info| {
889 let encoded_encoding = match &page_info.encoding {
890 PageEncoding::Legacy(array_encoding) => {
891 Any::from_msg(array_encoding)?.encode_to_vec()
892 }
893 PageEncoding::Structural(page_layout) => {
894 Any::from_msg(page_layout)?.encode_to_vec()
895 }
896 };
897 let (buffer_offsets, buffer_sizes): (Vec<_>, Vec<_>) = page_info
898 .buffer_offsets_and_sizes
899 .as_ref()
900 .iter()
901 .cloned()
902 .unzip();
903 Ok(pbfile::column_metadata::Page {
904 buffer_offsets,
905 buffer_sizes,
906 encoding: Some(pbfile::Encoding {
907 location: Some(pbfile::encoding::Location::Direct(DirectEncoding {
908 encoding: encoded_encoding,
909 })),
910 }),
911 length: page_info.num_rows,
912 priority: page_info.priority,
913 })
914 })
915 .collect::<Result<Vec<_>>>()?;
916 let (buffer_offsets, buffer_sizes): (Vec<_>, Vec<_>) =
917 col.buffer_offsets_and_sizes.iter().cloned().unzip();
918 let encoded_col_encoding = Any::from_msg(&col.encoding)?.encode_to_vec();
919 let column = pbfile::ColumnMetadata {
920 pages,
921 buffer_offsets,
922 buffer_sizes,
923 encoding: Some(pbfile::Encoding {
924 location: Some(pbfile::encoding::Location::Direct(pbfile::DirectEncoding {
925 encoding: encoded_col_encoding,
926 })),
927 }),
928 };
929 let column_bytes = column.encode_to_vec();
930 col_metadata_positions.push((position, column_bytes.len() as u64));
931 data.put(column_bytes.as_slice());
932 }
933 let cmo_table_start = data.len() as u64;
935 for (meta_pos, meta_len) in col_metadata_positions {
936 data.put_u64_le(meta_pos);
937 data.put_u64_le(meta_len);
938 }
939 let gbo_table_start = data.len() as u64;
941 let num_global_buffers = global_buffers.len() as u32;
942 for (gbo_pos, gbo_len) in global_buffers {
943 data.put_u64_le(gbo_pos);
944 data.put_u64_le(gbo_len);
945 }
946
947 let (major, minor) = version.to_numbers();
948
949 data.put_u64_le(col_metadata_start);
951 data.put_u64_le(cmo_table_start);
952 data.put_u64_le(gbo_table_start);
953 data.put_u32_le(num_global_buffers);
954 data.put_u32_le(batch.page_table.len() as u32);
955 data.put_u16_le(major as u16);
956 data.put_u16_le(minor as u16);
957 data.put(MAGIC.as_slice());
958
959 Ok(data.freeze())
960}
961
962impl EncodedBatchWriteExt for EncodedBatch {
963 fn try_to_self_described_lance(&self, version: LanceFileVersion) -> Result<Bytes> {
964 concat_lance_footer(self, true, version)
965 }
966
967 fn try_to_mini_lance(&self, version: LanceFileVersion) -> Result<Bytes> {
968 concat_lance_footer(self, false, version)
969 }
970}
971
972#[cfg(test)]
973mod tests {
974 use std::collections::HashMap;
975 use std::sync::Arc;
976
977 use crate::reader::{FileReader, FileReaderOptions, describe_encoding};
978 use crate::testing::FsFixture;
979 use crate::writer::{ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, FileWriter, FileWriterOptions};
980 use arrow_array::builder::{Float32Builder, Int32Builder};
981 use arrow_array::{Int32Array, RecordBatch, UInt64Array};
982 use arrow_array::{RecordBatchReader, StringArray, types::Float64Type};
983 use arrow_schema::{DataType, Field, Field as ArrowField, Schema, Schema as ArrowSchema};
984 use lance_core::cache::LanceCache;
985 use lance_core::datatypes::Schema as LanceSchema;
986 use lance_core::utils::tempfile::TempObjFile;
987 use lance_datagen::{BatchCount, RowCount, array, gen_batch};
988 use lance_encoding::compression_config::{CompressionFieldParams, CompressionParams};
989 use lance_encoding::decoder::DecoderPlugins;
990 use lance_encoding::version::LanceFileVersion;
991 use lance_io::object_store::ObjectStore;
992 use lance_io::utils::CachedFileSize;
993
994 #[tokio::test]
995 async fn test_basic_write() {
996 let tmp_path = TempObjFile::default();
997 let obj_store = Arc::new(ObjectStore::local());
998
999 let reader = gen_batch()
1000 .col("score", array::rand::<Float64Type>())
1001 .into_reader_rows(RowCount::from(1000), BatchCount::from(10));
1002
1003 let writer = obj_store.create(&tmp_path).await.unwrap();
1004
1005 let lance_schema =
1006 lance_core::datatypes::Schema::try_from(reader.schema().as_ref()).unwrap();
1007
1008 let mut file_writer =
1009 FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
1010
1011 for batch in reader {
1012 file_writer.write_batch(&batch.unwrap()).await.unwrap();
1013 }
1014 file_writer.add_schema_metadata("foo", "bar");
1015 file_writer.finish().await.unwrap();
1016 }
1018
1019 #[tokio::test]
1020 async fn test_write_empty() {
1021 let tmp_path = TempObjFile::default();
1022 let obj_store = Arc::new(ObjectStore::local());
1023
1024 let reader = gen_batch()
1025 .col("score", array::rand::<Float64Type>())
1026 .into_reader_rows(RowCount::from(0), BatchCount::from(0));
1027
1028 let writer = obj_store.create(&tmp_path).await.unwrap();
1029
1030 let lance_schema =
1031 lance_core::datatypes::Schema::try_from(reader.schema().as_ref()).unwrap();
1032
1033 let mut file_writer =
1034 FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
1035
1036 for batch in reader {
1037 file_writer.write_batch(&batch.unwrap()).await.unwrap();
1038 }
1039 file_writer.add_schema_metadata("foo", "bar");
1040 file_writer.finish().await.unwrap();
1041 }
1042
1043 #[tokio::test]
1044 async fn test_max_page_bytes_enforced() {
1045 let arrow_field = Field::new("data", DataType::UInt64, false);
1046 let arrow_schema = Schema::new(vec![arrow_field]);
1047 let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();
1048
1049 let data: Vec<u64> = (0..1_000_000).collect();
1051 let array = UInt64Array::from(data);
1052 let batch =
1053 RecordBatch::try_new(arrow_schema.clone().into(), vec![Arc::new(array)]).unwrap();
1054
1055 let options = FileWriterOptions {
1056 max_page_bytes: Some(1024 * 1024), format_version: Some(LanceFileVersion::V2_0),
1059 ..Default::default()
1060 };
1061
1062 let path = TempObjFile::default();
1063 let object_store = ObjectStore::local();
1064 let mut writer = FileWriter::try_new(
1065 object_store.create(&path).await.unwrap(),
1066 lance_schema,
1067 options,
1068 )
1069 .unwrap();
1070
1071 writer.write_batch(&batch).await.unwrap();
1072 writer.finish().await.unwrap();
1073
1074 let fs = FsFixture::default();
1075 let file_scheduler = fs
1076 .scheduler
1077 .open_file(&path, &CachedFileSize::unknown())
1078 .await
1079 .unwrap();
1080 let file_reader = FileReader::try_open(
1081 file_scheduler,
1082 None,
1083 Arc::<DecoderPlugins>::default(),
1084 &LanceCache::no_cache(),
1085 FileReaderOptions::default(),
1086 )
1087 .await
1088 .unwrap();
1089
1090 let column_meta = file_reader.metadata();
1091
1092 let mut total_page_num: u32 = 0;
1093 for (col_idx, col_metadata) in column_meta.column_metadatas.iter().enumerate() {
1094 assert!(
1095 !col_metadata.pages.is_empty(),
1096 "Column {} has no pages",
1097 col_idx
1098 );
1099
1100 for (page_idx, page) in col_metadata.pages.iter().enumerate() {
1101 total_page_num += 1;
1102 let total_size: u64 = page.buffer_sizes.iter().sum();
1103 assert!(
1104 total_size <= 1024 * 1024,
1105 "Column {} Page {} size {} exceeds 1MB limit",
1106 col_idx,
1107 page_idx,
1108 total_size
1109 );
1110 }
1111 }
1112
1113 assert_eq!(total_page_num, 8)
1114 }
1115
1116 #[tokio::test(flavor = "current_thread")]
1117 async fn test_max_page_bytes_env_var() {
1118 let arrow_field = Field::new("data", DataType::UInt64, false);
1119 let arrow_schema = Schema::new(vec![arrow_field]);
1120 let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();
1121 let data: Vec<u64> = (0..500_000).collect();
1123 let array = UInt64Array::from(data);
1124 let batch =
1125 RecordBatch::try_new(arrow_schema.clone().into(), vec![Arc::new(array)]).unwrap();
1126
1127 unsafe {
1129 std::env::set_var(ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, "2097152");
1130 }
1131
1132 let options = FileWriterOptions {
1133 max_page_bytes: None, ..Default::default()
1135 };
1136
1137 let path = TempObjFile::default();
1138 let object_store = ObjectStore::local();
1139 let mut writer = FileWriter::try_new(
1140 object_store.create(&path).await.unwrap(),
1141 lance_schema.clone(),
1142 options,
1143 )
1144 .unwrap();
1145
1146 writer.write_batch(&batch).await.unwrap();
1147 writer.finish().await.unwrap();
1148
1149 let fs = FsFixture::default();
1150 let file_scheduler = fs
1151 .scheduler
1152 .open_file(&path, &CachedFileSize::unknown())
1153 .await
1154 .unwrap();
1155 let file_reader = FileReader::try_open(
1156 file_scheduler,
1157 None,
1158 Arc::<DecoderPlugins>::default(),
1159 &LanceCache::no_cache(),
1160 FileReaderOptions::default(),
1161 )
1162 .await
1163 .unwrap();
1164
1165 for col_metadata in file_reader.metadata().column_metadatas.iter() {
1166 for page in col_metadata.pages.iter() {
1167 let total_size: u64 = page.buffer_sizes.iter().sum();
1168 assert!(
1169 total_size <= 2 * 1024 * 1024,
1170 "Page size {} exceeds 2MB limit",
1171 total_size
1172 );
1173 }
1174 }
1175
1176 unsafe {
1177 std::env::set_var(ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, "");
1178 }
1179 }
1180
1181 #[tokio::test]
1182 async fn test_compression_overrides_end_to_end() {
1183 let arrow_schema = Arc::new(ArrowSchema::new(vec![
1185 ArrowField::new("customer_id", DataType::Int32, false),
1186 ArrowField::new("product_id", DataType::Int32, false),
1187 ArrowField::new("quantity", DataType::Int32, false),
1188 ArrowField::new("price", DataType::Float32, false),
1189 ArrowField::new("description", DataType::Utf8, false),
1190 ]));
1191
1192 let lance_schema = LanceSchema::try_from(arrow_schema.as_ref()).unwrap();
1193
1194 let mut customer_ids = Int32Builder::new();
1196 let mut product_ids = Int32Builder::new();
1197 let mut quantities = Int32Builder::new();
1198 let mut prices = Float32Builder::new();
1199 let mut descriptions = Vec::new();
1200
1201 for i in 0..10000 {
1208 customer_ids.append_value(i / 100);
1211
1212 product_ids.append_value(i / 2000);
1214
1215 quantities.append_value(if i % 10 == 0 { 5 } else { 1 });
1217
1218 prices.append_value(match i % 3 {
1220 0 => 9.99,
1221 1 => 19.99,
1222 _ => 29.99,
1223 });
1224
1225 descriptions.push(format!("Product {}", i / 2000));
1227 }
1228
1229 let batch = RecordBatch::try_new(
1230 arrow_schema.clone(),
1231 vec![
1232 Arc::new(customer_ids.finish()),
1233 Arc::new(product_ids.finish()),
1234 Arc::new(quantities.finish()),
1235 Arc::new(prices.finish()),
1236 Arc::new(StringArray::from(descriptions)),
1237 ],
1238 )
1239 .unwrap();
1240
1241 let mut params = CompressionParams::new();
1243
1244 params.columns.insert(
1246 "*_id".to_string(),
1247 CompressionFieldParams {
1248 rle_threshold: Some(0.5), compression: None, compression_level: None,
1251 bss: Some(lance_encoding::compression_config::BssMode::Off), minichunk_size: None,
1253 },
1254 );
1255
1256 let encoding_strategy = lance_encoding::encoder::default_encoding_strategy_with_params(
1261 LanceFileVersion::V2_1,
1262 params,
1263 )
1264 .unwrap();
1265
1266 let options = FileWriterOptions {
1268 encoding_strategy: Some(Arc::from(encoding_strategy)),
1269 format_version: Some(LanceFileVersion::V2_1),
1270 max_page_bytes: Some(64 * 1024), ..Default::default()
1272 };
1273
1274 let path = TempObjFile::default();
1276 let object_store = ObjectStore::local();
1277
1278 let mut writer = FileWriter::try_new(
1279 object_store.create(&path).await.unwrap(),
1280 lance_schema.clone(),
1281 options,
1282 )
1283 .unwrap();
1284
1285 writer.write_batch(&batch).await.unwrap();
1286 writer.add_schema_metadata("compression_test", "configured_compression");
1287 writer.finish().await.unwrap();
1288
1289 let path_no_compression = TempObjFile::default();
1291 let default_options = FileWriterOptions {
1292 format_version: Some(LanceFileVersion::V2_1),
1293 max_page_bytes: Some(64 * 1024),
1294 ..Default::default()
1295 };
1296
1297 let mut writer_no_compression = FileWriter::try_new(
1298 object_store.create(&path_no_compression).await.unwrap(),
1299 lance_schema.clone(),
1300 default_options,
1301 )
1302 .unwrap();
1303
1304 writer_no_compression.write_batch(&batch).await.unwrap();
1305 writer_no_compression.finish().await.unwrap();
1306
1307 let fs = FsFixture::default();
1313 let file_scheduler = fs
1314 .scheduler
1315 .open_file(&path, &CachedFileSize::unknown())
1316 .await
1317 .unwrap();
1318
1319 let file_reader = FileReader::try_open(
1320 file_scheduler,
1321 None,
1322 Arc::<DecoderPlugins>::default(),
1323 &LanceCache::no_cache(),
1324 FileReaderOptions::default(),
1325 )
1326 .await
1327 .unwrap();
1328
1329 let metadata = file_reader.metadata();
1331 assert_eq!(metadata.major_version, 2);
1332 assert_eq!(metadata.minor_version, 1);
1333
1334 let schema = file_reader.schema();
1335 assert_eq!(
1336 schema.metadata.get("compression_test"),
1337 Some(&"configured_compression".to_string())
1338 );
1339
1340 let column_metadatas = &metadata.column_metadatas;
1342
1343 assert!(!column_metadatas[0].pages.is_empty());
1345 let customer_id_encoding = describe_encoding(&column_metadatas[0].pages[0]);
1346 assert!(
1347 customer_id_encoding.contains("RLE") || customer_id_encoding.contains("Rle"),
1348 "customer_id column should use RLE encoding due to '*_id' pattern match, but got: {}",
1349 customer_id_encoding
1350 );
1351
1352 assert!(!column_metadatas[1].pages.is_empty());
1354 let product_id_encoding = describe_encoding(&column_metadatas[1].pages[0]);
1355 assert!(
1356 product_id_encoding.contains("RLE") || product_id_encoding.contains("Rle"),
1357 "product_id column should use RLE encoding due to '*_id' pattern match, but got: {}",
1358 product_id_encoding
1359 );
1360 }
1361
1362 #[tokio::test]
1363 async fn test_field_metadata_compression() {
1364 let mut metadata = HashMap::new();
1366 metadata.insert(
1367 lance_encoding::constants::COMPRESSION_META_KEY.to_string(),
1368 "zstd".to_string(),
1369 );
1370 metadata.insert(
1371 lance_encoding::constants::COMPRESSION_LEVEL_META_KEY.to_string(),
1372 "6".to_string(),
1373 );
1374
1375 let arrow_schema = Arc::new(ArrowSchema::new(vec![
1376 ArrowField::new("id", DataType::Int32, false),
1377 ArrowField::new("text", DataType::Utf8, false).with_metadata(metadata.clone()),
1378 ArrowField::new("data", DataType::Int32, false).with_metadata(HashMap::from([(
1379 lance_encoding::constants::COMPRESSION_META_KEY.to_string(),
1380 "none".to_string(),
1381 )])),
1382 ]));
1383
1384 let lance_schema = LanceSchema::try_from(arrow_schema.as_ref()).unwrap();
1385
1386 let id_array = Int32Array::from_iter_values(0..1000);
1388 let text_array = StringArray::from_iter_values(
1389 (0..1000).map(|i| format!("test string {} repeated text", i)),
1390 );
1391 let data_array = Int32Array::from_iter_values((0..1000).map(|i| i * 2));
1392
1393 let batch = RecordBatch::try_new(
1394 arrow_schema.clone(),
1395 vec![
1396 Arc::new(id_array),
1397 Arc::new(text_array),
1398 Arc::new(data_array),
1399 ],
1400 )
1401 .unwrap();
1402
1403 let path = TempObjFile::default();
1404 let object_store = ObjectStore::local();
1405
1406 let params = CompressionParams::new();
1408 let encoding_strategy = lance_encoding::encoder::default_encoding_strategy_with_params(
1409 LanceFileVersion::V2_1,
1410 params,
1411 )
1412 .unwrap();
1413
1414 let options = FileWriterOptions {
1415 encoding_strategy: Some(Arc::from(encoding_strategy)),
1416 format_version: Some(LanceFileVersion::V2_1),
1417 ..Default::default()
1418 };
1419 let mut writer = FileWriter::try_new(
1420 object_store.create(&path).await.unwrap(),
1421 lance_schema.clone(),
1422 options,
1423 )
1424 .unwrap();
1425
1426 writer.write_batch(&batch).await.unwrap();
1427 writer.finish().await.unwrap();
1428
1429 let fs = FsFixture::default();
1431 let file_scheduler = fs
1432 .scheduler
1433 .open_file(&path, &CachedFileSize::unknown())
1434 .await
1435 .unwrap();
1436 let file_reader = FileReader::try_open(
1437 file_scheduler,
1438 None,
1439 Arc::<DecoderPlugins>::default(),
1440 &LanceCache::no_cache(),
1441 FileReaderOptions::default(),
1442 )
1443 .await
1444 .unwrap();
1445
1446 let column_metadatas = &file_reader.metadata().column_metadatas;
1447
1448 let text_encoding = describe_encoding(&column_metadatas[1].pages[0]);
1450 assert!(
1452 text_encoding.contains("Zstd"),
1453 "text column should use zstd compression from field metadata, but got: {}",
1454 text_encoding
1455 );
1456
1457 let data_encoding = describe_encoding(&column_metadatas[2].pages[0]);
1459 assert!(
1461 data_encoding.contains("Flat") && data_encoding.contains("compression: None"),
1462 "data column should use no compression from field metadata, but got: {}",
1463 data_encoding
1464 );
1465 }
1466
1467 #[tokio::test]
1468 async fn test_field_metadata_rle_threshold() {
1469 let mut metadata = HashMap::new();
1471 metadata.insert(
1472 lance_encoding::constants::RLE_THRESHOLD_META_KEY.to_string(),
1473 "0.9".to_string(),
1474 );
1475 metadata.insert(
1477 lance_encoding::constants::COMPRESSION_META_KEY.to_string(),
1478 "lz4".to_string(),
1479 );
1480 metadata.insert(
1482 lance_encoding::constants::BSS_META_KEY.to_string(),
1483 "off".to_string(),
1484 );
1485
1486 let arrow_schema = Arc::new(ArrowSchema::new(vec![
1487 ArrowField::new("status", DataType::Int32, false).with_metadata(metadata),
1488 ]));
1489
1490 let lance_schema = LanceSchema::try_from(arrow_schema.as_ref()).unwrap();
1491
1492 let status_array = Int32Array::from_iter_values(
1494 std::iter::repeat_n(200, 8000)
1495 .chain(std::iter::repeat_n(404, 1500))
1496 .chain(std::iter::repeat_n(500, 500)),
1497 );
1498
1499 let batch =
1500 RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(status_array)]).unwrap();
1501
1502 let path = TempObjFile::default();
1503 let object_store = ObjectStore::local();
1504
1505 let params = CompressionParams::new();
1507 let encoding_strategy = lance_encoding::encoder::default_encoding_strategy_with_params(
1508 LanceFileVersion::V2_1,
1509 params,
1510 )
1511 .unwrap();
1512
1513 let options = FileWriterOptions {
1514 encoding_strategy: Some(Arc::from(encoding_strategy)),
1515 format_version: Some(LanceFileVersion::V2_1),
1516 ..Default::default()
1517 };
1518 let mut writer = FileWriter::try_new(
1519 object_store.create(&path).await.unwrap(),
1520 lance_schema.clone(),
1521 options,
1522 )
1523 .unwrap();
1524
1525 writer.write_batch(&batch).await.unwrap();
1526 writer.finish().await.unwrap();
1527
1528 let fs = FsFixture::default();
1530 let file_scheduler = fs
1531 .scheduler
1532 .open_file(&path, &CachedFileSize::unknown())
1533 .await
1534 .unwrap();
1535 let file_reader = FileReader::try_open(
1536 file_scheduler,
1537 None,
1538 Arc::<DecoderPlugins>::default(),
1539 &LanceCache::no_cache(),
1540 FileReaderOptions::default(),
1541 )
1542 .await
1543 .unwrap();
1544
1545 let column_metadatas = &file_reader.metadata().column_metadatas;
1546 let status_encoding = describe_encoding(&column_metadatas[0].pages[0]);
1547 assert!(
1548 status_encoding.contains("RLE") || status_encoding.contains("Rle"),
1549 "status column should use RLE encoding due to metadata threshold, but got: {}",
1550 status_encoding
1551 );
1552 }
1553
1554 #[tokio::test]
1555 async fn test_large_page_split_on_read() {
1556 use arrow_array::Array;
1557 use futures::TryStreamExt;
1558 use lance_encoding::decoder::FilterExpression;
1559 use lance_io::ReadBatchParams;
1560
1561 let arrow_field = ArrowField::new("data", DataType::Binary, false);
1564 let arrow_schema = ArrowSchema::new(vec![arrow_field]);
1565 let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();
1566
1567 let large_value = vec![42u8; 40 * 1024 * 1024];
1569 let array = arrow_array::BinaryArray::from(vec![
1570 Some(large_value.as_slice()),
1571 Some(b"small value"),
1572 ]);
1573 let batch = RecordBatch::try_new(Arc::new(arrow_schema), vec![Arc::new(array)]).unwrap();
1574
1575 let options = FileWriterOptions {
1577 max_page_bytes: Some(128 * 1024 * 1024),
1578 format_version: Some(LanceFileVersion::V2_1),
1579 ..Default::default()
1580 };
1581
1582 let fs = FsFixture::default();
1583 let path = fs.tmp_path;
1584
1585 let mut writer = FileWriter::try_new(
1586 fs.object_store.create(&path).await.unwrap(),
1587 lance_schema.clone(),
1588 options,
1589 )
1590 .unwrap();
1591
1592 writer.write_batch(&batch).await.unwrap();
1593 let write_summary = writer.finish().await.unwrap();
1594 assert_eq!(write_summary.num_rows, 2);
1595 assert_eq!(
1596 write_summary.size_bytes,
1597 fs.object_store.size(&path).await.unwrap()
1598 );
1599
1600 let file_scheduler = fs
1602 .scheduler
1603 .open_file(&path, &CachedFileSize::unknown())
1604 .await
1605 .unwrap();
1606
1607 let reader_options = FileReaderOptions {
1609 read_chunk_size: 10 * 1024 * 1024, ..Default::default()
1611 };
1612
1613 let file_reader = FileReader::try_open(
1614 file_scheduler,
1615 None,
1616 Arc::<DecoderPlugins>::default(),
1617 &LanceCache::no_cache(),
1618 reader_options,
1619 )
1620 .await
1621 .unwrap();
1622
1623 let stream = file_reader
1625 .read_stream(
1626 ReadBatchParams::RangeFull,
1627 1024,
1628 10, FilterExpression::no_filter(),
1630 )
1631 .await
1632 .unwrap();
1633
1634 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1635 assert_eq!(batches.len(), 1);
1636
1637 let read_array = batches[0].column(0);
1639 let read_binary = read_array
1640 .as_any()
1641 .downcast_ref::<arrow_array::BinaryArray>()
1642 .unwrap();
1643
1644 assert_eq!(read_binary.len(), 2);
1645 assert_eq!(read_binary.value(0).len(), 40 * 1024 * 1024);
1646 assert_eq!(read_binary.value(1), b"small value");
1647
1648 assert!(read_binary.value(0).iter().all(|&b| b == 42u8));
1650 }
1651
1652 fn spill_config() -> (TempObjFile, Arc<ObjectStore>) {
1653 let spill_path = TempObjFile::default();
1654 (spill_path, Arc::new(ObjectStore::local()))
1655 }
1656
1657 fn make_batches(num_batches: i32, num_cols: usize, rows_per_batch: i32) -> Vec<RecordBatch> {
1658 let fields: Vec<_> = (0..num_cols)
1659 .map(|c| ArrowField::new(format!("c{c}"), DataType::Int32, false))
1660 .collect();
1661 let schema = Arc::new(ArrowSchema::new(fields));
1662 (0..num_batches)
1663 .map(|i| {
1664 let cols: Vec<Arc<dyn arrow_array::Array>> = (0..num_cols)
1665 .map(|c| {
1666 let start = (i * rows_per_batch + c as i32) * 100;
1667 Arc::new(Int32Array::from_iter_values(start..start + rows_per_batch))
1668 as Arc<dyn arrow_array::Array>
1669 })
1670 .collect();
1671 RecordBatch::try_new(schema.clone(), cols).unwrap()
1672 })
1673 .collect()
1674 }
1675
1676 async fn write_and_read_batches(
1677 batches: &[RecordBatch],
1678 spill: Option<(Arc<ObjectStore>, object_store::path::Path)>,
1679 ) -> Vec<RecordBatch> {
1680 let fs = FsFixture::default();
1681 let lance_schema = LanceSchema::try_from(batches[0].schema().as_ref()).unwrap();
1682 let writer = fs.object_store.create(&fs.tmp_path).await.unwrap();
1683 let mut file_writer =
1684 FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
1685 if let Some((store, path)) = spill {
1686 file_writer = file_writer.with_page_metadata_spill(store, path);
1687 }
1688 for batch in batches {
1689 file_writer.write_batch(batch).await.unwrap();
1690 }
1691 file_writer.add_schema_metadata("foo", "bar");
1692 file_writer.finish().await.unwrap();
1693
1694 crate::testing::read_lance_file(
1695 &fs,
1696 Arc::<DecoderPlugins>::default(),
1697 lance_encoding::decoder::FilterExpression::no_filter(),
1698 )
1699 .await
1700 }
1701
1702 #[rstest::rstest]
1703 #[case::multi_col(20, 2, 100)]
1704 #[case::many_batches(50, 2, 100)]
1705 #[tokio::test]
1706 async fn test_page_metadata_spill_roundtrip(
1707 #[case] num_batches: i32,
1708 #[case] num_cols: usize,
1709 #[case] rows_per_batch: i32,
1710 ) {
1711 let batches = make_batches(num_batches, num_cols, rows_per_batch);
1712 let baseline = write_and_read_batches(&batches, None).await;
1713 let (spill_path, spill_store) = spill_config();
1714 let spilled =
1715 write_and_read_batches(&batches, Some((spill_store, spill_path.as_ref().clone())))
1716 .await;
1717 assert_eq!(baseline, spilled);
1718 }
1719
1720 #[tokio::test]
1721 async fn test_page_metadata_spill_many_columns() {
1722 let batches = make_batches(10, 500, 100);
1724 let baseline = write_and_read_batches(&batches, None).await;
1725 let (spill_path, spill_store) = spill_config();
1726 let spilled =
1727 write_and_read_batches(&batches, Some((spill_store, spill_path.as_ref().clone())))
1728 .await;
1729 assert_eq!(baseline, spilled);
1730 }
1731}