1use core::panic;
5use std::collections::HashMap;
6use std::sync::atomic::AtomicBool;
7use std::sync::Arc;
8
9use arrow_array::RecordBatch;
10
11use arrow_data::ArrayData;
12use bytes::{BufMut, Bytes, BytesMut};
13use futures::stream::FuturesOrdered;
14use futures::StreamExt;
15use lance_core::datatypes::{Field, Schema as LanceSchema};
16use lance_core::utils::bit::pad_bytes;
17use lance_core::{Error, Result};
18use lance_encoding::decoder::PageEncoding;
19use lance_encoding::encoder::{
20 default_encoding_strategy, BatchEncoder, EncodeTask, EncodedBatch, EncodedPage,
21 EncodingOptions, FieldEncoder, FieldEncodingStrategy, OutOfLineBuffers,
22};
23use lance_encoding::repdef::RepDefBuilder;
24use lance_encoding::version::LanceFileVersion;
25use lance_io::object_store::ObjectStore;
26use lance_io::object_writer::ObjectWriter;
27use lance_io::traits::Writer;
28use log::{debug, warn};
29use object_store::path::Path;
30use prost::Message;
31use prost_types::Any;
32use snafu::location;
33use tokio::io::AsyncWriteExt;
34use tracing::instrument;
35
36use crate::datatypes::FieldsWithMeta;
37use crate::format::pb;
38use crate::format::pbfile;
39use crate::format::pbfile::DirectEncoding;
40use crate::format::MAGIC;
41
42pub(crate) const PAGE_BUFFER_ALIGNMENT: usize = 64;
44const PAD_BUFFER: [u8; PAGE_BUFFER_ALIGNMENT] = [72; PAGE_BUFFER_ALIGNMENT];
45
46#[derive(Debug, Clone, Default)]
47pub struct FileWriterOptions {
48 pub data_cache_bytes: Option<u64>,
63 pub max_page_bytes: Option<u64>,
69 pub keep_original_array: Option<bool>,
87 pub encoding_strategy: Option<Arc<dyn FieldEncodingStrategy>>,
88 pub format_version: Option<LanceFileVersion>,
94}
95
96pub struct FileWriter {
97 writer: ObjectWriter,
98 schema: Option<LanceSchema>,
99 column_writers: Vec<Box<dyn FieldEncoder>>,
100 column_metadata: Vec<pbfile::ColumnMetadata>,
101 field_id_to_column_indices: Vec<(u32, u32)>,
102 num_columns: u32,
103 rows_written: u64,
104 global_buffers: Vec<(u64, u64)>,
105 schema_metadata: HashMap<String, String>,
106 options: FileWriterOptions,
107}
108
109fn initial_column_metadata() -> pbfile::ColumnMetadata {
110 pbfile::ColumnMetadata {
111 pages: Vec::new(),
112 buffer_offsets: Vec::new(),
113 buffer_sizes: Vec::new(),
114 encoding: None,
115 }
116}
117
118static WARNED_ON_UNSTABLE_API: AtomicBool = AtomicBool::new(false);
119
120impl FileWriter {
121 pub fn try_new(
123 object_writer: ObjectWriter,
124 schema: LanceSchema,
125 options: FileWriterOptions,
126 ) -> Result<Self> {
127 let mut writer = Self::new_lazy(object_writer, options);
128 writer.initialize(schema)?;
129 Ok(writer)
130 }
131
132 pub fn new_lazy(object_writer: ObjectWriter, options: FileWriterOptions) -> Self {
137 if let Some(format_version) = options.format_version {
138 if format_version > LanceFileVersion::Stable
139 && WARNED_ON_UNSTABLE_API
140 .compare_exchange(
141 false,
142 true,
143 std::sync::atomic::Ordering::Relaxed,
144 std::sync::atomic::Ordering::Relaxed,
145 )
146 .is_ok()
147 {
148 warn!("You have requested an unstable format version. Files written with this format version may not be readable in the future! This is a development feature and should only be used for experimentation and never for production data.");
149 }
150 }
151 Self {
152 writer: object_writer,
153 schema: None,
154 column_writers: Vec::new(),
155 column_metadata: Vec::new(),
156 num_columns: 0,
157 rows_written: 0,
158 field_id_to_column_indices: Vec::new(),
159 global_buffers: Vec::new(),
160 schema_metadata: HashMap::new(),
161 options,
162 }
163 }
164
165 pub async fn create_file_with_batches(
169 store: &ObjectStore,
170 path: &Path,
171 schema: lance_core::datatypes::Schema,
172 batches: impl Iterator<Item = RecordBatch> + Send,
173 options: FileWriterOptions,
174 ) -> Result<usize> {
175 let writer = store.create(path).await?;
176 let mut writer = Self::try_new(writer, schema, options)?;
177 for batch in batches {
178 writer.write_batch(&batch).await?;
179 }
180 Ok(writer.finish().await? as usize)
181 }
182
183 async fn do_write_buffer(writer: &mut ObjectWriter, buf: &[u8]) -> Result<()> {
184 writer.write_all(buf).await?;
185 let pad_bytes = pad_bytes::<PAGE_BUFFER_ALIGNMENT>(buf.len());
186 writer.write_all(&PAD_BUFFER[..pad_bytes]).await?;
187 Ok(())
188 }
189
190 pub fn version(&self) -> LanceFileVersion {
192 self.options.format_version.unwrap_or_default()
193 }
194
195 async fn write_page(&mut self, encoded_page: EncodedPage) -> Result<()> {
196 let buffers = encoded_page.data;
197 let mut buffer_offsets = Vec::with_capacity(buffers.len());
198 let mut buffer_sizes = Vec::with_capacity(buffers.len());
199 for buffer in buffers {
200 buffer_offsets.push(self.writer.tell().await? as u64);
201 buffer_sizes.push(buffer.len() as u64);
202 Self::do_write_buffer(&mut self.writer, &buffer).await?;
203 }
204 let encoded_encoding = match encoded_page.description {
205 PageEncoding::Legacy(array_encoding) => Any::from_msg(&array_encoding)?.encode_to_vec(),
206 PageEncoding::Structural(page_layout) => Any::from_msg(&page_layout)?.encode_to_vec(),
207 };
208 let page = pbfile::column_metadata::Page {
209 buffer_offsets,
210 buffer_sizes,
211 encoding: Some(pbfile::Encoding {
212 location: Some(pbfile::encoding::Location::Direct(DirectEncoding {
213 encoding: encoded_encoding,
214 })),
215 }),
216 length: encoded_page.num_rows,
217 priority: encoded_page.row_number,
218 };
219 self.column_metadata[encoded_page.column_idx as usize]
220 .pages
221 .push(page);
222 Ok(())
223 }
224
225 #[instrument(skip_all, level = "debug")]
226 async fn write_pages(&mut self, mut encoding_tasks: FuturesOrdered<EncodeTask>) -> Result<()> {
227 while let Some(encoding_task) = encoding_tasks.next().await {
236 let encoded_page = encoding_task?;
237 self.write_page(encoded_page).await?;
238 }
239 self.writer.flush().await?;
244 Ok(())
245 }
246
247 pub async fn write_batches(
249 &mut self,
250 batches: impl Iterator<Item = &RecordBatch>,
251 ) -> Result<()> {
252 for batch in batches {
253 self.write_batch(batch).await?;
254 }
255 Ok(())
256 }
257
258 fn verify_field_nullability(arr: &ArrayData, field: &Field) -> Result<()> {
259 if !field.nullable && arr.null_count() > 0 {
260 return Err(Error::invalid_input(format!("The field `{}` contained null values even though the field is marked non-null in the schema", field.name), location!()));
261 }
262
263 for (child_field, child_arr) in field.children.iter().zip(arr.child_data()) {
264 Self::verify_field_nullability(child_arr, child_field)?;
265 }
266
267 Ok(())
268 }
269
270 fn verify_nullability_constraints(&self, batch: &RecordBatch) -> Result<()> {
271 for (col, field) in batch
272 .columns()
273 .iter()
274 .zip(self.schema.as_ref().unwrap().fields.iter())
275 {
276 Self::verify_field_nullability(&col.to_data(), field)?;
277 }
278 Ok(())
279 }
280
281 fn initialize(&mut self, mut schema: LanceSchema) -> Result<()> {
282 let cache_bytes_per_column = if let Some(data_cache_bytes) = self.options.data_cache_bytes {
283 data_cache_bytes / schema.fields.len() as u64
284 } else {
285 8 * 1024 * 1024
286 };
287
288 let max_page_bytes = self.options.max_page_bytes.unwrap_or(32 * 1024 * 1024);
289
290 schema.validate()?;
291
292 let keep_original_array = self.options.keep_original_array.unwrap_or(false);
293 let encoding_strategy = self.options.encoding_strategy.clone().unwrap_or_else(|| {
294 let version = self.version();
295 default_encoding_strategy(version).into()
296 });
297
298 let encoding_options = EncodingOptions {
299 cache_bytes_per_column,
300 max_page_bytes,
301 keep_original_array,
302 buffer_alignment: PAGE_BUFFER_ALIGNMENT as u64,
303 };
304 let encoder =
305 BatchEncoder::try_new(&schema, encoding_strategy.as_ref(), &encoding_options)?;
306 self.num_columns = encoder.num_columns();
307
308 self.column_writers = encoder.field_encoders;
309 self.column_metadata = vec![initial_column_metadata(); self.num_columns as usize];
310 self.field_id_to_column_indices = encoder.field_id_to_column_index;
311 self.schema_metadata
312 .extend(std::mem::take(&mut schema.metadata));
313 self.schema = Some(schema);
314 Ok(())
315 }
316
317 fn ensure_initialized(&mut self, batch: &RecordBatch) -> Result<&LanceSchema> {
318 if self.schema.is_none() {
319 let schema = LanceSchema::try_from(batch.schema().as_ref())?;
320 self.initialize(schema)?;
321 }
322 Ok(self.schema.as_ref().unwrap())
323 }
324
325 #[instrument(skip_all, level = "debug")]
326 fn encode_batch(
327 &mut self,
328 batch: &RecordBatch,
329 external_buffers: &mut OutOfLineBuffers,
330 ) -> Result<Vec<Vec<EncodeTask>>> {
331 self.schema
332 .as_ref()
333 .unwrap()
334 .fields
335 .iter()
336 .zip(self.column_writers.iter_mut())
337 .map(|(field, column_writer)| {
338 let array = batch
339 .column_by_name(&field.name)
340 .ok_or(Error::InvalidInput {
341 source: format!(
342 "Cannot write batch. The batch was missing the column `{}`",
343 field.name
344 )
345 .into(),
346 location: location!(),
347 })?;
348 let repdef = RepDefBuilder::default();
349 let num_rows = array.len() as u64;
350 column_writer.maybe_encode(
351 array.clone(),
352 external_buffers,
353 repdef,
354 self.rows_written,
355 num_rows,
356 )
357 })
358 .collect::<Result<Vec<_>>>()
359 }
360
361 pub async fn write_batch(&mut self, batch: &RecordBatch) -> Result<()> {
366 debug!(
367 "write_batch called with {} bytes of data",
368 batch.get_array_memory_size()
369 );
370 self.ensure_initialized(batch)?;
371 self.verify_nullability_constraints(batch)?;
372 let num_rows = batch.num_rows() as u64;
373 if num_rows == 0 {
374 return Ok(());
375 }
376 if num_rows > u32::MAX as u64 {
377 return Err(Error::InvalidInput {
378 source: "cannot write Lance files with more than 2^32 rows".into(),
379 location: location!(),
380 });
381 }
382 let mut external_buffers =
385 OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
386 let encoding_tasks = self.encode_batch(batch, &mut external_buffers)?;
387 for external_buffer in external_buffers.take_buffers() {
389 Self::do_write_buffer(&mut self.writer, &external_buffer).await?;
390 }
391
392 let encoding_tasks = encoding_tasks
393 .into_iter()
394 .flatten()
395 .collect::<FuturesOrdered<_>>();
396
397 self.rows_written = match self.rows_written.checked_add(batch.num_rows() as u64) {
398 Some(rows_written) => rows_written,
399 None => {
400 return Err(Error::InvalidInput { source: format!("cannot write batch with {} rows because {} rows have already been written and Lance files cannot contain more than 2^64 rows", num_rows, self.rows_written).into(), location: location!() });
401 }
402 };
403
404 self.write_pages(encoding_tasks).await?;
405
406 Ok(())
407 }
408
409 async fn write_column_metadata(
410 &mut self,
411 metadata: pbfile::ColumnMetadata,
412 ) -> Result<(u64, u64)> {
413 let metadata_bytes = metadata.encode_to_vec();
414 let position = self.writer.tell().await? as u64;
415 let len = metadata_bytes.len() as u64;
416 self.writer.write_all(&metadata_bytes).await?;
417 Ok((position, len))
418 }
419
420 async fn write_column_metadatas(&mut self) -> Result<Vec<(u64, u64)>> {
421 let mut metadatas = Vec::new();
422 std::mem::swap(&mut self.column_metadata, &mut metadatas);
423 let mut metadata_positions = Vec::with_capacity(metadatas.len());
424 for metadata in metadatas {
425 metadata_positions.push(self.write_column_metadata(metadata).await?);
426 }
427 Ok(metadata_positions)
428 }
429
430 fn make_file_descriptor(
431 schema: &lance_core::datatypes::Schema,
432 num_rows: u64,
433 ) -> Result<pb::FileDescriptor> {
434 let fields_with_meta = FieldsWithMeta::from(schema);
435 Ok(pb::FileDescriptor {
436 schema: Some(pb::Schema {
437 fields: fields_with_meta.fields.0,
438 metadata: fields_with_meta.metadata,
439 }),
440 length: num_rows,
441 })
442 }
443
444 async fn write_global_buffers(&mut self) -> Result<Vec<(u64, u64)>> {
445 let schema = self.schema.as_mut().ok_or(Error::invalid_input("No schema provided on writer open and no data provided. Schema is unknown and file cannot be created", location!()))?;
446 schema.metadata = std::mem::take(&mut self.schema_metadata);
447 let file_descriptor = Self::make_file_descriptor(schema, self.rows_written)?;
448 let file_descriptor_bytes = file_descriptor.encode_to_vec();
449 let file_descriptor_len = file_descriptor_bytes.len() as u64;
450 let file_descriptor_position = self.writer.tell().await? as u64;
451 self.writer.write_all(&file_descriptor_bytes).await?;
452 let mut gbo_table = Vec::with_capacity(1 + self.global_buffers.len());
453 gbo_table.push((file_descriptor_position, file_descriptor_len));
454 gbo_table.append(&mut self.global_buffers);
455 Ok(gbo_table)
456 }
457
458 pub fn add_schema_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
464 self.schema_metadata.insert(key.into(), value.into());
465 }
466
467 pub async fn add_global_buffer(&mut self, buffer: Bytes) -> Result<u32> {
473 let position = self.writer.tell().await? as u64;
474 let len = buffer.len() as u64;
475 Self::do_write_buffer(&mut self.writer, &buffer).await?;
476 self.global_buffers.push((position, len));
477 Ok(self.global_buffers.len() as u32)
478 }
479
480 async fn finish_writers(&mut self) -> Result<()> {
481 let mut col_idx = 0;
482 for mut writer in std::mem::take(&mut self.column_writers) {
483 let mut external_buffers =
484 OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
485 let columns = writer.finish(&mut external_buffers).await?;
486 for buffer in external_buffers.take_buffers() {
487 self.writer.write_all(&buffer).await?;
488 }
489 debug_assert_eq!(
490 columns.len(),
491 writer.num_columns() as usize,
492 "Expected {} columns from column at index {} and got {}",
493 writer.num_columns(),
494 col_idx,
495 columns.len()
496 );
497 for column in columns {
498 for page in column.final_pages {
499 self.write_page(page).await?;
500 }
501 let column_metadata = &mut self.column_metadata[col_idx];
502 let mut buffer_pos = self.writer.tell().await? as u64;
503 for buffer in column.column_buffers {
504 column_metadata.buffer_offsets.push(buffer_pos);
505 let mut size = 0;
506 Self::do_write_buffer(&mut self.writer, &buffer).await?;
507 size += buffer.len() as u64;
508 buffer_pos += size;
509 column_metadata.buffer_sizes.push(size);
510 }
511 let encoded_encoding = Any::from_msg(&column.encoding)?.encode_to_vec();
512 column_metadata.encoding = Some(pbfile::Encoding {
513 location: Some(pbfile::encoding::Location::Direct(pbfile::DirectEncoding {
514 encoding: encoded_encoding,
515 })),
516 });
517 col_idx += 1;
518 }
519 }
520 if col_idx != self.column_metadata.len() {
521 panic!(
522 "Column writers finished with {} columns but we expected {}",
523 col_idx,
524 self.column_metadata.len()
525 );
526 }
527 Ok(())
528 }
529
530 fn version_to_numbers(&self) -> (u16, u16) {
533 let version = self.options.format_version.unwrap_or_default();
534 match version.resolve() {
535 LanceFileVersion::V2_0 => (0, 3),
536 LanceFileVersion::V2_1 => (2, 1),
537 _ => panic!("Unsupported version: {}", version),
538 }
539 }
540
541 pub async fn finish(&mut self) -> Result<u64> {
549 let mut external_buffers =
551 OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64);
552 let encoding_tasks = self
553 .column_writers
554 .iter_mut()
555 .map(|writer| writer.flush(&mut external_buffers))
556 .collect::<Result<Vec<_>>>()?;
557 for external_buffer in external_buffers.take_buffers() {
558 Self::do_write_buffer(&mut self.writer, &external_buffer).await?;
559 }
560 let encoding_tasks = encoding_tasks
561 .into_iter()
562 .flatten()
563 .collect::<FuturesOrdered<_>>();
564 self.write_pages(encoding_tasks).await?;
565
566 self.finish_writers().await?;
567
568 let global_buffer_offsets = self.write_global_buffers().await?;
570 let num_global_buffers = global_buffer_offsets.len() as u32;
571
572 let column_metadata_start = self.writer.tell().await? as u64;
574 let metadata_positions = self.write_column_metadatas().await?;
575
576 let cmo_table_start = self.writer.tell().await? as u64;
578 for (meta_pos, meta_len) in metadata_positions {
579 self.writer.write_u64_le(meta_pos).await?;
580 self.writer.write_u64_le(meta_len).await?;
581 }
582
583 let gbo_table_start = self.writer.tell().await? as u64;
585 for (gbo_pos, gbo_len) in global_buffer_offsets {
586 self.writer.write_u64_le(gbo_pos).await?;
587 self.writer.write_u64_le(gbo_len).await?;
588 }
589
590 let (major, minor) = self.version_to_numbers();
591 self.writer.write_u64_le(column_metadata_start).await?;
593 self.writer.write_u64_le(cmo_table_start).await?;
594 self.writer.write_u64_le(gbo_table_start).await?;
595 self.writer.write_u32_le(num_global_buffers).await?;
596 self.writer.write_u32_le(self.num_columns).await?;
597 self.writer.write_u16_le(major).await?;
598 self.writer.write_u16_le(minor).await?;
599 self.writer.write_all(MAGIC).await?;
600
601 self.writer.shutdown().await?;
603 Ok(self.rows_written)
604 }
605
606 pub async fn tell(&mut self) -> Result<u64> {
607 Ok(self.writer.tell().await? as u64)
608 }
609
610 pub fn field_id_to_column_indices(&self) -> &[(u32, u32)] {
611 &self.field_id_to_column_indices
612 }
613}
614
615pub trait EncodedBatchWriteExt {
618 fn try_to_self_described_lance(&self) -> Result<Bytes>;
620 fn try_to_mini_lance(&self) -> Result<Bytes>;
624}
625
626fn concat_lance_footer(batch: &EncodedBatch, write_schema: bool) -> Result<Bytes> {
631 let mut data = BytesMut::with_capacity(batch.data.len() + 1024 * 1024);
633 data.put(batch.data.clone());
634 let global_buffers = if write_schema {
636 let schema_start = data.len() as u64;
637 let lance_schema = lance_core::datatypes::Schema::try_from(batch.schema.as_ref())?;
638 let descriptor = FileWriter::make_file_descriptor(&lance_schema, batch.num_rows)?;
639 let descriptor_bytes = descriptor.encode_to_vec();
640 let descriptor_len = descriptor_bytes.len() as u64;
641 data.put(descriptor_bytes.as_slice());
642
643 vec![(schema_start, descriptor_len)]
644 } else {
645 vec![]
646 };
647 let col_metadata_start = data.len() as u64;
648
649 let mut col_metadata_positions = Vec::new();
650 for col in &batch.page_table {
652 let position = data.len() as u64;
653 let pages = col
654 .page_infos
655 .iter()
656 .map(|page_info| {
657 let encoded_encoding = match &page_info.encoding {
658 PageEncoding::Legacy(array_encoding) => {
659 Any::from_msg(array_encoding)?.encode_to_vec()
660 }
661 PageEncoding::Structural(page_layout) => {
662 Any::from_msg(page_layout)?.encode_to_vec()
663 }
664 };
665 let (buffer_offsets, buffer_sizes): (Vec<_>, Vec<_>) = page_info
666 .buffer_offsets_and_sizes
667 .as_ref()
668 .iter()
669 .cloned()
670 .unzip();
671 Ok(pbfile::column_metadata::Page {
672 buffer_offsets,
673 buffer_sizes,
674 encoding: Some(pbfile::Encoding {
675 location: Some(pbfile::encoding::Location::Direct(DirectEncoding {
676 encoding: encoded_encoding,
677 })),
678 }),
679 length: page_info.num_rows,
680 priority: page_info.priority,
681 })
682 })
683 .collect::<Result<Vec<_>>>()?;
684 let (buffer_offsets, buffer_sizes): (Vec<_>, Vec<_>) =
685 col.buffer_offsets_and_sizes.iter().cloned().unzip();
686 let encoded_col_encoding = Any::from_msg(&col.encoding)?.encode_to_vec();
687 let column = pbfile::ColumnMetadata {
688 pages,
689 buffer_offsets,
690 buffer_sizes,
691 encoding: Some(pbfile::Encoding {
692 location: Some(pbfile::encoding::Location::Direct(pbfile::DirectEncoding {
693 encoding: encoded_col_encoding,
694 })),
695 }),
696 };
697 let column_bytes = column.encode_to_vec();
698 col_metadata_positions.push((position, column_bytes.len() as u64));
699 data.put(column_bytes.as_slice());
700 }
701 let cmo_table_start = data.len() as u64;
703 for (meta_pos, meta_len) in col_metadata_positions {
704 data.put_u64_le(meta_pos);
705 data.put_u64_le(meta_len);
706 }
707 let gbo_table_start = data.len() as u64;
709 let num_global_buffers = global_buffers.len() as u32;
710 for (gbo_pos, gbo_len) in global_buffers {
711 data.put_u64_le(gbo_pos);
712 data.put_u64_le(gbo_len);
713 }
714
715 let (major, minor) = LanceFileVersion::default().to_numbers();
716
717 data.put_u64_le(col_metadata_start);
719 data.put_u64_le(cmo_table_start);
720 data.put_u64_le(gbo_table_start);
721 data.put_u32_le(num_global_buffers);
722 data.put_u32_le(batch.page_table.len() as u32);
723 data.put_u16_le(major as u16);
724 data.put_u16_le(minor as u16);
725 data.put(MAGIC.as_slice());
726
727 Ok(data.freeze())
728}
729
730impl EncodedBatchWriteExt for EncodedBatch {
731 fn try_to_self_described_lance(&self) -> Result<Bytes> {
732 concat_lance_footer(self, true)
733 }
734
735 fn try_to_mini_lance(&self) -> Result<Bytes> {
736 concat_lance_footer(self, false)
737 }
738}
739
740#[cfg(test)]
741mod tests {
742 use std::sync::Arc;
743
744 use arrow_array::{types::Float64Type, RecordBatchReader};
745 use lance_datagen::{array, gen, BatchCount, RowCount};
746 use lance_io::object_store::ObjectStore;
747 use object_store::path::Path;
748
749 use crate::v2::writer::{FileWriter, FileWriterOptions};
750
751 #[tokio::test]
752 async fn test_basic_write() {
753 let tmp_dir = tempfile::tempdir().unwrap();
754 let tmp_path: String = tmp_dir.path().to_str().unwrap().to_owned();
755 let tmp_path = Path::parse(tmp_path).unwrap();
756 let tmp_path = tmp_path.child("some_file.lance");
757 let obj_store = Arc::new(ObjectStore::local());
758
759 let reader = gen()
760 .col("score", array::rand::<Float64Type>())
761 .into_reader_rows(RowCount::from(1000), BatchCount::from(10));
762
763 let writer = obj_store.create(&tmp_path).await.unwrap();
764
765 let lance_schema =
766 lance_core::datatypes::Schema::try_from(reader.schema().as_ref()).unwrap();
767
768 let mut file_writer =
769 FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
770
771 for batch in reader {
772 file_writer.write_batch(&batch.unwrap()).await.unwrap();
773 }
774 file_writer.add_schema_metadata("foo", "bar");
775 file_writer.finish().await.unwrap();
776 }
778
779 #[tokio::test]
780 async fn test_write_empty() {
781 let tmp_dir = tempfile::tempdir().unwrap();
782 let tmp_path: String = tmp_dir.path().to_str().unwrap().to_owned();
783 let tmp_path = Path::parse(tmp_path).unwrap();
784 let tmp_path = tmp_path.child("some_file.lance");
785 let obj_store = Arc::new(ObjectStore::local());
786
787 let reader = gen()
788 .col("score", array::rand::<Float64Type>())
789 .into_reader_rows(RowCount::from(0), BatchCount::from(0));
790
791 let writer = obj_store.create(&tmp_path).await.unwrap();
792
793 let lance_schema =
794 lance_core::datatypes::Schema::try_from(reader.schema().as_ref()).unwrap();
795
796 let mut file_writer =
797 FileWriter::try_new(writer, lance_schema, FileWriterOptions::default()).unwrap();
798
799 for batch in reader {
800 file_writer.write_batch(&batch.unwrap()).await.unwrap();
801 }
802 file_writer.add_schema_metadata("foo", "bar");
803 file_writer.finish().await.unwrap();
804 }
805}