1use core::panic;
5use std::collections::HashMap;
6use std::sync::Arc;
7use std::sync::atomic::AtomicBool;
8
9use arrow_array::RecordBatch;
10
11use arrow_data::ArrayData;
12use bytes::{Buf, BufMut, Bytes, BytesMut};
13use futures::StreamExt;
14use futures::stream::FuturesOrdered;
15use lance_core::datatypes::{Field, Schema as LanceSchema};
16use lance_core::utils::bit::pad_bytes;
17use lance_core::{Error, Result};
18use lance_encoding::decoder::PageEncoding;
19use lance_encoding::encoder::{
20 BatchEncoder, EncodeTask, EncodedBatch, EncodedPage, EncodingOptions, FieldEncoder,
21 FieldEncodingStrategy, OutOfLineBuffers, default_encoding_strategy,
22};
23use lance_encoding::repdef::RepDefBuilder;
24use lance_encoding::version::LanceFileVersion;
25use lance_io::object_store::ObjectStore;
26use lance_io::traits::Writer;
27use log::{debug, warn};
28use object_store::path::Path;
29use prost::Message;
30use prost_types::Any;
31use tokio::io::AsyncWrite;
32use tokio::io::AsyncWriteExt;
33use tracing::instrument;
34
35use crate::datatypes::FieldsWithMeta;
36use crate::format::MAGIC;
37use crate::format::pb;
38use crate::format::pbfile;
39use crate::format::pbfile::DirectEncoding;
40
41pub(crate) const PAGE_BUFFER_ALIGNMENT: usize = 64;
43const PAD_BUFFER: [u8; PAGE_BUFFER_ALIGNMENT] = [72; PAGE_BUFFER_ALIGNMENT];
44const MAX_PAGE_BYTES: usize = 32 * 1024 * 1024;
50const ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES: &str = "LANCE_FILE_WRITER_MAX_PAGE_BYTES";
51
52#[derive(Debug, Clone, Default)]
53pub struct FileWriterOptions {
54 pub data_cache_bytes: Option<u64>,
69 pub max_page_bytes: Option<u64>,
75 pub keep_original_array: Option<bool>,
93 pub encoding_strategy: Option<Arc<dyn FieldEncodingStrategy>>,
94 pub format_version: Option<LanceFileVersion>,
100}
101
102const DEFAULT_SPILL_BUFFER_LIMIT: usize = 256 * 1024;
105
106struct PageMetadataSpill {
116 writer: Box<dyn Writer>,
117 object_store: Arc<ObjectStore>,
118 path: Path,
119 position: u64,
121 column_buffers: Vec<Vec<u8>>,
124 column_chunks: Vec<Vec<(u64, u32)>>,
127 per_column_limit: usize,
129}
130
131impl PageMetadataSpill {
132 async fn new(object_store: Arc<ObjectStore>, path: Path, num_columns: usize) -> Result<Self> {
133 let writer = object_store.create(&path).await?;
134 let per_column_limit = (DEFAULT_SPILL_BUFFER_LIMIT / num_columns.max(1)).max(64);
135 Ok(Self {
136 writer,
137 object_store,
138 path,
139 position: 0,
140 column_buffers: vec![Vec::new(); num_columns],
141 column_chunks: vec![Vec::new(); num_columns],
142 per_column_limit,
143 })
144 }
145
146 async fn append_page(
147 &mut self,
148 column_idx: usize,
149 page: &pbfile::column_metadata::Page,
150 ) -> Result<()> {
151 page.encode_length_delimited(&mut self.column_buffers[column_idx])
152 .map_err(|e| {
153 Error::io_source(Box::new(std::io::Error::new(
154 std::io::ErrorKind::InvalidData,
155 e,
156 )))
157 })?;
158 if self.column_buffers[column_idx].len() >= self.per_column_limit {
159 self.flush_column(column_idx).await?;
160 }
161 Ok(())
162 }
163
164 async fn flush_column(&mut self, column_idx: usize) -> Result<()> {
165 let buf = &self.column_buffers[column_idx];
166 if buf.is_empty() {
167 return Ok(());
168 }
169 let len = buf.len();
170 self.writer.write_all(buf).await?;
171 self.column_chunks[column_idx].push((self.position, len as u32));
172 self.position += len as u64;
173 self.column_buffers[column_idx].clear();
174 Ok(())
175 }
176
177 async fn shutdown_writer(&mut self) -> Result<()> {
178 for col_idx in 0..self.column_buffers.len() {
179 self.flush_column(col_idx).await?;
180 }
181 Writer::shutdown(self.writer.as_mut()).await?;
182 Ok(())
183 }
184}
185
186fn decode_spilled_chunk(data: &Bytes) -> Result<Vec<pbfile::column_metadata::Page>> {
187 let mut pages = Vec::new();
188 let mut cursor = data.clone();
189 while cursor.has_remaining() {
190 let page =
191 pbfile::column_metadata::Page::decode_length_delimited(&mut cursor).map_err(|e| {
192 Error::io_source(Box::new(std::io::Error::new(
193 std::io::ErrorKind::InvalidData,
194 e,
195 )))
196 })?;
197 pages.push(page);
198 }
199 Ok(pages)
200}
201
202enum PageSpillState {
203 Pending(Arc<ObjectStore>, Path),
204 Active(PageMetadataSpill),
205}
206
207pub struct FileWriter {
208 writer: Box<dyn Writer>,
209 schema: Option<LanceSchema>,
210 column_writers: Vec<Box<dyn FieldEncoder>>,
211 column_metadata: Vec<pbfile::ColumnMetadata>,
212 field_id_to_column_indices: Vec<(u32, u32)>,
213 num_columns: u32,
214 rows_written: u64,
215 global_buffers: Vec<(u64, u64)>,
216 schema_metadata: HashMap<String, String>,
217 options: FileWriterOptions,
218 page_spill: Option<PageSpillState>,
219}
220
221fn initial_column_metadata() -> pbfile::ColumnMetadata {
222 pbfile::ColumnMetadata {
223 pages: Vec::new(),
224 buffer_offsets: Vec::new(),
225 buffer_sizes: Vec::new(),
226 encoding: None,
227 }
228}
229
230static WARNED_ON_UNSTABLE_API: AtomicBool = AtomicBool::new(false);
231
232impl FileWriter {
233 pub fn try_new(
235 object_writer: Box<dyn Writer>,
236 schema: LanceSchema,
237 options: FileWriterOptions,
238 ) -> Result<Self> {
239 let mut writer = Self::new_lazy(object_writer, options);
240 writer.initialize(schema)?;
241 Ok(writer)
242 }
243
244 pub fn new_lazy(object_writer: Box<dyn Writer>, options: FileWriterOptions) -> Self {
249 if let Some(format_version) = options.format_version
250 && format_version.is_unstable()
251 && WARNED_ON_UNSTABLE_API
252 .compare_exchange(
253 false,
254 true,
255 std::sync::atomic::Ordering::Relaxed,
256 std::sync::atomic::Ordering::Relaxed,
257 )
258 .is_ok()
259 {
260 warn!(
261 "You have requested an unstable format version. Files written with this format version may not be readable in the future! This is a development feature and should only be used for experimentation and never for production data."
262 );
263 }
264 Self {
265 writer: object_writer,
266 schema: None,
267 column_writers: Vec::new(),
268 column_metadata: Vec::new(),
269 num_columns: 0,
270 rows_written: 0,
271 field_id_to_column_indices: Vec::new(),
272 global_buffers: Vec::new(),
273 schema_metadata: HashMap::new(),
274 page_spill: None,
275 options,
276 }
277 }
278
279 pub fn with_page_metadata_spill(mut self, object_store: Arc<ObjectStore>, path: Path) -> Self {
287 self.page_spill = Some(PageSpillState::Pending(object_store, path));
288 self
289 }
290
291 pub async fn create_file_with_batches(
295 store: &ObjectStore,
296 path: &Path,
297 schema: lance_core::datatypes::Schema,
298 batches: impl Iterator<Item = RecordBatch> + Send,
299 options: FileWriterOptions,
300 ) -> Result<usize> {
301 let writer = store.create(path).await?;
302 let mut writer = Self::try_new(writer, schema, options)?;
303 for batch in batches {
304 writer.write_batch(&batch).await?;
305 }
306 Ok(writer.finish().await? as usize)
307 }
308
309 async fn do_write_buffer(writer: &mut (impl AsyncWrite + Unpin), buf: &[u8]) -> Result<()> {
310 writer.write_all(buf).await?;
311 let pad_bytes = pad_bytes::<PAGE_BUFFER_ALIGNMENT>(buf.len());
312 writer.write_all(&PAD_BUFFER[..pad_bytes]).await?;
313 Ok(())
314 }
315
316 pub fn version(&self) -> LanceFileVersion {
318 self.options.format_version.unwrap_or_default()
319 }
320
321 async fn write_page(&mut self, encoded_page: EncodedPage) -> Result<()> {
322 let buffers = encoded_page.data;
323 let mut buffer_offsets = Vec::with_capacity(buffers.len());
324 let mut buffer_sizes = Vec::with_capacity(buffers.len());
325 for buffer in buffers {
326 buffer_offsets.push(self.writer.tell().await? as u64);
327 buffer_sizes.push(buffer.len() as u64);
328 Self::do_write_buffer(&mut self.writer, &buffer).await?;
329 }
330 let encoded_encoding = match encoded_page.description {
331 PageEncoding::Legacy(array_encoding) => Any::from_msg(&array_encoding)?.encode_to_vec(),
332 PageEncoding::Structural(page_layout) => Any::from_msg(&page_layout)?.encode_to_vec(),
333 };
334 let page = pbfile::column_metadata::Page {
335 buffer_offsets,
336 buffer_sizes,
337 encoding: Some(pbfile::Encoding {
338 location: Some(pbfile::encoding::Location::Direct(DirectEncoding {
339 encoding: encoded_encoding,
340 })),
341 }),
342 length: encoded_page.num_rows,
343 priority: encoded_page.row_number,
344 };
345 let col_idx = encoded_page.column_idx as usize;
346 if matches!(&self.page_spill, Some(PageSpillState::Pending(..))) {
347 let Some(PageSpillState::Pending(store, path)) = self.page_spill.take() else {
348 unreachable!()
349 };
350 self.page_spill = Some(PageSpillState::Active(
351 PageMetadataSpill::new(store, path, self.num_columns as usize).await?,
352 ));
353 }
354 match &mut self.page_spill {
355 Some(PageSpillState::Active(spill)) => spill.append_page(col_idx, &page).await?,
356 None => self.column_metadata[col_idx].pages.push(page),
357 Some(PageSpillState::Pending(..)) => unreachable!(),
358 }
359 Ok(())
360 }
361
362 #[instrument(skip_all, level = "debug")]
363 async fn write_pages(&mut self, mut encoding_tasks: FuturesOrdered<EncodeTask>) -> Result<()> {
364 while let Some(encoding_task) = encoding_tasks.next().await {
373 let encoded_page = encoding_task?;
374 self.write_page(encoded_page).await?;
375 }
376 self.writer.flush().await?;
381 Ok(())
382 }
383
384 pub async fn write_batches(
386 &mut self,
387 batches: impl Iterator<Item = &RecordBatch>,
388 ) -> Result<()> {
389 for batch in batches {
390 self.write_batch(batch).await?;
391 }
392 Ok(())
393 }
394
395 fn verify_field_nullability(arr: &ArrayData, field: &Field) -> Result<()> {
396 if !field.nullable && arr.null_count() > 0 {
397 return Err(Error::invalid_input(format!(
398 "The field `{}` contained null values even though the field is marked non-null in the schema",
399 field.name
400 )));
401 }
402
403 for (child_field, child_arr) in field.children.iter().zip(arr.child_data()) {
404 Self::verify_field_nullability(child_arr, child_field)?;
405 }
406
407 Ok(())
408 }
409
410 fn verify_nullability_constraints(&self, batch: &RecordBatch) -> Result<()> {
411 for (col, field) in batch
412 .columns()
413 .iter()
414 .zip(self.schema.as_ref().unwrap().fields.iter())
415 {
416 Self::verify_field_nullability(&col.to_data(), field)?;
417 }
418 Ok(())
419 }
420
421 fn initialize(&mut self, mut schema: LanceSchema) -> Result<()> {
422 let cache_bytes_per_column = if let Some(data_cache_bytes) = self.options.data_cache_bytes {
423 data_cache_bytes / schema.fields.len() as u64
424 } else {
425 8 * 1024 * 1024
426 };
427
428 let max_page_bytes = self.options.max_page_bytes.unwrap_or_else(|| {
429 std::env::var(ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES)
430 .map(|s| {
431 s.parse::<u64>().unwrap_or_else(|e| {
432 warn!(
433 "Failed to parse {}: {}, using default",
434 ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, e
435 );
436 MAX_PAGE_BYTES as u64
437 })
438 })
439 .unwrap_or(MAX_PAGE_BYTES as u64)
440 });
441
442 schema.validate()?;
443
444 let keep_original_array = self.options.keep_original_array.unwrap_or(false);
445 let encoding_strategy = self.options.encoding_strategy.clone().unwrap_or_else(|| {
446 let version = self.version();
447 default_encoding_strategy(version).into()
448 });
449
450 let encoding_options = EncodingOptions {
451 cache_bytes_per_column,
452 max_page_bytes,
453 keep_original_array,
454 buffer_alignment: PAGE_BUFFER_ALIGNMENT as u64,
455 version: self.version(),
456 };
457 let encoder =
458 BatchEncoder::try_new(&schema, encoding_strategy.as_ref(), &encoding_options)?;
459 self.num_columns = encoder.num_columns();
460
461 self.column_writers = encoder.field_encoders;
462 self.column_metadata = vec![initial_column_metadata(); self.num_columns as usize];
463 self.field_id_to_column_indices = encoder.field_id_to_column_index;
464 self.schema_metadata
465 .extend(std::mem::take(&mut schema.metadata));
466 self.schema = Some(schema);
467 Ok(())
468 }
469
470 fn ensure_initialized(&mut self, batch: &RecordBatch) -> Result<&LanceSchema> {
471 if self.schema.is_none() {
472 let schema = LanceSchema::try_from(batch.schema().as_ref())?;
473 self.initialize(schema)?;
474 }
475 Ok(self.schema.as_ref().unwrap())
476 }
477
478 #[instrument(skip_all, level = "debug")]
479 fn encode_batch(
480 &mut self,
481 batch: &RecordBatch,
482 external_buffers: &mut OutOfLineBuffers,
483 ) -> Result<Vec<Vec<EncodeTask>>> {
484 self.schema
485 .as_ref()
486 .unwrap()
487 .fields
488 .iter()
489 .zip(self.column_writers.iter_mut())
490 .map(|(field, column_writer)| {
491 let array =
492 batch
493 .column_by_name(&field.name)
494 .ok_or(Error::invalid_input_source(
495 format!(
496 "Cannot write batch. The batch was missing the column `{}`",
497 field.name
498 )
499 .into(),
500 ))?;
501 let repdef = RepDefBuilder::default();
502 let num_rows = array.len() as u64;
503 column_writer.maybe_encode(
504 array.clone(),
505 external_buffers,
506 repdef,
507 self.rows_written,
508 num_rows,
509 )
510 })
511 .collect::<Result<Vec<_>>>()
512 }
513
514 pub async fn write_batch(&mut self, batch: &RecordBatch) -> Result<()> {
519 debug!(
520 "write_batch called with {} rows, {} columns, and {} bytes of data",
521 batch.num_rows(),
522 batch.num_columns(),
523 batch.get_array_memory_size()
524 );
525 self.ensure_initialized(batch)?;
526 self.verify_nullability_constraints(batch)?;
527 let num_rows = batch.num_rows() as u64;
528 if num_rows == 0 {
529 return Ok(());
530 }
531 if num_rows > u32::MAX as u64 {
532 return Err(Error::invalid_input_source(
533 "cannot write Lance files with more than 2^32 rows".into(),
534 ));
535 }
536 let mut external_buffers =
539 OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
540 let encoding_tasks = self.encode_batch(batch, &mut external_buffers)?;
541 for external_buffer in external_buffers.take_buffers() {
543 Self::do_write_buffer(&mut self.writer, &external_buffer).await?;
544 }
545
546 let encoding_tasks = encoding_tasks
547 .into_iter()
548 .flatten()
549 .collect::<FuturesOrdered<_>>();
550
551 self.rows_written = match self.rows_written.checked_add(batch.num_rows() as u64) {
552 Some(rows_written) => rows_written,
553 None => {
554 return Err(Error::invalid_input_source(format!("cannot write batch with {} rows because {} rows have already been written and Lance files cannot contain more than 2^64 rows", num_rows, self.rows_written).into()));
555 }
556 };
557
558 self.write_pages(encoding_tasks).await?;
559
560 Ok(())
561 }
562
563 async fn write_column_metadata(
564 &mut self,
565 metadata: pbfile::ColumnMetadata,
566 ) -> Result<(u64, u64)> {
567 let metadata_bytes = metadata.encode_to_vec();
568 let position = self.writer.tell().await? as u64;
569 let len = metadata_bytes.len() as u64;
570 self.writer.write_all(&metadata_bytes).await?;
571 Ok((position, len))
572 }
573
574 async fn write_column_metadatas(&mut self) -> Result<Vec<(u64, u64)>> {
575 let metadatas = std::mem::take(&mut self.column_metadata);
576
577 let spill_state = self.page_spill.take();
581 let (spill_chunks, spill_reader) =
582 if let Some(PageSpillState::Active(mut spill)) = spill_state {
583 spill.shutdown_writer().await?;
584 let reader = spill.object_store.open(&spill.path).await?;
585 let chunks = std::mem::take(&mut spill.column_chunks);
586 (chunks, Some(reader))
587 } else {
588 (Vec::new(), None)
589 };
590
591 let mut metadata_positions = Vec::with_capacity(metadatas.len());
592 for (col_idx, mut metadata) in metadatas.into_iter().enumerate() {
593 if let Some(reader) = &spill_reader {
594 let mut pages = Vec::new();
595 for &(offset, len) in &spill_chunks[col_idx] {
596 let data = reader
597 .get_range(offset as usize..(offset as usize + len as usize))
598 .await
599 .map_err(|e| Error::io_source(Box::new(e)))?;
600 pages.extend(decode_spilled_chunk(&data)?);
601 }
602 metadata.pages = pages;
603 }
604 metadata_positions.push(self.write_column_metadata(metadata).await?);
605 }
606
607 Ok(metadata_positions)
608 }
609
610 fn make_file_descriptor(
611 schema: &lance_core::datatypes::Schema,
612 num_rows: u64,
613 ) -> Result<pb::FileDescriptor> {
614 let fields_with_meta = FieldsWithMeta::from(schema);
615 Ok(pb::FileDescriptor {
616 schema: Some(pb::Schema {
617 fields: fields_with_meta.fields.0,
618 metadata: fields_with_meta.metadata,
619 }),
620 length: num_rows,
621 })
622 }
623
624 async fn write_global_buffers(&mut self) -> Result<Vec<(u64, u64)>> {
625 let schema = self.schema.as_mut().ok_or(Error::invalid_input("No schema provided on writer open and no data provided. Schema is unknown and file cannot be created"))?;
626 schema.metadata = std::mem::take(&mut self.schema_metadata);
627 schema.fields.iter_mut().for_each(|f| {
631 if f.is_blob_v2() {
632 f.unloaded_mut();
633 }
634 });
635
636 let file_descriptor = Self::make_file_descriptor(schema, self.rows_written)?;
637 let file_descriptor_bytes = file_descriptor.encode_to_vec();
638 let file_descriptor_len = file_descriptor_bytes.len() as u64;
639 let file_descriptor_position = self.writer.tell().await? as u64;
640 self.writer.write_all(&file_descriptor_bytes).await?;
641 let mut gbo_table = Vec::with_capacity(1 + self.global_buffers.len());
642 gbo_table.push((file_descriptor_position, file_descriptor_len));
643 gbo_table.append(&mut self.global_buffers);
644 Ok(gbo_table)
645 }
646
647 pub fn add_schema_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
653 self.schema_metadata.insert(key.into(), value.into());
654 }
655
656 pub fn initialize_with_external_metadata(
665 &mut self,
666 schema: lance_core::datatypes::Schema,
667 column_metadata: Vec<pbfile::ColumnMetadata>,
668 rows_written: u64,
669 ) {
670 self.schema = Some(schema);
671 self.num_columns = column_metadata.len() as u32;
672 self.column_metadata = column_metadata;
673 self.rows_written = rows_written;
674 }
675
676 pub async fn add_global_buffer(&mut self, buffer: Bytes) -> Result<u32> {
682 let position = self.writer.tell().await? as u64;
683 let len = buffer.len() as u64;
684 Self::do_write_buffer(&mut self.writer, &buffer).await?;
685 self.global_buffers.push((position, len));
686 Ok(self.global_buffers.len() as u32)
687 }
688
689 async fn finish_writers(&mut self) -> Result<()> {
690 let mut col_idx = 0;
691 for mut writer in std::mem::take(&mut self.column_writers) {
692 let mut external_buffers =
693 OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
694 let columns = writer.finish(&mut external_buffers).await?;
695 for buffer in external_buffers.take_buffers() {
696 self.writer.write_all(&buffer).await?;
697 }
698 debug_assert_eq!(
699 columns.len(),
700 writer.num_columns() as usize,
701 "Expected {} columns from column at index {} and got {}",
702 writer.num_columns(),
703 col_idx,
704 columns.len()
705 );
706 for column in columns {
707 for page in column.final_pages {
708 self.write_page(page).await?;
709 }
710 let column_metadata = &mut self.column_metadata[col_idx];
711 let mut buffer_pos = self.writer.tell().await? as u64;
712 for buffer in column.column_buffers {
713 column_metadata.buffer_offsets.push(buffer_pos);
714 let mut size = 0;
715 Self::do_write_buffer(&mut self.writer, &buffer).await?;
716 size += buffer.len() as u64;
717 buffer_pos += size;
718 column_metadata.buffer_sizes.push(size);
719 }
720 let encoded_encoding = Any::from_msg(&column.encoding)?.encode_to_vec();
721 column_metadata.encoding = Some(pbfile::Encoding {
722 location: Some(pbfile::encoding::Location::Direct(pbfile::DirectEncoding {
723 encoding: encoded_encoding,
724 })),
725 });
726 col_idx += 1;
727 }
728 }
729 if col_idx != self.column_metadata.len() {
730 panic!(
731 "Column writers finished with {} columns but we expected {}",
732 col_idx,
733 self.column_metadata.len()
734 );
735 }
736 Ok(())
737 }
738
739 fn version_to_numbers(&self) -> (u16, u16) {
742 let version = self.options.format_version.unwrap_or_default();
743 match version.resolve() {
744 LanceFileVersion::V2_0 => (0, 3),
745 LanceFileVersion::V2_1 => (2, 1),
746 LanceFileVersion::V2_2 => (2, 2),
747 LanceFileVersion::V2_3 => (2, 3),
748 _ => panic!("Unsupported version: {}", version),
749 }
750 }
751
752 pub async fn finish(&mut self) -> Result<u64> {
760 let mut external_buffers =
762 OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
763 let encoding_tasks = self
764 .column_writers
765 .iter_mut()
766 .map(|writer| writer.flush(&mut external_buffers))
767 .collect::<Result<Vec<_>>>()?;
768 for external_buffer in external_buffers.take_buffers() {
769 Self::do_write_buffer(&mut self.writer, &external_buffer).await?;
770 }
771 let encoding_tasks = encoding_tasks
772 .into_iter()
773 .flatten()
774 .collect::<FuturesOrdered<_>>();
775 self.write_pages(encoding_tasks).await?;
776
777 if !self.column_writers.is_empty() {
778 self.finish_writers().await?;
779 }
780
781 let global_buffer_offsets = self.write_global_buffers().await?;
783 let num_global_buffers = global_buffer_offsets.len() as u32;
784
785 let column_metadata_start = self.writer.tell().await? as u64;
787 let metadata_positions = self.write_column_metadatas().await?;
788
789 let cmo_table_start = self.writer.tell().await? as u64;
791 for (meta_pos, meta_len) in metadata_positions {
792 self.writer.write_u64_le(meta_pos).await?;
793 self.writer.write_u64_le(meta_len).await?;
794 }
795
796 let gbo_table_start = self.writer.tell().await? as u64;
798 for (gbo_pos, gbo_len) in global_buffer_offsets {
799 self.writer.write_u64_le(gbo_pos).await?;
800 self.writer.write_u64_le(gbo_len).await?;
801 }
802
803 let (major, minor) = self.version_to_numbers();
804 self.writer.write_u64_le(column_metadata_start).await?;
806 self.writer.write_u64_le(cmo_table_start).await?;
807 self.writer.write_u64_le(gbo_table_start).await?;
808 self.writer.write_u32_le(num_global_buffers).await?;
809 self.writer.write_u32_le(self.num_columns).await?;
810 self.writer.write_u16_le(major).await?;
811 self.writer.write_u16_le(minor).await?;
812 self.writer.write_all(MAGIC).await?;
813
814 Writer::shutdown(self.writer.as_mut()).await?;
816
817 Ok(self.rows_written)
818 }
819
820 pub async fn abort(&mut self) {
821 }
824
825 pub async fn tell(&mut self) -> Result<u64> {
826 Ok(self.writer.tell().await? as u64)
827 }
828
829 pub fn field_id_to_column_indices(&self) -> &[(u32, u32)] {
830 &self.field_id_to_column_indices
831 }
832}
833
834pub trait EncodedBatchWriteExt {
837 fn try_to_self_described_lance(&self, version: LanceFileVersion) -> Result<Bytes>;
839 fn try_to_mini_lance(&self, version: LanceFileVersion) -> Result<Bytes>;
843}
844
845fn concat_lance_footer(
850 batch: &EncodedBatch,
851 write_schema: bool,
852 version: LanceFileVersion,
853) -> Result<Bytes> {
854 let mut data = BytesMut::with_capacity(batch.data.len() + 1024 * 1024);
856 data.put(batch.data.clone());
857 let global_buffers = if write_schema {
859 let schema_start = data.len() as u64;
860 let lance_schema = lance_core::datatypes::Schema::try_from(batch.schema.as_ref())?;
861 let descriptor = FileWriter::make_file_descriptor(&lance_schema, batch.num_rows)?;
862 let descriptor_bytes = descriptor.encode_to_vec();
863 let descriptor_len = descriptor_bytes.len() as u64;
864 data.put(descriptor_bytes.as_slice());
865
866 vec![(schema_start, descriptor_len)]
867 } else {
868 vec![]
869 };
870 let col_metadata_start = data.len() as u64;
871
872 let mut col_metadata_positions = Vec::new();
873 for col in &batch.page_table {
875 let position = data.len() as u64;
876 let pages = col
877 .page_infos
878 .iter()
879 .map(|page_info| {
880 let encoded_encoding = match &page_info.encoding {
881 PageEncoding::Legacy(array_encoding) => {
882 Any::from_msg(array_encoding)?.encode_to_vec()
883 }
884 PageEncoding::Structural(page_layout) => {
885 Any::from_msg(page_layout)?.encode_to_vec()
886 }
887 };
888 let (buffer_offsets, buffer_sizes): (Vec<_>, Vec<_>) = page_info
889 .buffer_offsets_and_sizes
890 .as_ref()
891 .iter()
892 .cloned()
893 .unzip();
894 Ok(pbfile::column_metadata::Page {
895 buffer_offsets,
896 buffer_sizes,
897 encoding: Some(pbfile::Encoding {
898 location: Some(pbfile::encoding::Location::Direct(DirectEncoding {
899 encoding: encoded_encoding,
900 })),
901 }),
902 length: page_info.num_rows,
903 priority: page_info.priority,
904 })
905 })
906 .collect::<Result<Vec<_>>>()?;
907 let (buffer_offsets, buffer_sizes): (Vec<_>, Vec<_>) =
908 col.buffer_offsets_and_sizes.iter().cloned().unzip();
909 let encoded_col_encoding = Any::from_msg(&col.encoding)?.encode_to_vec();
910 let column = pbfile::ColumnMetadata {
911 pages,
912 buffer_offsets,
913 buffer_sizes,
914 encoding: Some(pbfile::Encoding {
915 location: Some(pbfile::encoding::Location::Direct(pbfile::DirectEncoding {
916 encoding: encoded_col_encoding,
917 })),
918 }),
919 };
920 let column_bytes = column.encode_to_vec();
921 col_metadata_positions.push((position, column_bytes.len() as u64));
922 data.put(column_bytes.as_slice());
923 }
924 let cmo_table_start = data.len() as u64;
926 for (meta_pos, meta_len) in col_metadata_positions {
927 data.put_u64_le(meta_pos);
928 data.put_u64_le(meta_len);
929 }
930 let gbo_table_start = data.len() as u64;
932 let num_global_buffers = global_buffers.len() as u32;
933 for (gbo_pos, gbo_len) in global_buffers {
934 data.put_u64_le(gbo_pos);
935 data.put_u64_le(gbo_len);
936 }
937
938 let (major, minor) = version.to_numbers();
939
940 data.put_u64_le(col_metadata_start);
942 data.put_u64_le(cmo_table_start);
943 data.put_u64_le(gbo_table_start);
944 data.put_u32_le(num_global_buffers);
945 data.put_u32_le(batch.page_table.len() as u32);
946 data.put_u16_le(major as u16);
947 data.put_u16_le(minor as u16);
948 data.put(MAGIC.as_slice());
949
950 Ok(data.freeze())
951}
952
953impl EncodedBatchWriteExt for EncodedBatch {
954 fn try_to_self_described_lance(&self, version: LanceFileVersion) -> Result<Bytes> {
955 concat_lance_footer(self, true, version)
956 }
957
958 fn try_to_mini_lance(&self, version: LanceFileVersion) -> Result<Bytes> {
959 concat_lance_footer(self, false, version)
960 }
961}
962
963#[cfg(test)]
964mod tests {
965 use std::collections::HashMap;
966 use std::sync::Arc;
967
968 use crate::reader::{FileReader, FileReaderOptions, describe_encoding};
969 use crate::testing::FsFixture;
970 use crate::writer::{ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, FileWriter, FileWriterOptions};
971 use arrow_array::builder::{Float32Builder, Int32Builder};
972 use arrow_array::{Int32Array, RecordBatch, UInt64Array};
973 use arrow_array::{RecordBatchReader, StringArray, types::Float64Type};
974 use arrow_schema::{DataType, Field, Field as ArrowField, Schema, Schema as ArrowSchema};
975 use lance_core::cache::LanceCache;
976 use lance_core::datatypes::Schema as LanceSchema;
977 use lance_core::utils::tempfile::TempObjFile;
978 use lance_datagen::{BatchCount, RowCount, array, gen_batch};
979 use lance_encoding::compression_config::{CompressionFieldParams, CompressionParams};
980 use lance_encoding::decoder::DecoderPlugins;
981 use lance_encoding::version::LanceFileVersion;
982 use lance_io::object_store::ObjectStore;
983 use lance_io::utils::CachedFileSize;
984
985 #[tokio::test]
986 async fn test_basic_write() {
987 let tmp_path = TempObjFile::default();
988 let obj_store = Arc::new(ObjectStore::local());
989
990 let reader = gen_batch()
991 .col("score", array::rand::<Float64Type>())
992 .into_reader_rows(RowCount::from(1000), BatchCount::from(10));
993
994 let writer = obj_store.create(&tmp_path).await.unwrap();
995
996 let lance_schema =
997 lance_core::datatypes::Schema::try_from(reader.schema().as_ref()).unwrap();
998
999 let mut file_writer =
1000 FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
1001
1002 for batch in reader {
1003 file_writer.write_batch(&batch.unwrap()).await.unwrap();
1004 }
1005 file_writer.add_schema_metadata("foo", "bar");
1006 file_writer.finish().await.unwrap();
1007 }
1009
1010 #[tokio::test]
1011 async fn test_write_empty() {
1012 let tmp_path = TempObjFile::default();
1013 let obj_store = Arc::new(ObjectStore::local());
1014
1015 let reader = gen_batch()
1016 .col("score", array::rand::<Float64Type>())
1017 .into_reader_rows(RowCount::from(0), BatchCount::from(0));
1018
1019 let writer = obj_store.create(&tmp_path).await.unwrap();
1020
1021 let lance_schema =
1022 lance_core::datatypes::Schema::try_from(reader.schema().as_ref()).unwrap();
1023
1024 let mut file_writer =
1025 FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
1026
1027 for batch in reader {
1028 file_writer.write_batch(&batch.unwrap()).await.unwrap();
1029 }
1030 file_writer.add_schema_metadata("foo", "bar");
1031 file_writer.finish().await.unwrap();
1032 }
1033
1034 #[tokio::test]
1035 async fn test_max_page_bytes_enforced() {
1036 let arrow_field = Field::new("data", DataType::UInt64, false);
1037 let arrow_schema = Schema::new(vec![arrow_field]);
1038 let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();
1039
1040 let data: Vec<u64> = (0..1_000_000).collect();
1042 let array = UInt64Array::from(data);
1043 let batch =
1044 RecordBatch::try_new(arrow_schema.clone().into(), vec![Arc::new(array)]).unwrap();
1045
1046 let options = FileWriterOptions {
1047 max_page_bytes: Some(1024 * 1024), format_version: Some(LanceFileVersion::V2_0),
1050 ..Default::default()
1051 };
1052
1053 let path = TempObjFile::default();
1054 let object_store = ObjectStore::local();
1055 let mut writer = FileWriter::try_new(
1056 object_store.create(&path).await.unwrap(),
1057 lance_schema,
1058 options,
1059 )
1060 .unwrap();
1061
1062 writer.write_batch(&batch).await.unwrap();
1063 writer.finish().await.unwrap();
1064
1065 let fs = FsFixture::default();
1066 let file_scheduler = fs
1067 .scheduler
1068 .open_file(&path, &CachedFileSize::unknown())
1069 .await
1070 .unwrap();
1071 let file_reader = FileReader::try_open(
1072 file_scheduler,
1073 None,
1074 Arc::<DecoderPlugins>::default(),
1075 &LanceCache::no_cache(),
1076 FileReaderOptions::default(),
1077 )
1078 .await
1079 .unwrap();
1080
1081 let column_meta = file_reader.metadata();
1082
1083 let mut total_page_num: u32 = 0;
1084 for (col_idx, col_metadata) in column_meta.column_metadatas.iter().enumerate() {
1085 assert!(
1086 !col_metadata.pages.is_empty(),
1087 "Column {} has no pages",
1088 col_idx
1089 );
1090
1091 for (page_idx, page) in col_metadata.pages.iter().enumerate() {
1092 total_page_num += 1;
1093 let total_size: u64 = page.buffer_sizes.iter().sum();
1094 assert!(
1095 total_size <= 1024 * 1024,
1096 "Column {} Page {} size {} exceeds 1MB limit",
1097 col_idx,
1098 page_idx,
1099 total_size
1100 );
1101 }
1102 }
1103
1104 assert_eq!(total_page_num, 8)
1105 }
1106
1107 #[tokio::test(flavor = "current_thread")]
1108 async fn test_max_page_bytes_env_var() {
1109 let arrow_field = Field::new("data", DataType::UInt64, false);
1110 let arrow_schema = Schema::new(vec![arrow_field]);
1111 let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();
1112 let data: Vec<u64> = (0..500_000).collect();
1114 let array = UInt64Array::from(data);
1115 let batch =
1116 RecordBatch::try_new(arrow_schema.clone().into(), vec![Arc::new(array)]).unwrap();
1117
1118 unsafe {
1120 std::env::set_var(ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, "2097152");
1121 }
1122
1123 let options = FileWriterOptions {
1124 max_page_bytes: None, ..Default::default()
1126 };
1127
1128 let path = TempObjFile::default();
1129 let object_store = ObjectStore::local();
1130 let mut writer = FileWriter::try_new(
1131 object_store.create(&path).await.unwrap(),
1132 lance_schema.clone(),
1133 options,
1134 )
1135 .unwrap();
1136
1137 writer.write_batch(&batch).await.unwrap();
1138 writer.finish().await.unwrap();
1139
1140 let fs = FsFixture::default();
1141 let file_scheduler = fs
1142 .scheduler
1143 .open_file(&path, &CachedFileSize::unknown())
1144 .await
1145 .unwrap();
1146 let file_reader = FileReader::try_open(
1147 file_scheduler,
1148 None,
1149 Arc::<DecoderPlugins>::default(),
1150 &LanceCache::no_cache(),
1151 FileReaderOptions::default(),
1152 )
1153 .await
1154 .unwrap();
1155
1156 for col_metadata in file_reader.metadata().column_metadatas.iter() {
1157 for page in col_metadata.pages.iter() {
1158 let total_size: u64 = page.buffer_sizes.iter().sum();
1159 assert!(
1160 total_size <= 2 * 1024 * 1024,
1161 "Page size {} exceeds 2MB limit",
1162 total_size
1163 );
1164 }
1165 }
1166
1167 unsafe {
1168 std::env::set_var(ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, "");
1169 }
1170 }
1171
1172 #[tokio::test]
1173 async fn test_compression_overrides_end_to_end() {
1174 let arrow_schema = Arc::new(ArrowSchema::new(vec![
1176 ArrowField::new("customer_id", DataType::Int32, false),
1177 ArrowField::new("product_id", DataType::Int32, false),
1178 ArrowField::new("quantity", DataType::Int32, false),
1179 ArrowField::new("price", DataType::Float32, false),
1180 ArrowField::new("description", DataType::Utf8, false),
1181 ]));
1182
1183 let lance_schema = LanceSchema::try_from(arrow_schema.as_ref()).unwrap();
1184
1185 let mut customer_ids = Int32Builder::new();
1187 let mut product_ids = Int32Builder::new();
1188 let mut quantities = Int32Builder::new();
1189 let mut prices = Float32Builder::new();
1190 let mut descriptions = Vec::new();
1191
1192 for i in 0..10000 {
1199 customer_ids.append_value(i / 100);
1202
1203 product_ids.append_value(i / 2000);
1205
1206 quantities.append_value(if i % 10 == 0 { 5 } else { 1 });
1208
1209 prices.append_value(match i % 3 {
1211 0 => 9.99,
1212 1 => 19.99,
1213 _ => 29.99,
1214 });
1215
1216 descriptions.push(format!("Product {}", i / 2000));
1218 }
1219
1220 let batch = RecordBatch::try_new(
1221 arrow_schema.clone(),
1222 vec![
1223 Arc::new(customer_ids.finish()),
1224 Arc::new(product_ids.finish()),
1225 Arc::new(quantities.finish()),
1226 Arc::new(prices.finish()),
1227 Arc::new(StringArray::from(descriptions)),
1228 ],
1229 )
1230 .unwrap();
1231
1232 let mut params = CompressionParams::new();
1234
1235 params.columns.insert(
1237 "*_id".to_string(),
1238 CompressionFieldParams {
1239 rle_threshold: Some(0.5), compression: None, compression_level: None,
1242 bss: Some(lance_encoding::compression_config::BssMode::Off), minichunk_size: None,
1244 },
1245 );
1246
1247 let encoding_strategy = lance_encoding::encoder::default_encoding_strategy_with_params(
1252 LanceFileVersion::V2_1,
1253 params,
1254 )
1255 .unwrap();
1256
1257 let options = FileWriterOptions {
1259 encoding_strategy: Some(Arc::from(encoding_strategy)),
1260 format_version: Some(LanceFileVersion::V2_1),
1261 max_page_bytes: Some(64 * 1024), ..Default::default()
1263 };
1264
1265 let path = TempObjFile::default();
1267 let object_store = ObjectStore::local();
1268
1269 let mut writer = FileWriter::try_new(
1270 object_store.create(&path).await.unwrap(),
1271 lance_schema.clone(),
1272 options,
1273 )
1274 .unwrap();
1275
1276 writer.write_batch(&batch).await.unwrap();
1277 writer.add_schema_metadata("compression_test", "configured_compression");
1278 writer.finish().await.unwrap();
1279
1280 let path_no_compression = TempObjFile::default();
1282 let default_options = FileWriterOptions {
1283 format_version: Some(LanceFileVersion::V2_1),
1284 max_page_bytes: Some(64 * 1024),
1285 ..Default::default()
1286 };
1287
1288 let mut writer_no_compression = FileWriter::try_new(
1289 object_store.create(&path_no_compression).await.unwrap(),
1290 lance_schema.clone(),
1291 default_options,
1292 )
1293 .unwrap();
1294
1295 writer_no_compression.write_batch(&batch).await.unwrap();
1296 writer_no_compression.finish().await.unwrap();
1297
1298 let fs = FsFixture::default();
1304 let file_scheduler = fs
1305 .scheduler
1306 .open_file(&path, &CachedFileSize::unknown())
1307 .await
1308 .unwrap();
1309
1310 let file_reader = FileReader::try_open(
1311 file_scheduler,
1312 None,
1313 Arc::<DecoderPlugins>::default(),
1314 &LanceCache::no_cache(),
1315 FileReaderOptions::default(),
1316 )
1317 .await
1318 .unwrap();
1319
1320 let metadata = file_reader.metadata();
1322 assert_eq!(metadata.major_version, 2);
1323 assert_eq!(metadata.minor_version, 1);
1324
1325 let schema = file_reader.schema();
1326 assert_eq!(
1327 schema.metadata.get("compression_test"),
1328 Some(&"configured_compression".to_string())
1329 );
1330
1331 let column_metadatas = &metadata.column_metadatas;
1333
1334 assert!(!column_metadatas[0].pages.is_empty());
1336 let customer_id_encoding = describe_encoding(&column_metadatas[0].pages[0]);
1337 assert!(
1338 customer_id_encoding.contains("RLE") || customer_id_encoding.contains("Rle"),
1339 "customer_id column should use RLE encoding due to '*_id' pattern match, but got: {}",
1340 customer_id_encoding
1341 );
1342
1343 assert!(!column_metadatas[1].pages.is_empty());
1345 let product_id_encoding = describe_encoding(&column_metadatas[1].pages[0]);
1346 assert!(
1347 product_id_encoding.contains("RLE") || product_id_encoding.contains("Rle"),
1348 "product_id column should use RLE encoding due to '*_id' pattern match, but got: {}",
1349 product_id_encoding
1350 );
1351 }
1352
1353 #[tokio::test]
1354 async fn test_field_metadata_compression() {
1355 let mut metadata = HashMap::new();
1357 metadata.insert(
1358 lance_encoding::constants::COMPRESSION_META_KEY.to_string(),
1359 "zstd".to_string(),
1360 );
1361 metadata.insert(
1362 lance_encoding::constants::COMPRESSION_LEVEL_META_KEY.to_string(),
1363 "6".to_string(),
1364 );
1365
1366 let arrow_schema = Arc::new(ArrowSchema::new(vec![
1367 ArrowField::new("id", DataType::Int32, false),
1368 ArrowField::new("text", DataType::Utf8, false).with_metadata(metadata.clone()),
1369 ArrowField::new("data", DataType::Int32, false).with_metadata(HashMap::from([(
1370 lance_encoding::constants::COMPRESSION_META_KEY.to_string(),
1371 "none".to_string(),
1372 )])),
1373 ]));
1374
1375 let lance_schema = LanceSchema::try_from(arrow_schema.as_ref()).unwrap();
1376
1377 let id_array = Int32Array::from_iter_values(0..1000);
1379 let text_array = StringArray::from_iter_values(
1380 (0..1000).map(|i| format!("test string {} repeated text", i)),
1381 );
1382 let data_array = Int32Array::from_iter_values((0..1000).map(|i| i * 2));
1383
1384 let batch = RecordBatch::try_new(
1385 arrow_schema.clone(),
1386 vec![
1387 Arc::new(id_array),
1388 Arc::new(text_array),
1389 Arc::new(data_array),
1390 ],
1391 )
1392 .unwrap();
1393
1394 let path = TempObjFile::default();
1395 let object_store = ObjectStore::local();
1396
1397 let params = CompressionParams::new();
1399 let encoding_strategy = lance_encoding::encoder::default_encoding_strategy_with_params(
1400 LanceFileVersion::V2_1,
1401 params,
1402 )
1403 .unwrap();
1404
1405 let options = FileWriterOptions {
1406 encoding_strategy: Some(Arc::from(encoding_strategy)),
1407 format_version: Some(LanceFileVersion::V2_1),
1408 ..Default::default()
1409 };
1410 let mut writer = FileWriter::try_new(
1411 object_store.create(&path).await.unwrap(),
1412 lance_schema.clone(),
1413 options,
1414 )
1415 .unwrap();
1416
1417 writer.write_batch(&batch).await.unwrap();
1418 writer.finish().await.unwrap();
1419
1420 let fs = FsFixture::default();
1422 let file_scheduler = fs
1423 .scheduler
1424 .open_file(&path, &CachedFileSize::unknown())
1425 .await
1426 .unwrap();
1427 let file_reader = FileReader::try_open(
1428 file_scheduler,
1429 None,
1430 Arc::<DecoderPlugins>::default(),
1431 &LanceCache::no_cache(),
1432 FileReaderOptions::default(),
1433 )
1434 .await
1435 .unwrap();
1436
1437 let column_metadatas = &file_reader.metadata().column_metadatas;
1438
1439 let text_encoding = describe_encoding(&column_metadatas[1].pages[0]);
1441 assert!(
1443 text_encoding.contains("Zstd"),
1444 "text column should use zstd compression from field metadata, but got: {}",
1445 text_encoding
1446 );
1447
1448 let data_encoding = describe_encoding(&column_metadatas[2].pages[0]);
1450 assert!(
1452 data_encoding.contains("Flat") && data_encoding.contains("compression: None"),
1453 "data column should use no compression from field metadata, but got: {}",
1454 data_encoding
1455 );
1456 }
1457
1458 #[tokio::test]
1459 async fn test_field_metadata_rle_threshold() {
1460 let mut metadata = HashMap::new();
1462 metadata.insert(
1463 lance_encoding::constants::RLE_THRESHOLD_META_KEY.to_string(),
1464 "0.9".to_string(),
1465 );
1466 metadata.insert(
1468 lance_encoding::constants::COMPRESSION_META_KEY.to_string(),
1469 "lz4".to_string(),
1470 );
1471 metadata.insert(
1473 lance_encoding::constants::BSS_META_KEY.to_string(),
1474 "off".to_string(),
1475 );
1476
1477 let arrow_schema = Arc::new(ArrowSchema::new(vec![
1478 ArrowField::new("status", DataType::Int32, false).with_metadata(metadata),
1479 ]));
1480
1481 let lance_schema = LanceSchema::try_from(arrow_schema.as_ref()).unwrap();
1482
1483 let status_array = Int32Array::from_iter_values(
1485 std::iter::repeat_n(200, 8000)
1486 .chain(std::iter::repeat_n(404, 1500))
1487 .chain(std::iter::repeat_n(500, 500)),
1488 );
1489
1490 let batch =
1491 RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(status_array)]).unwrap();
1492
1493 let path = TempObjFile::default();
1494 let object_store = ObjectStore::local();
1495
1496 let params = CompressionParams::new();
1498 let encoding_strategy = lance_encoding::encoder::default_encoding_strategy_with_params(
1499 LanceFileVersion::V2_1,
1500 params,
1501 )
1502 .unwrap();
1503
1504 let options = FileWriterOptions {
1505 encoding_strategy: Some(Arc::from(encoding_strategy)),
1506 format_version: Some(LanceFileVersion::V2_1),
1507 ..Default::default()
1508 };
1509 let mut writer = FileWriter::try_new(
1510 object_store.create(&path).await.unwrap(),
1511 lance_schema.clone(),
1512 options,
1513 )
1514 .unwrap();
1515
1516 writer.write_batch(&batch).await.unwrap();
1517 writer.finish().await.unwrap();
1518
1519 let fs = FsFixture::default();
1521 let file_scheduler = fs
1522 .scheduler
1523 .open_file(&path, &CachedFileSize::unknown())
1524 .await
1525 .unwrap();
1526 let file_reader = FileReader::try_open(
1527 file_scheduler,
1528 None,
1529 Arc::<DecoderPlugins>::default(),
1530 &LanceCache::no_cache(),
1531 FileReaderOptions::default(),
1532 )
1533 .await
1534 .unwrap();
1535
1536 let column_metadatas = &file_reader.metadata().column_metadatas;
1537 let status_encoding = describe_encoding(&column_metadatas[0].pages[0]);
1538 assert!(
1539 status_encoding.contains("RLE") || status_encoding.contains("Rle"),
1540 "status column should use RLE encoding due to metadata threshold, but got: {}",
1541 status_encoding
1542 );
1543 }
1544
1545 #[tokio::test]
1546 async fn test_large_page_split_on_read() {
1547 use arrow_array::Array;
1548 use futures::TryStreamExt;
1549 use lance_encoding::decoder::FilterExpression;
1550 use lance_io::ReadBatchParams;
1551
1552 let arrow_field = ArrowField::new("data", DataType::Binary, false);
1555 let arrow_schema = ArrowSchema::new(vec![arrow_field]);
1556 let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();
1557
1558 let large_value = vec![42u8; 40 * 1024 * 1024];
1560 let array = arrow_array::BinaryArray::from(vec![
1561 Some(large_value.as_slice()),
1562 Some(b"small value"),
1563 ]);
1564 let batch = RecordBatch::try_new(Arc::new(arrow_schema), vec![Arc::new(array)]).unwrap();
1565
1566 let options = FileWriterOptions {
1568 max_page_bytes: Some(128 * 1024 * 1024),
1569 format_version: Some(LanceFileVersion::V2_1),
1570 ..Default::default()
1571 };
1572
1573 let fs = FsFixture::default();
1574 let path = fs.tmp_path;
1575
1576 let mut writer = FileWriter::try_new(
1577 fs.object_store.create(&path).await.unwrap(),
1578 lance_schema.clone(),
1579 options,
1580 )
1581 .unwrap();
1582
1583 writer.write_batch(&batch).await.unwrap();
1584 let num_rows = writer.finish().await.unwrap();
1585 assert_eq!(num_rows, 2);
1586
1587 let file_scheduler = fs
1589 .scheduler
1590 .open_file(&path, &CachedFileSize::unknown())
1591 .await
1592 .unwrap();
1593
1594 let reader_options = FileReaderOptions {
1596 read_chunk_size: 10 * 1024 * 1024, ..Default::default()
1598 };
1599
1600 let file_reader = FileReader::try_open(
1601 file_scheduler,
1602 None,
1603 Arc::<DecoderPlugins>::default(),
1604 &LanceCache::no_cache(),
1605 reader_options,
1606 )
1607 .await
1608 .unwrap();
1609
1610 let stream = file_reader
1612 .read_stream(
1613 ReadBatchParams::RangeFull,
1614 1024,
1615 10, FilterExpression::no_filter(),
1617 )
1618 .unwrap();
1619
1620 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1621 assert_eq!(batches.len(), 1);
1622
1623 let read_array = batches[0].column(0);
1625 let read_binary = read_array
1626 .as_any()
1627 .downcast_ref::<arrow_array::BinaryArray>()
1628 .unwrap();
1629
1630 assert_eq!(read_binary.len(), 2);
1631 assert_eq!(read_binary.value(0).len(), 40 * 1024 * 1024);
1632 assert_eq!(read_binary.value(1), b"small value");
1633
1634 assert!(read_binary.value(0).iter().all(|&b| b == 42u8));
1636 }
1637
1638 fn spill_config() -> (TempObjFile, Arc<ObjectStore>) {
1639 let spill_path = TempObjFile::default();
1640 (spill_path, Arc::new(ObjectStore::local()))
1641 }
1642
1643 fn make_batches(num_batches: i32, num_cols: usize, rows_per_batch: i32) -> Vec<RecordBatch> {
1644 let fields: Vec<_> = (0..num_cols)
1645 .map(|c| ArrowField::new(format!("c{c}"), DataType::Int32, false))
1646 .collect();
1647 let schema = Arc::new(ArrowSchema::new(fields));
1648 (0..num_batches)
1649 .map(|i| {
1650 let cols: Vec<Arc<dyn arrow_array::Array>> = (0..num_cols)
1651 .map(|c| {
1652 let start = (i * rows_per_batch + c as i32) * 100;
1653 Arc::new(Int32Array::from_iter_values(start..start + rows_per_batch))
1654 as Arc<dyn arrow_array::Array>
1655 })
1656 .collect();
1657 RecordBatch::try_new(schema.clone(), cols).unwrap()
1658 })
1659 .collect()
1660 }
1661
1662 async fn write_and_read_batches(
1663 batches: &[RecordBatch],
1664 spill: Option<(Arc<ObjectStore>, object_store::path::Path)>,
1665 ) -> Vec<RecordBatch> {
1666 let fs = FsFixture::default();
1667 let lance_schema = LanceSchema::try_from(batches[0].schema().as_ref()).unwrap();
1668 let writer = fs.object_store.create(&fs.tmp_path).await.unwrap();
1669 let mut file_writer =
1670 FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
1671 if let Some((store, path)) = spill {
1672 file_writer = file_writer.with_page_metadata_spill(store, path);
1673 }
1674 for batch in batches {
1675 file_writer.write_batch(batch).await.unwrap();
1676 }
1677 file_writer.add_schema_metadata("foo", "bar");
1678 file_writer.finish().await.unwrap();
1679
1680 crate::testing::read_lance_file(
1681 &fs,
1682 Arc::<DecoderPlugins>::default(),
1683 lance_encoding::decoder::FilterExpression::no_filter(),
1684 )
1685 .await
1686 }
1687
1688 #[rstest::rstest]
1689 #[case::multi_col(20, 2, 100)]
1690 #[case::many_batches(50, 2, 100)]
1691 #[tokio::test]
1692 async fn test_page_metadata_spill_roundtrip(
1693 #[case] num_batches: i32,
1694 #[case] num_cols: usize,
1695 #[case] rows_per_batch: i32,
1696 ) {
1697 let batches = make_batches(num_batches, num_cols, rows_per_batch);
1698 let baseline = write_and_read_batches(&batches, None).await;
1699 let (spill_path, spill_store) = spill_config();
1700 let spilled =
1701 write_and_read_batches(&batches, Some((spill_store, spill_path.as_ref().clone())))
1702 .await;
1703 assert_eq!(baseline, spilled);
1704 }
1705
1706 #[tokio::test]
1707 async fn test_page_metadata_spill_many_columns() {
1708 let batches = make_batches(10, 500, 100);
1710 let baseline = write_and_read_batches(&batches, None).await;
1711 let (spill_path, spill_store) = spill_config();
1712 let spilled =
1713 write_and_read_batches(&batches, Some((spill_store, spill_path.as_ref().clone())))
1714 .await;
1715 assert_eq!(baseline, spilled);
1716 }
1717}