lance/io/
commit.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Trait for commit implementations.
5//!
6//! In Lance, a transaction is committed by writing the next manifest file.
7//! However, care should be taken to ensure that the manifest file is written
8//! only once, even if there are concurrent writers. Different stores have
9//! different abilities to handle concurrent writes, so a trait is provided
10//! to allow for different implementations.
11//!
12//! The trait [CommitHandler] can be implemented to provide different commit
13//! strategies. The default implementation for most object stores is
14//! [ConditionalPutCommitHandler], which writes the manifest to a temporary path, then
15//! renames the temporary path to the final path if no object already exists
16//! at the final path.
17//!
18//! When providing your own commit handler, most often you are implementing in
19//! terms of a lock. The trait [CommitLock] can be implemented as a simpler
20//! alternative to [CommitHandler].
21
22use std::collections::{HashMap, HashSet};
23use std::num::NonZero;
24use std::sync::Arc;
25use std::time::Instant;
26
27use conflict_resolver::TransactionRebase;
28use lance_core::utils::backoff::{Backoff, SlotBackoff};
29use lance_core::utils::mask::RowIdTreeMap;
30use lance_file::version::LanceFileVersion;
31use lance_index::metrics::NoOpMetricsCollector;
32use lance_io::utils::CachedFileSize;
33use lance_table::format::{
34    is_detached_version, pb, DataStorageFormat, DeletionFile, Fragment, IndexMetadata, Manifest,
35    WriterVersion, DETACHED_VERSION_MASK,
36};
37use lance_table::io::commit::{
38    CommitConfig, CommitError, CommitHandler, ManifestLocation, ManifestNamingScheme,
39};
40use rand::{rng, Rng};
41use snafu::location;
42
43use futures::future::Either;
44use futures::{StreamExt, TryFutureExt, TryStreamExt};
45use lance_core::{Error, Result};
46use lance_index::{is_system_index, DatasetIndexExt};
47use log;
48use object_store::path::Path;
49use prost::Message;
50
51use super::ObjectStore;
52use crate::dataset::cleanup::auto_cleanup_hook;
53use crate::dataset::fragment::FileFragment;
54use crate::dataset::transaction::{Operation, Transaction};
55use crate::dataset::{
56    load_new_transactions, write_manifest_file, ManifestWriteConfig, NewTransactionResult, BLOB_DIR,
57};
58use crate::index::DatasetIndexInternalExt;
59use crate::io::deletion::read_dataset_deletion_file;
60use crate::session::caches::DSMetadataCache;
61use crate::session::index_caches::IndexMetadataKey;
62use crate::session::Session;
63use crate::Dataset;
64
65mod conflict_resolver;
66#[cfg(all(feature = "dynamodb_tests", test))]
67mod dynamodb;
68#[cfg(test)]
69mod external_manifest;
70#[cfg(all(feature = "dynamodb_tests", test))]
71mod s3_test;
72
73/// Read the transaction data from a transaction file.
74pub(crate) async fn read_transaction_file(
75    object_store: &ObjectStore,
76    base_path: &Path,
77    transaction_file: &str,
78) -> Result<Transaction> {
79    let path = base_path.child("_transactions").child(transaction_file);
80    let result = object_store.inner.get(&path).await?;
81    let data = result.bytes().await?;
82    let transaction = pb::Transaction::decode(data)?;
83    transaction.try_into()
84}
85
86/// Write a transaction to a file and return the relative path.
87async fn write_transaction_file(
88    object_store: &ObjectStore,
89    base_path: &Path,
90    transaction: &Transaction,
91) -> Result<String> {
92    let file_name = format!("{}-{}.txn", transaction.read_version, transaction.uuid);
93    let path = base_path.child("_transactions").child(file_name.as_str());
94
95    let message = pb::Transaction::from(transaction);
96    let buf = message.encode_to_vec();
97    object_store.inner.put(&path, buf.into()).await?;
98
99    Ok(file_name)
100}
101
102#[allow(clippy::too_many_arguments)]
103async fn do_commit_new_dataset(
104    object_store: &ObjectStore,
105    commit_handler: &dyn CommitHandler,
106    base_path: &Path,
107    transaction: &Transaction,
108    write_config: &ManifestWriteConfig,
109    manifest_naming_scheme: ManifestNamingScheme,
110    blob_version: Option<u64>,
111    metadata_cache: &DSMetadataCache,
112) -> Result<(Manifest, ManifestLocation)> {
113    let transaction_file = write_transaction_file(object_store, base_path, transaction).await?;
114
115    let (mut manifest, indices) = if let Operation::Clone {
116        ref_name,
117        ref_version,
118        ref_path,
119        branch_name,
120        ..
121    } = &transaction.operation
122    {
123        let source_manifest_location = commit_handler
124            .resolve_version_location(
125                &Path::parse(ref_path.as_str())?,
126                *ref_version,
127                &object_store.inner,
128            )
129            .await?;
130        let source_manifest = Dataset::load_manifest(
131            object_store,
132            &source_manifest_location,
133            base_path.to_string().as_str(),
134            &Session::default(),
135        )
136        .await?;
137
138        let new_base_id = source_manifest
139            .base_paths
140            .keys()
141            .max()
142            .map(|id| *id + 1)
143            .unwrap_or(0);
144        let new_manifest = source_manifest.shallow_clone(
145            ref_name.clone(),
146            ref_path.clone(),
147            new_base_id,
148            branch_name.clone(),
149            transaction_file,
150        );
151
152        let updated_indices = if let Some(index_section_pos) = source_manifest.index_section {
153            let reader = object_store.open(&source_manifest_location.path).await?;
154            let section: pb::IndexSection =
155                lance_io::utils::read_message(reader.as_ref(), index_section_pos).await?;
156            section
157                .indices
158                .into_iter()
159                .map(|index_pb| {
160                    let mut index = IndexMetadata::try_from(index_pb)?;
161                    index.base_id = Some(new_base_id);
162                    Ok(index)
163                })
164                .collect::<Result<Vec<_>>>()?
165        } else {
166            vec![]
167        };
168        (new_manifest, updated_indices)
169    } else {
170        let (manifest, indices) = transaction.build_manifest(
171            None,
172            vec![],
173            &transaction_file,
174            write_config,
175            blob_version,
176        )?;
177        (manifest, indices)
178    };
179
180    manifest.blob_dataset_version = blob_version;
181
182    let result = write_manifest_file(
183        object_store,
184        commit_handler,
185        base_path,
186        &mut manifest,
187        if indices.is_empty() {
188            None
189        } else {
190            Some(indices.clone())
191        },
192        write_config,
193        manifest_naming_scheme,
194    )
195    .await;
196
197    // TODO: Allow Append or Overwrite mode to retry using `commit_transaction`
198    // if there is a conflict.
199    match result {
200        Ok(manifest_location) => {
201            let tx_key = crate::session::caches::TransactionKey {
202                version: manifest.version,
203            };
204            metadata_cache
205                .insert_with_key(&tx_key, Arc::new(transaction.clone()))
206                .await;
207
208            let manifest_key = crate::session::caches::ManifestKey {
209                version: manifest_location.version,
210                e_tag: manifest_location.e_tag.as_deref(),
211            };
212            metadata_cache
213                .insert_with_key(&manifest_key, Arc::new(manifest.clone()))
214                .await;
215            Ok((manifest, manifest_location))
216        }
217        Err(CommitError::CommitConflict) => Err(crate::Error::DatasetAlreadyExists {
218            uri: base_path.to_string(),
219            location: location!(),
220        }),
221        Err(CommitError::OtherError(err)) => Err(err),
222    }
223}
224
225pub(crate) async fn commit_new_dataset(
226    object_store: &ObjectStore,
227    commit_handler: &dyn CommitHandler,
228    base_path: &Path,
229    transaction: &Transaction,
230    write_config: &ManifestWriteConfig,
231    manifest_naming_scheme: ManifestNamingScheme,
232    metadata_cache: &crate::session::caches::DSMetadataCache,
233) -> Result<(Manifest, ManifestLocation)> {
234    let blob_version = if let Some(blob_op) = transaction.blobs_op.as_ref() {
235        let blob_path = base_path.child(BLOB_DIR);
236        let blob_tx = Transaction::new(0, blob_op.clone(), None, None);
237        let (blob_manifest, _) = do_commit_new_dataset(
238            object_store,
239            commit_handler,
240            &blob_path,
241            &blob_tx,
242            write_config,
243            manifest_naming_scheme,
244            None,
245            metadata_cache,
246        )
247        .await?;
248        Some(blob_manifest.version)
249    } else {
250        None
251    };
252
253    do_commit_new_dataset(
254        object_store,
255        commit_handler,
256        base_path,
257        transaction,
258        write_config,
259        manifest_naming_scheme,
260        blob_version,
261        metadata_cache,
262    )
263    .await
264}
265
266/// Internal function to check if a manifest could use some migration.
267///
268/// Manifest migrations happen on each write, but sometimes we need to run them
269/// before certain new operations. An easy way to force a migration is to run
270/// `dataset.delete(false)`, which won't modify data but will cause a migration.
271/// However, you don't want to always have to do this, so we provide this method
272/// to check if a migration is needed.
273pub fn manifest_needs_migration(manifest: &Manifest, indices: &[IndexMetadata]) -> bool {
274    manifest.writer_version.is_none()
275        || manifest.fragments.iter().any(|f| {
276            f.physical_rows.is_none()
277                || (f
278                    .deletion_file
279                    .as_ref()
280                    .map(|d| d.num_deleted_rows.is_none())
281                    .unwrap_or(false))
282        })
283        || indices
284            .iter()
285            .any(|i| must_recalculate_fragment_bitmap(i, manifest.writer_version.as_ref()))
286}
287
288/// Update manifest with new metadata fields.
289///
290/// Fields such as `physical_rows` and `num_deleted_rows` may not have been
291/// in older datasets. To bring these old manifests up-to-date, we add them here.
292async fn migrate_manifest(
293    dataset: &Dataset,
294    manifest: &mut Manifest,
295    recompute_stats: bool,
296) -> Result<()> {
297    if !recompute_stats
298        && manifest.fragments.iter().all(|f| {
299            f.num_rows().map(|n| n > 0).unwrap_or(false)
300                && f.files.iter().all(|f| f.file_size_bytes.get().is_some())
301        })
302    {
303        return Ok(());
304    }
305
306    manifest.fragments =
307        Arc::new(migrate_fragments(dataset, &manifest.fragments, recompute_stats).await?);
308
309    Ok(())
310}
311
312fn check_storage_version(manifest: &mut Manifest) -> Result<()> {
313    let data_storage_version = manifest.data_storage_format.lance_file_version()?;
314    if manifest.data_storage_format.lance_file_version()? == LanceFileVersion::Legacy {
315        // Due to bugs in 0.16 it is possible the dataset's data storage version does not
316        // match the file version.  As a result, we need to check and see if they are out
317        // of sync.
318        if let Some(actual_file_version) =
319            Fragment::try_infer_version(&manifest.fragments).map_err(|e| Error::Internal {
320                message: format!(
321                    "The dataset contains a mixture of file versions.  You will need to rollback to an earlier version: {}",
322                    e
323                ),
324                location: location!(),
325            })? {
326                if actual_file_version > data_storage_version {
327                    log::warn!(
328                        "Data storage version {} is less than the actual file version {}.  This has been automatically updated.",
329                        data_storage_version,
330                        actual_file_version
331                    );
332                    manifest.data_storage_format = DataStorageFormat::new(actual_file_version);
333                }
334            }
335    } else {
336        // Otherwise, if we are on 2.0 or greater, we should ensure that the file versions
337        // match the data storage version.  This is a sanity assertion to prevent data corruption.
338        if let Some(actual_file_version) = Fragment::try_infer_version(&manifest.fragments)? {
339            if actual_file_version != data_storage_version {
340                return Err(Error::Internal {
341                    message: format!(
342                        "The operation added files with version {}.  However, the data storage version is {}.",
343                        actual_file_version,
344                        data_storage_version
345                    ),
346                    location: location!(),
347                });
348            }
349        }
350    }
351    Ok(())
352}
353
354/// Fix schema in case of duplicate field ids.
355///
356/// See test dataset v0.10.5/corrupt_schema
357fn fix_schema(manifest: &mut Manifest) -> Result<()> {
358    // We can short-circuit if there is only one file per fragment or no fragments.
359    if manifest.fragments.iter().all(|f| f.files.len() <= 1) {
360        return Ok(());
361    }
362
363    // First, see which, if any fields have duplicate ids, within any fragment.
364    let mut fields_with_duplicate_ids = HashSet::new();
365    let mut seen_fields = HashSet::new();
366    for fragment in manifest.fragments.iter() {
367        for file in fragment.files.iter() {
368            for field_id in file.fields.iter() {
369                if *field_id >= 0 && !seen_fields.insert(*field_id) {
370                    fields_with_duplicate_ids.insert(*field_id);
371                }
372            }
373        }
374        seen_fields.clear();
375    }
376    if fields_with_duplicate_ids.is_empty() {
377        return Ok(());
378    }
379
380    // Now, we need to remap the field ids to be unique.
381    let mut field_id_seed = manifest.max_field_id() + 1;
382    let mut old_field_id_mapping: HashMap<i32, i32> = HashMap::new();
383    let mut fields_with_duplicate_ids = fields_with_duplicate_ids.into_iter().collect::<Vec<_>>();
384    fields_with_duplicate_ids.sort_unstable();
385    for field_id in fields_with_duplicate_ids {
386        old_field_id_mapping.insert(field_id, field_id_seed);
387        field_id_seed += 1;
388    }
389
390    let mut fragments = manifest.fragments.as_ref().clone();
391
392    // Apply mapping to fragment files list
393    // We iterate over files in reverse order so that we only map the last field id
394    seen_fields.clear();
395    for fragment in fragments.iter_mut() {
396        for field_id in fragment
397            .files
398            .iter_mut()
399            .rev()
400            .flat_map(|file| file.fields.iter_mut())
401        {
402            if let Some(new_field_id) = old_field_id_mapping.get(field_id) {
403                if seen_fields.insert(*field_id) {
404                    *field_id = *new_field_id;
405                }
406            }
407        }
408        seen_fields.clear();
409    }
410
411    // Apply mapping to the schema
412    for (old_field_id, new_field_id) in &old_field_id_mapping {
413        let field = manifest.schema.mut_field_by_id(*old_field_id).unwrap();
414        field.id = *new_field_id;
415
416        if let Some(local_field) = manifest.local_schema.mut_field_by_id(*old_field_id) {
417            local_field.id = *new_field_id;
418        }
419    }
420
421    // Drop data files that are no longer in use.
422    let remaining_field_ids = manifest
423        .schema
424        .fields_pre_order()
425        .map(|f| f.id)
426        .collect::<HashSet<_>>();
427    for fragment in fragments.iter_mut() {
428        fragment.files.retain(|file| {
429            file.fields
430                .iter()
431                .any(|field_id| remaining_field_ids.contains(field_id))
432        });
433    }
434
435    manifest.fragments = Arc::new(fragments);
436
437    Ok(())
438}
439
440/// Get updated vector of fragments that has `physical_rows` and `num_deleted_rows`
441/// filled in. This is no-op for newer tables, but may do IO for tables written
442/// with older versions of Lance.
443pub(crate) async fn migrate_fragments(
444    dataset: &Dataset,
445    fragments: &[Fragment],
446    recompute_stats: bool,
447) -> Result<Vec<Fragment>> {
448    let dataset = Arc::new(dataset.clone());
449    let new_fragments = futures::stream::iter(fragments)
450        .map(|fragment| async {
451            let physical_rows = if recompute_stats {
452                None
453            } else {
454                fragment.physical_rows
455            };
456            let physical_rows = if let Some(physical_rows) = physical_rows {
457                Either::Right(futures::future::ready(Ok(physical_rows)))
458            } else {
459                let file_fragment = FileFragment::new(dataset.clone(), fragment.clone());
460                Either::Left(async move { file_fragment.physical_rows().await })
461            };
462            let num_deleted_rows = match &fragment.deletion_file {
463                None => Either::Left(futures::future::ready(Ok(None))),
464                Some(DeletionFile {
465                    num_deleted_rows: Some(deleted_rows),
466                    ..
467                }) if !recompute_stats => {
468                    Either::Left(futures::future::ready(Ok(Some(*deleted_rows))))
469                }
470                Some(deletion_file) => Either::Right(async {
471                    let deletion_vector =
472                        read_dataset_deletion_file(dataset.as_ref(), fragment.id, deletion_file)
473                            .await?;
474                    Ok(Some(deletion_vector.len()))
475                }),
476            };
477
478            let (physical_rows, num_deleted_rows) =
479                futures::future::try_join(physical_rows, num_deleted_rows).await?;
480
481            let mut data_files = fragment.files.clone();
482
483            // For each of the data files in the fragment, we need to get the file size
484            let object_store = dataset.object_store();
485            let get_sizes = data_files
486                .iter()
487                .map(|file| {
488                    if let Some(size) = file.file_size_bytes.get() {
489                        Either::Left(futures::future::ready(Ok(size)))
490                    } else {
491                        Either::Right(async {
492                            object_store
493                                .size(&dataset.base.child("data").child(file.path.clone()))
494                                .map_ok(|size| {
495                                    NonZero::new(size).ok_or_else(|| Error::Internal {
496                                        message: format!("File {} has size 0", file.path),
497                                        location: location!(),
498                                    })
499                                })
500                                .await?
501                        })
502                    }
503                })
504                .collect::<Vec<_>>();
505            let sizes = futures::future::try_join_all(get_sizes).await?;
506            data_files.iter_mut().zip(sizes).for_each(|(file, size)| {
507                file.file_size_bytes = CachedFileSize::new(size.into());
508            });
509
510            let deletion_file = fragment
511                .deletion_file
512                .as_ref()
513                .map(|deletion_file| DeletionFile {
514                    num_deleted_rows,
515                    ..deletion_file.clone()
516                });
517
518            Ok::<_, Error>(Fragment {
519                physical_rows: Some(physical_rows),
520                deletion_file,
521                files: data_files,
522                ..fragment.clone()
523            })
524        })
525        .buffered(dataset.object_store.io_parallelism())
526        // Filter out empty fragments
527        .try_filter(|frag| futures::future::ready(frag.num_rows().map(|n| n > 0).unwrap_or(false)))
528        .boxed();
529
530    new_fragments.try_collect().await
531}
532
533fn must_recalculate_fragment_bitmap(
534    index: &IndexMetadata,
535    version: Option<&WriterVersion>,
536) -> bool {
537    // If the fragment bitmap was written by an old version of lance then we need to recalculate
538    // it because it could be corrupt due to a bug in versions < 0.8.15
539    index.fragment_bitmap.is_none() || version.map(|v| v.older_than(0, 8, 15)).unwrap_or(true)
540}
541
542/// Update indices with new fields.
543///
544/// Indices might be missing `fragment_bitmap`, so this function will add it.
545async fn migrate_indices(dataset: &Dataset, indices: &mut [IndexMetadata]) -> Result<()> {
546    let needs_recalculating = match detect_overlapping_fragments(indices) {
547        Ok(()) => vec![],
548        Err(BadFragmentBitmapError { bad_indices }) => {
549            bad_indices.into_iter().map(|(name, _)| name).collect()
550        }
551    };
552    for index in indices {
553        if needs_recalculating.contains(&index.name)
554            || must_recalculate_fragment_bitmap(index, dataset.manifest.writer_version.as_ref())
555                && !is_system_index(index)
556        {
557            debug_assert_eq!(index.fields.len(), 1);
558            let idx_field = dataset.schema().field_by_id(index.fields[0]).ok_or_else(|| Error::Internal { message: format!("Index with uuid {} referred to field with id {} which did not exist in dataset", index.uuid, index.fields[0]), location: location!() })?;
559            // We need to calculate the fragments covered by the index
560            let idx = dataset
561                .open_generic_index(
562                    &idx_field.name,
563                    &index.uuid.to_string(),
564                    &NoOpMetricsCollector,
565                )
566                .await?;
567            index.fragment_bitmap = Some(idx.calculate_included_frags().await?);
568        }
569        // We can't reliably recalculate the index type for label_list and bitmap indices and so we can't migrate this field.
570        // However, we still log for visibility and to help potentially diagnose issues in the future if we grow to rely on the field.
571        if index.index_details.is_none() {
572            log::debug!("the index with uuid {} is missing index metadata.  This probably means it was written with Lance version <= 0.19.2.  This is not a problem.", index.uuid);
573        }
574    }
575
576    Ok(())
577}
578
579pub(crate) struct BadFragmentBitmapError {
580    pub bad_indices: Vec<(String, Vec<u32>)>,
581}
582
583/// Detect whether a given index has overlapping fragment bitmaps in its index
584/// segments.
585pub(crate) fn detect_overlapping_fragments(
586    indices: &[IndexMetadata],
587) -> std::result::Result<(), BadFragmentBitmapError> {
588    let index_names: HashSet<&str> = indices.iter().map(|i| i.name.as_str()).collect();
589    let mut bad_indices = Vec::new(); // (index_name, overlapping_fragments)
590    for name in index_names {
591        let mut seen_fragment_ids = HashSet::new();
592        let mut overlap = Vec::new();
593        for index in indices.iter().filter(|i| i.name == name) {
594            if let Some(fragment_bitmap) = index.fragment_bitmap.as_ref() {
595                for fragment in fragment_bitmap {
596                    if !seen_fragment_ids.insert(fragment) {
597                        overlap.push(fragment);
598                    }
599                }
600            }
601        }
602        if !overlap.is_empty() {
603            bad_indices.push((name.to_string(), overlap));
604        }
605    }
606    if bad_indices.is_empty() {
607        Ok(())
608    } else {
609        Err(BadFragmentBitmapError { bad_indices })
610    }
611}
612
613pub(crate) async fn do_commit_detached_transaction(
614    dataset: &Dataset,
615    object_store: &ObjectStore,
616    commit_handler: &dyn CommitHandler,
617    transaction: &Transaction,
618    write_config: &ManifestWriteConfig,
619    commit_config: &CommitConfig,
620    new_blob_version: Option<u64>,
621) -> Result<(Manifest, ManifestLocation)> {
622    // We don't strictly need a transaction file but we go ahead and create one for
623    // record-keeping if nothing else.
624    let transaction_file = write_transaction_file(object_store, &dataset.base, transaction).await?;
625
626    // We still do a loop since we may have conflicts in the random version we pick
627    let mut backoff = Backoff::default();
628    while backoff.attempt() < commit_config.num_retries {
629        // Pick a random u64 with the highest bit set to indicate it is detached
630        let random_version = rng().random::<u64>() | DETACHED_VERSION_MASK;
631
632        let (mut manifest, mut indices) = match transaction.operation {
633            Operation::Restore { version } => {
634                Transaction::restore_old_manifest(
635                    object_store,
636                    commit_handler,
637                    &dataset.base,
638                    version,
639                    write_config,
640                    &transaction_file,
641                )
642                .await?
643            }
644            _ => transaction.build_manifest(
645                Some(dataset.manifest.as_ref()),
646                dataset.load_indices().await?.as_ref().clone(),
647                &transaction_file,
648                write_config,
649                new_blob_version,
650            )?,
651        };
652
653        manifest.version = random_version;
654
655        // recompute_stats is always false so far because detached manifests are newer than
656        // the old stats bug.
657        migrate_manifest(dataset, &mut manifest, /*recompute_stats=*/ false).await?;
658        // fix_schema and check_storage_version are just for sanity-checking and consistency
659        fix_schema(&mut manifest)?;
660        check_storage_version(&mut manifest)?;
661        migrate_indices(dataset, &mut indices).await?;
662
663        // Try to commit the manifest
664        let result = write_manifest_file(
665            object_store,
666            commit_handler,
667            &dataset.base,
668            &mut manifest,
669            if indices.is_empty() {
670                None
671            } else {
672                Some(indices.clone())
673            },
674            write_config,
675            ManifestNamingScheme::V2,
676        )
677        .await;
678
679        match result {
680            Ok(location) => {
681                return Ok((manifest, location));
682            }
683            Err(CommitError::CommitConflict) => {
684                // We pick a random u64 for the version, so it's possible (though extremely unlikely)
685                // that we have a conflict. In that case, we just try again.
686                tokio::time::sleep(backoff.next_backoff()).await;
687            }
688            Err(CommitError::OtherError(err)) => {
689                // If other error, return
690                return Err(err);
691            }
692        }
693    }
694
695    // This should be extremely unlikely.  There should not be *that* many detached commits.  If
696    // this happens then it seems more likely there is a bug in our random u64 generation.
697    Err(crate::Error::CommitConflict {
698        version: 0,
699        source: format!(
700            "Failed find unused random u64 after {} retries.",
701            commit_config.num_retries
702        )
703        .into(),
704        location: location!(),
705    })
706}
707
708pub(crate) async fn commit_detached_transaction(
709    dataset: &Dataset,
710    object_store: &ObjectStore,
711    commit_handler: &dyn CommitHandler,
712    transaction: &Transaction,
713    write_config: &ManifestWriteConfig,
714    commit_config: &CommitConfig,
715) -> Result<(Manifest, ManifestLocation)> {
716    let new_blob_version = if let Some(blob_op) = transaction.blobs_op.as_ref() {
717        let blobs_dataset = dataset.blobs_dataset().await?.unwrap();
718        let blobs_tx =
719            Transaction::new(blobs_dataset.version().version, blob_op.clone(), None, None);
720        let (blobs_manifest, _) = do_commit_detached_transaction(
721            blobs_dataset.as_ref(),
722            object_store,
723            commit_handler,
724            &blobs_tx,
725            write_config,
726            commit_config,
727            None,
728        )
729        .await?;
730        Some(blobs_manifest.version)
731    } else {
732        None
733    };
734
735    do_commit_detached_transaction(
736        dataset,
737        object_store,
738        commit_handler,
739        transaction,
740        write_config,
741        commit_config,
742        new_blob_version,
743    )
744    .await
745}
746
747/// Load new transactions and sort them by version in ascending order (oldest to newest)
748async fn load_and_sort_new_transactions(
749    dataset: &Dataset,
750) -> Result<(Dataset, Vec<(u64, Arc<Transaction>)>)> {
751    let NewTransactionResult {
752        dataset: new_ds,
753        new_transactions,
754    } = load_new_transactions(dataset);
755    let new_transactions = new_transactions.try_collect::<Vec<_>>();
756    let (new_ds, mut txns) = futures::future::try_join(new_ds, new_transactions).await?;
757    txns.sort_by_key(|(version, _)| *version);
758    Ok((new_ds, txns))
759}
760
761/// Attempt to commit a transaction, with retries and conflict resolution.
762#[allow(clippy::too_many_arguments)]
763pub(crate) async fn commit_transaction(
764    dataset: &Dataset,
765    object_store: &ObjectStore,
766    commit_handler: &dyn CommitHandler,
767    transaction: &Transaction,
768    write_config: &ManifestWriteConfig,
769    commit_config: &CommitConfig,
770    manifest_naming_scheme: ManifestNamingScheme,
771    affected_rows: Option<&RowIdTreeMap>,
772) -> Result<(Manifest, ManifestLocation)> {
773    let new_blob_version = if let Some(blob_op) = transaction.blobs_op.as_ref() {
774        let blobs_dataset = dataset.blobs_dataset().await?.unwrap();
775        let blobs_tx =
776            Transaction::new(blobs_dataset.version().version, blob_op.clone(), None, None);
777        let (blobs_manifest, _) = do_commit_detached_transaction(
778            blobs_dataset.as_ref(),
779            object_store,
780            commit_handler,
781            &blobs_tx,
782            write_config,
783            commit_config,
784            None,
785        )
786        .await?;
787        Some(blobs_manifest.version)
788    } else {
789        None
790    };
791
792    // Note: object_store has been configured with WriteParams, but dataset.object_store()
793    // has not necessarily. So for anything involving writing, use `object_store`.
794    let read_version = transaction.read_version;
795    let mut target_version = read_version + 1;
796    let original_dataset = dataset.clone();
797
798    // read_version sometimes defaults to zero for overwrite.
799    // If num_retries is zero, we are in "strict overwrite" mode.
800    // Strict overwrites are not subject to any sort of automatic conflict resolution.
801    let strict_overwrite = matches!(transaction.operation, Operation::Overwrite { .. })
802        && commit_config.num_retries == 0;
803    let mut dataset =
804        if dataset.manifest.version != read_version && (read_version != 0 || strict_overwrite) {
805            // If the dataset version is not the same as the read version, we need to
806            // checkout the read version.
807            dataset.checkout_version(read_version).await?
808        } else {
809            // If the dataset version is the same as the read version, we can use it directly.
810            dataset.clone()
811        };
812
813    let mut transaction = transaction.clone();
814
815    let num_attempts = std::cmp::max(commit_config.num_retries, 1);
816    let mut backoff = SlotBackoff::default();
817    let start = Instant::now();
818
819    // Other transactions that may have been committed since the read_version.
820    // We keep pair of (version, transaction). No other transactions to check initially
821    let mut other_transactions: Vec<(u64, Arc<Transaction>)>;
822
823    while backoff.attempt() < num_attempts {
824        // We are pessimistic here and assume there may be other transactions
825        // we need to check for. We could be optimistic here and blindly
826        // attempt to commit, giving faster performance for sequence writes and
827        // slower performance for concurrent writes. But that makes the fast path
828        // faster and the slow path slower, which makes performance less predictable
829        // for users. So we always check for other transactions.
830        // We skip this for strict overwrites, because strict overwrites can't be rebased.
831        if !strict_overwrite {
832            (dataset, other_transactions) = load_and_sort_new_transactions(&dataset).await?;
833
834            // See if we can retry the commit. Try to account for all
835            // transactions that have been committed since the read_version.
836            // Use small amount of backoff to handle transactions that all
837            // started at exact same time better.
838
839            let mut rebase =
840                TransactionRebase::try_new(&original_dataset, transaction, affected_rows).await?;
841
842            for (other_version, other_transaction) in other_transactions.iter() {
843                rebase.check_txn(other_transaction, *other_version)?;
844            }
845
846            transaction = rebase.finish(&dataset).await?;
847        }
848
849        let transaction_file =
850            write_transaction_file(object_store, &dataset.base, &transaction).await?;
851
852        target_version = dataset.manifest.version + 1;
853        if is_detached_version(target_version) {
854            return Err(Error::Internal { message: "more than 2^65 versions have been created and so regular version numbers are appearing as 'detached' versions.".into(), location: location!() });
855        }
856        // Build an up-to-date manifest from the transaction and current manifest
857        let (mut manifest, mut indices) = match transaction.operation {
858            Operation::Restore { version } => {
859                Transaction::restore_old_manifest(
860                    object_store,
861                    commit_handler,
862                    &dataset.base,
863                    version,
864                    write_config,
865                    &transaction_file,
866                )
867                .await?
868            }
869            _ => transaction.build_manifest(
870                Some(dataset.manifest.as_ref()),
871                dataset.load_indices().await?.as_ref().clone(),
872                &transaction_file,
873                write_config,
874                new_blob_version,
875            )?,
876        };
877
878        manifest.version = target_version;
879
880        let previous_writer_version = &dataset.manifest.writer_version;
881        // The versions of Lance prior to when we started writing the writer version
882        // sometimes wrote incorrect `Fragment.physical_rows` values, so we should
883        // make sure to recompute them.
884        // See: https://github.com/lancedb/lance/issues/1531
885        let recompute_stats = previous_writer_version.is_none();
886
887        migrate_manifest(&dataset, &mut manifest, recompute_stats).await?;
888
889        fix_schema(&mut manifest)?;
890
891        check_storage_version(&mut manifest)?;
892
893        migrate_indices(&dataset, &mut indices).await?;
894
895        // Try to commit the manifest
896        let result = write_manifest_file(
897            object_store,
898            commit_handler,
899            &dataset.base,
900            &mut manifest,
901            if indices.is_empty() {
902                None
903            } else {
904                Some(indices.clone())
905            },
906            write_config,
907            manifest_naming_scheme,
908        )
909        .await;
910
911        match result {
912            Ok(manifest_location) => {
913                // Cache both the transaction file and manifest
914                let tx_key = crate::session::caches::TransactionKey {
915                    version: target_version,
916                };
917                dataset
918                    .metadata_cache
919                    .insert_with_key(&tx_key, Arc::new(transaction.clone()))
920                    .await;
921
922                let manifest_key = crate::session::caches::ManifestKey {
923                    version: manifest_location.version,
924                    e_tag: manifest_location.e_tag.as_deref(),
925                };
926                dataset
927                    .metadata_cache
928                    .insert_with_key(&manifest_key, Arc::new(manifest.clone()))
929                    .await;
930                if !indices.is_empty() {
931                    let key = IndexMetadataKey {
932                        version: target_version,
933                    };
934                    dataset
935                        .index_cache
936                        .insert_with_key(&key, Arc::new(indices))
937                        .await;
938                }
939
940                if !commit_config.skip_auto_cleanup {
941                    // Note: We're using the old dataset here (before the new manifest is committed).
942                    // This means cleanup runs based on the previous version's state, which may affect
943                    // which versions are available for cleanup.
944                    match auto_cleanup_hook(&dataset, &manifest).await {
945                        Ok(Some(stats)) => log::info!("Auto cleanup triggered: {:?}", stats),
946                        Err(e) => log::error!("Error encountered during auto_cleanup_hook: {}", e),
947                        _ => {}
948                    };
949                }
950                return Ok((manifest, manifest_location));
951            }
952            Err(CommitError::CommitConflict) => {
953                let next_attempt_i = backoff.attempt() + 1;
954
955                if backoff.attempt() == 0 {
956                    // We add 10% buffer here, to allow concurrent writes to complete.
957                    // We pass the first attempt's time to the backoff so it's used
958                    // as the unit for backoff time slots.
959                    // See SlotBackoff implementation for more details on how this works.
960                    backoff = backoff.with_unit((start.elapsed().as_millis() * 11 / 10) as u32);
961                }
962
963                if next_attempt_i < num_attempts {
964                    tokio::time::sleep(backoff.next_backoff()).await;
965                    continue;
966                } else {
967                    break;
968                }
969            }
970            Err(CommitError::OtherError(err)) => {
971                // If other error, return
972                return Err(err);
973            }
974        }
975    }
976
977    Err(crate::Error::CommitConflict {
978        version: target_version,
979        source: format!(
980            "Failed to commit the transaction after {} retries.",
981            commit_config.num_retries
982        )
983        .into(),
984        location: location!(),
985    })
986}
987
988#[cfg(test)]
989mod tests {
990    use std::sync::Mutex;
991
992    use arrow_array::types::Int32Type;
993    use arrow_array::{Int32Array, Int64Array, RecordBatch, RecordBatchIterator};
994    use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema};
995    use futures::future::join_all;
996    use lance_arrow::FixedSizeListArrayExt;
997    use lance_core::datatypes::{Field, Schema};
998    use lance_core::utils::tempfile::TempStrDir;
999    use lance_index::IndexType;
1000    use lance_linalg::distance::MetricType;
1001    use lance_table::format::{DataFile, DataStorageFormat};
1002    use lance_table::io::commit::{
1003        CommitLease, CommitLock, RenameCommitHandler, UnsafeCommitHandler,
1004    };
1005    use lance_testing::datagen::generate_random_array;
1006
1007    use super::*;
1008
1009    use crate::dataset::{WriteMode, WriteParams};
1010    use crate::index::vector::VectorIndexParams;
1011    use crate::utils::test::{DatagenExt, FragmentCount, FragmentRowCount};
1012    use crate::Dataset;
1013
1014    async fn test_commit_handler(handler: Arc<dyn CommitHandler>, should_succeed: bool) {
1015        // Create a dataset, passing handler as commit handler
1016        let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
1017            "x",
1018            DataType::Int64,
1019            false,
1020        )]));
1021        let data = RecordBatch::try_new(
1022            schema.clone(),
1023            vec![Arc::new(Int64Array::from(vec![1, 2, 3]))],
1024        )
1025        .unwrap();
1026        let reader = RecordBatchIterator::new(vec![Ok(data)], schema);
1027
1028        let options = WriteParams {
1029            commit_handler: Some(handler),
1030            ..Default::default()
1031        };
1032        let dataset = Dataset::write(reader, "memory://test", Some(options))
1033            .await
1034            .unwrap();
1035
1036        // Create 10 concurrent tasks to write into the table
1037        // Record how many succeed and how many fail
1038        let tasks = (0..10).map(|_| {
1039            let mut dataset = dataset.clone();
1040            tokio::task::spawn(async move {
1041                dataset
1042                    .delete("x = 2")
1043                    .await
1044                    .map(|_| dataset.manifest.version)
1045            })
1046        });
1047
1048        let task_results: Vec<Option<u64>> = join_all(tasks)
1049            .await
1050            .iter()
1051            .map(|res| match res {
1052                Ok(Ok(version)) => Some(*version),
1053                _ => None,
1054            })
1055            .collect();
1056
1057        let num_successes = task_results.iter().filter(|x| x.is_some()).count();
1058        let distinct_results: HashSet<_> = task_results.iter().filter_map(|x| x.as_ref()).collect();
1059
1060        if should_succeed {
1061            assert_eq!(
1062                num_successes,
1063                distinct_results.len(),
1064                "Expected no two tasks to succeed for the same version. Got {:?}",
1065                task_results
1066            );
1067        } else {
1068            // All we can promise here is at least one tasks succeeds, but multiple
1069            // could in theory.
1070            assert!(num_successes >= distinct_results.len(),);
1071        }
1072    }
1073
1074    #[tokio::test]
1075    async fn test_rename_commit_handler() {
1076        // Rename is default for memory
1077        let handler = Arc::new(RenameCommitHandler);
1078        test_commit_handler(handler, true).await;
1079    }
1080
1081    #[tokio::test]
1082    async fn test_custom_commit() {
1083        #[derive(Debug)]
1084        struct CustomCommitHandler {
1085            locked_version: Arc<Mutex<Option<u64>>>,
1086        }
1087
1088        struct CustomCommitLease {
1089            version: u64,
1090            locked_version: Arc<Mutex<Option<u64>>>,
1091        }
1092
1093        #[async_trait::async_trait]
1094        impl CommitLock for CustomCommitHandler {
1095            type Lease = CustomCommitLease;
1096
1097            async fn lock(&self, version: u64) -> std::result::Result<Self::Lease, CommitError> {
1098                let mut locked_version = self.locked_version.lock().unwrap();
1099                if locked_version.is_some() {
1100                    // Already locked
1101                    return Err(CommitError::CommitConflict);
1102                }
1103
1104                // Lock the version
1105                *locked_version = Some(version);
1106
1107                Ok(CustomCommitLease {
1108                    version,
1109                    locked_version: self.locked_version.clone(),
1110                })
1111            }
1112        }
1113
1114        #[async_trait::async_trait]
1115        impl CommitLease for CustomCommitLease {
1116            async fn release(&self, _success: bool) -> std::result::Result<(), CommitError> {
1117                let mut locked_version = self.locked_version.lock().unwrap();
1118                if *locked_version != Some(self.version) {
1119                    // Already released
1120                    return Err(CommitError::CommitConflict);
1121                }
1122
1123                // Release the version
1124                *locked_version = None;
1125
1126                Ok(())
1127            }
1128        }
1129
1130        let locked_version = Arc::new(Mutex::new(None));
1131        let handler = Arc::new(CustomCommitHandler { locked_version });
1132        test_commit_handler(handler, true).await;
1133    }
1134
1135    #[tokio::test]
1136    async fn test_unsafe_commit_handler() {
1137        let handler = Arc::new(UnsafeCommitHandler);
1138        test_commit_handler(handler, false).await;
1139    }
1140
1141    #[tokio::test]
1142    async fn test_roundtrip_transaction_file() {
1143        let object_store = ObjectStore::memory();
1144        let base_path = Path::from("test");
1145        let transaction = Transaction::new(
1146            42,
1147            Operation::Append { fragments: vec![] },
1148            /*blobs_op= */ None,
1149            Some("hello world".to_string()),
1150        );
1151
1152        let file_name = write_transaction_file(&object_store, &base_path, &transaction)
1153            .await
1154            .unwrap();
1155        let read_transaction = read_transaction_file(&object_store, &base_path, &file_name)
1156            .await
1157            .unwrap();
1158
1159        assert_eq!(transaction.read_version, read_transaction.read_version);
1160        assert_eq!(transaction.uuid, read_transaction.uuid);
1161        assert!(matches!(
1162            read_transaction.operation,
1163            Operation::Append { .. }
1164        ));
1165        assert_eq!(transaction.tag, read_transaction.tag);
1166    }
1167
1168    #[tokio::test]
1169    async fn test_concurrent_create_index() {
1170        // Create a table with two vector columns
1171        let test_dir = TempStrDir::default();
1172        let test_uri = test_dir.as_str();
1173
1174        let dimension = 16;
1175        let schema = Arc::new(ArrowSchema::new(vec![
1176            ArrowField::new(
1177                "vector1",
1178                DataType::FixedSizeList(
1179                    Arc::new(ArrowField::new("item", DataType::Float32, true)),
1180                    dimension,
1181                ),
1182                false,
1183            ),
1184            ArrowField::new(
1185                "vector2",
1186                DataType::FixedSizeList(
1187                    Arc::new(ArrowField::new("item", DataType::Float32, true)),
1188                    dimension,
1189                ),
1190                false,
1191            ),
1192        ]));
1193        let float_arr = generate_random_array(512 * dimension as usize);
1194        let vectors = Arc::new(
1195            <arrow_array::FixedSizeListArray as FixedSizeListArrayExt>::try_new_from_values(
1196                float_arr, dimension,
1197            )
1198            .unwrap(),
1199        );
1200        let batches =
1201            vec![
1202                RecordBatch::try_new(schema.clone(), vec![vectors.clone(), vectors.clone()])
1203                    .unwrap(),
1204            ];
1205
1206        let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
1207        let dataset = Dataset::write(reader, test_uri, None).await.unwrap();
1208        dataset.validate().await.unwrap();
1209
1210        // From initial version, concurrently call create index 3 times,
1211        // two of which will be for the same column.
1212        let params = VectorIndexParams::ivf_pq(10, 8, 2, MetricType::L2, 50);
1213        let futures: Vec<_> = ["vector1", "vector1", "vector2"]
1214            .iter()
1215            .map(|col_name| {
1216                let mut dataset = dataset.clone();
1217                let params = params.clone();
1218                tokio::spawn(async move {
1219                    dataset
1220                        .create_index(&[col_name], IndexType::Vector, None, &params, true)
1221                        .await
1222                })
1223            })
1224            .collect();
1225
1226        let results = join_all(futures).await;
1227        for result in results {
1228            assert!(matches!(result, Ok(Ok(_))), "{:?}", result);
1229        }
1230
1231        // Validate that each version has the anticipated number of indexes
1232        let dataset = dataset.checkout_version(1).await.unwrap();
1233        assert!(dataset.load_indices().await.unwrap().is_empty());
1234
1235        let dataset = dataset.checkout_version(2).await.unwrap();
1236        assert_eq!(dataset.load_indices().await.unwrap().len(), 1);
1237
1238        let dataset = dataset.checkout_version(3).await.unwrap();
1239        let indices = dataset.load_indices().await.unwrap();
1240        assert!(!indices.is_empty() && indices.len() <= 2);
1241
1242        // At this point, we have created two indices. If they are both for the same column,
1243        // it must be vector1 and not vector2.
1244        if indices.len() == 2 {
1245            let mut fields: Vec<i32> = indices.iter().flat_map(|i| i.fields.clone()).collect();
1246            fields.sort();
1247            assert_eq!(fields, vec![0, 1]);
1248        } else {
1249            assert_eq!(indices[0].fields, vec![0]);
1250        }
1251
1252        let dataset = dataset.checkout_version(4).await.unwrap();
1253        let indices = dataset.load_indices().await.unwrap();
1254        assert_eq!(indices.len(), 2);
1255        let mut fields: Vec<i32> = indices.iter().flat_map(|i| i.fields.clone()).collect();
1256        fields.sort();
1257        assert_eq!(fields, vec![0, 1]);
1258    }
1259
1260    #[tokio::test]
1261    async fn test_load_and_sort_new_transactions() {
1262        // Create a dataset
1263        let mut dataset = lance_datagen::gen_batch()
1264            .col("i", lance_datagen::array::step::<Int32Type>())
1265            .into_ram_dataset(FragmentCount::from(1), FragmentRowCount::from(10))
1266            .await
1267            .unwrap();
1268
1269        // Create 100 small UpdateConfig transactions
1270        for i in 0..100 {
1271            dataset
1272                .update_config(vec![(format!("key_{}", i), format!("value_{}", i))])
1273                .await
1274                .unwrap();
1275        }
1276
1277        // Now load the dataset at version 1 and check that load_and_sort_new_transactions
1278        // returns transactions in order
1279        let dataset_v1 = dataset.checkout_version(1).await.unwrap();
1280        let (_, transactions) = load_and_sort_new_transactions(&dataset_v1).await.unwrap();
1281
1282        // Verify transactions are sorted by version
1283        let versions: Vec<u64> = transactions.iter().map(|(v, _)| *v).collect();
1284        for i in 1..versions.len() {
1285            assert!(
1286                versions[i] > versions[i - 1],
1287                "Transactions not in order: version {} came after version {}",
1288                versions[i],
1289                versions[i - 1]
1290            );
1291        }
1292
1293        // Also verify we have exactly 100 transactions (versions 2-101)
1294        assert_eq!(transactions.len(), 100);
1295        assert_eq!(versions.first(), Some(&2));
1296        assert_eq!(versions.last(), Some(&101));
1297    }
1298
1299    #[tokio::test]
1300    async fn test_concurrent_writes() {
1301        for write_mode in [WriteMode::Append, WriteMode::Overwrite] {
1302            // Create an empty table
1303            let test_dir = TempStrDir::default();
1304            let test_uri = test_dir.as_str();
1305
1306            let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
1307                "i",
1308                DataType::Int32,
1309                false,
1310            )]));
1311
1312            let dataset = Dataset::write(
1313                RecordBatchIterator::new(vec![].into_iter().map(Ok), schema.clone()),
1314                test_uri,
1315                None,
1316            )
1317            .await
1318            .unwrap();
1319
1320            // Make some sample data
1321            let batch = RecordBatch::try_new(
1322                schema.clone(),
1323                vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
1324            )
1325            .unwrap();
1326
1327            // Write data concurrently in 5 tasks
1328            let futures: Vec<_> = (0..5)
1329                .map(|_| {
1330                    let batch = batch.clone();
1331                    let schema = schema.clone();
1332                    let uri = test_uri.to_string();
1333                    tokio::spawn(async move {
1334                        let reader = RecordBatchIterator::new(vec![Ok(batch)], schema);
1335                        Dataset::write(
1336                            reader,
1337                            &uri,
1338                            Some(WriteParams {
1339                                mode: write_mode,
1340                                ..Default::default()
1341                            }),
1342                        )
1343                        .await
1344                    })
1345                })
1346                .collect();
1347            let results = join_all(futures).await;
1348
1349            // Assert all succeeded
1350            for result in results {
1351                assert!(matches!(result, Ok(Ok(_))), "{:?}", result);
1352            }
1353
1354            // Assert final fragments and versions expected
1355            let dataset = dataset.checkout_version(6).await.unwrap();
1356
1357            match write_mode {
1358                WriteMode::Append => {
1359                    assert_eq!(dataset.get_fragments().len(), 5);
1360                }
1361                WriteMode::Overwrite => {
1362                    assert_eq!(dataset.get_fragments().len(), 1);
1363                }
1364                _ => unreachable!(),
1365            }
1366
1367            dataset.validate().await.unwrap()
1368        }
1369    }
1370
1371    async fn get_empty_dataset() -> (TempStrDir, Dataset) {
1372        let test_dir = TempStrDir::default();
1373        let test_uri = test_dir.as_str();
1374
1375        let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
1376            "i",
1377            DataType::Int32,
1378            false,
1379        )]));
1380
1381        let ds = Dataset::write(
1382            RecordBatchIterator::new(vec![].into_iter().map(Ok), schema.clone()),
1383            test_uri,
1384            None,
1385        )
1386        .await
1387        .unwrap();
1388        (test_dir, ds)
1389    }
1390
1391    #[tokio::test]
1392    async fn test_good_concurrent_config_writes() {
1393        let (_tmpdir, dataset) = get_empty_dataset().await;
1394        let original_num_config_keys = dataset.manifest.config.len();
1395
1396        // Test successful concurrent insert config operations
1397        let futures: Vec<_> = ["key1", "key2", "key3", "key4", "key5"]
1398            .iter()
1399            .map(|key| {
1400                let mut dataset = dataset.clone();
1401                tokio::spawn(async move {
1402                    dataset
1403                        .update_config(HashMap::from([(
1404                            key.to_string(),
1405                            Some("value".to_string()),
1406                        )]))
1407                        .await
1408                })
1409            })
1410            .collect();
1411        let results = join_all(futures).await;
1412
1413        // Assert all succeeded
1414        for result in results {
1415            assert!(matches!(result, Ok(Ok(_))), "{:?}", result);
1416        }
1417
1418        let dataset = dataset.checkout_version(6).await.unwrap();
1419        assert_eq!(dataset.manifest.config.len(), 5 + original_num_config_keys);
1420
1421        dataset.validate().await.unwrap();
1422
1423        // Test successful concurrent delete operations. If multiple delete
1424        // operations attempt to delete the same key, they are all successful.
1425        let futures: Vec<_> = ["key1", "key1", "key1", "key2", "key2"]
1426            .iter()
1427            .map(|key| {
1428                let mut dataset = dataset.clone();
1429                tokio::spawn(async move {
1430                    dataset
1431                        .update_config(HashMap::from([(key.to_string(), None)]))
1432                        .await
1433                })
1434            })
1435            .collect();
1436        let results = join_all(futures).await;
1437
1438        // Assert all succeeded
1439        for result in results {
1440            assert!(matches!(result, Ok(Ok(_))), "{:?}", result);
1441        }
1442
1443        let dataset = dataset.checkout_version(11).await.unwrap();
1444
1445        // There are now two fewer keys
1446        assert_eq!(dataset.manifest.config.len(), 3 + original_num_config_keys);
1447
1448        dataset.validate().await.unwrap()
1449    }
1450
1451    #[tokio::test]
1452    async fn test_bad_concurrent_config_writes() {
1453        // If two concurrent insert config operations occur for the same key, a
1454        // `CommitConflict` should be returned
1455        let (_tmpdir, dataset) = get_empty_dataset().await;
1456
1457        let futures: Vec<_> = ["key1", "key1", "key2", "key3", "key4"]
1458            .iter()
1459            .map(|key| {
1460                let mut dataset = dataset.clone();
1461                tokio::spawn(async move {
1462                    dataset
1463                        .update_config(HashMap::from([(
1464                            key.to_string(),
1465                            Some("value".to_string()),
1466                        )]))
1467                        .await
1468                })
1469            })
1470            .collect();
1471
1472        let results = join_all(futures).await;
1473
1474        // Assert that either the first or the second operation fails
1475        let mut first_operation_failed = false;
1476        for (i, result) in results.into_iter().enumerate() {
1477            let result = result.unwrap();
1478            match i {
1479                0 => {
1480                    if result.is_err() {
1481                        first_operation_failed = true;
1482                        assert!(
1483                            matches!(&result, &Err(Error::CommitConflict { .. })),
1484                            "{:?}",
1485                            result,
1486                        );
1487                    }
1488                }
1489                1 => match first_operation_failed {
1490                    true => assert!(result.is_ok(), "{:?}", result),
1491                    false => {
1492                        assert!(
1493                            matches!(&result, &Err(Error::CommitConflict { .. })),
1494                            "{:?}",
1495                            result,
1496                        );
1497                    }
1498                },
1499                _ => assert!(result.is_ok(), "{:?}", result),
1500            }
1501        }
1502    }
1503
1504    #[test]
1505    fn test_fix_schema() {
1506        // Manifest has a fragment with no fields in use
1507        // Manifest has a duplicate field id in one fragment but not others.
1508        let mut field0 =
1509            Field::try_from(ArrowField::new("a", arrow_schema::DataType::Int64, false)).unwrap();
1510        field0.set_id(-1, &mut 0);
1511        let mut field2 =
1512            Field::try_from(ArrowField::new("b", arrow_schema::DataType::Int64, false)).unwrap();
1513        field2.set_id(-1, &mut 2);
1514
1515        let schema = Schema {
1516            fields: vec![field0.clone(), field2.clone()],
1517            metadata: Default::default(),
1518        };
1519        let fragments = vec![
1520            Fragment {
1521                id: 0,
1522                files: vec![
1523                    DataFile::new_legacy_from_fields("path1", vec![0, 1, 2]),
1524                    DataFile::new_legacy_from_fields("unused", vec![9]),
1525                ],
1526                deletion_file: None,
1527                row_id_meta: None,
1528                physical_rows: None,
1529            },
1530            Fragment {
1531                id: 1,
1532                files: vec![
1533                    DataFile::new_legacy_from_fields("path2", vec![0, 1, 2]),
1534                    DataFile::new_legacy_from_fields("path3", vec![2]),
1535                ],
1536                deletion_file: None,
1537                row_id_meta: None,
1538                physical_rows: None,
1539            },
1540        ];
1541
1542        let mut manifest = Manifest::new(
1543            schema,
1544            Arc::new(fragments),
1545            DataStorageFormat::default(),
1546            /*blob_dataset_version=*/ None,
1547            HashMap::new(),
1548        );
1549
1550        fix_schema(&mut manifest).unwrap();
1551
1552        // Because of the duplicate field id, the field id of field2 should have been changed to 10
1553        field2.id = 10;
1554        let expected_schema = Schema {
1555            fields: vec![field0, field2],
1556            metadata: Default::default(),
1557        };
1558        assert_eq!(manifest.schema, expected_schema);
1559
1560        // The fragment with just field 9 should have been removed, since it's
1561        // not used in the current schema.
1562        // The field 2 should have been changed to 10, except in the first
1563        // file of the second fragment.
1564        let expected_fragments = vec![
1565            Fragment {
1566                id: 0,
1567                files: vec![DataFile::new_legacy_from_fields("path1", vec![0, 1, 10])],
1568                deletion_file: None,
1569                row_id_meta: None,
1570                physical_rows: None,
1571            },
1572            Fragment {
1573                id: 1,
1574                files: vec![
1575                    DataFile::new_legacy_from_fields("path2", vec![0, 1, 2]),
1576                    DataFile::new_legacy_from_fields("path3", vec![10]),
1577                ],
1578                deletion_file: None,
1579                row_id_meta: None,
1580                physical_rows: None,
1581            },
1582        ];
1583        assert_eq!(manifest.fragments.as_ref(), &expected_fragments);
1584    }
1585}