1use arrow_array::RecordBatch;
5use chrono::TimeDelta;
6use datafusion::physical_plan::SendableRecordBatchStream;
7use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
8use futures::{Stream, StreamExt, TryStreamExt};
9use lance_arrow::BLOB_META_KEY;
10use lance_core::datatypes::{
11 NullabilityComparison, OnMissing, OnTypeMismatch, SchemaCompareOptions,
12};
13use lance_core::error::LanceOptionExt;
14use lance_core::utils::tempfile::TempDir;
15use lance_core::utils::tracing::{AUDIT_MODE_CREATE, AUDIT_TYPE_DATA, TRACE_FILE_AUDIT};
16use lance_core::{Error, Result, datatypes::Schema};
17use lance_datafusion::chunker::{break_stream, chunk_stream};
18use lance_datafusion::spill::{SpillReceiver, SpillSender, create_replay_spill};
19use lance_datafusion::utils::StreamingWriteSource;
20use lance_file::previous::writer::{
21 FileWriter as PreviousFileWriter, ManifestProvider as PreviousManifestProvider,
22};
23use lance_file::version::LanceFileVersion;
24use lance_file::writer::{self as current_writer, FileWriterOptions};
25use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry};
26use lance_table::format::{BasePath, DataFile, Fragment};
27use lance_table::io::commit::{CommitHandler, commit_handler_from_url};
28use lance_table::io::manifest::ManifestDescribing;
29use object_store::path::Path;
30use std::collections::{HashMap, HashSet};
31use std::num::NonZero;
32use std::sync::Arc;
33use std::sync::atomic::AtomicUsize;
34use tracing::{info, instrument};
35
36use crate::Dataset;
37use crate::dataset::blob::{
38 BlobPreprocessor, ExternalBaseCandidate, ExternalBaseResolver, preprocess_blob_batches,
39};
40use crate::session::Session;
41
42use super::DATA_DIR;
43use super::fragment::write::generate_random_filename;
44use super::progress::{NoopFragmentWriteProgress, WriteFragmentProgress};
45use super::transaction::Transaction;
46use super::utils::SchemaAdapter;
47
48mod commit;
49pub mod delete;
50mod insert;
51pub mod merge_insert;
52mod retry;
53pub mod update;
54
55pub use commit::CommitBuilder;
56pub use delete::{DeleteBuilder, DeleteResult};
57pub use insert::InsertBuilder;
58
59#[derive(Debug, Clone)]
61pub enum WriteDestination<'a> {
62 Dataset(Arc<Dataset>),
64 Uri(&'a str),
66}
67
68impl WriteDestination<'_> {
69 pub fn dataset(&self) -> Option<&Dataset> {
70 match self {
71 WriteDestination::Dataset(dataset) => Some(dataset.as_ref()),
72 WriteDestination::Uri(_) => None,
73 }
74 }
75
76 pub fn uri(&self) -> String {
77 match self {
78 WriteDestination::Dataset(dataset) => dataset.uri.clone(),
79 WriteDestination::Uri(uri) => uri.to_string(),
80 }
81 }
82}
83
84impl From<Arc<Dataset>> for WriteDestination<'_> {
85 fn from(dataset: Arc<Dataset>) -> Self {
86 WriteDestination::Dataset(dataset)
87 }
88}
89
90impl<'a> From<&'a str> for WriteDestination<'a> {
91 fn from(uri: &'a str) -> Self {
92 WriteDestination::Uri(uri)
93 }
94}
95
96impl<'a> From<&'a String> for WriteDestination<'a> {
97 fn from(uri: &'a String) -> Self {
98 WriteDestination::Uri(uri.as_str())
99 }
100}
101
102impl<'a> From<&'a Path> for WriteDestination<'a> {
103 fn from(path: &'a Path) -> Self {
104 WriteDestination::Uri(path.as_ref())
105 }
106}
107
108#[derive(Debug, Clone, Copy)]
110pub enum WriteMode {
111 Create,
113 Append,
115 Overwrite,
117}
118
119impl TryFrom<&str> for WriteMode {
120 type Error = Error;
121
122 fn try_from(value: &str) -> Result<Self> {
123 match value.to_lowercase().as_str() {
124 "create" => Ok(Self::Create),
125 "append" => Ok(Self::Append),
126 "overwrite" => Ok(Self::Overwrite),
127 _ => Err(Error::invalid_input(format!(
128 "Invalid write mode: {}",
129 value
130 ))),
131 }
132 }
133}
134
135#[derive(Debug, Clone)]
137pub struct AutoCleanupParams {
138 pub interval: usize,
139 pub older_than: TimeDelta,
140}
141
142impl Default for AutoCleanupParams {
143 fn default() -> Self {
144 Self {
145 interval: 20,
146 older_than: TimeDelta::days(14),
147 }
148 }
149}
150
151#[derive(Debug, Clone)]
153pub struct WriteParams {
154 pub max_rows_per_file: usize,
156
157 pub max_rows_per_group: usize,
159
160 pub max_bytes_per_file: usize,
172
173 pub mode: WriteMode,
175
176 pub store_params: Option<ObjectStoreParams>,
177
178 pub progress: Arc<dyn WriteFragmentProgress>,
179
180 pub commit_handler: Option<Arc<dyn CommitHandler>>,
188
189 pub data_storage_version: Option<LanceFileVersion>,
196
197 pub enable_stable_row_ids: bool,
202
203 pub enable_v2_manifest_paths: bool,
209
210 pub session: Option<Arc<Session>>,
211
212 pub auto_cleanup: Option<AutoCleanupParams>,
219
220 pub skip_auto_cleanup: bool,
225
226 pub transaction_properties: Option<Arc<HashMap<String, String>>>,
230
231 pub initial_bases: Option<Vec<BasePath>>,
236
237 pub target_bases: Option<Vec<u32>>,
244
245 pub target_base_names_or_paths: Option<Vec<String>>,
249
250 pub allow_external_blob_outside_bases: bool,
253}
254
255impl Default for WriteParams {
256 fn default() -> Self {
257 Self {
258 max_rows_per_file: 1024 * 1024, max_rows_per_group: 1024,
260 max_bytes_per_file: 90 * 1024 * 1024 * 1024, mode: WriteMode::Create,
264 store_params: None,
265 progress: Arc::new(NoopFragmentWriteProgress::new()),
266 commit_handler: None,
267 data_storage_version: None,
268 enable_stable_row_ids: false,
269 enable_v2_manifest_paths: true,
270 session: None,
271 auto_cleanup: Some(AutoCleanupParams::default()),
272 skip_auto_cleanup: false,
273 transaction_properties: None,
274 initial_bases: None,
275 target_bases: None,
276 target_base_names_or_paths: None,
277 allow_external_blob_outside_bases: false,
278 }
279 }
280}
281
282impl WriteParams {
283 pub fn with_storage_version(version: LanceFileVersion) -> Self {
286 Self {
287 data_storage_version: Some(version),
288 ..Default::default()
289 }
290 }
291
292 pub fn storage_version_or_default(&self) -> LanceFileVersion {
293 self.data_storage_version.unwrap_or_default()
294 }
295
296 pub fn store_registry(&self) -> Arc<ObjectStoreRegistry> {
297 self.session
298 .as_ref()
299 .map(|s| s.store_registry())
300 .unwrap_or_default()
301 }
302
303 pub fn with_transaction_properties(self, properties: HashMap<String, String>) -> Self {
305 Self {
306 transaction_properties: Some(Arc::new(properties)),
307 ..self
308 }
309 }
310
311 pub fn with_initial_bases(self, bases: Vec<BasePath>) -> Self {
317 Self {
318 initial_bases: Some(bases),
319 ..self
320 }
321 }
322
323 pub fn with_target_bases(self, base_ids: Vec<u32>) -> Self {
330 Self {
331 target_bases: Some(base_ids),
332 ..self
333 }
334 }
335
336 pub fn with_target_base_names_or_paths(self, references: Vec<String>) -> Self {
351 Self {
352 target_base_names_or_paths: Some(references),
353 ..self
354 }
355 }
356
357 pub fn with_allow_external_blob_outside_bases(self, allow: bool) -> Self {
359 Self {
360 allow_external_blob_outside_bases: allow,
361 ..self
362 }
363 }
364}
365
366#[deprecated(
372 since = "0.20.0",
373 note = "Use [`InsertBuilder::write_uncommitted_stream`] instead"
374)]
375pub async fn write_fragments(
376 dest: impl Into<WriteDestination<'_>>,
377 data: impl StreamingWriteSource,
378 params: WriteParams,
379) -> Result<Transaction> {
380 InsertBuilder::new(dest.into())
381 .with_params(¶ms)
382 .execute_uncommitted_stream(data)
383 .await
384}
385
386#[allow(clippy::too_many_arguments)]
387pub async fn do_write_fragments(
388 dataset: Option<&Dataset>,
389 object_store: Arc<ObjectStore>,
390 base_dir: &Path,
391 schema: &Schema,
392 data: SendableRecordBatchStream,
393 params: WriteParams,
394 storage_version: LanceFileVersion,
395 target_bases_info: Option<Vec<TargetBaseInfo>>,
396) -> Result<Vec<Fragment>> {
397 let adapter = SchemaAdapter::new(data.schema());
398 let data = adapter.to_physical_stream(data);
399
400 let mut buffered_reader = if storage_version == LanceFileVersion::Legacy {
401 chunk_stream(data, params.max_rows_per_group)
403 } else {
404 break_stream(data, params.max_rows_per_file)
407 .map_ok(|batch| vec![batch])
408 .boxed()
409 };
410
411 let external_base_resolver = if storage_version >= LanceFileVersion::V2_2
412 && schema.fields.iter().any(|field| field.is_blob_v2())
413 {
414 Some(Arc::new(
415 build_external_base_resolver(dataset, ¶ms).await?,
416 ))
417 } else {
418 None
419 };
420
421 let writer_generator = WriterGenerator::new(
422 object_store,
423 base_dir,
424 schema,
425 storage_version,
426 target_bases_info,
427 external_base_resolver,
428 params.allow_external_blob_outside_bases,
429 );
430 let mut writer: Option<Box<dyn GenericWriter>> = None;
431 let mut num_rows_in_current_file = 0;
432 let mut fragments = Vec::new();
433 while let Some(batch_chunk) = buffered_reader.next().await {
434 let batch_chunk = batch_chunk?;
435
436 if writer.is_none() {
437 let (new_writer, new_fragment) = writer_generator.new_writer().await?;
438 params.progress.begin(&new_fragment).await?;
439 writer = Some(new_writer);
440 fragments.push(new_fragment);
441 }
442
443 writer.as_mut().unwrap().write(&batch_chunk).await?;
444 for batch in batch_chunk {
445 num_rows_in_current_file += batch.num_rows() as u32;
446 }
447
448 if num_rows_in_current_file >= params.max_rows_per_file as u32
449 || writer.as_mut().unwrap().tell().await? >= params.max_bytes_per_file as u64
450 {
451 let (num_rows, data_file) = writer.take().unwrap().finish().await?;
452 info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_DATA, path = &data_file.path);
453 debug_assert_eq!(num_rows, num_rows_in_current_file);
454 params.progress.complete(fragments.last().unwrap()).await?;
455 let last_fragment = fragments.last_mut().unwrap();
456 last_fragment.physical_rows = Some(num_rows as usize);
457 last_fragment.files.push(data_file);
458 num_rows_in_current_file = 0;
459 }
460 }
461
462 if let Some(mut writer) = writer.take() {
464 let (num_rows, data_file) = writer.finish().await?;
465 info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_DATA, path = &data_file.path);
466 let last_fragment = fragments.last_mut().unwrap();
467 last_fragment.physical_rows = Some(num_rows as usize);
468 last_fragment.files.push(data_file);
469 }
470
471 Ok(fragments)
472}
473
474pub async fn validate_and_resolve_target_bases(
475 params: &mut WriteParams,
476 existing_base_paths: Option<&HashMap<u32, BasePath>>,
477) -> Result<Option<Vec<TargetBaseInfo>>> {
478 if !matches!(params.mode, WriteMode::Create) && params.initial_bases.is_some() {
480 return Err(Error::invalid_input(format!(
481 "Cannot register new bases in {:?} mode. Only CREATE mode can register new bases.",
482 params.mode
483 )));
484 }
485
486 if params.target_base_names_or_paths.is_some() && params.target_bases.is_some() {
487 return Err(Error::invalid_input(
488 "Cannot specify both target_base_names_or_paths and target_bases. Use one or the other.",
489 ));
490 }
491
492 let mut all_bases: HashMap<u32, BasePath> = existing_base_paths.cloned().unwrap_or_default();
494 if let Some(initial_bases) = &mut params.initial_bases {
495 let mut next_id = all_bases.keys().max().map(|&id| id + 1).unwrap_or(1);
496
497 for base_path in initial_bases.iter_mut() {
498 if base_path.id == 0 {
499 base_path.id = next_id;
500 next_id += 1;
501 }
502 all_bases.insert(base_path.id, base_path.clone());
503 }
504 }
505
506 let target_base_ids = if let Some(ref names_or_paths) = params.target_base_names_or_paths {
508 let mut resolved_ids = Vec::new();
509 for reference in names_or_paths {
510 let ref_str = reference.as_str();
511 let id = all_bases
512 .iter()
513 .find(|(_, base)| {
514 base.name.as_ref().map(|n| n == ref_str).unwrap_or(false)
515 || base.path == ref_str
516 })
517 .map(|(&id, _)| id)
518 .ok_or_else(|| {
519 Error::invalid_input(format!(
520 "Base reference '{}' not found in available bases",
521 ref_str
522 ))
523 })?;
524
525 resolved_ids.push(id);
526 }
527 Some(resolved_ids)
528 } else {
529 params.target_bases.clone()
530 };
531
532 let store_registry = params
534 .session
535 .as_ref()
536 .map(|s| s.store_registry())
537 .unwrap_or_default();
538
539 if let Some(target_bases) = &target_base_ids {
540 let store_params = params.store_params.clone().unwrap_or_default();
541 let mut bases_info = Vec::new();
542
543 for &target_base_id in target_bases {
544 let base_path = all_bases.get(&target_base_id).ok_or_else(|| {
545 Error::invalid_input(format!(
546 "Target base ID {} not found in available bases",
547 target_base_id
548 ))
549 })?;
550
551 let (target_object_store, extracted_path) = ObjectStore::from_uri_and_params(
552 store_registry.clone(),
553 &base_path.path,
554 &store_params,
555 )
556 .await?;
557
558 bases_info.push(TargetBaseInfo {
559 base_id: target_base_id,
560 object_store: target_object_store,
561 base_dir: extracted_path,
562 is_dataset_root: base_path.is_dataset_root,
563 });
564 }
565
566 Ok(Some(bases_info))
567 } else {
568 Ok(None)
569 }
570}
571
572fn append_external_base_candidate(
573 base_path: &BasePath,
574 store_prefix: String,
575 extracted_path: Path,
576 candidates: &mut Vec<ExternalBaseCandidate>,
577 seen_base_ids: &mut HashSet<u32>,
578) {
579 if base_path.is_dataset_root {
580 return;
581 }
582 if seen_base_ids.insert(base_path.id) {
583 candidates.push(ExternalBaseCandidate {
584 base_id: base_path.id,
585 store_prefix,
586 base_path: extracted_path,
587 });
588 }
589}
590
591async fn append_external_initial_bases(
592 initial_bases: Option<&Vec<BasePath>>,
593 store_registry: Arc<ObjectStoreRegistry>,
594 store_params: &ObjectStoreParams,
595 candidates: &mut Vec<ExternalBaseCandidate>,
596 seen_base_ids: &mut HashSet<u32>,
597) -> Result<()> {
598 if let Some(initial_bases) = initial_bases {
599 for base_path in initial_bases {
600 let (store, extracted_path) = ObjectStore::from_uri_and_params(
601 store_registry.clone(),
602 &base_path.path,
603 store_params,
604 )
605 .await?;
606 append_external_base_candidate(
607 base_path,
608 store.store_prefix.clone(),
609 extracted_path,
610 candidates,
611 seen_base_ids,
612 );
613 }
614 }
615 Ok(())
616}
617
618async fn build_external_base_resolver(
619 dataset: Option<&Dataset>,
620 params: &WriteParams,
621) -> Result<ExternalBaseResolver> {
622 let store_registry = dataset
623 .map(|ds| ds.session.store_registry())
624 .unwrap_or_else(|| params.store_registry());
625 let store_params = params.store_params.clone().unwrap_or_default();
626
627 let mut seen_base_ids = HashSet::new();
628 let mut candidates = vec![];
629
630 if let Some(dataset) = dataset {
631 for base_path in dataset.manifest.base_paths.values() {
632 let (store, extracted_path) = ObjectStore::from_uri_and_params(
633 store_registry.clone(),
634 &base_path.path,
635 &store_params,
636 )
637 .await?;
638 append_external_base_candidate(
639 base_path,
640 store.store_prefix.clone(),
641 extracted_path,
642 &mut candidates,
643 &mut seen_base_ids,
644 );
645 }
646 }
647
648 append_external_initial_bases(
649 params.initial_bases.as_ref(),
650 store_registry.clone(),
651 &store_params,
652 &mut candidates,
653 &mut seen_base_ids,
654 )
655 .await?;
656
657 Ok(ExternalBaseResolver::new(
658 candidates,
659 store_registry,
660 store_params,
661 ))
662}
663
664#[instrument(level = "debug", skip_all)]
674pub async fn write_fragments_internal(
675 dataset: Option<&Dataset>,
676 object_store: Arc<ObjectStore>,
677 base_dir: &Path,
678 schema: Schema,
679 data: SendableRecordBatchStream,
680 params: WriteParams,
681 target_bases_info: Option<Vec<TargetBaseInfo>>,
682) -> Result<(Vec<Fragment>, Schema)> {
683 let mut params = params;
684 let adapter = SchemaAdapter::new(data.schema());
685
686 let (data, converted_schema) = if adapter.requires_physical_conversion() {
687 let data = adapter.to_physical_stream(data);
688 let arrow_schema = data.schema();
690 let converted_schema = Schema::try_from(arrow_schema.as_ref())?;
691 (data, converted_schema)
692 } else {
693 (data, schema)
695 };
696
697 params.max_rows_per_group = std::cmp::min(params.max_rows_per_group, params.max_rows_per_file);
699
700 let (schema, storage_version) = if let Some(dataset) = dataset {
701 match params.mode {
702 WriteMode::Append | WriteMode::Create => {
703 converted_schema.check_compatible(
705 dataset.schema(),
706 &SchemaCompareOptions {
707 compare_nullability: NullabilityComparison::Ignore,
710 allow_missing_if_nullable: true,
711 ignore_field_order: true,
712 compare_dictionary: dataset.is_legacy_storage(),
713 ..Default::default()
714 },
715 )?;
716 let write_schema = dataset.schema().project_by_schema(
717 &converted_schema,
718 OnMissing::Error,
719 OnTypeMismatch::Error,
720 )?;
721 let data_storage_version = dataset
723 .manifest()
724 .data_storage_format
725 .lance_file_version()?;
726 (write_schema, data_storage_version)
727 }
728 WriteMode::Overwrite => {
729 let data_storage_version = params.data_storage_version.unwrap_or(
733 dataset
734 .manifest()
735 .data_storage_format
736 .lance_file_version()?,
737 );
738 (converted_schema, data_storage_version)
739 }
740 }
741 } else {
742 (converted_schema, params.storage_version_or_default())
745 };
746
747 if storage_version < LanceFileVersion::V2_2 && schema.fields.iter().any(|f| f.is_blob_v2()) {
748 return Err(Error::invalid_input(format!(
749 "Blob v2 requires file version >= 2.2 (got {:?})",
750 storage_version
751 )));
752 }
753
754 if storage_version >= LanceFileVersion::V2_2
755 && schema
756 .fields
757 .iter()
758 .any(|f| f.metadata.contains_key(BLOB_META_KEY))
759 {
760 return Err(Error::invalid_input(format!(
761 "Legacy blob columns (field metadata key {BLOB_META_KEY:?}) are not supported for file version >= 2.2. Use the blob v2 extension type (ARROW:extension:name = \"lance.blob.v2\") and the new blob APIs (e.g. lance::blob::blob_field / lance::blob::BlobArrayBuilder)."
762 )));
763 }
764
765 let fragments = do_write_fragments(
766 dataset,
767 object_store,
768 base_dir,
769 &schema,
770 data,
771 params,
772 storage_version,
773 target_bases_info,
774 )
775 .await?;
776
777 Ok((fragments, schema))
778}
779
780#[async_trait::async_trait]
781pub trait GenericWriter: Send {
782 async fn write(&mut self, batches: &[RecordBatch]) -> Result<()>;
784 async fn tell(&mut self) -> Result<u64>;
789 async fn finish(&mut self) -> Result<(u32, DataFile)>;
791}
792
793struct V1WriterAdapter<M>
794where
795 M: PreviousManifestProvider + Send + Sync,
796{
797 writer: PreviousFileWriter<M>,
798 path: String,
799 base_id: Option<u32>,
800}
801
802#[async_trait::async_trait]
803impl<M> GenericWriter for V1WriterAdapter<M>
804where
805 M: PreviousManifestProvider + Send + Sync,
806{
807 async fn write(&mut self, batches: &[RecordBatch]) -> Result<()> {
808 self.writer.write(batches).await
809 }
810 async fn tell(&mut self) -> Result<u64> {
811 Ok(self.writer.tell().await? as u64)
812 }
813 async fn finish(&mut self) -> Result<(u32, DataFile)> {
814 let size_bytes = self.writer.tell().await?;
815 Ok((
816 self.writer.finish().await? as u32,
817 DataFile::new_legacy(
818 self.path.clone(),
819 self.writer.schema(),
820 NonZero::new(size_bytes as u64),
821 self.base_id,
822 ),
823 ))
824 }
825}
826
827struct V2WriterAdapter {
828 writer: current_writer::FileWriter,
829 path: String,
830 base_id: Option<u32>,
831 preprocessor: Option<BlobPreprocessor>,
832}
833
834#[async_trait::async_trait]
835impl GenericWriter for V2WriterAdapter {
836 async fn write(&mut self, batches: &[RecordBatch]) -> Result<()> {
837 if let Some(pre) = self.preprocessor.as_mut() {
838 let processed = preprocess_blob_batches(batches, pre).await?;
839 for batch in processed {
840 self.writer.write_batch(&batch).await?;
841 }
842 } else {
843 for batch in batches {
844 self.writer.write_batch(batch).await?;
845 }
846 }
847 Ok(())
848 }
849 async fn tell(&mut self) -> Result<u64> {
850 Ok(self.writer.tell().await?)
851 }
852 async fn finish(&mut self) -> Result<(u32, DataFile)> {
853 if let Some(pre) = self.preprocessor.as_mut() {
854 pre.finish().await?;
855 }
856 let field_ids = self
857 .writer
858 .field_id_to_column_indices()
859 .iter()
860 .map(|(field_id, _)| *field_id as i32)
861 .collect::<Vec<_>>();
862 let column_indices = self
863 .writer
864 .field_id_to_column_indices()
865 .iter()
866 .map(|(_, column_index)| *column_index as i32)
867 .collect::<Vec<_>>();
868 let (major, minor) = self.writer.version().to_numbers();
869 let num_rows = self.writer.finish().await? as u32;
870 let data_file = DataFile::new(
871 std::mem::take(&mut self.path),
872 field_ids,
873 column_indices,
874 major,
875 minor,
876 NonZero::new(self.writer.tell().await?),
877 self.base_id,
878 );
879 Ok((num_rows, data_file))
880 }
881}
882
883pub async fn open_writer(
884 object_store: &ObjectStore,
885 schema: &Schema,
886 base_dir: &Path,
887 storage_version: LanceFileVersion,
888) -> Result<Box<dyn GenericWriter>> {
889 open_writer_with_options(
890 object_store,
891 schema,
892 base_dir,
893 storage_version,
894 WriterOptions {
895 add_data_dir: true,
896 ..Default::default()
897 },
898 )
899 .await
900}
901
902#[derive(Default)]
903struct WriterOptions {
904 add_data_dir: bool,
905 base_id: Option<u32>,
906 external_base_resolver: Option<Arc<ExternalBaseResolver>>,
907 allow_external_blob_outside_bases: bool,
908}
909
910async fn open_writer_with_options(
911 object_store: &ObjectStore,
912 schema: &Schema,
913 base_dir: &Path,
914 storage_version: LanceFileVersion,
915 options: WriterOptions,
916) -> Result<Box<dyn GenericWriter>> {
917 let WriterOptions {
918 add_data_dir,
919 base_id,
920 external_base_resolver,
921 allow_external_blob_outside_bases,
922 } = options;
923
924 let data_file_key = generate_random_filename();
925 let filename = format!("{}.lance", data_file_key);
926
927 let data_dir = if add_data_dir {
928 base_dir.child(DATA_DIR)
929 } else {
930 base_dir.clone()
931 };
932
933 let full_path = data_dir.child(filename.as_str());
934
935 let writer = if storage_version == LanceFileVersion::Legacy {
936 Box::new(V1WriterAdapter {
937 writer: PreviousFileWriter::<ManifestDescribing>::try_new(
938 object_store,
939 &full_path,
940 schema.clone(),
941 &Default::default(),
942 )
943 .await?,
944 path: filename,
945 base_id,
946 })
947 } else {
948 let writer = object_store.create(&full_path).await?;
949 let enable_blob_v2 = storage_version >= LanceFileVersion::V2_2;
950 let file_writer = current_writer::FileWriter::try_new(
951 writer,
952 schema.clone(),
953 FileWriterOptions {
954 format_version: Some(storage_version),
955 ..Default::default()
956 },
957 )?;
958 let preprocessor = if enable_blob_v2 {
959 Some(BlobPreprocessor::new(
960 object_store.clone(),
961 data_dir.clone(),
962 data_file_key.clone(),
963 schema,
964 external_base_resolver,
965 allow_external_blob_outside_bases,
966 ))
967 } else {
968 None
969 };
970 let writer_adapter = V2WriterAdapter {
971 writer: file_writer,
972 path: filename,
973 base_id,
974 preprocessor,
975 };
976 Box::new(writer_adapter) as Box<dyn GenericWriter>
977 };
978 Ok(writer)
979}
980
981pub struct TargetBaseInfo {
984 pub base_id: u32,
985 pub object_store: Arc<ObjectStore>,
986 pub base_dir: Path,
988 pub is_dataset_root: bool,
992}
993
994struct WriterGenerator {
995 object_store: Arc<ObjectStore>,
997 base_dir: Path,
999 schema: Schema,
1000 storage_version: LanceFileVersion,
1001 target_bases_info: Option<Vec<TargetBaseInfo>>,
1003 external_base_resolver: Option<Arc<ExternalBaseResolver>>,
1004 allow_external_blob_outside_bases: bool,
1005 next_base_index: AtomicUsize,
1007}
1008
1009impl WriterGenerator {
1010 pub fn new(
1011 object_store: Arc<ObjectStore>,
1012 base_dir: &Path,
1013 schema: &Schema,
1014 storage_version: LanceFileVersion,
1015 target_bases_info: Option<Vec<TargetBaseInfo>>,
1016 external_base_resolver: Option<Arc<ExternalBaseResolver>>,
1017 allow_external_blob_outside_bases: bool,
1018 ) -> Self {
1019 Self {
1020 object_store,
1021 base_dir: base_dir.clone(),
1022 schema: schema.clone(),
1023 storage_version,
1024 target_bases_info,
1025 external_base_resolver,
1026 allow_external_blob_outside_bases,
1027 next_base_index: AtomicUsize::new(0),
1028 }
1029 }
1030
1031 fn select_target_base(&self) -> Option<&TargetBaseInfo> {
1034 self.target_bases_info.as_ref().map(|bases| {
1035 let index = self
1036 .next_base_index
1037 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1038 &bases[index % bases.len()]
1039 })
1040 }
1041
1042 pub async fn new_writer(&self) -> Result<(Box<dyn GenericWriter>, Fragment)> {
1043 let fragment = Fragment::new(0);
1045
1046 let writer = if let Some(base_info) = self.select_target_base() {
1047 open_writer_with_options(
1048 &base_info.object_store,
1049 &self.schema,
1050 &base_info.base_dir,
1051 self.storage_version,
1052 WriterOptions {
1053 add_data_dir: base_info.is_dataset_root,
1054 base_id: Some(base_info.base_id),
1055 external_base_resolver: self.external_base_resolver.clone(),
1056 allow_external_blob_outside_bases: self.allow_external_blob_outside_bases,
1057 },
1058 )
1059 .await?
1060 } else {
1061 open_writer_with_options(
1062 &self.object_store,
1063 &self.schema,
1064 &self.base_dir,
1065 self.storage_version,
1066 WriterOptions {
1067 add_data_dir: true,
1068 base_id: None,
1069 external_base_resolver: self.external_base_resolver.clone(),
1070 allow_external_blob_outside_bases: self.allow_external_blob_outside_bases,
1071 },
1072 )
1073 .await?
1074 };
1075
1076 Ok((writer, fragment))
1077 }
1078}
1079
1080async fn resolve_commit_handler(
1082 uri: &str,
1083 commit_handler: Option<Arc<dyn CommitHandler>>,
1084 store_options: &Option<ObjectStoreParams>,
1085) -> Result<Arc<dyn CommitHandler>> {
1086 match commit_handler {
1087 None => {
1088 #[allow(deprecated)]
1089 if store_options
1090 .as_ref()
1091 .map(|opts| opts.object_store.is_some())
1092 .unwrap_or_default()
1093 {
1094 return Err(Error::invalid_input(
1095 "when creating a dataset with a custom object store the commit_handler must also be specified",
1096 ));
1097 }
1098 commit_handler_from_url(uri, store_options).await
1099 }
1100 Some(commit_handler) => {
1101 if uri.starts_with("s3+ddb") {
1102 Err(Error::invalid_input(
1103 "`s3+ddb://` scheme and custom commit handler are mutually exclusive",
1104 ))
1105 } else {
1106 Ok(commit_handler)
1107 }
1108 }
1109 }
1110}
1111
1112async fn new_source_iter(
1122 source: SendableRecordBatchStream,
1123 enable_retries: bool,
1124) -> Result<Box<dyn Iterator<Item = SendableRecordBatchStream> + Send + 'static>> {
1125 if enable_retries {
1126 let schema = source.schema();
1127
1128 let size_hint = source.size_hint();
1131 if size_hint.0 == 1 && size_hint.1 == Some(1) {
1132 let batches: Vec<RecordBatch> = source.try_collect().await?;
1133 Ok(Box::new(std::iter::repeat_with(move || {
1134 Box::pin(RecordBatchStreamAdapter::new(
1135 schema.clone(),
1136 futures::stream::iter(batches.clone().into_iter().map(Ok)),
1137 )) as SendableRecordBatchStream
1138 })))
1139 } else {
1140 Ok(Box::new(
1142 SpillStreamIter::try_new(source, 100 * 1024 * 1024).await?,
1143 ))
1144 }
1145 } else {
1146 Ok(Box::new(std::iter::once(source)))
1147 }
1148}
1149
1150struct SpillStreamIter {
1151 receiver: SpillReceiver,
1152 #[allow(dead_code)] sender_handle: tokio::task::JoinHandle<SpillSender>,
1154 #[allow(dead_code)] tmp_dir: TempDir,
1159}
1160
1161impl SpillStreamIter {
1162 pub async fn try_new(
1163 mut source: SendableRecordBatchStream,
1164 memory_limit: usize,
1165 ) -> Result<Self> {
1166 let tmp_dir = tokio::task::spawn_blocking(|| {
1167 TempDir::try_new()
1168 .map_err(|e| Error::invalid_input(format!("Failed to create temp dir: {}", e)))
1169 })
1170 .await
1171 .ok()
1172 .expect_ok()??;
1173
1174 let tmp_path = tmp_dir.std_path().join("spill.arrows");
1175 let (mut sender, receiver) = create_replay_spill(tmp_path, source.schema(), memory_limit);
1176
1177 let sender_handle = tokio::task::spawn(async move {
1178 while let Some(res) = source.next().await {
1179 match res {
1180 Ok(batch) => match sender.write(batch).await {
1181 Ok(_) => {}
1182 Err(e) => {
1183 sender.send_error(e);
1184 break;
1185 }
1186 },
1187 Err(e) => {
1188 sender.send_error(e);
1189 break;
1190 }
1191 }
1192 }
1193
1194 if let Err(err) = sender.finish().await {
1195 sender.send_error(err);
1196 }
1197 sender
1198 });
1199
1200 Ok(Self {
1201 receiver,
1202 tmp_dir,
1203 sender_handle,
1204 })
1205 }
1206}
1207
1208impl Iterator for SpillStreamIter {
1209 type Item = SendableRecordBatchStream;
1210
1211 fn next(&mut self) -> Option<Self::Item> {
1212 Some(self.receiver.read())
1213 }
1214}
1215
1216#[cfg(test)]
1217mod tests {
1218 use super::*;
1219
1220 use arrow_array::{Int32Array, RecordBatchIterator, RecordBatchReader, StructArray};
1221 use arrow_schema::{DataType, Field as ArrowField, Fields, Schema as ArrowSchema};
1222 use datafusion::{error::DataFusionError, physical_plan::stream::RecordBatchStreamAdapter};
1223 use datafusion_physical_plan::RecordBatchStream;
1224 use futures::TryStreamExt;
1225 use lance_datagen::{BatchCount, RowCount, array, gen_batch};
1226 use lance_file::previous::reader::FileReader as PreviousFileReader;
1227 use lance_io::traits::Reader;
1228
1229 #[tokio::test]
1230 async fn test_chunking_large_batches() {
1231 let schema = Arc::new(ArrowSchema::new(vec![arrow::datatypes::Field::new(
1233 "a",
1234 DataType::Int32,
1235 false,
1236 )]));
1237 let batch =
1238 RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from_iter(0..28))])
1239 .unwrap();
1240 let batches: Vec<RecordBatch> =
1241 vec![batch.slice(0, 10), batch.slice(10, 10), batch.slice(20, 8)];
1242 let stream = RecordBatchStreamAdapter::new(
1243 schema.clone(),
1244 futures::stream::iter(batches.into_iter().map(Ok::<_, DataFusionError>)),
1245 );
1246
1247 let chunks: Vec<Vec<RecordBatch>> = chunk_stream(Box::pin(stream), 3)
1249 .try_collect()
1250 .await
1251 .unwrap();
1252
1253 assert_eq!(chunks.len(), 10);
1254 assert_eq!(chunks[0].len(), 1);
1255
1256 for (i, chunk) in chunks.iter().enumerate() {
1257 let num_rows = chunk.iter().map(|batch| batch.num_rows()).sum::<usize>();
1258 if i < chunks.len() - 1 {
1259 assert_eq!(num_rows, 3);
1260 } else {
1261 assert_eq!(num_rows, 1);
1263 }
1264 }
1265
1266 assert_eq!(chunks[3].len(), 2);
1269 assert_eq!(chunks[3][0].num_rows(), 1);
1270 assert_eq!(chunks[3][1].num_rows(), 2);
1271 }
1272
1273 #[tokio::test]
1274 async fn test_chunking_small_batches() {
1275 let schema = Arc::new(ArrowSchema::new(vec![arrow::datatypes::Field::new(
1277 "a",
1278 DataType::Int32,
1279 false,
1280 )]));
1281 let batch =
1282 RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from_iter(0..30))])
1283 .unwrap();
1284
1285 let batches: Vec<RecordBatch> = (0..10).map(|i| batch.slice(i * 3, 3)).collect();
1286 let stream = RecordBatchStreamAdapter::new(
1287 schema.clone(),
1288 futures::stream::iter(batches.into_iter().map(Ok::<_, DataFusionError>)),
1289 );
1290
1291 let chunks: Vec<Vec<RecordBatch>> = chunk_stream(Box::pin(stream), 10)
1293 .try_collect()
1294 .await
1295 .unwrap();
1296
1297 assert_eq!(chunks.len(), 3);
1298 assert_eq!(chunks[0].len(), 4);
1299 assert_eq!(chunks[0][0], batch.slice(0, 3));
1300 assert_eq!(chunks[0][1], batch.slice(3, 3));
1301 assert_eq!(chunks[0][2], batch.slice(6, 3));
1302 assert_eq!(chunks[0][3], batch.slice(9, 1));
1303
1304 for chunk in &chunks {
1305 let num_rows = chunk.iter().map(|batch| batch.num_rows()).sum::<usize>();
1306 assert_eq!(num_rows, 10);
1307 }
1308 }
1309
1310 #[tokio::test]
1311 async fn test_file_size() {
1312 let reader_to_frags = |data_reader: Box<dyn RecordBatchReader + Send>| {
1313 let schema = data_reader.schema();
1314 let data_reader =
1315 data_reader.map(|rb| rb.map_err(datafusion::error::DataFusionError::from));
1316
1317 let data_stream = Box::pin(RecordBatchStreamAdapter::new(
1318 schema.clone(),
1319 futures::stream::iter(data_reader),
1320 ));
1321
1322 let write_params = WriteParams {
1323 max_rows_per_file: 1024 * 1024, max_bytes_per_file: 2 * 1024,
1325 mode: WriteMode::Create,
1326 ..Default::default()
1327 };
1328
1329 async move {
1330 let schema = Schema::try_from(schema.as_ref()).unwrap();
1331
1332 let object_store = Arc::new(ObjectStore::memory());
1333 write_fragments_internal(
1334 None,
1335 object_store,
1336 &Path::from("test"),
1337 schema,
1338 data_stream,
1339 write_params,
1340 None,
1341 )
1342 .await
1343 }
1344 };
1345
1346 let data_reader = Box::new(
1353 gen_batch()
1354 .anon_col(array::rand_fsb(1024))
1355 .into_reader_rows(RowCount::from(10 * 1024), BatchCount::from(2)),
1356 );
1357
1358 let (fragments, _) = reader_to_frags(data_reader).await.unwrap();
1359
1360 assert_eq!(fragments.len(), 2);
1361 }
1362
1363 #[tokio::test]
1364 async fn test_max_rows_per_file() {
1365 let reader_to_frags = |data_reader: Box<dyn RecordBatchReader + Send>| {
1366 let schema = data_reader.schema();
1367 let data_reader =
1368 data_reader.map(|rb| rb.map_err(datafusion::error::DataFusionError::from));
1369
1370 let data_stream = Box::pin(RecordBatchStreamAdapter::new(
1371 schema.clone(),
1372 futures::stream::iter(data_reader),
1373 ));
1374
1375 let write_params = WriteParams {
1376 max_rows_per_file: 5000, max_bytes_per_file: 1024 * 1024 * 1024, mode: WriteMode::Create,
1379 ..Default::default()
1380 };
1381
1382 async move {
1383 let schema = Schema::try_from(schema.as_ref()).unwrap();
1384
1385 let object_store = Arc::new(ObjectStore::memory());
1386 write_fragments_internal(
1387 None,
1388 object_store,
1389 &Path::from("test"),
1390 schema,
1391 data_stream,
1392 write_params,
1393 None,
1394 )
1395 .await
1396 }
1397 };
1398
1399 let data_reader = Box::new(
1404 gen_batch()
1405 .anon_col(array::rand_type(&DataType::Int32))
1406 .into_reader_rows(RowCount::from(12000), BatchCount::from(1)),
1407 );
1408
1409 let (fragments, _) = reader_to_frags(data_reader).await.unwrap();
1410
1411 assert_eq!(fragments.len(), 3);
1413
1414 let row_counts: Vec<usize> = fragments
1416 .iter()
1417 .map(|f| f.physical_rows.unwrap_or(0))
1418 .collect();
1419 assert_eq!(row_counts, vec![5000, 5000, 2000]);
1420 }
1421
1422 #[tokio::test]
1423 async fn test_max_rows_per_group() {
1424 let reader_to_frags = |data_reader: Box<dyn RecordBatchReader + Send>,
1425 version: LanceFileVersion| {
1426 let schema = data_reader.schema();
1427 let data_reader =
1428 data_reader.map(|rb| rb.map_err(datafusion::error::DataFusionError::from));
1429
1430 let data_stream = Box::pin(RecordBatchStreamAdapter::new(
1431 schema.clone(),
1432 futures::stream::iter(data_reader),
1433 ));
1434
1435 let write_params = WriteParams {
1436 max_rows_per_file: 5000, max_rows_per_group: 3000, mode: WriteMode::Create,
1439 data_storage_version: Some(version),
1440 ..Default::default()
1441 };
1442
1443 async move {
1444 let schema = Schema::try_from(schema.as_ref()).unwrap();
1445
1446 let object_store = Arc::new(ObjectStore::memory());
1447 write_fragments_internal(
1448 None,
1449 object_store,
1450 &Path::from("test"),
1451 schema,
1452 data_stream,
1453 write_params,
1454 None,
1455 )
1456 .await
1457 }
1458 };
1459
1460 let data_reader_v1 = Box::new(
1470 gen_batch()
1471 .anon_col(array::rand_type(&DataType::Int32))
1472 .into_reader_rows(RowCount::from(9000), BatchCount::from(1)),
1473 );
1474
1475 let (fragments_v1, _) = reader_to_frags(data_reader_v1, LanceFileVersion::Legacy)
1476 .await
1477 .unwrap();
1478 let row_counts_v1: Vec<usize> = fragments_v1
1479 .iter()
1480 .map(|f| f.physical_rows.unwrap_or(0))
1481 .collect();
1482
1483 assert_eq!(fragments_v1.len(), 2);
1485 assert_eq!(row_counts_v1, vec![6000, 3000]);
1486
1487 let data_reader_v2 = Box::new(
1494 gen_batch()
1495 .anon_col(array::rand_type(&DataType::Int32))
1496 .into_reader_rows(RowCount::from(9000), BatchCount::from(1)),
1497 );
1498
1499 let (fragments_v2, _) = reader_to_frags(data_reader_v2, LanceFileVersion::Stable)
1500 .await
1501 .unwrap();
1502 let row_counts_v2: Vec<usize> = fragments_v2
1503 .iter()
1504 .map(|f| f.physical_rows.unwrap_or(0))
1505 .collect();
1506
1507 assert_eq!(fragments_v2.len(), 2);
1509 assert_eq!(row_counts_v2, vec![5000, 4000]);
1510
1511 assert_eq!(fragments_v1.len(), fragments_v2.len());
1517 assert_ne!(row_counts_v1, row_counts_v2);
1518 }
1519
1520 #[tokio::test]
1521 async fn test_file_write_version() {
1522 let schema = Arc::new(ArrowSchema::new(vec![arrow::datatypes::Field::new(
1523 "a",
1524 DataType::Int32,
1525 false,
1526 )]));
1527
1528 let batch = RecordBatch::try_new(
1530 schema.clone(),
1531 vec![Arc::new(Int32Array::from_iter(0..1024))],
1532 )
1533 .unwrap();
1534
1535 let versions = vec![
1536 LanceFileVersion::Legacy,
1537 LanceFileVersion::V2_0,
1538 LanceFileVersion::V2_1,
1539 LanceFileVersion::V2_2,
1540 LanceFileVersion::Stable,
1541 LanceFileVersion::Next,
1542 ];
1543 for version in versions {
1544 let (major, minor) = version.to_numbers();
1545 let write_params = WriteParams {
1546 data_storage_version: Some(version),
1547 max_rows_per_group: 1,
1549 ..Default::default()
1550 };
1551
1552 let data_stream = Box::pin(RecordBatchStreamAdapter::new(
1553 schema.clone(),
1554 futures::stream::iter(std::iter::once(Ok(batch.clone()))),
1555 ));
1556
1557 let schema = Schema::try_from(schema.as_ref()).unwrap();
1558
1559 let object_store = Arc::new(ObjectStore::memory());
1560 let (fragments, _) = write_fragments_internal(
1561 None,
1562 object_store,
1563 &Path::from("test"),
1564 schema,
1565 data_stream,
1566 write_params,
1567 None,
1568 )
1569 .await
1570 .unwrap();
1571
1572 assert_eq!(fragments.len(), 1);
1573 let fragment = &fragments[0];
1574 assert_eq!(fragment.files.len(), 1);
1575 assert_eq!(fragment.physical_rows, Some(1024));
1576 assert_eq!(
1577 fragment.files[0].file_major_version, major,
1578 "version: {}",
1579 version
1580 );
1581 assert_eq!(
1582 fragment.files[0].file_minor_version, minor,
1583 "version: {}",
1584 version
1585 );
1586 }
1587 }
1588
1589 #[tokio::test]
1590 async fn test_file_v1_schema_order() {
1591 let struct_fields = Fields::from(vec![ArrowField::new("b", DataType::Int32, false)]);
1594 let arrow_schema = ArrowSchema::new(vec![
1595 ArrowField::new("d", DataType::Int32, false),
1596 ArrowField::new("a", DataType::Struct(struct_fields.clone()), false),
1597 ]);
1598 let mut schema = Schema::try_from(&arrow_schema).unwrap();
1599 schema.mut_field_by_id(0).unwrap().id = 3;
1605 schema.mut_field_by_id(1).unwrap().id = 0;
1606 schema.mut_field_by_id(2).unwrap().id = 1;
1607
1608 let field_ids = schema.fields_pre_order().map(|f| f.id).collect::<Vec<_>>();
1609 assert_eq!(field_ids, vec![3, 0, 1]);
1610
1611 let data = RecordBatch::try_new(
1612 Arc::new(arrow_schema.clone()),
1613 vec![
1614 Arc::new(Int32Array::from(vec![1, 2])),
1615 Arc::new(StructArray::new(
1616 struct_fields,
1617 vec![Arc::new(Int32Array::from(vec![3, 4]))],
1618 None,
1619 )),
1620 ],
1621 )
1622 .unwrap();
1623
1624 let write_params = WriteParams {
1625 data_storage_version: Some(LanceFileVersion::Legacy),
1626 ..Default::default()
1627 };
1628 let data_stream = Box::pin(RecordBatchStreamAdapter::new(
1629 Arc::new(arrow_schema),
1630 futures::stream::iter(std::iter::once(Ok(data.clone()))),
1631 ));
1632
1633 let object_store = Arc::new(ObjectStore::memory());
1634 let base_path = Path::from("test");
1635 let (fragments, _) = write_fragments_internal(
1636 None,
1637 object_store.clone(),
1638 &base_path,
1639 schema.clone(),
1640 data_stream,
1641 write_params,
1642 None,
1643 )
1644 .await
1645 .unwrap();
1646
1647 assert_eq!(fragments.len(), 1);
1648 let fragment = &fragments[0];
1649 assert_eq!(fragment.files.len(), 1);
1650 assert_eq!(fragment.files[0].fields, vec![0, 1, 3]);
1651
1652 let path = base_path
1653 .child(DATA_DIR)
1654 .child(fragment.files[0].path.as_str());
1655 let file_reader: Arc<dyn Reader> = object_store.open(&path).await.unwrap().into();
1656 let reader = PreviousFileReader::try_new_from_reader(
1657 &path,
1658 file_reader,
1659 None,
1660 schema.clone(),
1661 0,
1662 0,
1663 3,
1664 None,
1665 )
1666 .await
1667 .unwrap();
1668 assert_eq!(reader.num_batches(), 1);
1669 let batch = reader.read_batch(0, .., &schema).await.unwrap();
1670 assert_eq!(batch, data);
1671 }
1672
1673 #[tokio::test]
1674 async fn test_explicit_data_file_bases_writer_generator() {
1675 use arrow::datatypes::{DataType, Field as ArrowField, Schema as ArrowSchema};
1676 use lance_io::object_store::ObjectStore;
1677 use std::sync::Arc;
1678
1679 let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
1681 "id",
1682 DataType::Int32,
1683 false,
1684 )]));
1685 let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
1686
1687 let object_store = Arc::new(ObjectStore::memory());
1689 let base_dir = Path::from("test/bucket2");
1690
1691 let target_bases = vec![TargetBaseInfo {
1693 base_id: 2,
1694 object_store: object_store.clone(),
1695 base_dir: base_dir.clone(),
1696 is_dataset_root: false, }];
1698 let writer_generator = WriterGenerator::new(
1699 object_store.clone(),
1700 &base_dir,
1701 &schema,
1702 LanceFileVersion::Stable,
1703 Some(target_bases),
1704 None,
1705 false,
1706 );
1707
1708 let (writer, fragment) = writer_generator.new_writer().await.unwrap();
1710
1711 assert_eq!(fragment.id, 0); drop(writer); }
1717
1718 #[tokio::test]
1719 async fn test_writer_with_base_id() {
1720 use arrow::array::Int32Array;
1721 use arrow::datatypes::{DataType, Field as ArrowField, Schema as ArrowSchema};
1722 use arrow::record_batch::RecordBatch;
1723 use lance_io::object_store::ObjectStore;
1724 use std::sync::Arc;
1725
1726 let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
1728 "id",
1729 DataType::Int32,
1730 false,
1731 )]));
1732 let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
1733
1734 let batch = RecordBatch::try_new(
1735 arrow_schema.clone(),
1736 vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
1737 )
1738 .unwrap();
1739
1740 let object_store = Arc::new(ObjectStore::memory());
1742 let base_dir = Path::from("test/bucket2");
1743
1744 let mut inner_writer = open_writer_with_options(
1745 &object_store,
1746 &schema,
1747 &base_dir,
1748 LanceFileVersion::Stable,
1749 WriterOptions {
1750 add_data_dir: false, ..Default::default()
1752 },
1753 )
1754 .await
1755 .unwrap();
1756
1757 inner_writer.write(&[batch]).await.unwrap();
1759
1760 let base_id = 2u32;
1762 let (_num_rows, mut data_file) = inner_writer.finish().await.unwrap();
1763 data_file.base_id = Some(base_id);
1764
1765 assert_eq!(data_file.base_id, Some(base_id));
1766 assert!(!data_file.path.is_empty());
1767 }
1768
1769 #[tokio::test]
1770 async fn test_round_robin_target_base_selection() {
1771 use arrow::array::Int32Array;
1772 use arrow::datatypes::{DataType, Field as ArrowField, Schema as ArrowSchema};
1773 use arrow::record_batch::RecordBatch;
1774 use lance_io::object_store::ObjectStore;
1775 use std::sync::Arc;
1776
1777 let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
1779 "id",
1780 DataType::Int32,
1781 false,
1782 )]));
1783 let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
1784
1785 let store1 = Arc::new(ObjectStore::memory());
1787 let store2 = Arc::new(ObjectStore::memory());
1788 let store3 = Arc::new(ObjectStore::memory());
1789
1790 let target_bases = vec![
1792 TargetBaseInfo {
1793 base_id: 1,
1794 object_store: store1.clone(),
1795 base_dir: Path::from("base1"),
1796 is_dataset_root: false,
1797 },
1798 TargetBaseInfo {
1799 base_id: 2,
1800 object_store: store2.clone(),
1801 base_dir: Path::from("base2"),
1802 is_dataset_root: false,
1803 },
1804 TargetBaseInfo {
1805 base_id: 3,
1806 object_store: store3.clone(),
1807 base_dir: Path::from("base3"),
1808 is_dataset_root: false,
1809 },
1810 ];
1811
1812 let writer_generator = WriterGenerator::new(
1813 Arc::new(ObjectStore::memory()),
1814 &Path::from("default"),
1815 &schema,
1816 LanceFileVersion::Stable,
1817 Some(target_bases),
1818 None,
1819 false,
1820 );
1821
1822 let batch = RecordBatch::try_new(
1824 arrow_schema.clone(),
1825 vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
1826 )
1827 .unwrap();
1828
1829 let mut base_ids = Vec::new();
1831 for _ in 0..6 {
1832 let (mut writer, _fragment) = writer_generator.new_writer().await.unwrap();
1833 writer.write(std::slice::from_ref(&batch)).await.unwrap();
1834 let (_num_rows, data_file) = writer.finish().await.unwrap();
1835 base_ids.push(data_file.base_id.unwrap());
1836 }
1837
1838 assert_eq!(base_ids, vec![1, 2, 3, 1, 2, 3]);
1840 }
1841
1842 #[tokio::test]
1843 async fn test_explicit_data_file_bases_path_parsing() {
1844 let test_cases = vec![
1846 ("s3://multi-path-test/test1/subBucket2", "test1/subBucket2"),
1847 ("gs://my-bucket/path/to/data", "path/to/data"),
1848 ("az://container/path/to/data", "path/to/data"),
1849 (
1850 "abfss://filesystem@account.dfs.core.windows.net/path/to/data",
1851 "path/to/data",
1852 ),
1853 ("file:///tmp/test/bucket", "tmp/test/bucket"),
1854 ];
1855
1856 for (uri, expected_path) in test_cases {
1857 let url = url::Url::parse(uri).unwrap();
1858 let parsed_path = url.path().trim_start_matches('/');
1859 assert_eq!(parsed_path, expected_path, "Failed for URI: {}", uri);
1860 }
1861 }
1862
1863 #[tokio::test]
1864 async fn test_write_params_validation() {
1865 let mut params = WriteParams {
1867 mode: WriteMode::Create,
1868 initial_bases: Some(vec![
1869 BasePath {
1870 id: 1,
1871 name: Some("bucket1".to_string()),
1872 path: "s3://bucket1/path1".to_string(),
1873 is_dataset_root: true,
1874 },
1875 BasePath {
1876 id: 2,
1877 name: Some("bucket2".to_string()),
1878 path: "s3://bucket2/path2".to_string(),
1879 is_dataset_root: true,
1880 },
1881 BasePath {
1882 id: 3,
1883 name: Some("azure-az-base".to_string()),
1884 path: "az://container/path1".to_string(),
1885 is_dataset_root: true,
1886 },
1887 BasePath {
1888 id: 4,
1889 name: Some("azure-abfss-base".to_string()),
1890 path: "abfss://filesystem@account.dfs.core.windows.net/path1".to_string(),
1891 is_dataset_root: true,
1892 },
1893 ]),
1894 target_bases: Some(vec![1]), ..Default::default()
1896 };
1897
1898 let result = validate_write_params(¶ms);
1900 assert!(result.is_ok());
1901
1902 params.target_bases = Some(vec![99]); let result = validate_write_params(¶ms);
1905 assert!(result.is_err());
1906
1907 params.initial_bases = None;
1909 params.target_bases = Some(vec![1]);
1910 let result = validate_write_params(¶ms);
1911 assert!(result.is_err());
1912 }
1913
1914 fn validate_write_params(params: &WriteParams) -> Result<()> {
1915 if matches!(params.mode, WriteMode::Create)
1917 && let Some(target_bases) = ¶ms.target_bases
1918 {
1919 if target_bases.len() != 1 {
1920 return Err(Error::invalid_input(format!(
1921 "target_bases with {} elements is not supported",
1922 target_bases.len()
1923 )));
1924 }
1925 let target_base_id = target_bases[0];
1926 if let Some(initial_bases) = ¶ms.initial_bases {
1927 if !initial_bases.iter().any(|bp| bp.id == target_base_id) {
1928 return Err(Error::invalid_input(format!(
1929 "target_base_id {} must be one of the initial_bases in CREATE mode",
1930 target_base_id
1931 )));
1932 }
1933 } else {
1934 return Err(Error::invalid_input(
1935 "initial_bases must be provided when target_bases is specified in CREATE mode",
1936 ));
1937 }
1938 }
1939 Ok(())
1940 }
1941
1942 #[tokio::test]
1943 async fn test_multi_base_create() {
1944 use lance_testing::datagen::{BatchGenerator, IncrementingInt32};
1945
1946 let test_uri = "memory://multi_base_test";
1948 let primary_uri = format!("{}/primary", test_uri);
1949 let base1_uri = format!("{}/base1", test_uri);
1950 let base2_uri = format!("{}/base2", test_uri);
1951
1952 let mut data_gen =
1953 BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned())));
1954
1955 let dataset = crate::dataset::Dataset::write(
1956 data_gen.batch(5),
1957 &primary_uri,
1958 Some(WriteParams {
1959 mode: WriteMode::Create,
1960 initial_bases: Some(vec![
1961 BasePath {
1962 id: 1,
1963 name: Some("base1".to_string()),
1964 path: base1_uri.clone(),
1965 is_dataset_root: true,
1966 },
1967 BasePath {
1968 id: 2,
1969 name: Some("base2".to_string()),
1970 path: base2_uri.clone(),
1971 is_dataset_root: true,
1972 },
1973 ]),
1974 target_bases: Some(vec![1]),
1975 ..Default::default()
1976 }),
1977 )
1978 .await
1979 .unwrap();
1980
1981 assert_eq!(dataset.count_rows(None).await.unwrap(), 5);
1983
1984 assert_eq!(dataset.manifest.base_paths.len(), 2);
1986 assert!(
1987 dataset
1988 .manifest
1989 .base_paths
1990 .values()
1991 .any(|bp| bp.name == Some("base1".to_string()))
1992 );
1993 assert!(
1994 dataset
1995 .manifest
1996 .base_paths
1997 .values()
1998 .any(|bp| bp.name == Some("base2".to_string()))
1999 );
2000
2001 let fragments = dataset.get_fragments();
2003 assert!(!fragments.is_empty());
2004 for fragment in fragments {
2005 assert!(
2006 fragment
2007 .metadata
2008 .files
2009 .iter()
2010 .any(|file| file.base_id == Some(1))
2011 );
2012 }
2013
2014 let mut data_gen2 =
2016 BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned())));
2017
2018 let result = Dataset::write(
2019 data_gen2.batch(5),
2020 &format!("{}/test_validation", test_uri),
2021 Some(WriteParams {
2022 mode: WriteMode::Create,
2023 initial_bases: Some(vec![BasePath {
2024 id: 1,
2025 name: Some("base1".to_string()),
2026 path: base1_uri.clone(),
2027 is_dataset_root: true,
2028 }]),
2029 target_bases: Some(vec![1]),
2030 target_base_names_or_paths: Some(vec!["base1".to_string()]),
2031 ..Default::default()
2032 }),
2033 )
2034 .await;
2035
2036 assert!(result.is_err());
2037 assert!(
2038 result
2039 .unwrap_err()
2040 .to_string()
2041 .contains("Cannot specify both target_base_names_or_paths and target_bases")
2042 );
2043 }
2044
2045 #[tokio::test]
2046 async fn test_multi_base_overwrite() {
2047 use lance_testing::datagen::{BatchGenerator, IncrementingInt32};
2048
2049 let test_uri = "memory://overwrite_test";
2051 let primary_uri = format!("{}/primary", test_uri);
2052 let base1_uri = format!("{}/base1", test_uri);
2053 let base2_uri = format!("{}/base2", test_uri);
2054 let _base3_uri = format!("{}/base3", test_uri);
2055
2056 let mut data_gen =
2057 BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned())));
2058
2059 let dataset = Dataset::write(
2060 data_gen.batch(3),
2061 &primary_uri,
2062 Some(WriteParams {
2063 mode: WriteMode::Create,
2064 initial_bases: Some(vec![
2065 BasePath {
2066 id: 1,
2067 name: Some("base1".to_string()),
2068 path: base1_uri.clone(),
2069 is_dataset_root: true,
2070 },
2071 BasePath {
2072 id: 2,
2073 name: Some("base2".to_string()),
2074 path: base2_uri.clone(),
2075 is_dataset_root: true,
2076 },
2077 ]),
2078 target_bases: Some(vec![1]),
2079 ..Default::default()
2080 }),
2081 )
2082 .await
2083 .unwrap();
2084
2085 assert_eq!(dataset.count_rows(None).await.unwrap(), 3);
2086
2087 let mut data_gen2 =
2090 BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned())));
2091
2092 let dataset = Dataset::write(
2093 data_gen2.batch(2),
2094 std::sync::Arc::new(dataset),
2095 Some(WriteParams {
2096 mode: WriteMode::Overwrite,
2097 target_bases: Some(vec![2]), ..Default::default()
2100 }),
2101 )
2102 .await
2103 .unwrap();
2104
2105 assert_eq!(dataset.count_rows(None).await.unwrap(), 2);
2107
2108 assert_eq!(dataset.manifest.base_paths.len(), 2);
2110 assert!(
2111 dataset
2112 .manifest
2113 .base_paths
2114 .values()
2115 .any(|bp| bp.name == Some("base1".to_string()))
2116 );
2117 assert!(
2118 dataset
2119 .manifest
2120 .base_paths
2121 .values()
2122 .any(|bp| bp.name == Some("base2".to_string()))
2123 );
2124
2125 let fragments = dataset.get_fragments();
2127 assert!(
2128 fragments
2129 .iter()
2130 .all(|f| f.metadata.files.iter().all(|file| file.base_id == Some(2)))
2131 );
2132
2133 let mut data_gen3 =
2135 BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned())));
2136
2137 let result = Dataset::write(
2138 data_gen3.batch(2),
2139 Arc::new(dataset),
2140 Some(WriteParams {
2141 mode: WriteMode::Overwrite,
2142 initial_bases: Some(vec![BasePath {
2143 id: 3,
2144 name: Some("base3".to_string()),
2145 path: _base3_uri.clone(),
2146 is_dataset_root: true,
2147 }]),
2148 target_bases: Some(vec![1]),
2149 ..Default::default()
2150 }),
2151 )
2152 .await;
2153
2154 assert!(result.is_err());
2155 assert!(
2156 result
2157 .unwrap_err()
2158 .to_string()
2159 .contains("Cannot register new bases in Overwrite mode")
2160 );
2161 }
2162
2163 #[tokio::test]
2164 async fn test_multi_base_append() {
2165 use lance_testing::datagen::{BatchGenerator, IncrementingInt32};
2166
2167 let test_uri = "memory://append_test";
2169 let primary_uri = format!("{}/primary", test_uri);
2170 let base1_uri = format!("{}/base1", test_uri);
2171 let base2_uri = format!("{}/base2", test_uri);
2172
2173 let mut data_gen =
2174 BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned())));
2175
2176 let dataset = Dataset::write(
2177 data_gen.batch(3),
2178 &primary_uri,
2179 Some(WriteParams {
2180 mode: WriteMode::Create,
2181 initial_bases: Some(vec![
2182 BasePath {
2183 id: 1,
2184 name: Some("base1".to_string()),
2185 path: base1_uri.clone(),
2186 is_dataset_root: true,
2187 },
2188 BasePath {
2189 id: 2,
2190 name: Some("base2".to_string()),
2191 path: base2_uri.clone(),
2192 is_dataset_root: true,
2193 },
2194 ]),
2195 target_bases: Some(vec![1]),
2196 ..Default::default()
2197 }),
2198 )
2199 .await
2200 .unwrap();
2201
2202 assert_eq!(dataset.count_rows(None).await.unwrap(), 3);
2203
2204 let mut data_gen2 =
2206 BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned())));
2207
2208 let dataset = Dataset::write(
2209 data_gen2.batch(2),
2210 std::sync::Arc::new(dataset),
2211 Some(WriteParams {
2212 mode: WriteMode::Append,
2213 target_bases: Some(vec![1]),
2214 ..Default::default()
2215 }),
2216 )
2217 .await
2218 .unwrap();
2219
2220 assert_eq!(dataset.count_rows(None).await.unwrap(), 5);
2221
2222 assert_eq!(dataset.manifest.base_paths.len(), 2);
2224
2225 let mut data_gen3 =
2227 BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned())));
2228
2229 let dataset = Dataset::write(
2230 data_gen3.batch(4),
2231 Arc::new(dataset),
2232 Some(WriteParams {
2233 mode: WriteMode::Append,
2234 target_bases: Some(vec![2]),
2235 ..Default::default()
2236 }),
2237 )
2238 .await
2239 .unwrap();
2240
2241 assert_eq!(dataset.count_rows(None).await.unwrap(), 9);
2242
2243 let fragments = dataset.get_fragments();
2245 let has_base1_data = fragments
2246 .iter()
2247 .any(|f| f.metadata.files.iter().any(|file| file.base_id == Some(1)));
2248 let has_base2_data = fragments
2249 .iter()
2250 .any(|f| f.metadata.files.iter().any(|file| file.base_id == Some(2)));
2251
2252 assert!(has_base1_data, "Should have data in base1");
2253 assert!(has_base2_data, "Should have data in base2");
2254
2255 let mut data_gen4 =
2257 BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned())));
2258 let base3_uri = format!("{}/base3", test_uri);
2259
2260 let result = Dataset::write(
2261 data_gen4.batch(2),
2262 Arc::new(dataset),
2263 Some(WriteParams {
2264 mode: WriteMode::Append,
2265 initial_bases: Some(vec![BasePath {
2266 id: 3,
2267 name: Some("base3".to_string()),
2268 path: base3_uri,
2269 is_dataset_root: true,
2270 }]),
2271 target_bases: Some(vec![1]),
2272 ..Default::default()
2273 }),
2274 )
2275 .await;
2276
2277 assert!(result.is_err());
2278 assert!(
2279 result
2280 .unwrap_err()
2281 .to_string()
2282 .contains("Cannot register new bases in Append mode")
2283 );
2284 }
2285
2286 #[tokio::test]
2287 async fn test_multi_base_is_dataset_root_flag() {
2288 use lance_core::utils::tempfile::TempDir;
2289 use lance_testing::datagen::{BatchGenerator, IncrementingInt32};
2290
2291 let test_dir = TempDir::default();
2293 let primary_uri = test_dir.path_str();
2294 let base1_dir = test_dir.std_path().join("base1");
2295 let base2_dir = test_dir.std_path().join("base2");
2296
2297 std::fs::create_dir_all(&base1_dir).unwrap();
2298 std::fs::create_dir_all(&base2_dir).unwrap();
2299
2300 let base1_uri = format!("file://{}", base1_dir.display());
2301 let base2_uri = format!("file://{}", base2_dir.display());
2302
2303 let mut data_gen =
2304 BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned())));
2305
2306 let dataset = Dataset::write(
2307 data_gen.batch(10),
2308 &primary_uri,
2309 Some(WriteParams {
2310 mode: WriteMode::Create,
2311 max_rows_per_file: 5, initial_bases: Some(vec![
2313 BasePath {
2314 id: 1,
2315 name: Some("base1".to_string()),
2316 path: base1_uri.clone(),
2317 is_dataset_root: true, },
2319 BasePath {
2320 id: 2,
2321 name: Some("base2".to_string()),
2322 path: base2_uri.clone(),
2323 is_dataset_root: false, },
2325 ]),
2326 target_bases: Some(vec![1, 2]), ..Default::default()
2328 }),
2329 )
2330 .await
2331 .unwrap();
2332
2333 assert_eq!(dataset.count_rows(None).await.unwrap(), 10);
2334
2335 assert_eq!(dataset.manifest.base_paths.len(), 2);
2337
2338 let base1 = dataset
2339 .manifest
2340 .base_paths
2341 .values()
2342 .find(|bp| bp.name == Some("base1".to_string()))
2343 .expect("base1 not found");
2344 let base2 = dataset
2345 .manifest
2346 .base_paths
2347 .values()
2348 .find(|bp| bp.name == Some("base2".to_string()))
2349 .expect("base2 not found");
2350
2351 assert!(
2353 base1.is_dataset_root,
2354 "base1 should have is_dataset_root=true"
2355 );
2356 assert!(
2357 !base2.is_dataset_root,
2358 "base2 should have is_dataset_root=false"
2359 );
2360
2361 let fragments = dataset.get_fragments();
2363 assert!(!fragments.is_empty());
2364
2365 let has_base1_data = fragments
2366 .iter()
2367 .any(|f| f.metadata.files.iter().any(|file| file.base_id == Some(1)));
2368 let has_base2_data = fragments
2369 .iter()
2370 .any(|f| f.metadata.files.iter().any(|file| file.base_id == Some(2)));
2371
2372 assert!(has_base1_data, "Should have data in base1");
2373 assert!(has_base2_data, "Should have data in base2");
2374
2375 let base1_data_dir = base1_dir.join("data");
2378 assert!(base1_data_dir.exists(), "base1/data directory should exist");
2379 let base1_files: Vec<_> = std::fs::read_dir(&base1_data_dir)
2380 .unwrap()
2381 .filter_map(|e| e.ok())
2382 .filter(|e| {
2383 e.path()
2384 .extension()
2385 .map(|ext| ext == "lance")
2386 .unwrap_or(false)
2387 })
2388 .collect();
2389 assert!(
2390 !base1_files.is_empty(),
2391 "base1/data should contain .lance files"
2392 );
2393
2394 let base2_files: Vec<_> = std::fs::read_dir(&base2_dir)
2396 .unwrap()
2397 .filter_map(|e| e.ok())
2398 .filter(|e| {
2399 e.path()
2400 .extension()
2401 .map(|ext| ext == "lance")
2402 .unwrap_or(false)
2403 })
2404 .collect();
2405 assert!(
2406 !base2_files.is_empty(),
2407 "base2 should contain .lance files directly"
2408 );
2409
2410 let base2_data_dir = base2_dir.join("data");
2412 if base2_data_dir.exists() {
2413 let base2_data_files: Vec<_> = std::fs::read_dir(&base2_data_dir)
2414 .unwrap()
2415 .filter_map(|e| e.ok())
2416 .filter(|e| {
2417 e.path()
2418 .extension()
2419 .map(|ext| ext == "lance")
2420 .unwrap_or(false)
2421 })
2422 .collect();
2423 assert!(
2424 base2_data_files.is_empty(),
2425 "base2/data should NOT contain .lance files"
2426 );
2427 }
2428 }
2429
2430 #[tokio::test]
2431 async fn test_multi_base_target_by_path_uri() {
2432 use lance_core::utils::tempfile::TempDir;
2433 use lance_testing::datagen::{BatchGenerator, IncrementingInt32};
2434
2435 let test_dir = TempDir::default();
2437 let primary_uri = test_dir.path_str();
2438 let base1_dir = test_dir.std_path().join("base1");
2439 let base2_dir = test_dir.std_path().join("base2");
2440
2441 std::fs::create_dir_all(&base1_dir).unwrap();
2442 std::fs::create_dir_all(&base2_dir).unwrap();
2443
2444 let base1_uri = format!("file://{}", base1_dir.display());
2445 let base2_uri = format!("file://{}", base2_dir.display());
2446
2447 let mut data_gen =
2448 BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned())));
2449
2450 let dataset = Dataset::write(
2452 data_gen.batch(10),
2453 &primary_uri,
2454 Some(WriteParams {
2455 mode: WriteMode::Create,
2456 max_rows_per_file: 5,
2457 initial_bases: Some(vec![
2458 BasePath {
2459 id: 1,
2460 name: Some("base1".to_string()),
2461 path: base1_uri.clone(),
2462 is_dataset_root: true,
2463 },
2464 BasePath {
2465 id: 2,
2466 name: Some("base2".to_string()),
2467 path: base2_uri.clone(),
2468 is_dataset_root: true,
2469 },
2470 ]),
2471 target_base_names_or_paths: Some(vec!["base1".to_string()]), ..Default::default()
2473 }),
2474 )
2475 .await
2476 .unwrap();
2477
2478 assert_eq!(dataset.count_rows(None).await.unwrap(), 10);
2479
2480 let fragments = dataset.get_fragments();
2482 assert!(
2483 fragments
2484 .iter()
2485 .all(|f| f.metadata.files.iter().all(|file| file.base_id == Some(1)))
2486 );
2487
2488 let mut data_gen2 =
2490 BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned())));
2491
2492 let dataset = Dataset::write(
2493 data_gen2.batch(5),
2494 Arc::new(dataset),
2495 Some(WriteParams {
2496 mode: WriteMode::Append,
2497 target_base_names_or_paths: Some(vec![base2_uri.clone()]),
2499 max_rows_per_file: 5,
2500 ..Default::default()
2501 }),
2502 )
2503 .await
2504 .unwrap();
2505
2506 assert_eq!(dataset.count_rows(None).await.unwrap(), 15);
2507
2508 let fragments = dataset.get_fragments();
2510 let has_base1_data = fragments
2511 .iter()
2512 .any(|f| f.metadata.files.iter().any(|file| file.base_id == Some(1)));
2513 let has_base2_data = fragments
2514 .iter()
2515 .any(|f| f.metadata.files.iter().any(|file| file.base_id == Some(2)));
2516
2517 assert!(has_base1_data, "Should have data in base1");
2518 assert!(has_base2_data, "Should have data in base2");
2519
2520 let base2_fragments: Vec<_> = fragments
2522 .iter()
2523 .filter(|f| f.metadata.files.iter().all(|file| file.base_id == Some(2)))
2524 .collect();
2525 assert_eq!(base2_fragments.len(), 1, "Should have 1 fragment in base2");
2526 }
2527
2528 #[tokio::test]
2529 async fn test_empty_stream_write() {
2530 use lance_io::object_store::ObjectStore;
2531
2532 let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
2534 "id",
2535 DataType::Int32,
2536 false,
2537 )]));
2538 let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
2539
2540 let data_stream = Box::pin(RecordBatchStreamAdapter::new(
2542 arrow_schema.clone(),
2543 futures::stream::iter(std::iter::empty::<
2544 std::result::Result<RecordBatch, DataFusionError>,
2545 >()),
2546 ));
2547
2548 let object_store = Arc::new(ObjectStore::memory());
2549 let write_params = WriteParams {
2550 mode: WriteMode::Create,
2551 ..Default::default()
2552 };
2553
2554 let result = write_fragments_internal(
2555 None,
2556 object_store,
2557 &Path::from("test_empty"),
2558 schema,
2559 data_stream,
2560 write_params,
2561 None,
2562 )
2563 .await;
2564
2565 match result {
2568 Ok((fragments, _)) => {
2569 assert!(
2571 fragments.is_empty(),
2572 "Empty stream should create no fragments"
2573 );
2574 }
2575 Err(e) => {
2576 panic!("Expected write empty stream success, got error: {}", e);
2577 }
2578 }
2579 }
2580
2581 #[tokio::test]
2582 async fn test_schema_mismatch_on_append() {
2583 use arrow_array::record_batch;
2584
2585 let batch1 = record_batch!(
2587 ("id", Int32, [1, 2, 3, 4, 5]),
2588 ("value", Int32, [10, 20, 30, 40, 50])
2589 )
2590 .unwrap();
2591
2592 let dataset = InsertBuilder::new("memory://")
2593 .with_params(&WriteParams {
2594 mode: WriteMode::Create,
2595 ..Default::default()
2596 })
2597 .execute(vec![batch1])
2598 .await
2599 .unwrap();
2600
2601 assert_eq!(dataset.count_rows(None).await.unwrap(), 5);
2603 assert_eq!(dataset.schema().fields.len(), 2);
2604
2605 let batch2 = record_batch!(
2607 ("id", Int32, [6, 7, 8]),
2608 ("value", Float64, [60.0, 70.0, 80.0])
2609 )
2610 .unwrap();
2611
2612 let result = InsertBuilder::new(Arc::new(dataset.clone()))
2613 .with_params(&WriteParams {
2614 mode: WriteMode::Append,
2615 ..Default::default()
2616 })
2617 .execute(vec![batch2])
2618 .await;
2619
2620 assert!(result.is_err(), "Append with mismatched schema should fail");
2622 let error = result.unwrap_err();
2623 let error_msg = error.to_string().to_lowercase();
2624 assert!(
2625 error_msg.contains("schema")
2626 || error_msg.contains("type")
2627 || error_msg.contains("mismatch")
2628 || error_msg.contains("field")
2629 || error_msg.contains("not found"),
2630 "Error should mention schema or type mismatch: {}",
2631 error_msg
2632 );
2633
2634 assert_eq!(dataset.count_rows(None).await.unwrap(), 5);
2636 assert_eq!(dataset.schema().fields.len(), 2);
2637 }
2638
2639 #[tokio::test]
2640 async fn test_disk_full_error() {
2641 use std::io::{self, ErrorKind};
2642 use std::sync::Arc;
2643
2644 use async_trait::async_trait;
2645 use object_store::{
2646 GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, PutMultipartOptions,
2647 PutOptions, PutPayload, PutResult,
2648 };
2649
2650 #[derive(Debug)]
2652 struct DiskFullObjectStore;
2653
2654 impl std::fmt::Display for DiskFullObjectStore {
2655 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2656 write!(f, "DiskFullObjectStore")
2657 }
2658 }
2659
2660 #[async_trait]
2661 impl object_store::ObjectStore for DiskFullObjectStore {
2662 async fn put(
2663 &self,
2664 _location: &object_store::path::Path,
2665 _bytes: PutPayload,
2666 ) -> object_store::Result<PutResult> {
2667 Err(object_store::Error::Generic {
2668 store: "DiskFullStore",
2669 source: Box::new(io::Error::new(
2670 ErrorKind::StorageFull,
2671 "No space left on device",
2672 )),
2673 })
2674 }
2675
2676 async fn put_opts(
2677 &self,
2678 _location: &object_store::path::Path,
2679 _bytes: PutPayload,
2680 _opts: PutOptions,
2681 ) -> object_store::Result<PutResult> {
2682 Err(object_store::Error::Generic {
2683 store: "DiskFullStore",
2684 source: Box::new(io::Error::new(
2685 ErrorKind::StorageFull,
2686 "No space left on device",
2687 )),
2688 })
2689 }
2690
2691 async fn put_multipart(
2692 &self,
2693 _location: &object_store::path::Path,
2694 ) -> object_store::Result<Box<dyn MultipartUpload>> {
2695 Err(object_store::Error::NotSupported {
2696 source: "Multipart upload not supported".into(),
2697 })
2698 }
2699
2700 async fn put_multipart_opts(
2701 &self,
2702 _location: &object_store::path::Path,
2703 _opts: PutMultipartOptions,
2704 ) -> object_store::Result<Box<dyn MultipartUpload>> {
2705 Err(object_store::Error::NotSupported {
2706 source: "Multipart upload not supported".into(),
2707 })
2708 }
2709
2710 async fn get(
2711 &self,
2712 _location: &object_store::path::Path,
2713 ) -> object_store::Result<GetResult> {
2714 Err(object_store::Error::NotFound {
2715 path: "".into(),
2716 source: "".into(),
2717 })
2718 }
2719
2720 async fn get_opts(
2721 &self,
2722 _location: &object_store::path::Path,
2723 _options: GetOptions,
2724 ) -> object_store::Result<GetResult> {
2725 Err(object_store::Error::NotFound {
2726 path: "".into(),
2727 source: "".into(),
2728 })
2729 }
2730
2731 async fn delete(
2732 &self,
2733 _location: &object_store::path::Path,
2734 ) -> object_store::Result<()> {
2735 Ok(())
2736 }
2737
2738 fn list(
2739 &self,
2740 _prefix: Option<&object_store::path::Path>,
2741 ) -> futures::stream::BoxStream<'static, object_store::Result<ObjectMeta>> {
2742 Box::pin(futures::stream::empty())
2743 }
2744
2745 async fn list_with_delimiter(
2746 &self,
2747 _prefix: Option<&object_store::path::Path>,
2748 ) -> object_store::Result<ListResult> {
2749 Ok(ListResult {
2750 common_prefixes: vec![],
2751 objects: vec![],
2752 })
2753 }
2754
2755 async fn copy(
2756 &self,
2757 _from: &object_store::path::Path,
2758 _to: &object_store::path::Path,
2759 ) -> object_store::Result<()> {
2760 Ok(())
2761 }
2762
2763 async fn copy_if_not_exists(
2764 &self,
2765 _from: &object_store::path::Path,
2766 _to: &object_store::path::Path,
2767 ) -> object_store::Result<()> {
2768 Ok(())
2769 }
2770 }
2771
2772 let object_store = Arc::new(lance_io::object_store::ObjectStore::new(
2773 Arc::new(DiskFullObjectStore) as Arc<dyn object_store::ObjectStore>,
2774 url::Url::parse("mock:///test").unwrap(),
2777 None,
2778 None,
2779 false,
2780 true,
2781 lance_io::object_store::DEFAULT_LOCAL_IO_PARALLELISM,
2782 lance_io::object_store::DEFAULT_DOWNLOAD_RETRY_COUNT,
2783 None,
2784 ));
2785
2786 let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
2788 "id",
2789 DataType::Int32,
2790 false,
2791 )]));
2792
2793 let batch = RecordBatch::try_new(
2794 arrow_schema.clone(),
2795 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]))],
2796 )
2797 .unwrap();
2798
2799 let data_reader = Box::new(RecordBatchIterator::new(
2800 vec![Ok(batch)].into_iter(),
2801 arrow_schema.clone(),
2802 ));
2803
2804 let data_stream = Box::pin(RecordBatchStreamAdapter::new(
2805 arrow_schema,
2806 futures::stream::iter(data_reader.map(|rb| rb.map_err(DataFusionError::from))),
2807 ));
2808
2809 let schema = Schema::try_from(data_stream.schema().as_ref()).unwrap();
2810
2811 let write_params = WriteParams {
2812 mode: WriteMode::Create,
2813 ..Default::default()
2814 };
2815
2816 let result = write_fragments_internal(
2818 None,
2819 object_store,
2820 &Path::from("test_disk_full"),
2821 schema,
2822 data_stream,
2823 write_params,
2824 None,
2825 )
2826 .await;
2827
2828 assert!(result.is_err(), "Write should fail when disk is full");
2830 let error = result.unwrap_err();
2831 let error_msg = error.to_string().to_lowercase();
2832
2833 assert!(
2835 error_msg.contains("io")
2836 || error_msg.contains("space")
2837 || error_msg.contains("storage")
2838 || error_msg.contains("full"),
2839 "Error should mention IO, space, or storage: {}",
2840 error_msg
2841 );
2842
2843 assert!(
2845 matches!(error, lance_core::Error::IO { .. }),
2846 "Expected IO error, got: {}",
2847 error
2848 );
2849 }
2850
2851 #[tokio::test]
2857 async fn test_write_interruption_recovery() {
2858 use super::commit::CommitBuilder;
2859 use arrow_array::record_batch;
2860
2861 let temp_dir = TempDir::default();
2863 let dataset_uri = format!("file://{}", temp_dir.std_path().display());
2864
2865 let batch =
2867 record_batch!(("id", Int32, [1, 2, 3]), ("value", Utf8, ["a", "b", "c"])).unwrap();
2868
2869 let dataset = InsertBuilder::new(&dataset_uri)
2871 .execute(vec![batch.clone()])
2872 .await
2873 .unwrap();
2874
2875 assert_eq!(dataset.count_rows(None).await.unwrap(), 3);
2877
2878 let new_batch =
2880 record_batch!(("id", Int32, [4, 5, 6]), ("value", Utf8, ["d", "e", "f"])).unwrap();
2881
2882 let uncommitted_result = InsertBuilder::new(WriteDestination::Dataset(Arc::new(
2884 Dataset::open(&dataset_uri).await.unwrap(),
2885 )))
2886 .with_params(&WriteParams {
2887 mode: WriteMode::Append,
2888 ..Default::default()
2889 })
2890 .execute_uncommitted(vec![new_batch])
2891 .await;
2892
2893 assert!(
2895 uncommitted_result.is_ok(),
2896 "Uncommitted write should succeed"
2897 );
2898 let transaction = uncommitted_result.unwrap();
2899
2900 let dataset_before_commit = Dataset::open(&dataset_uri).await.unwrap();
2902 let row_count_before = dataset_before_commit.count_rows(None).await.unwrap();
2903 assert_eq!(
2904 row_count_before, 3,
2905 "Dataset should still have only original 3 rows (uncommitted data not visible)"
2906 );
2907
2908 let commit_result = CommitBuilder::new(&dataset_uri).execute(transaction).await;
2910 commit_result.unwrap();
2911
2912 let dataset_after_commit = Dataset::open(&dataset_uri).await.unwrap();
2914 let row_count_after = dataset_after_commit.count_rows(None).await.unwrap();
2915 assert_eq!(
2916 row_count_after, 6,
2917 "Dataset should have all 6 rows after commit"
2918 );
2919
2920 let mut scanner = dataset_after_commit.scan();
2922 scanner.project(&["id", "value"]).unwrap();
2923 let batches = scanner
2924 .try_into_stream()
2925 .await
2926 .unwrap()
2927 .try_collect::<Vec<_>>()
2928 .await
2929 .unwrap();
2930
2931 let all_ids: Vec<i32> = batches
2932 .iter()
2933 .flat_map(|batch| {
2934 batch
2935 .column(0)
2936 .as_any()
2937 .downcast_ref::<Int32Array>()
2938 .unwrap()
2939 .iter()
2940 .flatten()
2941 })
2942 .collect();
2943
2944 assert_eq!(
2945 all_ids,
2946 vec![1, 2, 3, 4, 5, 6],
2947 "All data should be correctly written"
2948 );
2949 }
2950}