1use core::panic;
5use std::collections::HashMap;
6use std::sync::atomic::AtomicBool;
7use std::sync::Arc;
8
9use arrow_array::RecordBatch;
10
11use arrow_data::ArrayData;
12use bytes::{BufMut, Bytes, BytesMut};
13use futures::stream::FuturesOrdered;
14use futures::StreamExt;
15use lance_core::datatypes::{Field, Schema as LanceSchema};
16use lance_core::utils::bit::pad_bytes;
17use lance_core::{Error, Result};
18use lance_encoding::decoder::PageEncoding;
19use lance_encoding::encoder::{
20 default_encoding_strategy, BatchEncoder, EncodeTask, EncodedBatch, EncodedPage,
21 EncodingOptions, FieldEncoder, FieldEncodingStrategy, OutOfLineBuffers,
22};
23use lance_encoding::repdef::RepDefBuilder;
24use lance_encoding::version::LanceFileVersion;
25use lance_io::object_store::ObjectStore;
26use lance_io::object_writer::ObjectWriter;
27use lance_io::traits::Writer;
28use log::{debug, warn};
29use object_store::path::Path;
30use prost::Message;
31use prost_types::Any;
32use snafu::location;
33use tokio::io::AsyncWriteExt;
34use tracing::instrument;
35
36use crate::datatypes::FieldsWithMeta;
37use crate::format::pb;
38use crate::format::pbfile;
39use crate::format::pbfile::DirectEncoding;
40use crate::format::MAGIC;
41
42pub(crate) const PAGE_BUFFER_ALIGNMENT: usize = 64;
44const PAD_BUFFER: [u8; PAGE_BUFFER_ALIGNMENT] = [72; PAGE_BUFFER_ALIGNMENT];
45const MAX_PAGE_BYTES: usize = 32 * 1024 * 1024;
51const ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES: &str = "LANCE_FILE_WRITER_MAX_PAGE_BYTES";
52
53#[derive(Debug, Clone, Default)]
54pub struct FileWriterOptions {
55 pub data_cache_bytes: Option<u64>,
70 pub max_page_bytes: Option<u64>,
76 pub keep_original_array: Option<bool>,
94 pub encoding_strategy: Option<Arc<dyn FieldEncodingStrategy>>,
95 pub format_version: Option<LanceFileVersion>,
101}
102
103pub struct FileWriter {
104 writer: ObjectWriter,
105 schema: Option<LanceSchema>,
106 column_writers: Vec<Box<dyn FieldEncoder>>,
107 column_metadata: Vec<pbfile::ColumnMetadata>,
108 field_id_to_column_indices: Vec<(u32, u32)>,
109 num_columns: u32,
110 rows_written: u64,
111 global_buffers: Vec<(u64, u64)>,
112 schema_metadata: HashMap<String, String>,
113 options: FileWriterOptions,
114}
115
116fn initial_column_metadata() -> pbfile::ColumnMetadata {
117 pbfile::ColumnMetadata {
118 pages: Vec::new(),
119 buffer_offsets: Vec::new(),
120 buffer_sizes: Vec::new(),
121 encoding: None,
122 }
123}
124
125static WARNED_ON_UNSTABLE_API: AtomicBool = AtomicBool::new(false);
126
127impl FileWriter {
128 pub fn try_new(
130 object_writer: ObjectWriter,
131 schema: LanceSchema,
132 options: FileWriterOptions,
133 ) -> Result<Self> {
134 let mut writer = Self::new_lazy(object_writer, options);
135 writer.initialize(schema)?;
136 Ok(writer)
137 }
138
139 pub fn new_lazy(object_writer: ObjectWriter, options: FileWriterOptions) -> Self {
144 if let Some(format_version) = options.format_version {
145 if format_version > LanceFileVersion::Stable
146 && WARNED_ON_UNSTABLE_API
147 .compare_exchange(
148 false,
149 true,
150 std::sync::atomic::Ordering::Relaxed,
151 std::sync::atomic::Ordering::Relaxed,
152 )
153 .is_ok()
154 {
155 warn!("You have requested an unstable format version. Files written with this format version may not be readable in the future! This is a development feature and should only be used for experimentation and never for production data.");
156 }
157 }
158 Self {
159 writer: object_writer,
160 schema: None,
161 column_writers: Vec::new(),
162 column_metadata: Vec::new(),
163 num_columns: 0,
164 rows_written: 0,
165 field_id_to_column_indices: Vec::new(),
166 global_buffers: Vec::new(),
167 schema_metadata: HashMap::new(),
168 options,
169 }
170 }
171
172 pub async fn create_file_with_batches(
176 store: &ObjectStore,
177 path: &Path,
178 schema: lance_core::datatypes::Schema,
179 batches: impl Iterator<Item = RecordBatch> + Send,
180 options: FileWriterOptions,
181 ) -> Result<usize> {
182 let writer = store.create(path).await?;
183 let mut writer = Self::try_new(writer, schema, options)?;
184 for batch in batches {
185 writer.write_batch(&batch).await?;
186 }
187 Ok(writer.finish().await? as usize)
188 }
189
190 async fn do_write_buffer(writer: &mut ObjectWriter, buf: &[u8]) -> Result<()> {
191 writer.write_all(buf).await?;
192 let pad_bytes = pad_bytes::<PAGE_BUFFER_ALIGNMENT>(buf.len());
193 writer.write_all(&PAD_BUFFER[..pad_bytes]).await?;
194 Ok(())
195 }
196
197 pub fn version(&self) -> LanceFileVersion {
199 self.options.format_version.unwrap_or_default()
200 }
201
202 async fn write_page(&mut self, encoded_page: EncodedPage) -> Result<()> {
203 let buffers = encoded_page.data;
204 let mut buffer_offsets = Vec::with_capacity(buffers.len());
205 let mut buffer_sizes = Vec::with_capacity(buffers.len());
206 for buffer in buffers {
207 buffer_offsets.push(self.writer.tell().await? as u64);
208 buffer_sizes.push(buffer.len() as u64);
209 Self::do_write_buffer(&mut self.writer, &buffer).await?;
210 }
211 let encoded_encoding = match encoded_page.description {
212 PageEncoding::Legacy(array_encoding) => Any::from_msg(&array_encoding)?.encode_to_vec(),
213 PageEncoding::Structural(page_layout) => Any::from_msg(&page_layout)?.encode_to_vec(),
214 };
215 let page = pbfile::column_metadata::Page {
216 buffer_offsets,
217 buffer_sizes,
218 encoding: Some(pbfile::Encoding {
219 location: Some(pbfile::encoding::Location::Direct(DirectEncoding {
220 encoding: encoded_encoding,
221 })),
222 }),
223 length: encoded_page.num_rows,
224 priority: encoded_page.row_number,
225 };
226 self.column_metadata[encoded_page.column_idx as usize]
227 .pages
228 .push(page);
229 Ok(())
230 }
231
232 #[instrument(skip_all, level = "debug")]
233 async fn write_pages(&mut self, mut encoding_tasks: FuturesOrdered<EncodeTask>) -> Result<()> {
234 while let Some(encoding_task) = encoding_tasks.next().await {
243 let encoded_page = encoding_task?;
244 self.write_page(encoded_page).await?;
245 }
246 self.writer.flush().await?;
251 Ok(())
252 }
253
254 pub async fn write_batches(
256 &mut self,
257 batches: impl Iterator<Item = &RecordBatch>,
258 ) -> Result<()> {
259 for batch in batches {
260 self.write_batch(batch).await?;
261 }
262 Ok(())
263 }
264
265 fn verify_field_nullability(arr: &ArrayData, field: &Field) -> Result<()> {
266 if !field.nullable && arr.null_count() > 0 {
267 return Err(Error::invalid_input(format!("The field `{}` contained null values even though the field is marked non-null in the schema", field.name), location!()));
268 }
269
270 for (child_field, child_arr) in field.children.iter().zip(arr.child_data()) {
271 Self::verify_field_nullability(child_arr, child_field)?;
272 }
273
274 Ok(())
275 }
276
277 fn verify_nullability_constraints(&self, batch: &RecordBatch) -> Result<()> {
278 for (col, field) in batch
279 .columns()
280 .iter()
281 .zip(self.schema.as_ref().unwrap().fields.iter())
282 {
283 Self::verify_field_nullability(&col.to_data(), field)?;
284 }
285 Ok(())
286 }
287
288 fn initialize(&mut self, mut schema: LanceSchema) -> Result<()> {
289 let cache_bytes_per_column = if let Some(data_cache_bytes) = self.options.data_cache_bytes {
290 data_cache_bytes / schema.fields.len() as u64
291 } else {
292 8 * 1024 * 1024
293 };
294
295 let max_page_bytes = self.options.max_page_bytes.unwrap_or_else(|| {
296 std::env::var(ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES)
297 .map(|s| {
298 s.parse::<u64>().unwrap_or_else(|e| {
299 warn!(
300 "Failed to parse {}: {}, using default",
301 ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, e
302 );
303 MAX_PAGE_BYTES as u64
304 })
305 })
306 .unwrap_or(MAX_PAGE_BYTES as u64)
307 });
308
309 schema.validate()?;
310
311 let keep_original_array = self.options.keep_original_array.unwrap_or(false);
312 let encoding_strategy = self.options.encoding_strategy.clone().unwrap_or_else(|| {
313 let version = self.version();
314 default_encoding_strategy(version).into()
315 });
316
317 let encoding_options = EncodingOptions {
318 cache_bytes_per_column,
319 max_page_bytes,
320 keep_original_array,
321 buffer_alignment: PAGE_BUFFER_ALIGNMENT as u64,
322 };
323 let encoder =
324 BatchEncoder::try_new(&schema, encoding_strategy.as_ref(), &encoding_options)?;
325 self.num_columns = encoder.num_columns();
326
327 self.column_writers = encoder.field_encoders;
328 self.column_metadata = vec![initial_column_metadata(); self.num_columns as usize];
329 self.field_id_to_column_indices = encoder.field_id_to_column_index;
330 self.schema_metadata
331 .extend(std::mem::take(&mut schema.metadata));
332 self.schema = Some(schema);
333 Ok(())
334 }
335
336 fn ensure_initialized(&mut self, batch: &RecordBatch) -> Result<&LanceSchema> {
337 if self.schema.is_none() {
338 let schema = LanceSchema::try_from(batch.schema().as_ref())?;
339 self.initialize(schema)?;
340 }
341 Ok(self.schema.as_ref().unwrap())
342 }
343
344 #[instrument(skip_all, level = "debug")]
345 fn encode_batch(
346 &mut self,
347 batch: &RecordBatch,
348 external_buffers: &mut OutOfLineBuffers,
349 ) -> Result<Vec<Vec<EncodeTask>>> {
350 self.schema
351 .as_ref()
352 .unwrap()
353 .fields
354 .iter()
355 .zip(self.column_writers.iter_mut())
356 .map(|(field, column_writer)| {
357 let array = batch
358 .column_by_name(&field.name)
359 .ok_or(Error::InvalidInput {
360 source: format!(
361 "Cannot write batch. The batch was missing the column `{}`",
362 field.name
363 )
364 .into(),
365 location: location!(),
366 })?;
367 let repdef = RepDefBuilder::default();
368 let num_rows = array.len() as u64;
369 column_writer.maybe_encode(
370 array.clone(),
371 external_buffers,
372 repdef,
373 self.rows_written,
374 num_rows,
375 )
376 })
377 .collect::<Result<Vec<_>>>()
378 }
379
380 pub async fn write_batch(&mut self, batch: &RecordBatch) -> Result<()> {
385 debug!(
386 "write_batch called with {} bytes of data",
387 batch.get_array_memory_size()
388 );
389 self.ensure_initialized(batch)?;
390 self.verify_nullability_constraints(batch)?;
391 let num_rows = batch.num_rows() as u64;
392 if num_rows == 0 {
393 return Ok(());
394 }
395 if num_rows > u32::MAX as u64 {
396 return Err(Error::InvalidInput {
397 source: "cannot write Lance files with more than 2^32 rows".into(),
398 location: location!(),
399 });
400 }
401 let mut external_buffers =
404 OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
405 let encoding_tasks = self.encode_batch(batch, &mut external_buffers)?;
406 for external_buffer in external_buffers.take_buffers() {
408 Self::do_write_buffer(&mut self.writer, &external_buffer).await?;
409 }
410
411 let encoding_tasks = encoding_tasks
412 .into_iter()
413 .flatten()
414 .collect::<FuturesOrdered<_>>();
415
416 self.rows_written = match self.rows_written.checked_add(batch.num_rows() as u64) {
417 Some(rows_written) => rows_written,
418 None => {
419 return Err(Error::InvalidInput { source: format!("cannot write batch with {} rows because {} rows have already been written and Lance files cannot contain more than 2^64 rows", num_rows, self.rows_written).into(), location: location!() });
420 }
421 };
422
423 self.write_pages(encoding_tasks).await?;
424
425 Ok(())
426 }
427
428 async fn write_column_metadata(
429 &mut self,
430 metadata: pbfile::ColumnMetadata,
431 ) -> Result<(u64, u64)> {
432 let metadata_bytes = metadata.encode_to_vec();
433 let position = self.writer.tell().await? as u64;
434 let len = metadata_bytes.len() as u64;
435 self.writer.write_all(&metadata_bytes).await?;
436 Ok((position, len))
437 }
438
439 async fn write_column_metadatas(&mut self) -> Result<Vec<(u64, u64)>> {
440 let mut metadatas = Vec::new();
441 std::mem::swap(&mut self.column_metadata, &mut metadatas);
442 let mut metadata_positions = Vec::with_capacity(metadatas.len());
443 for metadata in metadatas {
444 metadata_positions.push(self.write_column_metadata(metadata).await?);
445 }
446 Ok(metadata_positions)
447 }
448
449 fn make_file_descriptor(
450 schema: &lance_core::datatypes::Schema,
451 num_rows: u64,
452 ) -> Result<pb::FileDescriptor> {
453 let fields_with_meta = FieldsWithMeta::from(schema);
454 Ok(pb::FileDescriptor {
455 schema: Some(pb::Schema {
456 fields: fields_with_meta.fields.0,
457 metadata: fields_with_meta.metadata,
458 }),
459 length: num_rows,
460 })
461 }
462
463 async fn write_global_buffers(&mut self) -> Result<Vec<(u64, u64)>> {
464 let schema = self.schema.as_mut().ok_or(Error::invalid_input("No schema provided on writer open and no data provided. Schema is unknown and file cannot be created", location!()))?;
465 schema.metadata = std::mem::take(&mut self.schema_metadata);
466 let file_descriptor = Self::make_file_descriptor(schema, self.rows_written)?;
467 let file_descriptor_bytes = file_descriptor.encode_to_vec();
468 let file_descriptor_len = file_descriptor_bytes.len() as u64;
469 let file_descriptor_position = self.writer.tell().await? as u64;
470 self.writer.write_all(&file_descriptor_bytes).await?;
471 let mut gbo_table = Vec::with_capacity(1 + self.global_buffers.len());
472 gbo_table.push((file_descriptor_position, file_descriptor_len));
473 gbo_table.append(&mut self.global_buffers);
474 Ok(gbo_table)
475 }
476
477 pub fn add_schema_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
483 self.schema_metadata.insert(key.into(), value.into());
484 }
485
486 pub async fn add_global_buffer(&mut self, buffer: Bytes) -> Result<u32> {
492 let position = self.writer.tell().await? as u64;
493 let len = buffer.len() as u64;
494 Self::do_write_buffer(&mut self.writer, &buffer).await?;
495 self.global_buffers.push((position, len));
496 Ok(self.global_buffers.len() as u32)
497 }
498
499 async fn finish_writers(&mut self) -> Result<()> {
500 let mut col_idx = 0;
501 for mut writer in std::mem::take(&mut self.column_writers) {
502 let mut external_buffers =
503 OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
504 let columns = writer.finish(&mut external_buffers).await?;
505 for buffer in external_buffers.take_buffers() {
506 self.writer.write_all(&buffer).await?;
507 }
508 debug_assert_eq!(
509 columns.len(),
510 writer.num_columns() as usize,
511 "Expected {} columns from column at index {} and got {}",
512 writer.num_columns(),
513 col_idx,
514 columns.len()
515 );
516 for column in columns {
517 for page in column.final_pages {
518 self.write_page(page).await?;
519 }
520 let column_metadata = &mut self.column_metadata[col_idx];
521 let mut buffer_pos = self.writer.tell().await? as u64;
522 for buffer in column.column_buffers {
523 column_metadata.buffer_offsets.push(buffer_pos);
524 let mut size = 0;
525 Self::do_write_buffer(&mut self.writer, &buffer).await?;
526 size += buffer.len() as u64;
527 buffer_pos += size;
528 column_metadata.buffer_sizes.push(size);
529 }
530 let encoded_encoding = Any::from_msg(&column.encoding)?.encode_to_vec();
531 column_metadata.encoding = Some(pbfile::Encoding {
532 location: Some(pbfile::encoding::Location::Direct(pbfile::DirectEncoding {
533 encoding: encoded_encoding,
534 })),
535 });
536 col_idx += 1;
537 }
538 }
539 if col_idx != self.column_metadata.len() {
540 panic!(
541 "Column writers finished with {} columns but we expected {}",
542 col_idx,
543 self.column_metadata.len()
544 );
545 }
546 Ok(())
547 }
548
549 fn version_to_numbers(&self) -> (u16, u16) {
552 let version = self.options.format_version.unwrap_or_default();
553 match version.resolve() {
554 LanceFileVersion::V2_0 => (0, 3),
555 LanceFileVersion::V2_1 => (2, 1),
556 LanceFileVersion::V2_2 => (2, 2),
557 _ => panic!("Unsupported version: {}", version),
558 }
559 }
560
561 pub async fn finish(&mut self) -> Result<u64> {
569 let mut external_buffers =
571 OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
572 let encoding_tasks = self
573 .column_writers
574 .iter_mut()
575 .map(|writer| writer.flush(&mut external_buffers))
576 .collect::<Result<Vec<_>>>()?;
577 for external_buffer in external_buffers.take_buffers() {
578 Self::do_write_buffer(&mut self.writer, &external_buffer).await?;
579 }
580 let encoding_tasks = encoding_tasks
581 .into_iter()
582 .flatten()
583 .collect::<FuturesOrdered<_>>();
584 self.write_pages(encoding_tasks).await?;
585
586 self.finish_writers().await?;
587
588 let global_buffer_offsets = self.write_global_buffers().await?;
590 let num_global_buffers = global_buffer_offsets.len() as u32;
591
592 let column_metadata_start = self.writer.tell().await? as u64;
594 let metadata_positions = self.write_column_metadatas().await?;
595
596 let cmo_table_start = self.writer.tell().await? as u64;
598 for (meta_pos, meta_len) in metadata_positions {
599 self.writer.write_u64_le(meta_pos).await?;
600 self.writer.write_u64_le(meta_len).await?;
601 }
602
603 let gbo_table_start = self.writer.tell().await? as u64;
605 for (gbo_pos, gbo_len) in global_buffer_offsets {
606 self.writer.write_u64_le(gbo_pos).await?;
607 self.writer.write_u64_le(gbo_len).await?;
608 }
609
610 let (major, minor) = self.version_to_numbers();
611 self.writer.write_u64_le(column_metadata_start).await?;
613 self.writer.write_u64_le(cmo_table_start).await?;
614 self.writer.write_u64_le(gbo_table_start).await?;
615 self.writer.write_u32_le(num_global_buffers).await?;
616 self.writer.write_u32_le(self.num_columns).await?;
617 self.writer.write_u16_le(major).await?;
618 self.writer.write_u16_le(minor).await?;
619 self.writer.write_all(MAGIC).await?;
620
621 self.writer.shutdown().await?;
623 Ok(self.rows_written)
624 }
625
626 pub async fn abort(&mut self) {
627 self.writer.abort().await;
628 }
629
630 pub async fn tell(&mut self) -> Result<u64> {
631 Ok(self.writer.tell().await? as u64)
632 }
633
634 pub fn field_id_to_column_indices(&self) -> &[(u32, u32)] {
635 &self.field_id_to_column_indices
636 }
637}
638
639pub trait EncodedBatchWriteExt {
642 fn try_to_self_described_lance(&self, version: LanceFileVersion) -> Result<Bytes>;
644 fn try_to_mini_lance(&self, version: LanceFileVersion) -> Result<Bytes>;
648}
649
650fn concat_lance_footer(
655 batch: &EncodedBatch,
656 write_schema: bool,
657 version: LanceFileVersion,
658) -> Result<Bytes> {
659 let mut data = BytesMut::with_capacity(batch.data.len() + 1024 * 1024);
661 data.put(batch.data.clone());
662 let global_buffers = if write_schema {
664 let schema_start = data.len() as u64;
665 let lance_schema = lance_core::datatypes::Schema::try_from(batch.schema.as_ref())?;
666 let descriptor = FileWriter::make_file_descriptor(&lance_schema, batch.num_rows)?;
667 let descriptor_bytes = descriptor.encode_to_vec();
668 let descriptor_len = descriptor_bytes.len() as u64;
669 data.put(descriptor_bytes.as_slice());
670
671 vec![(schema_start, descriptor_len)]
672 } else {
673 vec![]
674 };
675 let col_metadata_start = data.len() as u64;
676
677 let mut col_metadata_positions = Vec::new();
678 for col in &batch.page_table {
680 let position = data.len() as u64;
681 let pages = col
682 .page_infos
683 .iter()
684 .map(|page_info| {
685 let encoded_encoding = match &page_info.encoding {
686 PageEncoding::Legacy(array_encoding) => {
687 Any::from_msg(array_encoding)?.encode_to_vec()
688 }
689 PageEncoding::Structural(page_layout) => {
690 Any::from_msg(page_layout)?.encode_to_vec()
691 }
692 };
693 let (buffer_offsets, buffer_sizes): (Vec<_>, Vec<_>) = page_info
694 .buffer_offsets_and_sizes
695 .as_ref()
696 .iter()
697 .cloned()
698 .unzip();
699 Ok(pbfile::column_metadata::Page {
700 buffer_offsets,
701 buffer_sizes,
702 encoding: Some(pbfile::Encoding {
703 location: Some(pbfile::encoding::Location::Direct(DirectEncoding {
704 encoding: encoded_encoding,
705 })),
706 }),
707 length: page_info.num_rows,
708 priority: page_info.priority,
709 })
710 })
711 .collect::<Result<Vec<_>>>()?;
712 let (buffer_offsets, buffer_sizes): (Vec<_>, Vec<_>) =
713 col.buffer_offsets_and_sizes.iter().cloned().unzip();
714 let encoded_col_encoding = Any::from_msg(&col.encoding)?.encode_to_vec();
715 let column = pbfile::ColumnMetadata {
716 pages,
717 buffer_offsets,
718 buffer_sizes,
719 encoding: Some(pbfile::Encoding {
720 location: Some(pbfile::encoding::Location::Direct(pbfile::DirectEncoding {
721 encoding: encoded_col_encoding,
722 })),
723 }),
724 };
725 let column_bytes = column.encode_to_vec();
726 col_metadata_positions.push((position, column_bytes.len() as u64));
727 data.put(column_bytes.as_slice());
728 }
729 let cmo_table_start = data.len() as u64;
731 for (meta_pos, meta_len) in col_metadata_positions {
732 data.put_u64_le(meta_pos);
733 data.put_u64_le(meta_len);
734 }
735 let gbo_table_start = data.len() as u64;
737 let num_global_buffers = global_buffers.len() as u32;
738 for (gbo_pos, gbo_len) in global_buffers {
739 data.put_u64_le(gbo_pos);
740 data.put_u64_le(gbo_len);
741 }
742
743 let (major, minor) = version.to_numbers();
744
745 data.put_u64_le(col_metadata_start);
747 data.put_u64_le(cmo_table_start);
748 data.put_u64_le(gbo_table_start);
749 data.put_u32_le(num_global_buffers);
750 data.put_u32_le(batch.page_table.len() as u32);
751 data.put_u16_le(major as u16);
752 data.put_u16_le(minor as u16);
753 data.put(MAGIC.as_slice());
754
755 Ok(data.freeze())
756}
757
758impl EncodedBatchWriteExt for EncodedBatch {
759 fn try_to_self_described_lance(&self, version: LanceFileVersion) -> Result<Bytes> {
760 concat_lance_footer(self, true, version)
761 }
762
763 fn try_to_mini_lance(&self, version: LanceFileVersion) -> Result<Bytes> {
764 concat_lance_footer(self, false, version)
765 }
766}
767
768#[cfg(test)]
769mod tests {
770 use std::collections::HashMap;
771 use std::sync::Arc;
772
773 use crate::v2::reader::{describe_encoding, FileReader, FileReaderOptions};
774 use crate::v2::testing::FsFixture;
775 use crate::v2::writer::{FileWriter, FileWriterOptions, ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES};
776 use arrow_array::builder::{Float32Builder, Int32Builder};
777 use arrow_array::{types::Float64Type, RecordBatchReader, StringArray};
778 use arrow_array::{Int32Array, RecordBatch, UInt64Array};
779 use arrow_schema::{DataType, Field, Field as ArrowField, Schema, Schema as ArrowSchema};
780 use lance_core::cache::LanceCache;
781 use lance_core::datatypes::Schema as LanceSchema;
782 use lance_datagen::{array, gen_batch, BatchCount, RowCount};
783 use lance_encoding::compression_config::{CompressionFieldParams, CompressionParams};
784 use lance_encoding::decoder::DecoderPlugins;
785 use lance_encoding::version::LanceFileVersion;
786 use lance_io::object_store::ObjectStore;
787 use lance_io::utils::CachedFileSize;
788 use object_store::path::Path;
789 use tempfile::tempdir;
790
791 #[tokio::test]
792 async fn test_basic_write() {
793 let tmp_dir = tempfile::tempdir().unwrap();
794 let tmp_path: String = tmp_dir.path().to_str().unwrap().to_owned();
795 let tmp_path = Path::parse(tmp_path).unwrap();
796 let tmp_path = tmp_path.child("some_file.lance");
797 let obj_store = Arc::new(ObjectStore::local());
798
799 let reader = gen_batch()
800 .col("score", array::rand::<Float64Type>())
801 .into_reader_rows(RowCount::from(1000), BatchCount::from(10));
802
803 let writer = obj_store.create(&tmp_path).await.unwrap();
804
805 let lance_schema =
806 lance_core::datatypes::Schema::try_from(reader.schema().as_ref()).unwrap();
807
808 let mut file_writer =
809 FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
810
811 for batch in reader {
812 file_writer.write_batch(&batch.unwrap()).await.unwrap();
813 }
814 file_writer.add_schema_metadata("foo", "bar");
815 file_writer.finish().await.unwrap();
816 }
818
819 #[tokio::test]
820 async fn test_write_empty() {
821 let tmp_dir = tempfile::tempdir().unwrap();
822 let tmp_path: String = tmp_dir.path().to_str().unwrap().to_owned();
823 let tmp_path = Path::parse(tmp_path).unwrap();
824 let tmp_path = tmp_path.child("some_file.lance");
825 let obj_store = Arc::new(ObjectStore::local());
826
827 let reader = gen_batch()
828 .col("score", array::rand::<Float64Type>())
829 .into_reader_rows(RowCount::from(0), BatchCount::from(0));
830
831 let writer = obj_store.create(&tmp_path).await.unwrap();
832
833 let lance_schema =
834 lance_core::datatypes::Schema::try_from(reader.schema().as_ref()).unwrap();
835
836 let mut file_writer =
837 FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
838
839 for batch in reader {
840 file_writer.write_batch(&batch.unwrap()).await.unwrap();
841 }
842 file_writer.add_schema_metadata("foo", "bar");
843 file_writer.finish().await.unwrap();
844 }
845
846 #[tokio::test]
847 async fn test_max_page_bytes_enforced() {
848 let arrow_field = Field::new("data", DataType::UInt64, false);
849 let arrow_schema = Schema::new(vec![arrow_field]);
850 let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();
851
852 let data: Vec<u64> = (0..1_000_000).collect();
854 let array = UInt64Array::from(data);
855 let batch =
856 RecordBatch::try_new(arrow_schema.clone().into(), vec![Arc::new(array)]).unwrap();
857
858 let options = FileWriterOptions {
859 max_page_bytes: Some(1024 * 1024), ..Default::default()
861 };
862
863 let tmp_dir = tempdir().unwrap();
864 let path = tmp_dir.path().join("test.lance");
865 let object_store = ObjectStore::local();
866 let mut writer = FileWriter::try_new(
867 object_store
868 .create(&Path::from(path.to_str().unwrap()))
869 .await
870 .unwrap(),
871 lance_schema,
872 options,
873 )
874 .unwrap();
875
876 writer.write_batch(&batch).await.unwrap();
877 writer.finish().await.unwrap();
878
879 let fs = FsFixture::default();
880 let file_scheduler = fs
881 .scheduler
882 .open_file(
883 &Path::from(path.to_str().unwrap()),
884 &CachedFileSize::unknown(),
885 )
886 .await
887 .unwrap();
888 let file_reader = FileReader::try_open(
889 file_scheduler,
890 None,
891 Arc::<DecoderPlugins>::default(),
892 &LanceCache::no_cache(),
893 FileReaderOptions::default(),
894 )
895 .await
896 .unwrap();
897
898 let column_meta = file_reader.metadata();
899
900 let mut total_page_num: u32 = 0;
901 for (col_idx, col_metadata) in column_meta.column_metadatas.iter().enumerate() {
902 assert!(
903 !col_metadata.pages.is_empty(),
904 "Column {} has no pages",
905 col_idx
906 );
907
908 for (page_idx, page) in col_metadata.pages.iter().enumerate() {
909 total_page_num += 1;
910 let total_size: u64 = page.buffer_sizes.iter().sum();
911 assert!(
912 total_size <= 1024 * 1024,
913 "Column {} Page {} size {} exceeds 1MB limit",
914 col_idx,
915 page_idx,
916 total_size
917 );
918 }
919 }
920
921 assert_eq!(total_page_num, 8)
922 }
923
924 #[tokio::test(flavor = "current_thread")]
925 async fn test_max_page_bytes_env_var() {
926 let arrow_field = Field::new("data", DataType::UInt64, false);
927 let arrow_schema = Schema::new(vec![arrow_field]);
928 let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();
929 let data: Vec<u64> = (0..500_000).collect();
931 let array = UInt64Array::from(data);
932 let batch =
933 RecordBatch::try_new(arrow_schema.clone().into(), vec![Arc::new(array)]).unwrap();
934
935 std::env::set_var(ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, "2097152");
937
938 let options = FileWriterOptions {
939 max_page_bytes: None, ..Default::default()
941 };
942
943 let tmp_dir = tempdir().unwrap();
944 let path = tmp_dir.path().join("test_env_var.lance");
945 let object_store = ObjectStore::local();
946 let mut writer = FileWriter::try_new(
947 object_store
948 .create(&Path::from(path.to_str().unwrap()))
949 .await
950 .unwrap(),
951 lance_schema.clone(),
952 options,
953 )
954 .unwrap();
955
956 writer.write_batch(&batch).await.unwrap();
957 writer.finish().await.unwrap();
958
959 let fs = FsFixture::default();
960 let file_scheduler = fs
961 .scheduler
962 .open_file(
963 &Path::from(path.to_str().unwrap()),
964 &CachedFileSize::unknown(),
965 )
966 .await
967 .unwrap();
968 let file_reader = FileReader::try_open(
969 file_scheduler,
970 None,
971 Arc::<DecoderPlugins>::default(),
972 &LanceCache::no_cache(),
973 FileReaderOptions::default(),
974 )
975 .await
976 .unwrap();
977
978 for col_metadata in file_reader.metadata().column_metadatas.iter() {
979 for page in col_metadata.pages.iter() {
980 let total_size: u64 = page.buffer_sizes.iter().sum();
981 assert!(
982 total_size <= 2 * 1024 * 1024,
983 "Page size {} exceeds 2MB limit",
984 total_size
985 );
986 }
987 }
988
989 std::env::set_var(ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, "");
990 }
991
992 #[tokio::test]
993 async fn test_compression_overrides_end_to_end() {
994 let arrow_schema = Arc::new(ArrowSchema::new(vec![
996 ArrowField::new("customer_id", DataType::Int32, false),
997 ArrowField::new("product_id", DataType::Int32, false),
998 ArrowField::new("quantity", DataType::Int32, false),
999 ArrowField::new("price", DataType::Float32, false),
1000 ArrowField::new("description", DataType::Utf8, false),
1001 ]));
1002
1003 let lance_schema = LanceSchema::try_from(arrow_schema.as_ref()).unwrap();
1004
1005 let mut customer_ids = Int32Builder::new();
1007 let mut product_ids = Int32Builder::new();
1008 let mut quantities = Int32Builder::new();
1009 let mut prices = Float32Builder::new();
1010 let mut descriptions = Vec::new();
1011
1012 for i in 0..10000 {
1019 customer_ids.append_value(i / 100);
1022
1023 product_ids.append_value(i / 2000);
1025
1026 quantities.append_value(if i % 10 == 0 { 5 } else { 1 });
1028
1029 prices.append_value(match i % 3 {
1031 0 => 9.99,
1032 1 => 19.99,
1033 _ => 29.99,
1034 });
1035
1036 descriptions.push(format!("Product {}", i / 2000));
1038 }
1039
1040 let batch = RecordBatch::try_new(
1041 arrow_schema.clone(),
1042 vec![
1043 Arc::new(customer_ids.finish()),
1044 Arc::new(product_ids.finish()),
1045 Arc::new(quantities.finish()),
1046 Arc::new(prices.finish()),
1047 Arc::new(StringArray::from(descriptions)),
1048 ],
1049 )
1050 .unwrap();
1051
1052 let mut params = CompressionParams::new();
1054
1055 params.columns.insert(
1057 "*_id".to_string(),
1058 CompressionFieldParams {
1059 rle_threshold: Some(0.5), compression: None, compression_level: None,
1062 bss: Some(lance_encoding::compression_config::BssMode::Off), },
1064 );
1065
1066 let encoding_strategy = lance_encoding::encoder::default_encoding_strategy_with_params(
1071 LanceFileVersion::V2_1,
1072 params,
1073 )
1074 .unwrap();
1075
1076 let options = FileWriterOptions {
1078 encoding_strategy: Some(Arc::from(encoding_strategy)),
1079 format_version: Some(LanceFileVersion::V2_1),
1080 max_page_bytes: Some(64 * 1024), ..Default::default()
1082 };
1083
1084 let tmp_dir = tempdir().unwrap();
1086 let path = tmp_dir.path().join("test_compression_overrides.lance");
1087 let object_store = ObjectStore::local();
1088
1089 let mut writer = FileWriter::try_new(
1090 object_store
1091 .create(&Path::from(path.to_str().unwrap()))
1092 .await
1093 .unwrap(),
1094 lance_schema.clone(),
1095 options,
1096 )
1097 .unwrap();
1098
1099 writer.write_batch(&batch).await.unwrap();
1100 writer.add_schema_metadata("compression_test", "configured_compression");
1101 writer.finish().await.unwrap();
1102
1103 let path_no_compression = tmp_dir.path().join("test_no_compression.lance");
1105 let default_options = FileWriterOptions {
1106 format_version: Some(LanceFileVersion::V2_1),
1107 max_page_bytes: Some(64 * 1024),
1108 ..Default::default()
1109 };
1110
1111 let mut writer_no_compression = FileWriter::try_new(
1112 object_store
1113 .create(&Path::from(path_no_compression.to_str().unwrap()))
1114 .await
1115 .unwrap(),
1116 lance_schema.clone(),
1117 default_options,
1118 )
1119 .unwrap();
1120
1121 writer_no_compression.write_batch(&batch).await.unwrap();
1122 writer_no_compression.finish().await.unwrap();
1123
1124 let fs = FsFixture::default();
1130 let file_scheduler = fs
1131 .scheduler
1132 .open_file(
1133 &Path::from(path.to_str().unwrap()),
1134 &CachedFileSize::unknown(),
1135 )
1136 .await
1137 .unwrap();
1138
1139 let file_reader = FileReader::try_open(
1140 file_scheduler,
1141 None,
1142 Arc::<DecoderPlugins>::default(),
1143 &LanceCache::no_cache(),
1144 FileReaderOptions::default(),
1145 )
1146 .await
1147 .unwrap();
1148
1149 let metadata = file_reader.metadata();
1151 assert_eq!(metadata.major_version, 2);
1152 assert_eq!(metadata.minor_version, 1);
1153
1154 let schema = file_reader.schema();
1155 assert_eq!(
1156 schema.metadata.get("compression_test"),
1157 Some(&"configured_compression".to_string())
1158 );
1159
1160 let column_metadatas = &metadata.column_metadatas;
1162
1163 assert!(!column_metadatas[0].pages.is_empty());
1165 let customer_id_encoding = describe_encoding(&column_metadatas[0].pages[0]);
1166 assert!(
1167 customer_id_encoding.contains("RLE") || customer_id_encoding.contains("Rle"),
1168 "customer_id column should use RLE encoding due to '*_id' pattern match, but got: {}",
1169 customer_id_encoding
1170 );
1171
1172 assert!(!column_metadatas[1].pages.is_empty());
1174 let product_id_encoding = describe_encoding(&column_metadatas[1].pages[0]);
1175 assert!(
1176 product_id_encoding.contains("RLE") || product_id_encoding.contains("Rle"),
1177 "product_id column should use RLE encoding due to '*_id' pattern match, but got: {}",
1178 product_id_encoding
1179 );
1180 }
1181
1182 #[tokio::test]
1183 async fn test_field_metadata_compression() {
1184 let mut metadata = HashMap::new();
1186 metadata.insert(
1187 lance_encoding::constants::COMPRESSION_META_KEY.to_string(),
1188 "zstd".to_string(),
1189 );
1190 metadata.insert(
1191 lance_encoding::constants::COMPRESSION_LEVEL_META_KEY.to_string(),
1192 "6".to_string(),
1193 );
1194
1195 let arrow_schema = Arc::new(ArrowSchema::new(vec![
1196 ArrowField::new("id", DataType::Int32, false),
1197 ArrowField::new("text", DataType::Utf8, false).with_metadata(metadata.clone()),
1198 ArrowField::new("data", DataType::Int32, false).with_metadata(HashMap::from([(
1199 lance_encoding::constants::COMPRESSION_META_KEY.to_string(),
1200 "none".to_string(),
1201 )])),
1202 ]));
1203
1204 let lance_schema = LanceSchema::try_from(arrow_schema.as_ref()).unwrap();
1205
1206 let id_array = Int32Array::from_iter_values(0..1000);
1208 let text_array = StringArray::from_iter_values(
1209 (0..1000).map(|i| format!("test string {} repeated text", i)),
1210 );
1211 let data_array = Int32Array::from_iter_values((0..1000).map(|i| i * 2));
1212
1213 let batch = RecordBatch::try_new(
1214 arrow_schema.clone(),
1215 vec![
1216 Arc::new(id_array),
1217 Arc::new(text_array),
1218 Arc::new(data_array),
1219 ],
1220 )
1221 .unwrap();
1222
1223 let tmp_dir = tempdir().unwrap();
1224 let path = tmp_dir.path().join("field_metadata_test.lance");
1225 let object_store = ObjectStore::local();
1226
1227 let params = CompressionParams::new();
1229 let encoding_strategy = lance_encoding::encoder::default_encoding_strategy_with_params(
1230 LanceFileVersion::V2_1,
1231 params,
1232 )
1233 .unwrap();
1234
1235 let options = FileWriterOptions {
1236 encoding_strategy: Some(Arc::from(encoding_strategy)),
1237 format_version: Some(LanceFileVersion::V2_1),
1238 ..Default::default()
1239 };
1240 let mut writer = FileWriter::try_new(
1241 object_store
1242 .create(&Path::from(path.to_str().unwrap()))
1243 .await
1244 .unwrap(),
1245 lance_schema.clone(),
1246 options,
1247 )
1248 .unwrap();
1249
1250 writer.write_batch(&batch).await.unwrap();
1251 writer.finish().await.unwrap();
1252
1253 let fs = FsFixture::default();
1255 let file_scheduler = fs
1256 .scheduler
1257 .open_file(
1258 &Path::from(path.to_str().unwrap()),
1259 &CachedFileSize::unknown(),
1260 )
1261 .await
1262 .unwrap();
1263 let file_reader = FileReader::try_open(
1264 file_scheduler,
1265 None,
1266 Arc::<DecoderPlugins>::default(),
1267 &LanceCache::no_cache(),
1268 FileReaderOptions::default(),
1269 )
1270 .await
1271 .unwrap();
1272
1273 let column_metadatas = &file_reader.metadata().column_metadatas;
1274
1275 let text_encoding = describe_encoding(&column_metadatas[1].pages[0]);
1277 assert!(
1279 text_encoding.contains("Zstd"),
1280 "text column should use zstd compression from field metadata, but got: {}",
1281 text_encoding
1282 );
1283
1284 let data_encoding = describe_encoding(&column_metadatas[2].pages[0]);
1286 assert!(
1288 data_encoding.contains("Flat") && data_encoding.contains("compression: None"),
1289 "data column should use no compression from field metadata, but got: {}",
1290 data_encoding
1291 );
1292 }
1293
1294 #[tokio::test]
1295 async fn test_field_metadata_rle_threshold() {
1296 let mut metadata = HashMap::new();
1298 metadata.insert(
1299 lance_encoding::constants::RLE_THRESHOLD_META_KEY.to_string(),
1300 "0.9".to_string(),
1301 );
1302 metadata.insert(
1304 lance_encoding::constants::COMPRESSION_META_KEY.to_string(),
1305 "lz4".to_string(),
1306 );
1307 metadata.insert(
1309 lance_encoding::constants::BSS_META_KEY.to_string(),
1310 "off".to_string(),
1311 );
1312
1313 let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
1314 "status",
1315 DataType::Int32,
1316 false,
1317 )
1318 .with_metadata(metadata)]));
1319
1320 let lance_schema = LanceSchema::try_from(arrow_schema.as_ref()).unwrap();
1321
1322 let status_array = Int32Array::from_iter_values(
1324 std::iter::repeat_n(200, 8000)
1325 .chain(std::iter::repeat_n(404, 1500))
1326 .chain(std::iter::repeat_n(500, 500)),
1327 );
1328
1329 let batch =
1330 RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(status_array)]).unwrap();
1331
1332 let tmp_dir = tempdir().unwrap();
1333 let path = tmp_dir.path().join("rle_threshold_test.lance");
1334 let object_store = ObjectStore::local();
1335
1336 let params = CompressionParams::new();
1338 let encoding_strategy = lance_encoding::encoder::default_encoding_strategy_with_params(
1339 LanceFileVersion::V2_1,
1340 params,
1341 )
1342 .unwrap();
1343
1344 let options = FileWriterOptions {
1345 encoding_strategy: Some(Arc::from(encoding_strategy)),
1346 format_version: Some(LanceFileVersion::V2_1),
1347 ..Default::default()
1348 };
1349 let mut writer = FileWriter::try_new(
1350 object_store
1351 .create(&Path::from(path.to_str().unwrap()))
1352 .await
1353 .unwrap(),
1354 lance_schema.clone(),
1355 options,
1356 )
1357 .unwrap();
1358
1359 writer.write_batch(&batch).await.unwrap();
1360 writer.finish().await.unwrap();
1361
1362 let fs = FsFixture::default();
1364 let file_scheduler = fs
1365 .scheduler
1366 .open_file(
1367 &Path::from(path.to_str().unwrap()),
1368 &CachedFileSize::unknown(),
1369 )
1370 .await
1371 .unwrap();
1372 let file_reader = FileReader::try_open(
1373 file_scheduler,
1374 None,
1375 Arc::<DecoderPlugins>::default(),
1376 &LanceCache::no_cache(),
1377 FileReaderOptions::default(),
1378 )
1379 .await
1380 .unwrap();
1381
1382 let column_metadatas = &file_reader.metadata().column_metadatas;
1383 let status_encoding = describe_encoding(&column_metadatas[0].pages[0]);
1384 assert!(
1385 status_encoding.contains("RLE") || status_encoding.contains("Rle"),
1386 "status column should use RLE encoding due to metadata threshold, but got: {}",
1387 status_encoding
1388 );
1389 }
1390
1391 #[tokio::test]
1392 async fn test_large_page_split_on_read() {
1393 use arrow_array::Array;
1394 use futures::TryStreamExt;
1395 use lance_encoding::decoder::FilterExpression;
1396 use lance_io::ReadBatchParams;
1397
1398 let arrow_field = ArrowField::new("data", DataType::Binary, false);
1401 let arrow_schema = ArrowSchema::new(vec![arrow_field]);
1402 let lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();
1403
1404 let large_value = vec![42u8; 40 * 1024 * 1024];
1406 let array = arrow_array::BinaryArray::from(vec![
1407 Some(large_value.as_slice()),
1408 Some(b"small value"),
1409 ]);
1410 let batch = RecordBatch::try_new(Arc::new(arrow_schema), vec![Arc::new(array)]).unwrap();
1411
1412 let options = FileWriterOptions {
1414 max_page_bytes: Some(128 * 1024 * 1024),
1415 format_version: Some(LanceFileVersion::V2_1),
1416 ..Default::default()
1417 };
1418
1419 let fs = FsFixture::default();
1420 let path = fs.tmp_path.child("large_page_test.lance");
1421
1422 let mut writer = FileWriter::try_new(
1423 fs.object_store.create(&path).await.unwrap(),
1424 lance_schema.clone(),
1425 options,
1426 )
1427 .unwrap();
1428
1429 writer.write_batch(&batch).await.unwrap();
1430 let num_rows = writer.finish().await.unwrap();
1431 assert_eq!(num_rows, 2);
1432
1433 let file_scheduler = fs
1435 .scheduler
1436 .open_file(&path, &CachedFileSize::unknown())
1437 .await
1438 .unwrap();
1439
1440 let reader_options = FileReaderOptions {
1442 read_chunk_size: 10 * 1024 * 1024, ..Default::default()
1444 };
1445
1446 let file_reader = FileReader::try_open(
1447 file_scheduler,
1448 None,
1449 Arc::<DecoderPlugins>::default(),
1450 &LanceCache::no_cache(),
1451 reader_options,
1452 )
1453 .await
1454 .unwrap();
1455
1456 let stream = file_reader
1458 .read_stream(
1459 ReadBatchParams::RangeFull,
1460 1024,
1461 10, FilterExpression::no_filter(),
1463 )
1464 .unwrap();
1465
1466 let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
1467 assert_eq!(batches.len(), 1);
1468
1469 let read_array = batches[0].column(0);
1471 let read_binary = read_array
1472 .as_any()
1473 .downcast_ref::<arrow_array::BinaryArray>()
1474 .unwrap();
1475
1476 assert_eq!(read_binary.len(), 2);
1477 assert_eq!(read_binary.value(0).len(), 40 * 1024 * 1024);
1478 assert_eq!(read_binary.value(1), b"small value");
1479
1480 assert!(read_binary.value(0).iter().all(|&b| b == 42u8));
1482 }
1483}