Skip to main content

lance/dataset/
write.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use arrow_array::RecordBatch;
5use chrono::TimeDelta;
6use datafusion::physical_plan::SendableRecordBatchStream;
7use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
8use futures::{Stream, StreamExt, TryStreamExt};
9use lance_arrow::BLOB_META_KEY;
10use lance_core::datatypes::{
11    NullabilityComparison, OnMissing, OnTypeMismatch, SchemaCompareOptions,
12};
13use lance_core::error::LanceOptionExt;
14use lance_core::utils::tempfile::TempDir;
15use lance_core::utils::tracing::{AUDIT_MODE_CREATE, AUDIT_TYPE_DATA, TRACE_FILE_AUDIT};
16use lance_core::{Error, Result, datatypes::Schema};
17use lance_datafusion::chunker::{break_stream, chunk_stream};
18use lance_datafusion::spill::{SpillReceiver, SpillSender, create_replay_spill};
19use lance_datafusion::utils::StreamingWriteSource;
20use lance_file::previous::writer::{
21    FileWriter as PreviousFileWriter, ManifestProvider as PreviousManifestProvider,
22};
23use lance_file::version::LanceFileVersion;
24use lance_file::writer::{self as current_writer, FileWriterOptions};
25use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry};
26use lance_table::format::{BasePath, DataFile, Fragment};
27use lance_table::io::commit::{CommitHandler, commit_handler_from_url};
28use lance_table::io::manifest::ManifestDescribing;
29use object_store::path::Path;
30use std::collections::{HashMap, HashSet};
31use std::num::NonZero;
32use std::sync::Arc;
33use std::sync::atomic::AtomicUsize;
34use tracing::{info, instrument};
35
36use crate::Dataset;
37use crate::dataset::blob::{
38    BlobPreprocessor, ExternalBaseCandidate, ExternalBaseResolver, preprocess_blob_batches,
39};
40use crate::session::Session;
41
42use super::DATA_DIR;
43use super::fragment::write::generate_random_filename;
44use super::progress::{NoopFragmentWriteProgress, WriteFragmentProgress};
45use super::transaction::Transaction;
46use super::utils::SchemaAdapter;
47
48mod commit;
49pub mod delete;
50mod insert;
51pub mod merge_insert;
52mod retry;
53pub mod update;
54
55pub use commit::CommitBuilder;
56pub use delete::{DeleteBuilder, DeleteResult};
57pub use insert::InsertBuilder;
58
59/// The destination to write data to.
60#[derive(Debug, Clone)]
61pub enum WriteDestination<'a> {
62    /// An existing dataset to write to.
63    Dataset(Arc<Dataset>),
64    /// A URI to write to.
65    Uri(&'a str),
66}
67
68impl WriteDestination<'_> {
69    pub fn dataset(&self) -> Option<&Dataset> {
70        match self {
71            WriteDestination::Dataset(dataset) => Some(dataset.as_ref()),
72            WriteDestination::Uri(_) => None,
73        }
74    }
75
76    pub fn uri(&self) -> String {
77        match self {
78            WriteDestination::Dataset(dataset) => dataset.uri.clone(),
79            WriteDestination::Uri(uri) => uri.to_string(),
80        }
81    }
82}
83
84impl From<Arc<Dataset>> for WriteDestination<'_> {
85    fn from(dataset: Arc<Dataset>) -> Self {
86        WriteDestination::Dataset(dataset)
87    }
88}
89
90impl<'a> From<&'a str> for WriteDestination<'a> {
91    fn from(uri: &'a str) -> Self {
92        WriteDestination::Uri(uri)
93    }
94}
95
96impl<'a> From<&'a String> for WriteDestination<'a> {
97    fn from(uri: &'a String) -> Self {
98        WriteDestination::Uri(uri.as_str())
99    }
100}
101
102impl<'a> From<&'a Path> for WriteDestination<'a> {
103    fn from(path: &'a Path) -> Self {
104        WriteDestination::Uri(path.as_ref())
105    }
106}
107
108/// The mode to write dataset.
109#[derive(Debug, Clone, Copy)]
110pub enum WriteMode {
111    /// Create a new dataset. Expect the dataset does not exist.
112    Create,
113    /// Append to an existing dataset.
114    Append,
115    /// Overwrite a dataset as a new version, or create new dataset if not exist.
116    Overwrite,
117}
118
119impl TryFrom<&str> for WriteMode {
120    type Error = Error;
121
122    fn try_from(value: &str) -> Result<Self> {
123        match value.to_lowercase().as_str() {
124            "create" => Ok(Self::Create),
125            "append" => Ok(Self::Append),
126            "overwrite" => Ok(Self::Overwrite),
127            _ => Err(Error::invalid_input(format!(
128                "Invalid write mode: {}",
129                value
130            ))),
131        }
132    }
133}
134
135/// Auto cleanup parameters
136#[derive(Debug, Clone)]
137pub struct AutoCleanupParams {
138    pub interval: usize,
139    pub older_than: TimeDelta,
140}
141
142impl Default for AutoCleanupParams {
143    fn default() -> Self {
144        Self {
145            interval: 20,
146            older_than: TimeDelta::days(14),
147        }
148    }
149}
150
151/// Dataset Write Parameters
152#[derive(Debug, Clone)]
153pub struct WriteParams {
154    /// Max number of records per file.
155    pub max_rows_per_file: usize,
156
157    /// Max number of rows per row group.
158    pub max_rows_per_group: usize,
159
160    /// Max file size in bytes.
161    ///
162    /// This is a soft limit. The actual file size may be larger than this value
163    /// by a few megabytes, since once we detect we hit this limit, we still
164    /// need to flush the footer.
165    ///
166    /// This limit is checked after writing each group, so if max_rows_per_group
167    /// is set to a large value, this limit may be exceeded by a large amount.
168    ///
169    /// The default is 90 GB. If you are using an object store such as S3, we
170    /// currently have a hard 100 GB limit.
171    pub max_bytes_per_file: usize,
172
173    /// Write mode
174    pub mode: WriteMode,
175
176    pub store_params: Option<ObjectStoreParams>,
177
178    pub progress: Arc<dyn WriteFragmentProgress>,
179
180    /// If present, dataset will use this to update the latest version
181    ///
182    /// If not set, the default will be based on the object store.  Generally this will
183    /// be RenameCommitHandler unless the object store does not handle atomic renames (e.g. S3)
184    ///
185    /// If a custom object store is provided (via store_params.object_store) then this
186    /// must also be provided.
187    pub commit_handler: Option<Arc<dyn CommitHandler>>,
188
189    /// The format version to use when writing data.
190    ///
191    /// Newer versions are more efficient but the data can only be read by more recent versions
192    /// of lance.
193    ///
194    /// If not specified then the latest stable version will be used.
195    pub data_storage_version: Option<LanceFileVersion>,
196
197    /// Experimental: if set to true, the writer will use stable row ids.
198    /// These row ids are stable after compaction operations, but not after updates.
199    /// This makes compaction more efficient, since with stable row ids no
200    /// secondary indices need to be updated to point to new row ids.
201    pub enable_stable_row_ids: bool,
202
203    /// If set to true, and this is a new dataset, uses the new v2 manifest paths.
204    /// These allow constant-time lookups for the latest manifest on object storage.
205    /// This parameter has no effect on existing datasets. To migrate an existing
206    /// dataset, use the [`super::Dataset::migrate_manifest_paths_v2`] method.
207    /// Default is True.
208    pub enable_v2_manifest_paths: bool,
209
210    pub session: Option<Arc<Session>>,
211
212    /// If Some and this is a new dataset, old dataset versions will be
213    /// automatically cleaned up according to the parameters set out in
214    /// [`AutoCleanupParams`]. This parameter has no effect on existing datasets.
215    /// To add auto-cleanup to an existing dataset, use [`Dataset::update_config`]
216    /// to set `lance.auto_cleanup.interval` and `lance.auto_cleanup.older_than`.
217    /// Both parameters must be set to invoke auto-cleanup.
218    pub auto_cleanup: Option<AutoCleanupParams>,
219
220    /// If true, skip auto cleanup during commits. This should be set to true
221    /// for high frequency writes to improve performance. This is also useful
222    /// if the writer does not have delete permissions and the clean up would
223    /// just try and log a failure anyway. Default is false.
224    pub skip_auto_cleanup: bool,
225
226    /// Configuration key-value pairs for this write operation.
227    /// This can include commit messages, engine information, etc.
228    /// this properties map will be persisted as part of Transaction object.
229    pub transaction_properties: Option<Arc<HashMap<String, String>>>,
230
231    /// New base paths to register in the manifest during dataset creation.
232    /// Each BasePath must have a properly assigned ID (non-zero).
233    /// Only used in CREATE/OVERWRITE modes for manifest registration.
234    /// IDs should be assigned by the caller before passing to WriteParams.
235    pub initial_bases: Option<Vec<BasePath>>,
236
237    /// Target base IDs for writing data files.
238    /// When provided, all new data files will be written to bases with these IDs.
239    /// Used in all modes (CREATE, APPEND, OVERWRITE) to specify where data should be written.
240    /// The IDs must correspond to either:
241    /// - IDs in initial_bases (for CREATE/OVERWRITE modes)
242    /// - IDs already registered in the existing dataset manifest (for APPEND mode)
243    pub target_bases: Option<Vec<u32>>,
244
245    /// Target base names or paths as strings (unresolved).
246    /// These will be resolved to IDs when the write operation executes.
247    /// Resolution happens at builder execution time when dataset context is available.
248    pub target_base_names_or_paths: Option<Vec<String>>,
249
250    /// Allow writing external blob URIs that cannot be mapped to any registered
251    /// non-dataset-root base path. When disabled, such rows are rejected.
252    pub allow_external_blob_outside_bases: bool,
253}
254
255impl Default for WriteParams {
256    fn default() -> Self {
257        Self {
258            max_rows_per_file: 1024 * 1024, // 1 million
259            max_rows_per_group: 1024,
260            // object-store has a 100GB limit, so we should at least make sure
261            // we are under that.
262            max_bytes_per_file: 90 * 1024 * 1024 * 1024, // 90 GB
263            mode: WriteMode::Create,
264            store_params: None,
265            progress: Arc::new(NoopFragmentWriteProgress::new()),
266            commit_handler: None,
267            data_storage_version: None,
268            enable_stable_row_ids: false,
269            enable_v2_manifest_paths: true,
270            session: None,
271            auto_cleanup: Some(AutoCleanupParams::default()),
272            skip_auto_cleanup: false,
273            transaction_properties: None,
274            initial_bases: None,
275            target_bases: None,
276            target_base_names_or_paths: None,
277            allow_external_blob_outside_bases: false,
278        }
279    }
280}
281
282impl WriteParams {
283    /// Create a new WriteParams with the given storage version.
284    /// The other fields are set to their default values.
285    pub fn with_storage_version(version: LanceFileVersion) -> Self {
286        Self {
287            data_storage_version: Some(version),
288            ..Default::default()
289        }
290    }
291
292    pub fn storage_version_or_default(&self) -> LanceFileVersion {
293        self.data_storage_version.unwrap_or_default()
294    }
295
296    pub fn store_registry(&self) -> Arc<ObjectStoreRegistry> {
297        self.session
298            .as_ref()
299            .map(|s| s.store_registry())
300            .unwrap_or_default()
301    }
302
303    /// Set the properties for this WriteParams.
304    pub fn with_transaction_properties(self, properties: HashMap<String, String>) -> Self {
305        Self {
306            transaction_properties: Some(Arc::new(properties)),
307            ..self
308        }
309    }
310
311    /// Set the initial_bases for this WriteParams.
312    ///
313    /// This specifies new base paths to register in the manifest during dataset creation.
314    /// Each BasePath must have a properly assigned ID (non-zero) before calling this method.
315    /// Only used in CREATE/OVERWRITE modes for manifest registration.
316    pub fn with_initial_bases(self, bases: Vec<BasePath>) -> Self {
317        Self {
318            initial_bases: Some(bases),
319            ..self
320        }
321    }
322
323    /// Set the target_bases for this WriteParams.
324    ///
325    /// This specifies the base IDs where data files should be written.
326    /// The IDs must correspond to either:
327    /// - IDs in initial_bases (for CREATE/OVERWRITE modes)
328    /// - IDs already registered in the existing dataset manifest (for APPEND mode)
329    pub fn with_target_bases(self, base_ids: Vec<u32>) -> Self {
330        Self {
331            target_bases: Some(base_ids),
332            ..self
333        }
334    }
335
336    /// Store target base names or paths for deferred resolution.
337    ///
338    /// This method stores the references in `target_base_names_or_paths` field
339    /// to be resolved later at execution time when the dataset manifest is available.
340    ///
341    /// Resolution will happen at write execution time and will try to match:
342    /// 1. initial_bases by name
343    /// 2. initial_bases by path
344    /// 3. existing manifest by name
345    /// 4. existing manifest by path
346    ///
347    /// # Arguments
348    ///
349    /// * `references` - Vector of base names or paths to be resolved later
350    pub fn with_target_base_names_or_paths(self, references: Vec<String>) -> Self {
351        Self {
352            target_base_names_or_paths: Some(references),
353            ..self
354        }
355    }
356
357    /// Configure whether external blobs outside registered bases are allowed.
358    pub fn with_allow_external_blob_outside_bases(self, allow: bool) -> Self {
359        Self {
360            allow_external_blob_outside_bases: allow,
361            ..self
362        }
363    }
364}
365
366/// Writes the given data to the dataset and returns fragments.
367///
368/// NOTE: the fragments have not yet been assigned an ID. That must be done
369/// by the caller. This is so this function can be called in parallel, and the
370/// IDs can be assigned after writing is complete.
371#[deprecated(
372    since = "0.20.0",
373    note = "Use [`InsertBuilder::write_uncommitted_stream`] instead"
374)]
375pub async fn write_fragments(
376    dest: impl Into<WriteDestination<'_>>,
377    data: impl StreamingWriteSource,
378    params: WriteParams,
379) -> Result<Transaction> {
380    InsertBuilder::new(dest.into())
381        .with_params(&params)
382        .execute_uncommitted_stream(data)
383        .await
384}
385
386#[allow(clippy::too_many_arguments)]
387pub async fn do_write_fragments(
388    dataset: Option<&Dataset>,
389    object_store: Arc<ObjectStore>,
390    base_dir: &Path,
391    schema: &Schema,
392    data: SendableRecordBatchStream,
393    params: WriteParams,
394    storage_version: LanceFileVersion,
395    target_bases_info: Option<Vec<TargetBaseInfo>>,
396) -> Result<Vec<Fragment>> {
397    let adapter = SchemaAdapter::new(data.schema());
398    let data = adapter.to_physical_stream(data);
399
400    let mut buffered_reader = if storage_version == LanceFileVersion::Legacy {
401        // In v1 we split the stream into row group sized batches
402        chunk_stream(data, params.max_rows_per_group)
403    } else {
404        // In v2 we don't care about group size but we do want to break
405        // the stream on file boundaries
406        break_stream(data, params.max_rows_per_file)
407            .map_ok(|batch| vec![batch])
408            .boxed()
409    };
410
411    let external_base_resolver = if storage_version >= LanceFileVersion::V2_2
412        && schema.fields.iter().any(|field| field.is_blob_v2())
413    {
414        Some(Arc::new(
415            build_external_base_resolver(dataset, &params).await?,
416        ))
417    } else {
418        None
419    };
420
421    let writer_generator = WriterGenerator::new(
422        object_store,
423        base_dir,
424        schema,
425        storage_version,
426        target_bases_info,
427        external_base_resolver,
428        params.allow_external_blob_outside_bases,
429    );
430    let mut writer: Option<Box<dyn GenericWriter>> = None;
431    let mut num_rows_in_current_file = 0;
432    let mut fragments = Vec::new();
433    while let Some(batch_chunk) = buffered_reader.next().await {
434        let batch_chunk = batch_chunk?;
435
436        if writer.is_none() {
437            let (new_writer, new_fragment) = writer_generator.new_writer().await?;
438            params.progress.begin(&new_fragment).await?;
439            writer = Some(new_writer);
440            fragments.push(new_fragment);
441        }
442
443        writer.as_mut().unwrap().write(&batch_chunk).await?;
444        for batch in batch_chunk {
445            num_rows_in_current_file += batch.num_rows() as u32;
446        }
447
448        if num_rows_in_current_file >= params.max_rows_per_file as u32
449            || writer.as_mut().unwrap().tell().await? >= params.max_bytes_per_file as u64
450        {
451            let (num_rows, data_file) = writer.take().unwrap().finish().await?;
452            info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_DATA, path = &data_file.path);
453            debug_assert_eq!(num_rows, num_rows_in_current_file);
454            params.progress.complete(fragments.last().unwrap()).await?;
455            let last_fragment = fragments.last_mut().unwrap();
456            last_fragment.physical_rows = Some(num_rows as usize);
457            last_fragment.files.push(data_file);
458            num_rows_in_current_file = 0;
459        }
460    }
461
462    // Complete the final writer
463    if let Some(mut writer) = writer.take() {
464        let (num_rows, data_file) = writer.finish().await?;
465        info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_DATA, path = &data_file.path);
466        let last_fragment = fragments.last_mut().unwrap();
467        last_fragment.physical_rows = Some(num_rows as usize);
468        last_fragment.files.push(data_file);
469    }
470
471    Ok(fragments)
472}
473
474pub async fn validate_and_resolve_target_bases(
475    params: &mut WriteParams,
476    existing_base_paths: Option<&HashMap<u32, BasePath>>,
477) -> Result<Option<Vec<TargetBaseInfo>>> {
478    // Step 1: Validations
479    if !matches!(params.mode, WriteMode::Create) && params.initial_bases.is_some() {
480        return Err(Error::invalid_input(format!(
481            "Cannot register new bases in {:?} mode. Only CREATE mode can register new bases.",
482            params.mode
483        )));
484    }
485
486    if params.target_base_names_or_paths.is_some() && params.target_bases.is_some() {
487        return Err(Error::invalid_input(
488            "Cannot specify both target_base_names_or_paths and target_bases. Use one or the other.",
489        ));
490    }
491
492    // Step 2: Assign IDs to initial_bases and add them to all_bases
493    let mut all_bases: HashMap<u32, BasePath> = existing_base_paths.cloned().unwrap_or_default();
494    if let Some(initial_bases) = &mut params.initial_bases {
495        let mut next_id = all_bases.keys().max().map(|&id| id + 1).unwrap_or(1);
496
497        for base_path in initial_bases.iter_mut() {
498            if base_path.id == 0 {
499                base_path.id = next_id;
500                next_id += 1;
501            }
502            all_bases.insert(base_path.id, base_path.clone());
503        }
504    }
505
506    // Step 3: Resolve target_base_names_or_paths to IDs
507    let target_base_ids = if let Some(ref names_or_paths) = params.target_base_names_or_paths {
508        let mut resolved_ids = Vec::new();
509        for reference in names_or_paths {
510            let ref_str = reference.as_str();
511            let id = all_bases
512                .iter()
513                .find(|(_, base)| {
514                    base.name.as_ref().map(|n| n == ref_str).unwrap_or(false)
515                        || base.path == ref_str
516                })
517                .map(|(&id, _)| id)
518                .ok_or_else(|| {
519                    Error::invalid_input(format!(
520                        "Base reference '{}' not found in available bases",
521                        ref_str
522                    ))
523                })?;
524
525            resolved_ids.push(id);
526        }
527        Some(resolved_ids)
528    } else {
529        params.target_bases.clone()
530    };
531
532    // Step 4: Prepare TargetBaseInfo structs
533    let store_registry = params
534        .session
535        .as_ref()
536        .map(|s| s.store_registry())
537        .unwrap_or_default();
538
539    if let Some(target_bases) = &target_base_ids {
540        let store_params = params.store_params.clone().unwrap_or_default();
541        let mut bases_info = Vec::new();
542
543        for &target_base_id in target_bases {
544            let base_path = all_bases.get(&target_base_id).ok_or_else(|| {
545                Error::invalid_input(format!(
546                    "Target base ID {} not found in available bases",
547                    target_base_id
548                ))
549            })?;
550
551            let (target_object_store, extracted_path) = ObjectStore::from_uri_and_params(
552                store_registry.clone(),
553                &base_path.path,
554                &store_params,
555            )
556            .await?;
557
558            bases_info.push(TargetBaseInfo {
559                base_id: target_base_id,
560                object_store: target_object_store,
561                base_dir: extracted_path,
562                is_dataset_root: base_path.is_dataset_root,
563            });
564        }
565
566        Ok(Some(bases_info))
567    } else {
568        Ok(None)
569    }
570}
571
572fn append_external_base_candidate(
573    base_path: &BasePath,
574    store_prefix: String,
575    extracted_path: Path,
576    candidates: &mut Vec<ExternalBaseCandidate>,
577    seen_base_ids: &mut HashSet<u32>,
578) {
579    if base_path.is_dataset_root {
580        return;
581    }
582    if seen_base_ids.insert(base_path.id) {
583        candidates.push(ExternalBaseCandidate {
584            base_id: base_path.id,
585            store_prefix,
586            base_path: extracted_path,
587        });
588    }
589}
590
591async fn append_external_initial_bases(
592    initial_bases: Option<&Vec<BasePath>>,
593    store_registry: Arc<ObjectStoreRegistry>,
594    store_params: &ObjectStoreParams,
595    candidates: &mut Vec<ExternalBaseCandidate>,
596    seen_base_ids: &mut HashSet<u32>,
597) -> Result<()> {
598    if let Some(initial_bases) = initial_bases {
599        for base_path in initial_bases {
600            let (store, extracted_path) = ObjectStore::from_uri_and_params(
601                store_registry.clone(),
602                &base_path.path,
603                store_params,
604            )
605            .await?;
606            append_external_base_candidate(
607                base_path,
608                store.store_prefix.clone(),
609                extracted_path,
610                candidates,
611                seen_base_ids,
612            );
613        }
614    }
615    Ok(())
616}
617
618async fn build_external_base_resolver(
619    dataset: Option<&Dataset>,
620    params: &WriteParams,
621) -> Result<ExternalBaseResolver> {
622    let store_registry = dataset
623        .map(|ds| ds.session.store_registry())
624        .unwrap_or_else(|| params.store_registry());
625    let store_params = params.store_params.clone().unwrap_or_default();
626
627    let mut seen_base_ids = HashSet::new();
628    let mut candidates = vec![];
629
630    if let Some(dataset) = dataset {
631        for base_path in dataset.manifest.base_paths.values() {
632            let (store, extracted_path) = ObjectStore::from_uri_and_params(
633                store_registry.clone(),
634                &base_path.path,
635                &store_params,
636            )
637            .await?;
638            append_external_base_candidate(
639                base_path,
640                store.store_prefix.clone(),
641                extracted_path,
642                &mut candidates,
643                &mut seen_base_ids,
644            );
645        }
646    }
647
648    append_external_initial_bases(
649        params.initial_bases.as_ref(),
650        store_registry.clone(),
651        &store_params,
652        &mut candidates,
653        &mut seen_base_ids,
654    )
655    .await?;
656
657    Ok(ExternalBaseResolver::new(
658        candidates,
659        store_registry,
660        store_params,
661    ))
662}
663
664/// Writes the given data to the dataset and returns fragments.
665///
666/// NOTE: the fragments have not yet been assigned an ID. That must be done
667/// by the caller. This is so this function can be called in parallel, and the
668/// IDs can be assigned after writing is complete.
669///
670/// This is a private variant that takes a `SendableRecordBatchStream` instead
671/// of a reader. We don't expose the stream at our interface because it is a
672/// DataFusion type.
673#[instrument(level = "debug", skip_all)]
674pub async fn write_fragments_internal(
675    dataset: Option<&Dataset>,
676    object_store: Arc<ObjectStore>,
677    base_dir: &Path,
678    schema: Schema,
679    data: SendableRecordBatchStream,
680    params: WriteParams,
681    target_bases_info: Option<Vec<TargetBaseInfo>>,
682) -> Result<(Vec<Fragment>, Schema)> {
683    let mut params = params;
684    let adapter = SchemaAdapter::new(data.schema());
685
686    let (data, converted_schema) = if adapter.requires_physical_conversion() {
687        let data = adapter.to_physical_stream(data);
688        // Update the schema to match the converted data
689        let arrow_schema = data.schema();
690        let converted_schema = Schema::try_from(arrow_schema.as_ref())?;
691        (data, converted_schema)
692    } else {
693        // No conversion needed, use original schema to preserve dictionary info
694        (data, schema)
695    };
696
697    // Make sure the max rows per group is not larger than the max rows per file
698    params.max_rows_per_group = std::cmp::min(params.max_rows_per_group, params.max_rows_per_file);
699
700    let (schema, storage_version) = if let Some(dataset) = dataset {
701        match params.mode {
702            WriteMode::Append | WriteMode::Create => {
703                // Append mode, so we need to check compatibility
704                converted_schema.check_compatible(
705                    dataset.schema(),
706                    &SchemaCompareOptions {
707                        // We don't care if the user claims their data is nullable / non-nullable.  We will
708                        // verify against the actual data.
709                        compare_nullability: NullabilityComparison::Ignore,
710                        allow_missing_if_nullable: true,
711                        ignore_field_order: true,
712                        compare_dictionary: dataset.is_legacy_storage(),
713                        ..Default::default()
714                    },
715                )?;
716                let write_schema = dataset.schema().project_by_schema(
717                    &converted_schema,
718                    OnMissing::Error,
719                    OnTypeMismatch::Error,
720                )?;
721                // Use the storage version from the dataset, ignoring any version from the user.
722                let data_storage_version = dataset
723                    .manifest()
724                    .data_storage_format
725                    .lance_file_version()?;
726                (write_schema, data_storage_version)
727            }
728            WriteMode::Overwrite => {
729                // Overwrite, use the schema from the data.  If the user specified
730                // a storage version use that.  Otherwise use the version from the
731                // dataset.
732                let data_storage_version = params.data_storage_version.unwrap_or(
733                    dataset
734                        .manifest()
735                        .data_storage_format
736                        .lance_file_version()?,
737                );
738                (converted_schema, data_storage_version)
739            }
740        }
741    } else {
742        // Brand new dataset, use the schema from the data and the storage version
743        // from the user or the default.
744        (converted_schema, params.storage_version_or_default())
745    };
746
747    if storage_version < LanceFileVersion::V2_2 && schema.fields.iter().any(|f| f.is_blob_v2()) {
748        return Err(Error::invalid_input(format!(
749            "Blob v2 requires file version >= 2.2 (got {:?})",
750            storage_version
751        )));
752    }
753
754    if storage_version >= LanceFileVersion::V2_2
755        && schema
756            .fields
757            .iter()
758            .any(|f| f.metadata.contains_key(BLOB_META_KEY))
759    {
760        return Err(Error::invalid_input(format!(
761            "Legacy blob columns (field metadata key {BLOB_META_KEY:?}) are not supported for file version >= 2.2. Use the blob v2 extension type (ARROW:extension:name = \"lance.blob.v2\") and the new blob APIs (e.g. lance::blob::blob_field / lance::blob::BlobArrayBuilder)."
762        )));
763    }
764
765    let fragments = do_write_fragments(
766        dataset,
767        object_store,
768        base_dir,
769        &schema,
770        data,
771        params,
772        storage_version,
773        target_bases_info,
774    )
775    .await?;
776
777    Ok((fragments, schema))
778}
779
780#[async_trait::async_trait]
781pub trait GenericWriter: Send {
782    /// Write the given batches to the file
783    async fn write(&mut self, batches: &[RecordBatch]) -> Result<()>;
784    /// Get the current position in the file
785    ///
786    /// We use this to know when the file is too large and we need to start
787    /// a new file
788    async fn tell(&mut self) -> Result<u64>;
789    /// Finish writing the file (flush the remaining data and write footer)
790    async fn finish(&mut self) -> Result<(u32, DataFile)>;
791}
792
793struct V1WriterAdapter<M>
794where
795    M: PreviousManifestProvider + Send + Sync,
796{
797    writer: PreviousFileWriter<M>,
798    path: String,
799    base_id: Option<u32>,
800}
801
802#[async_trait::async_trait]
803impl<M> GenericWriter for V1WriterAdapter<M>
804where
805    M: PreviousManifestProvider + Send + Sync,
806{
807    async fn write(&mut self, batches: &[RecordBatch]) -> Result<()> {
808        self.writer.write(batches).await
809    }
810    async fn tell(&mut self) -> Result<u64> {
811        Ok(self.writer.tell().await? as u64)
812    }
813    async fn finish(&mut self) -> Result<(u32, DataFile)> {
814        let size_bytes = self.writer.tell().await?;
815        Ok((
816            self.writer.finish().await? as u32,
817            DataFile::new_legacy(
818                self.path.clone(),
819                self.writer.schema(),
820                NonZero::new(size_bytes as u64),
821                self.base_id,
822            ),
823        ))
824    }
825}
826
827struct V2WriterAdapter {
828    writer: current_writer::FileWriter,
829    path: String,
830    base_id: Option<u32>,
831    preprocessor: Option<BlobPreprocessor>,
832}
833
834#[async_trait::async_trait]
835impl GenericWriter for V2WriterAdapter {
836    async fn write(&mut self, batches: &[RecordBatch]) -> Result<()> {
837        if let Some(pre) = self.preprocessor.as_mut() {
838            let processed = preprocess_blob_batches(batches, pre).await?;
839            for batch in processed {
840                self.writer.write_batch(&batch).await?;
841            }
842        } else {
843            for batch in batches {
844                self.writer.write_batch(batch).await?;
845            }
846        }
847        Ok(())
848    }
849    async fn tell(&mut self) -> Result<u64> {
850        Ok(self.writer.tell().await?)
851    }
852    async fn finish(&mut self) -> Result<(u32, DataFile)> {
853        if let Some(pre) = self.preprocessor.as_mut() {
854            pre.finish().await?;
855        }
856        let field_ids = self
857            .writer
858            .field_id_to_column_indices()
859            .iter()
860            .map(|(field_id, _)| *field_id as i32)
861            .collect::<Vec<_>>();
862        let column_indices = self
863            .writer
864            .field_id_to_column_indices()
865            .iter()
866            .map(|(_, column_index)| *column_index as i32)
867            .collect::<Vec<_>>();
868        let (major, minor) = self.writer.version().to_numbers();
869        let num_rows = self.writer.finish().await? as u32;
870        let data_file = DataFile::new(
871            std::mem::take(&mut self.path),
872            field_ids,
873            column_indices,
874            major,
875            minor,
876            NonZero::new(self.writer.tell().await?),
877            self.base_id,
878        );
879        Ok((num_rows, data_file))
880    }
881}
882
883pub async fn open_writer(
884    object_store: &ObjectStore,
885    schema: &Schema,
886    base_dir: &Path,
887    storage_version: LanceFileVersion,
888) -> Result<Box<dyn GenericWriter>> {
889    open_writer_with_options(
890        object_store,
891        schema,
892        base_dir,
893        storage_version,
894        WriterOptions {
895            add_data_dir: true,
896            ..Default::default()
897        },
898    )
899    .await
900}
901
902#[derive(Default)]
903struct WriterOptions {
904    add_data_dir: bool,
905    base_id: Option<u32>,
906    external_base_resolver: Option<Arc<ExternalBaseResolver>>,
907    allow_external_blob_outside_bases: bool,
908}
909
910async fn open_writer_with_options(
911    object_store: &ObjectStore,
912    schema: &Schema,
913    base_dir: &Path,
914    storage_version: LanceFileVersion,
915    options: WriterOptions,
916) -> Result<Box<dyn GenericWriter>> {
917    let WriterOptions {
918        add_data_dir,
919        base_id,
920        external_base_resolver,
921        allow_external_blob_outside_bases,
922    } = options;
923
924    let data_file_key = generate_random_filename();
925    let filename = format!("{}.lance", data_file_key);
926
927    let data_dir = if add_data_dir {
928        base_dir.child(DATA_DIR)
929    } else {
930        base_dir.clone()
931    };
932
933    let full_path = data_dir.child(filename.as_str());
934
935    let writer = if storage_version == LanceFileVersion::Legacy {
936        Box::new(V1WriterAdapter {
937            writer: PreviousFileWriter::<ManifestDescribing>::try_new(
938                object_store,
939                &full_path,
940                schema.clone(),
941                &Default::default(),
942            )
943            .await?,
944            path: filename,
945            base_id,
946        })
947    } else {
948        let writer = object_store.create(&full_path).await?;
949        let enable_blob_v2 = storage_version >= LanceFileVersion::V2_2;
950        let file_writer = current_writer::FileWriter::try_new(
951            writer,
952            schema.clone(),
953            FileWriterOptions {
954                format_version: Some(storage_version),
955                ..Default::default()
956            },
957        )?;
958        let preprocessor = if enable_blob_v2 {
959            Some(BlobPreprocessor::new(
960                object_store.clone(),
961                data_dir.clone(),
962                data_file_key.clone(),
963                schema,
964                external_base_resolver,
965                allow_external_blob_outside_bases,
966            ))
967        } else {
968            None
969        };
970        let writer_adapter = V2WriterAdapter {
971            writer: file_writer,
972            path: filename,
973            base_id,
974            preprocessor,
975        };
976        Box::new(writer_adapter) as Box<dyn GenericWriter>
977    };
978    Ok(writer)
979}
980
981/// Information about a target base for writing.
982/// Contains the base ID, object store, directory path, and whether it's a dataset root.
983pub struct TargetBaseInfo {
984    pub base_id: u32,
985    pub object_store: Arc<ObjectStore>,
986    /// The base directory path (without /data subdirectory)
987    pub base_dir: Path,
988    /// Whether this base path is a dataset root.
989    /// If true, /data will be added when creating file paths.
990    /// If false, files will be written directly to base_dir.
991    pub is_dataset_root: bool,
992}
993
994struct WriterGenerator {
995    /// Default object store (used when no target bases specified)
996    object_store: Arc<ObjectStore>,
997    /// Default base directory (used when no target bases specified)
998    base_dir: Path,
999    schema: Schema,
1000    storage_version: LanceFileVersion,
1001    /// Target base information (if writing to specific bases)
1002    target_bases_info: Option<Vec<TargetBaseInfo>>,
1003    external_base_resolver: Option<Arc<ExternalBaseResolver>>,
1004    allow_external_blob_outside_bases: bool,
1005    /// Counter for round-robin selection
1006    next_base_index: AtomicUsize,
1007}
1008
1009impl WriterGenerator {
1010    pub fn new(
1011        object_store: Arc<ObjectStore>,
1012        base_dir: &Path,
1013        schema: &Schema,
1014        storage_version: LanceFileVersion,
1015        target_bases_info: Option<Vec<TargetBaseInfo>>,
1016        external_base_resolver: Option<Arc<ExternalBaseResolver>>,
1017        allow_external_blob_outside_bases: bool,
1018    ) -> Self {
1019        Self {
1020            object_store,
1021            base_dir: base_dir.clone(),
1022            schema: schema.clone(),
1023            storage_version,
1024            target_bases_info,
1025            external_base_resolver,
1026            allow_external_blob_outside_bases,
1027            next_base_index: AtomicUsize::new(0),
1028        }
1029    }
1030
1031    /// Select the next target base using round-robin strategy.
1032    /// TODO: In the future, we can develop different strategies for selecting target bases
1033    fn select_target_base(&self) -> Option<&TargetBaseInfo> {
1034        self.target_bases_info.as_ref().map(|bases| {
1035            let index = self
1036                .next_base_index
1037                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1038            &bases[index % bases.len()]
1039        })
1040    }
1041
1042    pub async fn new_writer(&self) -> Result<(Box<dyn GenericWriter>, Fragment)> {
1043        // Use temporary ID 0; will assign ID later.
1044        let fragment = Fragment::new(0);
1045
1046        let writer = if let Some(base_info) = self.select_target_base() {
1047            open_writer_with_options(
1048                &base_info.object_store,
1049                &self.schema,
1050                &base_info.base_dir,
1051                self.storage_version,
1052                WriterOptions {
1053                    add_data_dir: base_info.is_dataset_root,
1054                    base_id: Some(base_info.base_id),
1055                    external_base_resolver: self.external_base_resolver.clone(),
1056                    allow_external_blob_outside_bases: self.allow_external_blob_outside_bases,
1057                },
1058            )
1059            .await?
1060        } else {
1061            open_writer_with_options(
1062                &self.object_store,
1063                &self.schema,
1064                &self.base_dir,
1065                self.storage_version,
1066                WriterOptions {
1067                    add_data_dir: true,
1068                    base_id: None,
1069                    external_base_resolver: self.external_base_resolver.clone(),
1070                    allow_external_blob_outside_bases: self.allow_external_blob_outside_bases,
1071                },
1072            )
1073            .await?
1074        };
1075
1076        Ok((writer, fragment))
1077    }
1078}
1079
1080// Given input options resolve what the commit handler should be.
1081async fn resolve_commit_handler(
1082    uri: &str,
1083    commit_handler: Option<Arc<dyn CommitHandler>>,
1084    store_options: &Option<ObjectStoreParams>,
1085) -> Result<Arc<dyn CommitHandler>> {
1086    match commit_handler {
1087        None => {
1088            #[allow(deprecated)]
1089            if store_options
1090                .as_ref()
1091                .map(|opts| opts.object_store.is_some())
1092                .unwrap_or_default()
1093            {
1094                return Err(Error::invalid_input(
1095                    "when creating a dataset with a custom object store the commit_handler must also be specified",
1096                ));
1097            }
1098            commit_handler_from_url(uri, store_options).await
1099        }
1100        Some(commit_handler) => {
1101            if uri.starts_with("s3+ddb") {
1102                Err(Error::invalid_input(
1103                    "`s3+ddb://` scheme and custom commit handler are mutually exclusive",
1104                ))
1105            } else {
1106                Ok(commit_handler)
1107            }
1108        }
1109    }
1110}
1111
1112/// Create an iterator of record batch streams from the given source.
1113///
1114/// If `enable_retries` is true, then the source will be saved either in memory
1115/// or spilled to disk to allow replaying the source in case of a failure. The
1116/// source will be kept in memory if either (1) the size hint shows that
1117/// there is only one batch or (2) the stream contains less than 100MB of
1118/// data. Otherwise, the source will be spilled to a temporary file on disk.
1119///
1120/// This is used to support retries on write operations.
1121async fn new_source_iter(
1122    source: SendableRecordBatchStream,
1123    enable_retries: bool,
1124) -> Result<Box<dyn Iterator<Item = SendableRecordBatchStream> + Send + 'static>> {
1125    if enable_retries {
1126        let schema = source.schema();
1127
1128        // If size hint shows there is only one batch, spilling has no benefit, just keep that
1129        // in memory. (This is a pretty common case.)
1130        let size_hint = source.size_hint();
1131        if size_hint.0 == 1 && size_hint.1 == Some(1) {
1132            let batches: Vec<RecordBatch> = source.try_collect().await?;
1133            Ok(Box::new(std::iter::repeat_with(move || {
1134                Box::pin(RecordBatchStreamAdapter::new(
1135                    schema.clone(),
1136                    futures::stream::iter(batches.clone().into_iter().map(Ok)),
1137                )) as SendableRecordBatchStream
1138            })))
1139        } else {
1140            // Allow buffering up to 100MB in memory before spilling to disk.
1141            Ok(Box::new(
1142                SpillStreamIter::try_new(source, 100 * 1024 * 1024).await?,
1143            ))
1144        }
1145    } else {
1146        Ok(Box::new(std::iter::once(source)))
1147    }
1148}
1149
1150struct SpillStreamIter {
1151    receiver: SpillReceiver,
1152    #[allow(dead_code)] // Exists to keep the SpillSender alive
1153    sender_handle: tokio::task::JoinHandle<SpillSender>,
1154    // This temp dir is used to store the spilled data. It is kept alive by
1155    // this struct. When this struct is dropped, the Drop implementation of
1156    // tempfile::TempDir will delete the temp dir.
1157    #[allow(dead_code)] // Exists to keep the temp dir alive
1158    tmp_dir: TempDir,
1159}
1160
1161impl SpillStreamIter {
1162    pub async fn try_new(
1163        mut source: SendableRecordBatchStream,
1164        memory_limit: usize,
1165    ) -> Result<Self> {
1166        let tmp_dir = tokio::task::spawn_blocking(|| {
1167            TempDir::try_new()
1168                .map_err(|e| Error::invalid_input(format!("Failed to create temp dir: {}", e)))
1169        })
1170        .await
1171        .ok()
1172        .expect_ok()??;
1173
1174        let tmp_path = tmp_dir.std_path().join("spill.arrows");
1175        let (mut sender, receiver) = create_replay_spill(tmp_path, source.schema(), memory_limit);
1176
1177        let sender_handle = tokio::task::spawn(async move {
1178            while let Some(res) = source.next().await {
1179                match res {
1180                    Ok(batch) => match sender.write(batch).await {
1181                        Ok(_) => {}
1182                        Err(e) => {
1183                            sender.send_error(e);
1184                            break;
1185                        }
1186                    },
1187                    Err(e) => {
1188                        sender.send_error(e);
1189                        break;
1190                    }
1191                }
1192            }
1193
1194            if let Err(err) = sender.finish().await {
1195                sender.send_error(err);
1196            }
1197            sender
1198        });
1199
1200        Ok(Self {
1201            receiver,
1202            tmp_dir,
1203            sender_handle,
1204        })
1205    }
1206}
1207
1208impl Iterator for SpillStreamIter {
1209    type Item = SendableRecordBatchStream;
1210
1211    fn next(&mut self) -> Option<Self::Item> {
1212        Some(self.receiver.read())
1213    }
1214}
1215
1216#[cfg(test)]
1217mod tests {
1218    use super::*;
1219
1220    use arrow_array::{Int32Array, RecordBatchIterator, RecordBatchReader, StructArray};
1221    use arrow_schema::{DataType, Field as ArrowField, Fields, Schema as ArrowSchema};
1222    use datafusion::{error::DataFusionError, physical_plan::stream::RecordBatchStreamAdapter};
1223    use datafusion_physical_plan::RecordBatchStream;
1224    use futures::TryStreamExt;
1225    use lance_datagen::{BatchCount, RowCount, array, gen_batch};
1226    use lance_file::previous::reader::FileReader as PreviousFileReader;
1227    use lance_io::traits::Reader;
1228
1229    #[tokio::test]
1230    async fn test_chunking_large_batches() {
1231        // Create a stream of 3 batches of 10 rows
1232        let schema = Arc::new(ArrowSchema::new(vec![arrow::datatypes::Field::new(
1233            "a",
1234            DataType::Int32,
1235            false,
1236        )]));
1237        let batch =
1238            RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from_iter(0..28))])
1239                .unwrap();
1240        let batches: Vec<RecordBatch> =
1241            vec![batch.slice(0, 10), batch.slice(10, 10), batch.slice(20, 8)];
1242        let stream = RecordBatchStreamAdapter::new(
1243            schema.clone(),
1244            futures::stream::iter(batches.into_iter().map(Ok::<_, DataFusionError>)),
1245        );
1246
1247        // Chunk into a stream of 3 row batches
1248        let chunks: Vec<Vec<RecordBatch>> = chunk_stream(Box::pin(stream), 3)
1249            .try_collect()
1250            .await
1251            .unwrap();
1252
1253        assert_eq!(chunks.len(), 10);
1254        assert_eq!(chunks[0].len(), 1);
1255
1256        for (i, chunk) in chunks.iter().enumerate() {
1257            let num_rows = chunk.iter().map(|batch| batch.num_rows()).sum::<usize>();
1258            if i < chunks.len() - 1 {
1259                assert_eq!(num_rows, 3);
1260            } else {
1261                // Last chunk is shorter
1262                assert_eq!(num_rows, 1);
1263            }
1264        }
1265
1266        // The fourth chunk is split along the boundary between the original first
1267        // two batches.
1268        assert_eq!(chunks[3].len(), 2);
1269        assert_eq!(chunks[3][0].num_rows(), 1);
1270        assert_eq!(chunks[3][1].num_rows(), 2);
1271    }
1272
1273    #[tokio::test]
1274    async fn test_chunking_small_batches() {
1275        // Create a stream of 10 batches of 3 rows
1276        let schema = Arc::new(ArrowSchema::new(vec![arrow::datatypes::Field::new(
1277            "a",
1278            DataType::Int32,
1279            false,
1280        )]));
1281        let batch =
1282            RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from_iter(0..30))])
1283                .unwrap();
1284
1285        let batches: Vec<RecordBatch> = (0..10).map(|i| batch.slice(i * 3, 3)).collect();
1286        let stream = RecordBatchStreamAdapter::new(
1287            schema.clone(),
1288            futures::stream::iter(batches.into_iter().map(Ok::<_, DataFusionError>)),
1289        );
1290
1291        // Chunk into a stream of 10 row batches
1292        let chunks: Vec<Vec<RecordBatch>> = chunk_stream(Box::pin(stream), 10)
1293            .try_collect()
1294            .await
1295            .unwrap();
1296
1297        assert_eq!(chunks.len(), 3);
1298        assert_eq!(chunks[0].len(), 4);
1299        assert_eq!(chunks[0][0], batch.slice(0, 3));
1300        assert_eq!(chunks[0][1], batch.slice(3, 3));
1301        assert_eq!(chunks[0][2], batch.slice(6, 3));
1302        assert_eq!(chunks[0][3], batch.slice(9, 1));
1303
1304        for chunk in &chunks {
1305            let num_rows = chunk.iter().map(|batch| batch.num_rows()).sum::<usize>();
1306            assert_eq!(num_rows, 10);
1307        }
1308    }
1309
1310    #[tokio::test]
1311    async fn test_file_size() {
1312        let reader_to_frags = |data_reader: Box<dyn RecordBatchReader + Send>| {
1313            let schema = data_reader.schema();
1314            let data_reader =
1315                data_reader.map(|rb| rb.map_err(datafusion::error::DataFusionError::from));
1316
1317            let data_stream = Box::pin(RecordBatchStreamAdapter::new(
1318                schema.clone(),
1319                futures::stream::iter(data_reader),
1320            ));
1321
1322            let write_params = WriteParams {
1323                max_rows_per_file: 1024 * 1024, // Won't be limited by this
1324                max_bytes_per_file: 2 * 1024,
1325                mode: WriteMode::Create,
1326                ..Default::default()
1327            };
1328
1329            async move {
1330                let schema = Schema::try_from(schema.as_ref()).unwrap();
1331
1332                let object_store = Arc::new(ObjectStore::memory());
1333                write_fragments_internal(
1334                    None,
1335                    object_store,
1336                    &Path::from("test"),
1337                    schema,
1338                    data_stream,
1339                    write_params,
1340                    None,
1341                )
1342                .await
1343            }
1344        };
1345
1346        // The writer will not generate a new file until at enough data is *written* (not
1347        // just accumulated) to justify a new file.  Since the default page size is 8MiB
1348        // we actually need to generate quite a bit of data to trigger this.
1349        //
1350        // To avoid generating and writing millions of rows (which is a bit slow for a unit
1351        // test) we can use a large data type (1KiB binary)
1352        let data_reader = Box::new(
1353            gen_batch()
1354                .anon_col(array::rand_fsb(1024))
1355                .into_reader_rows(RowCount::from(10 * 1024), BatchCount::from(2)),
1356        );
1357
1358        let (fragments, _) = reader_to_frags(data_reader).await.unwrap();
1359
1360        assert_eq!(fragments.len(), 2);
1361    }
1362
1363    #[tokio::test]
1364    async fn test_max_rows_per_file() {
1365        let reader_to_frags = |data_reader: Box<dyn RecordBatchReader + Send>| {
1366            let schema = data_reader.schema();
1367            let data_reader =
1368                data_reader.map(|rb| rb.map_err(datafusion::error::DataFusionError::from));
1369
1370            let data_stream = Box::pin(RecordBatchStreamAdapter::new(
1371                schema.clone(),
1372                futures::stream::iter(data_reader),
1373            ));
1374
1375            let write_params = WriteParams {
1376                max_rows_per_file: 5000,                // Limit by rows
1377                max_bytes_per_file: 1024 * 1024 * 1024, // Won't be limited by this
1378                mode: WriteMode::Create,
1379                ..Default::default()
1380            };
1381
1382            async move {
1383                let schema = Schema::try_from(schema.as_ref()).unwrap();
1384
1385                let object_store = Arc::new(ObjectStore::memory());
1386                write_fragments_internal(
1387                    None,
1388                    object_store,
1389                    &Path::from("test"),
1390                    schema,
1391                    data_stream,
1392                    write_params,
1393                    None,
1394                )
1395                .await
1396            }
1397        };
1398
1399        // Generate 12000 rows total, which should create 3 files:
1400        // - File 1: 5000 rows
1401        // - File 2: 5000 rows
1402        // - File 3: 2000 rows
1403        let data_reader = Box::new(
1404            gen_batch()
1405                .anon_col(array::rand_type(&DataType::Int32))
1406                .into_reader_rows(RowCount::from(12000), BatchCount::from(1)),
1407        );
1408
1409        let (fragments, _) = reader_to_frags(data_reader).await.unwrap();
1410
1411        // Should have 3 fragments
1412        assert_eq!(fragments.len(), 3);
1413
1414        // Verify the row count distribution
1415        let row_counts: Vec<usize> = fragments
1416            .iter()
1417            .map(|f| f.physical_rows.unwrap_or(0))
1418            .collect();
1419        assert_eq!(row_counts, vec![5000, 5000, 2000]);
1420    }
1421
1422    #[tokio::test]
1423    async fn test_max_rows_per_group() {
1424        let reader_to_frags = |data_reader: Box<dyn RecordBatchReader + Send>,
1425                               version: LanceFileVersion| {
1426            let schema = data_reader.schema();
1427            let data_reader =
1428                data_reader.map(|rb| rb.map_err(datafusion::error::DataFusionError::from));
1429
1430            let data_stream = Box::pin(RecordBatchStreamAdapter::new(
1431                schema.clone(),
1432                futures::stream::iter(data_reader),
1433            ));
1434
1435            let write_params = WriteParams {
1436                max_rows_per_file: 5000,  // Smaller than total data to force multiple files
1437                max_rows_per_group: 3000, // Row group size affects V1 only
1438                mode: WriteMode::Create,
1439                data_storage_version: Some(version),
1440                ..Default::default()
1441            };
1442
1443            async move {
1444                let schema = Schema::try_from(schema.as_ref()).unwrap();
1445
1446                let object_store = Arc::new(ObjectStore::memory());
1447                write_fragments_internal(
1448                    None,
1449                    object_store,
1450                    &Path::from("test"),
1451                    schema,
1452                    data_stream,
1453                    write_params,
1454                    None,
1455                )
1456                .await
1457            }
1458        };
1459
1460        // Test V1 (Legacy) version: max_rows_per_group affects chunking
1461        // With max_rows_per_group=3000 and max_rows_per_file=5000:
1462        // - Stream is chunked into batches of max 3000 rows
1463        // - Batches are written to files, splitting when file exceeds 5000 rows
1464        // For 9000 rows:
1465        //   - Chunk 1 (3000 rows) -> File 1 (6000 rows) - exceeds limit, triggers new file
1466        //   - Chunk 2 (3000 rows) -> File 2 (3000 rows) - start of new file
1467        // Result: 2 fragments with [6000, 3000] rows
1468        // Note: The exact behavior depends on when file splitting occurs
1469        let data_reader_v1 = Box::new(
1470            gen_batch()
1471                .anon_col(array::rand_type(&DataType::Int32))
1472                .into_reader_rows(RowCount::from(9000), BatchCount::from(1)),
1473        );
1474
1475        let (fragments_v1, _) = reader_to_frags(data_reader_v1, LanceFileVersion::Legacy)
1476            .await
1477            .unwrap();
1478        let row_counts_v1: Vec<usize> = fragments_v1
1479            .iter()
1480            .map(|f| f.physical_rows.unwrap_or(0))
1481            .collect();
1482
1483        // V1 creates 2 fragments based on row group chunking and file size limit
1484        assert_eq!(fragments_v1.len(), 2);
1485        assert_eq!(row_counts_v1, vec![6000, 3000]);
1486
1487        // Test V2+ version: max_rows_per_group is ignored, only max_rows_per_file matters
1488        // With max_rows_per_file=5000 and 9000 rows:
1489        // - Stream is not chunked by row group size
1490        // - Data is split only at file boundaries (5000 rows per file)
1491        // Result: 2 fragments with [5000, 4000] rows
1492        // V2 splits data more evenly at file boundaries regardless of row group size
1493        let data_reader_v2 = Box::new(
1494            gen_batch()
1495                .anon_col(array::rand_type(&DataType::Int32))
1496                .into_reader_rows(RowCount::from(9000), BatchCount::from(1)),
1497        );
1498
1499        let (fragments_v2, _) = reader_to_frags(data_reader_v2, LanceFileVersion::Stable)
1500            .await
1501            .unwrap();
1502        let row_counts_v2: Vec<usize> = fragments_v2
1503            .iter()
1504            .map(|f| f.physical_rows.unwrap_or(0))
1505            .collect();
1506
1507        // V2 should create 2 fragments based on file size only
1508        assert_eq!(fragments_v2.len(), 2);
1509        assert_eq!(row_counts_v2, vec![5000, 4000]);
1510
1511        // Key difference: Both V1 and V2 create 2 fragments, but with different distributions
1512        // - V1: [6000, 3000] - chunking by row groups affects distribution
1513        // - V2: [5000, 4000] - split only at file boundaries, more even
1514        // V2 distribution should be more even (closer to 5000/5000 split)
1515        // V1 distribution is affected by row group chunking (3000)
1516        assert_eq!(fragments_v1.len(), fragments_v2.len());
1517        assert_ne!(row_counts_v1, row_counts_v2);
1518    }
1519
1520    #[tokio::test]
1521    async fn test_file_write_version() {
1522        let schema = Arc::new(ArrowSchema::new(vec![arrow::datatypes::Field::new(
1523            "a",
1524            DataType::Int32,
1525            false,
1526        )]));
1527
1528        // Write 1024 rows
1529        let batch = RecordBatch::try_new(
1530            schema.clone(),
1531            vec![Arc::new(Int32Array::from_iter(0..1024))],
1532        )
1533        .unwrap();
1534
1535        let versions = vec![
1536            LanceFileVersion::Legacy,
1537            LanceFileVersion::V2_0,
1538            LanceFileVersion::V2_1,
1539            LanceFileVersion::V2_2,
1540            LanceFileVersion::Stable,
1541            LanceFileVersion::Next,
1542        ];
1543        for version in versions {
1544            let (major, minor) = version.to_numbers();
1545            let write_params = WriteParams {
1546                data_storage_version: Some(version),
1547                // This parameter should be ignored
1548                max_rows_per_group: 1,
1549                ..Default::default()
1550            };
1551
1552            let data_stream = Box::pin(RecordBatchStreamAdapter::new(
1553                schema.clone(),
1554                futures::stream::iter(std::iter::once(Ok(batch.clone()))),
1555            ));
1556
1557            let schema = Schema::try_from(schema.as_ref()).unwrap();
1558
1559            let object_store = Arc::new(ObjectStore::memory());
1560            let (fragments, _) = write_fragments_internal(
1561                None,
1562                object_store,
1563                &Path::from("test"),
1564                schema,
1565                data_stream,
1566                write_params,
1567                None,
1568            )
1569            .await
1570            .unwrap();
1571
1572            assert_eq!(fragments.len(), 1);
1573            let fragment = &fragments[0];
1574            assert_eq!(fragment.files.len(), 1);
1575            assert_eq!(fragment.physical_rows, Some(1024));
1576            assert_eq!(
1577                fragment.files[0].file_major_version, major,
1578                "version: {}",
1579                version
1580            );
1581            assert_eq!(
1582                fragment.files[0].file_minor_version, minor,
1583                "version: {}",
1584                version
1585            );
1586        }
1587    }
1588
1589    #[tokio::test]
1590    async fn test_file_v1_schema_order() {
1591        // Create a schema where fields ids are not in order and contain holes.
1592        // Also first field id is a struct.
1593        let struct_fields = Fields::from(vec![ArrowField::new("b", DataType::Int32, false)]);
1594        let arrow_schema = ArrowSchema::new(vec![
1595            ArrowField::new("d", DataType::Int32, false),
1596            ArrowField::new("a", DataType::Struct(struct_fields.clone()), false),
1597        ]);
1598        let mut schema = Schema::try_from(&arrow_schema).unwrap();
1599        // Make schema:
1600        // 0: a
1601        // 1: a.b
1602        // (hole at 2)
1603        // 3: d
1604        schema.mut_field_by_id(0).unwrap().id = 3;
1605        schema.mut_field_by_id(1).unwrap().id = 0;
1606        schema.mut_field_by_id(2).unwrap().id = 1;
1607
1608        let field_ids = schema.fields_pre_order().map(|f| f.id).collect::<Vec<_>>();
1609        assert_eq!(field_ids, vec![3, 0, 1]);
1610
1611        let data = RecordBatch::try_new(
1612            Arc::new(arrow_schema.clone()),
1613            vec![
1614                Arc::new(Int32Array::from(vec![1, 2])),
1615                Arc::new(StructArray::new(
1616                    struct_fields,
1617                    vec![Arc::new(Int32Array::from(vec![3, 4]))],
1618                    None,
1619                )),
1620            ],
1621        )
1622        .unwrap();
1623
1624        let write_params = WriteParams {
1625            data_storage_version: Some(LanceFileVersion::Legacy),
1626            ..Default::default()
1627        };
1628        let data_stream = Box::pin(RecordBatchStreamAdapter::new(
1629            Arc::new(arrow_schema),
1630            futures::stream::iter(std::iter::once(Ok(data.clone()))),
1631        ));
1632
1633        let object_store = Arc::new(ObjectStore::memory());
1634        let base_path = Path::from("test");
1635        let (fragments, _) = write_fragments_internal(
1636            None,
1637            object_store.clone(),
1638            &base_path,
1639            schema.clone(),
1640            data_stream,
1641            write_params,
1642            None,
1643        )
1644        .await
1645        .unwrap();
1646
1647        assert_eq!(fragments.len(), 1);
1648        let fragment = &fragments[0];
1649        assert_eq!(fragment.files.len(), 1);
1650        assert_eq!(fragment.files[0].fields, vec![0, 1, 3]);
1651
1652        let path = base_path
1653            .child(DATA_DIR)
1654            .child(fragment.files[0].path.as_str());
1655        let file_reader: Arc<dyn Reader> = object_store.open(&path).await.unwrap().into();
1656        let reader = PreviousFileReader::try_new_from_reader(
1657            &path,
1658            file_reader,
1659            None,
1660            schema.clone(),
1661            0,
1662            0,
1663            3,
1664            None,
1665        )
1666        .await
1667        .unwrap();
1668        assert_eq!(reader.num_batches(), 1);
1669        let batch = reader.read_batch(0, .., &schema).await.unwrap();
1670        assert_eq!(batch, data);
1671    }
1672
1673    #[tokio::test]
1674    async fn test_explicit_data_file_bases_writer_generator() {
1675        use arrow::datatypes::{DataType, Field as ArrowField, Schema as ArrowSchema};
1676        use lance_io::object_store::ObjectStore;
1677        use std::sync::Arc;
1678
1679        // Create test schema
1680        let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
1681            "id",
1682            DataType::Int32,
1683            false,
1684        )]));
1685        let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
1686
1687        // Create in-memory object store
1688        let object_store = Arc::new(ObjectStore::memory());
1689        let base_dir = Path::from("test/bucket2");
1690
1691        // Test WriterGenerator with explicit data file bases configuration
1692        let target_bases = vec![TargetBaseInfo {
1693            base_id: 2,
1694            object_store: object_store.clone(),
1695            base_dir: base_dir.clone(),
1696            is_dataset_root: false, // Test uses direct data directory
1697        }];
1698        let writer_generator = WriterGenerator::new(
1699            object_store.clone(),
1700            &base_dir,
1701            &schema,
1702            LanceFileVersion::Stable,
1703            Some(target_bases),
1704            None,
1705            false,
1706        );
1707
1708        // Create a writer
1709        let (writer, fragment) = writer_generator.new_writer().await.unwrap();
1710
1711        // Verify fragment is created
1712        assert_eq!(fragment.id, 0); // Temporary ID
1713
1714        // Verify writer is created (we can't test much more without writing data)
1715        drop(writer); // Clean up
1716    }
1717
1718    #[tokio::test]
1719    async fn test_writer_with_base_id() {
1720        use arrow::array::Int32Array;
1721        use arrow::datatypes::{DataType, Field as ArrowField, Schema as ArrowSchema};
1722        use arrow::record_batch::RecordBatch;
1723        use lance_io::object_store::ObjectStore;
1724        use std::sync::Arc;
1725
1726        // Create test data
1727        let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
1728            "id",
1729            DataType::Int32,
1730            false,
1731        )]));
1732        let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
1733
1734        let batch = RecordBatch::try_new(
1735            arrow_schema.clone(),
1736            vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
1737        )
1738        .unwrap();
1739
1740        // Create in-memory object store and writer
1741        let object_store = Arc::new(ObjectStore::memory());
1742        let base_dir = Path::from("test/bucket2");
1743
1744        let mut inner_writer = open_writer_with_options(
1745            &object_store,
1746            &schema,
1747            &base_dir,
1748            LanceFileVersion::Stable,
1749            WriterOptions {
1750                add_data_dir: false, // Don't add /data
1751                ..Default::default()
1752            },
1753        )
1754        .await
1755        .unwrap();
1756
1757        // Write data
1758        inner_writer.write(&[batch]).await.unwrap();
1759
1760        // Finish and manually set base_id
1761        let base_id = 2u32;
1762        let (_num_rows, mut data_file) = inner_writer.finish().await.unwrap();
1763        data_file.base_id = Some(base_id);
1764
1765        assert_eq!(data_file.base_id, Some(base_id));
1766        assert!(!data_file.path.is_empty());
1767    }
1768
1769    #[tokio::test]
1770    async fn test_round_robin_target_base_selection() {
1771        use arrow::array::Int32Array;
1772        use arrow::datatypes::{DataType, Field as ArrowField, Schema as ArrowSchema};
1773        use arrow::record_batch::RecordBatch;
1774        use lance_io::object_store::ObjectStore;
1775        use std::sync::Arc;
1776
1777        // Create test schema
1778        let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
1779            "id",
1780            DataType::Int32,
1781            false,
1782        )]));
1783        let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
1784
1785        // Create in-memory object stores for different bases
1786        let store1 = Arc::new(ObjectStore::memory());
1787        let store2 = Arc::new(ObjectStore::memory());
1788        let store3 = Arc::new(ObjectStore::memory());
1789
1790        // Create WriterGenerator with multiple target bases
1791        let target_bases = vec![
1792            TargetBaseInfo {
1793                base_id: 1,
1794                object_store: store1.clone(),
1795                base_dir: Path::from("base1"),
1796                is_dataset_root: false,
1797            },
1798            TargetBaseInfo {
1799                base_id: 2,
1800                object_store: store2.clone(),
1801                base_dir: Path::from("base2"),
1802                is_dataset_root: false,
1803            },
1804            TargetBaseInfo {
1805                base_id: 3,
1806                object_store: store3.clone(),
1807                base_dir: Path::from("base3"),
1808                is_dataset_root: false,
1809            },
1810        ];
1811
1812        let writer_generator = WriterGenerator::new(
1813            Arc::new(ObjectStore::memory()),
1814            &Path::from("default"),
1815            &schema,
1816            LanceFileVersion::Stable,
1817            Some(target_bases),
1818            None,
1819            false,
1820        );
1821
1822        // Create test batch
1823        let batch = RecordBatch::try_new(
1824            arrow_schema.clone(),
1825            vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
1826        )
1827        .unwrap();
1828
1829        // Create multiple writers and verify round-robin selection
1830        let mut base_ids = Vec::new();
1831        for _ in 0..6 {
1832            let (mut writer, _fragment) = writer_generator.new_writer().await.unwrap();
1833            writer.write(std::slice::from_ref(&batch)).await.unwrap();
1834            let (_num_rows, data_file) = writer.finish().await.unwrap();
1835            base_ids.push(data_file.base_id.unwrap());
1836        }
1837
1838        // Verify round-robin pattern: 1, 2, 3, 1, 2, 3
1839        assert_eq!(base_ids, vec![1, 2, 3, 1, 2, 3]);
1840    }
1841
1842    #[tokio::test]
1843    async fn test_explicit_data_file_bases_path_parsing() {
1844        // Test URI parsing logic
1845        let test_cases = vec![
1846            ("s3://multi-path-test/test1/subBucket2", "test1/subBucket2"),
1847            ("gs://my-bucket/path/to/data", "path/to/data"),
1848            ("az://container/path/to/data", "path/to/data"),
1849            (
1850                "abfss://filesystem@account.dfs.core.windows.net/path/to/data",
1851                "path/to/data",
1852            ),
1853            ("file:///tmp/test/bucket", "tmp/test/bucket"),
1854        ];
1855
1856        for (uri, expected_path) in test_cases {
1857            let url = url::Url::parse(uri).unwrap();
1858            let parsed_path = url.path().trim_start_matches('/');
1859            assert_eq!(parsed_path, expected_path, "Failed for URI: {}", uri);
1860        }
1861    }
1862
1863    #[tokio::test]
1864    async fn test_write_params_validation() {
1865        // Test CREATE mode validation
1866        let mut params = WriteParams {
1867            mode: WriteMode::Create,
1868            initial_bases: Some(vec![
1869                BasePath {
1870                    id: 1,
1871                    name: Some("bucket1".to_string()),
1872                    path: "s3://bucket1/path1".to_string(),
1873                    is_dataset_root: true,
1874                },
1875                BasePath {
1876                    id: 2,
1877                    name: Some("bucket2".to_string()),
1878                    path: "s3://bucket2/path2".to_string(),
1879                    is_dataset_root: true,
1880                },
1881                BasePath {
1882                    id: 3,
1883                    name: Some("azure-az-base".to_string()),
1884                    path: "az://container/path1".to_string(),
1885                    is_dataset_root: true,
1886                },
1887                BasePath {
1888                    id: 4,
1889                    name: Some("azure-abfss-base".to_string()),
1890                    path: "abfss://filesystem@account.dfs.core.windows.net/path1".to_string(),
1891                    is_dataset_root: true,
1892                },
1893            ]),
1894            target_bases: Some(vec![1]), // Use ID 1 which corresponds to bucket1
1895            ..Default::default()
1896        };
1897
1898        // This should be valid
1899        let result = validate_write_params(&params);
1900        assert!(result.is_ok());
1901
1902        // Test target_bases with ID not in initial_bases (should fail)
1903        params.target_bases = Some(vec![99]); // ID 99 doesn't exist
1904        let result = validate_write_params(&params);
1905        assert!(result.is_err());
1906
1907        // Test CREATE mode with target_bases but no initial_bases (should fail)
1908        params.initial_bases = None;
1909        params.target_bases = Some(vec![1]);
1910        let result = validate_write_params(&params);
1911        assert!(result.is_err());
1912    }
1913
1914    fn validate_write_params(params: &WriteParams) -> Result<()> {
1915        // Replicate the validation logic from the main write function
1916        if matches!(params.mode, WriteMode::Create)
1917            && let Some(target_bases) = &params.target_bases
1918        {
1919            if target_bases.len() != 1 {
1920                return Err(Error::invalid_input(format!(
1921                    "target_bases with {} elements is not supported",
1922                    target_bases.len()
1923                )));
1924            }
1925            let target_base_id = target_bases[0];
1926            if let Some(initial_bases) = &params.initial_bases {
1927                if !initial_bases.iter().any(|bp| bp.id == target_base_id) {
1928                    return Err(Error::invalid_input(format!(
1929                        "target_base_id {} must be one of the initial_bases in CREATE mode",
1930                        target_base_id
1931                    )));
1932                }
1933            } else {
1934                return Err(Error::invalid_input(
1935                    "initial_bases must be provided when target_bases is specified in CREATE mode",
1936                ));
1937            }
1938        }
1939        Ok(())
1940    }
1941
1942    #[tokio::test]
1943    async fn test_multi_base_create() {
1944        use lance_testing::datagen::{BatchGenerator, IncrementingInt32};
1945
1946        // Create dataset with multi-base configuration
1947        let test_uri = "memory://multi_base_test";
1948        let primary_uri = format!("{}/primary", test_uri);
1949        let base1_uri = format!("{}/base1", test_uri);
1950        let base2_uri = format!("{}/base2", test_uri);
1951
1952        let mut data_gen =
1953            BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned())));
1954
1955        let dataset = crate::dataset::Dataset::write(
1956            data_gen.batch(5),
1957            &primary_uri,
1958            Some(WriteParams {
1959                mode: WriteMode::Create,
1960                initial_bases: Some(vec![
1961                    BasePath {
1962                        id: 1,
1963                        name: Some("base1".to_string()),
1964                        path: base1_uri.clone(),
1965                        is_dataset_root: true,
1966                    },
1967                    BasePath {
1968                        id: 2,
1969                        name: Some("base2".to_string()),
1970                        path: base2_uri.clone(),
1971                        is_dataset_root: true,
1972                    },
1973                ]),
1974                target_bases: Some(vec![1]),
1975                ..Default::default()
1976            }),
1977        )
1978        .await
1979        .unwrap();
1980
1981        // Verify dataset was created
1982        assert_eq!(dataset.count_rows(None).await.unwrap(), 5);
1983
1984        // Verify base_paths are registered in manifest
1985        assert_eq!(dataset.manifest.base_paths.len(), 2);
1986        assert!(
1987            dataset
1988                .manifest
1989                .base_paths
1990                .values()
1991                .any(|bp| bp.name == Some("base1".to_string()))
1992        );
1993        assert!(
1994            dataset
1995                .manifest
1996                .base_paths
1997                .values()
1998                .any(|bp| bp.name == Some("base2".to_string()))
1999        );
2000
2001        // Verify data was written to base1
2002        let fragments = dataset.get_fragments();
2003        assert!(!fragments.is_empty());
2004        for fragment in fragments {
2005            assert!(
2006                fragment
2007                    .metadata
2008                    .files
2009                    .iter()
2010                    .any(|file| file.base_id == Some(1))
2011            );
2012        }
2013
2014        // Test validation: cannot specify both target_bases and target_base_names_or_paths
2015        let mut data_gen2 =
2016            BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned())));
2017
2018        let result = Dataset::write(
2019            data_gen2.batch(5),
2020            &format!("{}/test_validation", test_uri),
2021            Some(WriteParams {
2022                mode: WriteMode::Create,
2023                initial_bases: Some(vec![BasePath {
2024                    id: 1,
2025                    name: Some("base1".to_string()),
2026                    path: base1_uri.clone(),
2027                    is_dataset_root: true,
2028                }]),
2029                target_bases: Some(vec![1]),
2030                target_base_names_or_paths: Some(vec!["base1".to_string()]),
2031                ..Default::default()
2032            }),
2033        )
2034        .await;
2035
2036        assert!(result.is_err());
2037        assert!(
2038            result
2039                .unwrap_err()
2040                .to_string()
2041                .contains("Cannot specify both target_base_names_or_paths and target_bases")
2042        );
2043    }
2044
2045    #[tokio::test]
2046    async fn test_multi_base_overwrite() {
2047        use lance_testing::datagen::{BatchGenerator, IncrementingInt32};
2048
2049        // Create initial dataset
2050        let test_uri = "memory://overwrite_test";
2051        let primary_uri = format!("{}/primary", test_uri);
2052        let base1_uri = format!("{}/base1", test_uri);
2053        let base2_uri = format!("{}/base2", test_uri);
2054        let _base3_uri = format!("{}/base3", test_uri);
2055
2056        let mut data_gen =
2057            BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned())));
2058
2059        let dataset = Dataset::write(
2060            data_gen.batch(3),
2061            &primary_uri,
2062            Some(WriteParams {
2063                mode: WriteMode::Create,
2064                initial_bases: Some(vec![
2065                    BasePath {
2066                        id: 1,
2067                        name: Some("base1".to_string()),
2068                        path: base1_uri.clone(),
2069                        is_dataset_root: true,
2070                    },
2071                    BasePath {
2072                        id: 2,
2073                        name: Some("base2".to_string()),
2074                        path: base2_uri.clone(),
2075                        is_dataset_root: true,
2076                    },
2077                ]),
2078                target_bases: Some(vec![1]),
2079                ..Default::default()
2080            }),
2081        )
2082        .await
2083        .unwrap();
2084
2085        assert_eq!(dataset.count_rows(None).await.unwrap(), 3);
2086
2087        // Overwrite - should inherit existing base configuration (base1, base2)
2088        // Write to base2
2089        let mut data_gen2 =
2090            BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned())));
2091
2092        let dataset = Dataset::write(
2093            data_gen2.batch(2),
2094            std::sync::Arc::new(dataset),
2095            Some(WriteParams {
2096                mode: WriteMode::Overwrite,
2097                // No initial_bases - inherits existing base_paths
2098                target_bases: Some(vec![2]), // Write to base2
2099                ..Default::default()
2100            }),
2101        )
2102        .await
2103        .unwrap();
2104
2105        // Verify data was overwritten
2106        assert_eq!(dataset.count_rows(None).await.unwrap(), 2);
2107
2108        // Verify base_paths were inherited (still base1 and base2)
2109        assert_eq!(dataset.manifest.base_paths.len(), 2);
2110        assert!(
2111            dataset
2112                .manifest
2113                .base_paths
2114                .values()
2115                .any(|bp| bp.name == Some("base1".to_string()))
2116        );
2117        assert!(
2118            dataset
2119                .manifest
2120                .base_paths
2121                .values()
2122                .any(|bp| bp.name == Some("base2".to_string()))
2123        );
2124
2125        // Verify data was written to base2 (ID 2)
2126        let fragments = dataset.get_fragments();
2127        assert!(
2128            fragments
2129                .iter()
2130                .all(|f| f.metadata.files.iter().all(|file| file.base_id == Some(2)))
2131        );
2132
2133        // Test validation: cannot specify initial_bases in OVERWRITE mode
2134        let mut data_gen3 =
2135            BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned())));
2136
2137        let result = Dataset::write(
2138            data_gen3.batch(2),
2139            Arc::new(dataset),
2140            Some(WriteParams {
2141                mode: WriteMode::Overwrite,
2142                initial_bases: Some(vec![BasePath {
2143                    id: 3,
2144                    name: Some("base3".to_string()),
2145                    path: _base3_uri.clone(),
2146                    is_dataset_root: true,
2147                }]),
2148                target_bases: Some(vec![1]),
2149                ..Default::default()
2150            }),
2151        )
2152        .await;
2153
2154        assert!(result.is_err());
2155        assert!(
2156            result
2157                .unwrap_err()
2158                .to_string()
2159                .contains("Cannot register new bases in Overwrite mode")
2160        );
2161    }
2162
2163    #[tokio::test]
2164    async fn test_multi_base_append() {
2165        use lance_testing::datagen::{BatchGenerator, IncrementingInt32};
2166
2167        // Create initial dataset with multi-base configuration
2168        let test_uri = "memory://append_test";
2169        let primary_uri = format!("{}/primary", test_uri);
2170        let base1_uri = format!("{}/base1", test_uri);
2171        let base2_uri = format!("{}/base2", test_uri);
2172
2173        let mut data_gen =
2174            BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned())));
2175
2176        let dataset = Dataset::write(
2177            data_gen.batch(3),
2178            &primary_uri,
2179            Some(WriteParams {
2180                mode: WriteMode::Create,
2181                initial_bases: Some(vec![
2182                    BasePath {
2183                        id: 1,
2184                        name: Some("base1".to_string()),
2185                        path: base1_uri.clone(),
2186                        is_dataset_root: true,
2187                    },
2188                    BasePath {
2189                        id: 2,
2190                        name: Some("base2".to_string()),
2191                        path: base2_uri.clone(),
2192                        is_dataset_root: true,
2193                    },
2194                ]),
2195                target_bases: Some(vec![1]),
2196                ..Default::default()
2197            }),
2198        )
2199        .await
2200        .unwrap();
2201
2202        assert_eq!(dataset.count_rows(None).await.unwrap(), 3);
2203
2204        // Append to base1 (same base as initial write)
2205        let mut data_gen2 =
2206            BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned())));
2207
2208        let dataset = Dataset::write(
2209            data_gen2.batch(2),
2210            std::sync::Arc::new(dataset),
2211            Some(WriteParams {
2212                mode: WriteMode::Append,
2213                target_bases: Some(vec![1]),
2214                ..Default::default()
2215            }),
2216        )
2217        .await
2218        .unwrap();
2219
2220        assert_eq!(dataset.count_rows(None).await.unwrap(), 5);
2221
2222        // Verify base_paths are still registered
2223        assert_eq!(dataset.manifest.base_paths.len(), 2);
2224
2225        // Append to base2 (different base)
2226        let mut data_gen3 =
2227            BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned())));
2228
2229        let dataset = Dataset::write(
2230            data_gen3.batch(4),
2231            Arc::new(dataset),
2232            Some(WriteParams {
2233                mode: WriteMode::Append,
2234                target_bases: Some(vec![2]),
2235                ..Default::default()
2236            }),
2237        )
2238        .await
2239        .unwrap();
2240
2241        assert_eq!(dataset.count_rows(None).await.unwrap(), 9);
2242
2243        // Verify data is distributed across both bases
2244        let fragments = dataset.get_fragments();
2245        let has_base1_data = fragments
2246            .iter()
2247            .any(|f| f.metadata.files.iter().any(|file| file.base_id == Some(1)));
2248        let has_base2_data = fragments
2249            .iter()
2250            .any(|f| f.metadata.files.iter().any(|file| file.base_id == Some(2)));
2251
2252        assert!(has_base1_data, "Should have data in base1");
2253        assert!(has_base2_data, "Should have data in base2");
2254
2255        // Test validation: cannot specify initial_bases in APPEND mode
2256        let mut data_gen4 =
2257            BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned())));
2258        let base3_uri = format!("{}/base3", test_uri);
2259
2260        let result = Dataset::write(
2261            data_gen4.batch(2),
2262            Arc::new(dataset),
2263            Some(WriteParams {
2264                mode: WriteMode::Append,
2265                initial_bases: Some(vec![BasePath {
2266                    id: 3,
2267                    name: Some("base3".to_string()),
2268                    path: base3_uri,
2269                    is_dataset_root: true,
2270                }]),
2271                target_bases: Some(vec![1]),
2272                ..Default::default()
2273            }),
2274        )
2275        .await;
2276
2277        assert!(result.is_err());
2278        assert!(
2279            result
2280                .unwrap_err()
2281                .to_string()
2282                .contains("Cannot register new bases in Append mode")
2283        );
2284    }
2285
2286    #[tokio::test]
2287    async fn test_multi_base_is_dataset_root_flag() {
2288        use lance_core::utils::tempfile::TempDir;
2289        use lance_testing::datagen::{BatchGenerator, IncrementingInt32};
2290
2291        // Create dataset with different is_dataset_root settings using tempdir
2292        let test_dir = TempDir::default();
2293        let primary_uri = test_dir.path_str();
2294        let base1_dir = test_dir.std_path().join("base1");
2295        let base2_dir = test_dir.std_path().join("base2");
2296
2297        std::fs::create_dir_all(&base1_dir).unwrap();
2298        std::fs::create_dir_all(&base2_dir).unwrap();
2299
2300        let base1_uri = format!("file://{}", base1_dir.display());
2301        let base2_uri = format!("file://{}", base2_dir.display());
2302
2303        let mut data_gen =
2304            BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned())));
2305
2306        let dataset = Dataset::write(
2307            data_gen.batch(10),
2308            &primary_uri,
2309            Some(WriteParams {
2310                mode: WriteMode::Create,
2311                max_rows_per_file: 5, // Create multiple fragments
2312                initial_bases: Some(vec![
2313                    BasePath {
2314                        id: 1,
2315                        name: Some("base1".to_string()),
2316                        path: base1_uri.clone(),
2317                        is_dataset_root: true, // Files will go to base1/data/
2318                    },
2319                    BasePath {
2320                        id: 2,
2321                        name: Some("base2".to_string()),
2322                        path: base2_uri.clone(),
2323                        is_dataset_root: false, // Files will go directly to base2/
2324                    },
2325                ]),
2326                target_bases: Some(vec![1, 2]), // Write to both bases
2327                ..Default::default()
2328            }),
2329        )
2330        .await
2331        .unwrap();
2332
2333        assert_eq!(dataset.count_rows(None).await.unwrap(), 10);
2334
2335        // Verify base_paths configuration
2336        assert_eq!(dataset.manifest.base_paths.len(), 2);
2337
2338        let base1 = dataset
2339            .manifest
2340            .base_paths
2341            .values()
2342            .find(|bp| bp.name == Some("base1".to_string()))
2343            .expect("base1 not found");
2344        let base2 = dataset
2345            .manifest
2346            .base_paths
2347            .values()
2348            .find(|bp| bp.name == Some("base2".to_string()))
2349            .expect("base2 not found");
2350
2351        // Verify is_dataset_root flags are persisted correctly in manifest
2352        assert!(
2353            base1.is_dataset_root,
2354            "base1 should have is_dataset_root=true"
2355        );
2356        assert!(
2357            !base2.is_dataset_root,
2358            "base2 should have is_dataset_root=false"
2359        );
2360
2361        // Verify data was written to both bases
2362        let fragments = dataset.get_fragments();
2363        assert!(!fragments.is_empty());
2364
2365        let has_base1_data = fragments
2366            .iter()
2367            .any(|f| f.metadata.files.iter().any(|file| file.base_id == Some(1)));
2368        let has_base2_data = fragments
2369            .iter()
2370            .any(|f| f.metadata.files.iter().any(|file| file.base_id == Some(2)));
2371
2372        assert!(has_base1_data, "Should have data in base1");
2373        assert!(has_base2_data, "Should have data in base2");
2374
2375        // Verify actual file paths on disk
2376        // For base1 (is_dataset_root=true), files should be in base1/data/
2377        let base1_data_dir = base1_dir.join("data");
2378        assert!(base1_data_dir.exists(), "base1/data directory should exist");
2379        let base1_files: Vec<_> = std::fs::read_dir(&base1_data_dir)
2380            .unwrap()
2381            .filter_map(|e| e.ok())
2382            .filter(|e| {
2383                e.path()
2384                    .extension()
2385                    .map(|ext| ext == "lance")
2386                    .unwrap_or(false)
2387            })
2388            .collect();
2389        assert!(
2390            !base1_files.is_empty(),
2391            "base1/data should contain .lance files"
2392        );
2393
2394        // For base2 (is_dataset_root=false), files should be directly in base2/
2395        let base2_files: Vec<_> = std::fs::read_dir(&base2_dir)
2396            .unwrap()
2397            .filter_map(|e| e.ok())
2398            .filter(|e| {
2399                e.path()
2400                    .extension()
2401                    .map(|ext| ext == "lance")
2402                    .unwrap_or(false)
2403            })
2404            .collect();
2405        assert!(
2406            !base2_files.is_empty(),
2407            "base2 should contain .lance files directly"
2408        );
2409
2410        // Verify base2 does NOT have a data subdirectory with lance files
2411        let base2_data_dir = base2_dir.join("data");
2412        if base2_data_dir.exists() {
2413            let base2_data_files: Vec<_> = std::fs::read_dir(&base2_data_dir)
2414                .unwrap()
2415                .filter_map(|e| e.ok())
2416                .filter(|e| {
2417                    e.path()
2418                        .extension()
2419                        .map(|ext| ext == "lance")
2420                        .unwrap_or(false)
2421                })
2422                .collect();
2423            assert!(
2424                base2_data_files.is_empty(),
2425                "base2/data should NOT contain .lance files"
2426            );
2427        }
2428    }
2429
2430    #[tokio::test]
2431    async fn test_multi_base_target_by_path_uri() {
2432        use lance_core::utils::tempfile::TempDir;
2433        use lance_testing::datagen::{BatchGenerator, IncrementingInt32};
2434
2435        // Create dataset with named bases
2436        let test_dir = TempDir::default();
2437        let primary_uri = test_dir.path_str();
2438        let base1_dir = test_dir.std_path().join("base1");
2439        let base2_dir = test_dir.std_path().join("base2");
2440
2441        std::fs::create_dir_all(&base1_dir).unwrap();
2442        std::fs::create_dir_all(&base2_dir).unwrap();
2443
2444        let base1_uri = format!("file://{}", base1_dir.display());
2445        let base2_uri = format!("file://{}", base2_dir.display());
2446
2447        let mut data_gen =
2448            BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned())));
2449
2450        // Create initial dataset writing to base1 using name
2451        let dataset = Dataset::write(
2452            data_gen.batch(10),
2453            &primary_uri,
2454            Some(WriteParams {
2455                mode: WriteMode::Create,
2456                max_rows_per_file: 5,
2457                initial_bases: Some(vec![
2458                    BasePath {
2459                        id: 1,
2460                        name: Some("base1".to_string()),
2461                        path: base1_uri.clone(),
2462                        is_dataset_root: true,
2463                    },
2464                    BasePath {
2465                        id: 2,
2466                        name: Some("base2".to_string()),
2467                        path: base2_uri.clone(),
2468                        is_dataset_root: true,
2469                    },
2470                ]),
2471                target_base_names_or_paths: Some(vec!["base1".to_string()]), // Use name
2472                ..Default::default()
2473            }),
2474        )
2475        .await
2476        .unwrap();
2477
2478        assert_eq!(dataset.count_rows(None).await.unwrap(), 10);
2479
2480        // Verify data was written to base1
2481        let fragments = dataset.get_fragments();
2482        assert!(
2483            fragments
2484                .iter()
2485                .all(|f| f.metadata.files.iter().all(|file| file.base_id == Some(1)))
2486        );
2487
2488        // Now append using the path URI instead of name
2489        let mut data_gen2 =
2490            BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("id".to_owned())));
2491
2492        let dataset = Dataset::write(
2493            data_gen2.batch(5),
2494            Arc::new(dataset),
2495            Some(WriteParams {
2496                mode: WriteMode::Append,
2497                // Use the actual path URI instead of the name
2498                target_base_names_or_paths: Some(vec![base2_uri.clone()]),
2499                max_rows_per_file: 5,
2500                ..Default::default()
2501            }),
2502        )
2503        .await
2504        .unwrap();
2505
2506        assert_eq!(dataset.count_rows(None).await.unwrap(), 15);
2507
2508        // Verify data is now in both bases
2509        let fragments = dataset.get_fragments();
2510        let has_base1_data = fragments
2511            .iter()
2512            .any(|f| f.metadata.files.iter().any(|file| file.base_id == Some(1)));
2513        let has_base2_data = fragments
2514            .iter()
2515            .any(|f| f.metadata.files.iter().any(|file| file.base_id == Some(2)));
2516
2517        assert!(has_base1_data, "Should have data in base1");
2518        assert!(has_base2_data, "Should have data in base2");
2519
2520        // Verify base2 has exactly 1 fragment (from the append)
2521        let base2_fragments: Vec<_> = fragments
2522            .iter()
2523            .filter(|f| f.metadata.files.iter().all(|file| file.base_id == Some(2)))
2524            .collect();
2525        assert_eq!(base2_fragments.len(), 1, "Should have 1 fragment in base2");
2526    }
2527
2528    #[tokio::test]
2529    async fn test_empty_stream_write() {
2530        use lance_io::object_store::ObjectStore;
2531
2532        // Test writing an empty stream
2533        let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
2534            "id",
2535            DataType::Int32,
2536            false,
2537        )]));
2538        let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
2539
2540        // Create an empty stream
2541        let data_stream = Box::pin(RecordBatchStreamAdapter::new(
2542            arrow_schema.clone(),
2543            futures::stream::iter(std::iter::empty::<
2544                std::result::Result<RecordBatch, DataFusionError>,
2545            >()),
2546        ));
2547
2548        let object_store = Arc::new(ObjectStore::memory());
2549        let write_params = WriteParams {
2550            mode: WriteMode::Create,
2551            ..Default::default()
2552        };
2553
2554        let result = write_fragments_internal(
2555            None,
2556            object_store,
2557            &Path::from("test_empty"),
2558            schema,
2559            data_stream,
2560            write_params,
2561            None,
2562        )
2563        .await;
2564
2565        // Empty stream should be handled gracefully
2566        // It should create an empty dataset or return an appropriate result
2567        match result {
2568            Ok((fragments, _)) => {
2569                // If successful, verify it creates an empty result
2570                assert!(
2571                    fragments.is_empty(),
2572                    "Empty stream should create no fragments"
2573                );
2574            }
2575            Err(e) => {
2576                panic!("Expected write empty stream success, got error: {}", e);
2577            }
2578        }
2579    }
2580
2581    #[tokio::test]
2582    async fn test_schema_mismatch_on_append() {
2583        use arrow_array::record_batch;
2584
2585        // Create initial dataset with two Int32 columns
2586        let batch1 = record_batch!(
2587            ("id", Int32, [1, 2, 3, 4, 5]),
2588            ("value", Int32, [10, 20, 30, 40, 50])
2589        )
2590        .unwrap();
2591
2592        let dataset = InsertBuilder::new("memory://")
2593            .with_params(&WriteParams {
2594                mode: WriteMode::Create,
2595                ..Default::default()
2596            })
2597            .execute(vec![batch1])
2598            .await
2599            .unwrap();
2600
2601        // Verify initial dataset
2602        assert_eq!(dataset.count_rows(None).await.unwrap(), 5);
2603        assert_eq!(dataset.schema().fields.len(), 2);
2604
2605        // Try to append with different schema (Float64 instead of Int32 for 'value' column)
2606        let batch2 = record_batch!(
2607            ("id", Int32, [6, 7, 8]),
2608            ("value", Float64, [60.0, 70.0, 80.0])
2609        )
2610        .unwrap();
2611
2612        let result = InsertBuilder::new(Arc::new(dataset.clone()))
2613            .with_params(&WriteParams {
2614                mode: WriteMode::Append,
2615                ..Default::default()
2616            })
2617            .execute(vec![batch2])
2618            .await;
2619
2620        // Should fail due to schema mismatch
2621        assert!(result.is_err(), "Append with mismatched schema should fail");
2622        let error = result.unwrap_err();
2623        let error_msg = error.to_string().to_lowercase();
2624        assert!(
2625            error_msg.contains("schema")
2626                || error_msg.contains("type")
2627                || error_msg.contains("mismatch")
2628                || error_msg.contains("field")
2629                || error_msg.contains("not found"),
2630            "Error should mention schema or type mismatch: {}",
2631            error_msg
2632        );
2633
2634        // Verify original dataset is still intact
2635        assert_eq!(dataset.count_rows(None).await.unwrap(), 5);
2636        assert_eq!(dataset.schema().fields.len(), 2);
2637    }
2638
2639    #[tokio::test]
2640    async fn test_disk_full_error() {
2641        use std::io::{self, ErrorKind};
2642        use std::sync::Arc;
2643
2644        use async_trait::async_trait;
2645        use object_store::{
2646            GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, PutMultipartOptions,
2647            PutOptions, PutPayload, PutResult,
2648        };
2649
2650        // Create a custom ObjectStore that simulates disk full error
2651        #[derive(Debug)]
2652        struct DiskFullObjectStore;
2653
2654        impl std::fmt::Display for DiskFullObjectStore {
2655            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2656                write!(f, "DiskFullObjectStore")
2657            }
2658        }
2659
2660        #[async_trait]
2661        impl object_store::ObjectStore for DiskFullObjectStore {
2662            async fn put(
2663                &self,
2664                _location: &object_store::path::Path,
2665                _bytes: PutPayload,
2666            ) -> object_store::Result<PutResult> {
2667                Err(object_store::Error::Generic {
2668                    store: "DiskFullStore",
2669                    source: Box::new(io::Error::new(
2670                        ErrorKind::StorageFull,
2671                        "No space left on device",
2672                    )),
2673                })
2674            }
2675
2676            async fn put_opts(
2677                &self,
2678                _location: &object_store::path::Path,
2679                _bytes: PutPayload,
2680                _opts: PutOptions,
2681            ) -> object_store::Result<PutResult> {
2682                Err(object_store::Error::Generic {
2683                    store: "DiskFullStore",
2684                    source: Box::new(io::Error::new(
2685                        ErrorKind::StorageFull,
2686                        "No space left on device",
2687                    )),
2688                })
2689            }
2690
2691            async fn put_multipart(
2692                &self,
2693                _location: &object_store::path::Path,
2694            ) -> object_store::Result<Box<dyn MultipartUpload>> {
2695                Err(object_store::Error::NotSupported {
2696                    source: "Multipart upload not supported".into(),
2697                })
2698            }
2699
2700            async fn put_multipart_opts(
2701                &self,
2702                _location: &object_store::path::Path,
2703                _opts: PutMultipartOptions,
2704            ) -> object_store::Result<Box<dyn MultipartUpload>> {
2705                Err(object_store::Error::NotSupported {
2706                    source: "Multipart upload not supported".into(),
2707                })
2708            }
2709
2710            async fn get(
2711                &self,
2712                _location: &object_store::path::Path,
2713            ) -> object_store::Result<GetResult> {
2714                Err(object_store::Error::NotFound {
2715                    path: "".into(),
2716                    source: "".into(),
2717                })
2718            }
2719
2720            async fn get_opts(
2721                &self,
2722                _location: &object_store::path::Path,
2723                _options: GetOptions,
2724            ) -> object_store::Result<GetResult> {
2725                Err(object_store::Error::NotFound {
2726                    path: "".into(),
2727                    source: "".into(),
2728                })
2729            }
2730
2731            async fn delete(
2732                &self,
2733                _location: &object_store::path::Path,
2734            ) -> object_store::Result<()> {
2735                Ok(())
2736            }
2737
2738            fn list(
2739                &self,
2740                _prefix: Option<&object_store::path::Path>,
2741            ) -> futures::stream::BoxStream<'static, object_store::Result<ObjectMeta>> {
2742                Box::pin(futures::stream::empty())
2743            }
2744
2745            async fn list_with_delimiter(
2746                &self,
2747                _prefix: Option<&object_store::path::Path>,
2748            ) -> object_store::Result<ListResult> {
2749                Ok(ListResult {
2750                    common_prefixes: vec![],
2751                    objects: vec![],
2752                })
2753            }
2754
2755            async fn copy(
2756                &self,
2757                _from: &object_store::path::Path,
2758                _to: &object_store::path::Path,
2759            ) -> object_store::Result<()> {
2760                Ok(())
2761            }
2762
2763            async fn copy_if_not_exists(
2764                &self,
2765                _from: &object_store::path::Path,
2766                _to: &object_store::path::Path,
2767            ) -> object_store::Result<()> {
2768                Ok(())
2769            }
2770        }
2771
2772        let object_store = Arc::new(lance_io::object_store::ObjectStore::new(
2773            Arc::new(DiskFullObjectStore) as Arc<dyn object_store::ObjectStore>,
2774            // Use a non-"file" scheme so writes go through ObjectWriter (which
2775            // uses the DiskFullObjectStore) instead of the optimized LocalWriter.
2776            url::Url::parse("mock:///test").unwrap(),
2777            None,
2778            None,
2779            false,
2780            true,
2781            lance_io::object_store::DEFAULT_LOCAL_IO_PARALLELISM,
2782            lance_io::object_store::DEFAULT_DOWNLOAD_RETRY_COUNT,
2783            None,
2784        ));
2785
2786        // Create test data
2787        let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
2788            "id",
2789            DataType::Int32,
2790            false,
2791        )]));
2792
2793        let batch = RecordBatch::try_new(
2794            arrow_schema.clone(),
2795            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]))],
2796        )
2797        .unwrap();
2798
2799        let data_reader = Box::new(RecordBatchIterator::new(
2800            vec![Ok(batch)].into_iter(),
2801            arrow_schema.clone(),
2802        ));
2803
2804        let data_stream = Box::pin(RecordBatchStreamAdapter::new(
2805            arrow_schema,
2806            futures::stream::iter(data_reader.map(|rb| rb.map_err(DataFusionError::from))),
2807        ));
2808
2809        let schema = Schema::try_from(data_stream.schema().as_ref()).unwrap();
2810
2811        let write_params = WriteParams {
2812            mode: WriteMode::Create,
2813            ..Default::default()
2814        };
2815
2816        // Attempt to write data - should fail with IO error due to disk full
2817        let result = write_fragments_internal(
2818            None,
2819            object_store,
2820            &Path::from("test_disk_full"),
2821            schema,
2822            data_stream,
2823            write_params,
2824            None,
2825        )
2826        .await;
2827
2828        // Verify that the error is an IO error (which wraps the disk full error)
2829        assert!(result.is_err(), "Write should fail when disk is full");
2830        let error = result.unwrap_err();
2831        let error_msg = error.to_string().to_lowercase();
2832
2833        // The error should mention IO, space, or storage
2834        assert!(
2835            error_msg.contains("io")
2836                || error_msg.contains("space")
2837                || error_msg.contains("storage")
2838                || error_msg.contains("full"),
2839            "Error should mention IO, space, or storage: {}",
2840            error_msg
2841        );
2842
2843        // Verify it's an IO error type
2844        assert!(
2845            matches!(error, lance_core::Error::IO { .. }),
2846            "Expected IO error, got: {}",
2847            error
2848        );
2849    }
2850
2851    /// Test that dataset remains consistent after write interruption and can recover.
2852    /// This verifies that:
2853    /// 1. The dataset is not corrupted when a write is interrupted (not committed)
2854    /// 2. Incomplete data files are not visible until committed
2855    /// 3. The transaction can be retried successfully
2856    #[tokio::test]
2857    async fn test_write_interruption_recovery() {
2858        use super::commit::CommitBuilder;
2859        use arrow_array::record_batch;
2860
2861        // Create a temporary directory for testing
2862        let temp_dir = TempDir::default();
2863        let dataset_uri = format!("file://{}", temp_dir.std_path().display());
2864
2865        // First, create a normal dataset with some initial data
2866        let batch =
2867            record_batch!(("id", Int32, [1, 2, 3]), ("value", Utf8, ["a", "b", "c"])).unwrap();
2868
2869        // Write initial dataset normally
2870        let dataset = InsertBuilder::new(&dataset_uri)
2871            .execute(vec![batch.clone()])
2872            .await
2873            .unwrap();
2874
2875        // Verify initial dataset is valid
2876        assert_eq!(dataset.count_rows(None).await.unwrap(), 3);
2877
2878        // Prepare additional data to write
2879        let new_batch =
2880            record_batch!(("id", Int32, [4, 5, 6]), ("value", Utf8, ["d", "e", "f"])).unwrap();
2881
2882        // Step 1: Write uncommitted data (simulates interrupted write before commit)
2883        let uncommitted_result = InsertBuilder::new(WriteDestination::Dataset(Arc::new(
2884            Dataset::open(&dataset_uri).await.unwrap(),
2885        )))
2886        .with_params(&WriteParams {
2887            mode: WriteMode::Append,
2888            ..Default::default()
2889        })
2890        .execute_uncommitted(vec![new_batch])
2891        .await;
2892
2893        // The uncommitted write should succeed (data is written to files)
2894        assert!(
2895            uncommitted_result.is_ok(),
2896            "Uncommitted write should succeed"
2897        );
2898        let transaction = uncommitted_result.unwrap();
2899
2900        // Step 2: Verify dataset is still consistent (uncommitted changes not visible)
2901        let dataset_before_commit = Dataset::open(&dataset_uri).await.unwrap();
2902        let row_count_before = dataset_before_commit.count_rows(None).await.unwrap();
2903        assert_eq!(
2904            row_count_before, 3,
2905            "Dataset should still have only original 3 rows (uncommitted data not visible)"
2906        );
2907
2908        // Step 3: Commit to transaction (simulates retry after interruption)
2909        let commit_result = CommitBuilder::new(&dataset_uri).execute(transaction).await;
2910        commit_result.unwrap();
2911
2912        // Step 4: Verify dataset now has all 6 rows after successful commit
2913        let dataset_after_commit = Dataset::open(&dataset_uri).await.unwrap();
2914        let row_count_after = dataset_after_commit.count_rows(None).await.unwrap();
2915        assert_eq!(
2916            row_count_after, 6,
2917            "Dataset should have all 6 rows after commit"
2918        );
2919
2920        // Verify data integrity
2921        let mut scanner = dataset_after_commit.scan();
2922        scanner.project(&["id", "value"]).unwrap();
2923        let batches = scanner
2924            .try_into_stream()
2925            .await
2926            .unwrap()
2927            .try_collect::<Vec<_>>()
2928            .await
2929            .unwrap();
2930
2931        let all_ids: Vec<i32> = batches
2932            .iter()
2933            .flat_map(|batch| {
2934                batch
2935                    .column(0)
2936                    .as_any()
2937                    .downcast_ref::<Int32Array>()
2938                    .unwrap()
2939                    .iter()
2940                    .flatten()
2941            })
2942            .collect();
2943
2944        assert_eq!(
2945            all_ids,
2946            vec![1, 2, 3, 4, 5, 6],
2947            "All data should be correctly written"
2948        );
2949    }
2950}