lance_table/io/
commit.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Trait for commit implementations.
5//!
6//! In Lance, a transaction is committed by writing the next manifest file.
7//! However, care should be taken to ensure that the manifest file is written
8//! only once, even if there are concurrent writers. Different stores have
9//! different abilities to handle concurrent writes, so a trait is provided
10//! to allow for different implementations.
11//!
12//! The trait [CommitHandler] can be implemented to provide different commit
13//! strategies. The default implementation for most object stores is
14//! [RenameCommitHandler], which writes the manifest to a temporary path, then
15//! renames the temporary path to the final path if no object already exists
16//! at the final path. This is an atomic operation in most object stores, but
17//! not in AWS S3. So for AWS S3, the default commit handler is
18//! [UnsafeCommitHandler], which writes the manifest to the final path without
19//! any checks.
20//!
21//! When providing your own commit handler, most often you are implementing in
22//! terms of a lock. The trait [CommitLock] can be implemented as a simpler
23//! alternative to [CommitHandler].
24
25use std::io;
26use std::sync::atomic::AtomicBool;
27use std::sync::Arc;
28use std::{fmt::Debug, fs::DirEntry};
29
30use futures::{
31    future::{self, BoxFuture},
32    stream::BoxStream,
33    StreamExt, TryStreamExt,
34};
35use log::warn;
36use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore};
37use snafu::location;
38use url::Url;
39
40#[cfg(feature = "dynamodb")]
41pub mod dynamodb;
42pub mod external_manifest;
43
44use lance_core::{Error, Result};
45use lance_io::object_store::{ObjectStore, ObjectStoreExt, ObjectStoreParams};
46
47#[cfg(feature = "dynamodb")]
48use {
49    self::external_manifest::{ExternalManifestCommitHandler, ExternalManifestStore},
50    aws_credential_types::provider::error::CredentialsError,
51    aws_credential_types::provider::ProvideCredentials,
52    lance_io::object_store::{build_aws_credential, StorageOptions},
53    object_store::aws::AmazonS3ConfigKey,
54    object_store::aws::AwsCredentialProvider,
55    std::borrow::Cow,
56    std::time::{Duration, SystemTime},
57};
58
59use crate::format::{is_detached_version, Index, Manifest};
60
61const VERSIONS_DIR: &str = "_versions";
62const MANIFEST_EXTENSION: &str = "manifest";
63const DETACHED_VERSION_PREFIX: &str = "d";
64
65/// How manifest files should be named.
66#[derive(Clone, Copy, Debug, PartialEq, Eq)]
67pub enum ManifestNamingScheme {
68    /// `_versions/{version}.manifest`
69    V1,
70    /// `_manifests/{u64::MAX - version}.manifest`
71    ///
72    /// Zero-padded and reversed for O(1) lookup of latest version on object stores.
73    V2,
74}
75
76impl ManifestNamingScheme {
77    pub fn manifest_path(&self, base: &Path, version: u64) -> Path {
78        let directory = base.child(VERSIONS_DIR);
79        if is_detached_version(version) {
80            // Detached versions should never show up first in a list operation which
81            // means it needs to come lexicographically after all attached manifest
82            // files and so we add the prefix `d`.  There is no need to invert the
83            // version number since detached versions are not part of the version
84            let directory = base.child(VERSIONS_DIR);
85            directory.child(format!(
86                "{DETACHED_VERSION_PREFIX}{version}.{MANIFEST_EXTENSION}"
87            ))
88        } else {
89            match self {
90                Self::V1 => directory.child(format!("{version}.{MANIFEST_EXTENSION}")),
91                Self::V2 => {
92                    let inverted_version = u64::MAX - version;
93                    directory.child(format!("{inverted_version:020}.{MANIFEST_EXTENSION}"))
94                }
95            }
96        }
97    }
98
99    pub fn parse_version(&self, filename: &str) -> Option<u64> {
100        let file_number = filename
101            .split_once('.')
102            // Detached versions will fail the `parse` step, which is ok.
103            .and_then(|(version_str, _)| version_str.parse::<u64>().ok());
104        match self {
105            Self::V1 => file_number,
106            Self::V2 => file_number.map(|v| u64::MAX - v),
107        }
108    }
109
110    pub fn detect_scheme(filename: &str) -> Option<Self> {
111        if filename.starts_with(DETACHED_VERSION_PREFIX) {
112            // Currently, detached versions must imply V2
113            return Some(Self::V2);
114        }
115        if filename.ends_with(MANIFEST_EXTENSION) {
116            const V2_LEN: usize = 20 + 1 + MANIFEST_EXTENSION.len();
117            if filename.len() == V2_LEN {
118                Some(Self::V2)
119            } else {
120                Some(Self::V1)
121            }
122        } else {
123            None
124        }
125    }
126
127    pub fn detect_scheme_staging(filename: &str) -> Self {
128        // We shouldn't have to worry about detached versions here since there is no
129        // such thing as "detached" and "staged" at the same time.
130        if filename.chars().nth(20) == Some('.') {
131            Self::V2
132        } else {
133            Self::V1
134        }
135    }
136}
137
138/// Migrate all V1 manifests to V2 naming scheme.
139///
140/// This function will rename all V1 manifests to V2 naming scheme.
141///
142/// This function is idempotent, and can be run multiple times without
143/// changing the state of the object store.
144///
145/// However, it should not be run while other concurrent operations are happening.
146/// And it should also run until completion before resuming other operations.
147pub async fn migrate_scheme_to_v2(object_store: &ObjectStore, dataset_base: &Path) -> Result<()> {
148    object_store
149        .inner
150        .list(Some(&dataset_base.child(VERSIONS_DIR)))
151        .try_filter(|res| {
152            let res = if let Some(filename) = res.location.filename() {
153                ManifestNamingScheme::detect_scheme(filename) == Some(ManifestNamingScheme::V1)
154            } else {
155                false
156            };
157            future::ready(res)
158        })
159        .try_for_each_concurrent(object_store.io_parallelism(), |meta| async move {
160            let filename = meta.location.filename().unwrap();
161            let version = ManifestNamingScheme::V1.parse_version(filename).unwrap();
162            let path = ManifestNamingScheme::V2.manifest_path(dataset_base, version);
163            object_store.inner.rename(&meta.location, &path).await?;
164            Ok(())
165        })
166        .await?;
167
168    Ok(())
169}
170
171/// Function that writes the manifest to the object store.
172pub type ManifestWriter = for<'a> fn(
173    object_store: &'a ObjectStore,
174    manifest: &'a mut Manifest,
175    indices: Option<Vec<Index>>,
176    path: &'a Path,
177) -> BoxFuture<'a, Result<()>>;
178
179#[derive(Debug)]
180pub struct ManifestLocation {
181    /// The version the manifest corresponds to.
182    pub version: u64,
183    /// Path of the manifest file, relative to the table root.
184    pub path: Path,
185    /// Size, in bytes, of the manifest file. If it is not known, this field should be `None`.
186    pub size: Option<u64>,
187    /// Naming scheme of the manifest file.
188    pub naming_scheme: ManifestNamingScheme,
189}
190
191/// Get the latest manifest path
192async fn current_manifest_path(
193    object_store: &ObjectStore,
194    base: &Path,
195) -> Result<ManifestLocation> {
196    if object_store.is_local() {
197        if let Ok(Some(location)) = current_manifest_local(base) {
198            return Ok(location);
199        }
200    }
201
202    let manifest_files = object_store.inner.list(Some(&base.child(VERSIONS_DIR)));
203
204    let mut valid_manifests = manifest_files.try_filter_map(|res| {
205        if let Some(scheme) = ManifestNamingScheme::detect_scheme(res.location.filename().unwrap())
206        {
207            future::ready(Ok(Some((scheme, res))))
208        } else {
209            future::ready(Ok(None))
210        }
211    });
212
213    let first = valid_manifests.next().await.transpose()?;
214    match (first, object_store.list_is_lexically_ordered) {
215        // If the first valid manifest we see is V2, we can assume that we are using
216        // V2 naming scheme for all manifests.
217        (Some((scheme @ ManifestNamingScheme::V2, meta)), true) => {
218            let version = scheme
219                .parse_version(meta.location.filename().unwrap())
220                .unwrap();
221
222            // Sanity check: verify at least for the first 1k files that they are all V2
223            // and that the version numbers are decreasing. We use the first 1k because
224            // this is the typical size of an object store list endpoint response page.
225            for (scheme, meta) in valid_manifests.take(999).try_collect::<Vec<_>>().await? {
226                if scheme != ManifestNamingScheme::V2 {
227                    warn!(
228                        "Found V1 Manifest in a V2 directory. Use `migrate_manifest_paths_v2` \
229                         to migrate the directory."
230                    );
231                    break;
232                }
233                let next_version = scheme
234                    .parse_version(meta.location.filename().unwrap())
235                    .unwrap();
236                if next_version >= version {
237                    warn!(
238                        "List operation was expected to be lexically ordered, but was not. This \
239                         could mean a corrupt read. Please make a bug report on the lancedb/lance \
240                         GitHub repository."
241                    );
242                    break;
243                }
244            }
245
246            Ok(ManifestLocation {
247                version,
248                path: meta.location,
249                size: Some(meta.size as u64),
250                naming_scheme: scheme,
251            })
252        }
253        // If the first valid manifest we see if V1, assume for now that we are
254        // using V1 naming scheme for all manifests. Since we are listing the
255        // directory anyways, we will assert there aren't any V2 manifests.
256        (Some((scheme, meta)), _) => {
257            let mut current_version = scheme
258                .parse_version(meta.location.filename().unwrap())
259                .unwrap();
260            let mut current_meta = meta;
261
262            while let Some((scheme, meta)) = valid_manifests.next().await.transpose()? {
263                if matches!(scheme, ManifestNamingScheme::V2) {
264                    return Err(Error::Internal {
265                        message: "Found V2 manifest in a V1 manifest directory".to_string(),
266                        location: location!(),
267                    });
268                }
269                let version = scheme
270                    .parse_version(meta.location.filename().unwrap())
271                    .unwrap();
272                if version > current_version {
273                    current_version = version;
274                    current_meta = meta;
275                }
276            }
277            Ok(ManifestLocation {
278                version: current_version,
279                path: current_meta.location,
280                size: Some(current_meta.size as u64),
281                naming_scheme: scheme,
282            })
283        }
284        (None, _) => Err(Error::NotFound {
285            uri: base.child(VERSIONS_DIR).to_string(),
286            location: location!(),
287        }),
288    }
289}
290
291// This is an optimized function that searches for the latest manifest. In
292// object_store, list operations lookup metadata for each file listed. This
293// method only gets the metadata for the found latest manifest.
294fn current_manifest_local(base: &Path) -> std::io::Result<Option<ManifestLocation>> {
295    let path = lance_io::local::to_local_path(&base.child(VERSIONS_DIR));
296    let entries = std::fs::read_dir(path)?;
297
298    let mut latest_entry: Option<(u64, DirEntry)> = None;
299
300    let mut scheme: Option<ManifestNamingScheme> = None;
301
302    for entry in entries {
303        let entry = entry?;
304        let filename_raw = entry.file_name();
305        let filename = filename_raw.to_string_lossy();
306
307        let Some(entry_scheme) = ManifestNamingScheme::detect_scheme(&filename) else {
308            // Need to ignore temporary files, such as
309            // .tmp_7.manifest_9c100374-3298-4537-afc6-f5ee7913666d
310            continue;
311        };
312
313        if let Some(scheme) = scheme {
314            if scheme != entry_scheme {
315                return Err(io::Error::new(
316                    io::ErrorKind::InvalidData,
317                    format!(
318                        "Found multiple manifest naming schemes in the same directory: {:?} and {:?}",
319                        scheme, entry_scheme
320                    ),
321                ));
322            }
323        } else {
324            scheme = Some(entry_scheme);
325        }
326
327        let Some(version) = entry_scheme.parse_version(&filename) else {
328            continue;
329        };
330
331        if let Some((latest_version, _)) = &latest_entry {
332            if version > *latest_version {
333                latest_entry = Some((version, entry));
334            }
335        } else {
336            latest_entry = Some((version, entry));
337        }
338    }
339
340    if let Some((version, entry)) = latest_entry {
341        let path = Path::from_filesystem_path(entry.path())
342            .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?;
343        Ok(Some(ManifestLocation {
344            version,
345            path,
346            size: Some(entry.metadata()?.len()),
347            naming_scheme: scheme.unwrap(),
348        }))
349    } else {
350        Ok(None)
351    }
352}
353
354async fn list_manifests<'a>(
355    base_path: &Path,
356    object_store: &'a dyn OSObjectStore,
357) -> Result<BoxStream<'a, Result<Path>>> {
358    Ok(object_store
359        .read_dir_all(&base_path.child(VERSIONS_DIR), None)
360        .await?
361        .try_filter_map(|obj_meta| {
362            if obj_meta.location.extension() == Some(MANIFEST_EXTENSION) {
363                future::ready(Ok(Some(obj_meta.location)))
364            } else {
365                future::ready(Ok(None))
366            }
367        })
368        .boxed())
369}
370
371pub fn parse_version_from_path(path: &Path) -> Result<u64> {
372    path.filename()
373        .and_then(|name| name.split_once('.'))
374        .filter(|(_, extension)| *extension == MANIFEST_EXTENSION)
375        .and_then(|(version, _)| version.parse::<u64>().ok())
376        .ok_or(Error::Internal {
377            message: format!("Expected manifest file, but found {}", path),
378            location: location!(),
379        })
380}
381
382fn make_staging_manifest_path(base: &Path) -> Result<Path> {
383    let id = uuid::Uuid::new_v4().to_string();
384    Path::parse(format!("{base}-{id}")).map_err(|e| Error::IO {
385        source: Box::new(e),
386        location: location!(),
387    })
388}
389
390#[cfg(feature = "dynamodb")]
391const DDB_URL_QUERY_KEY: &str = "ddbTableName";
392
393/// Handle commits that prevent conflicting writes.
394///
395/// Commit implementations ensure that if there are multiple concurrent writers
396/// attempting to write the next version of a table, only one will win. In order
397/// to work, all writers must use the same commit handler type.
398/// This trait is also responsible for resolving where the manifests live.
399///
400// TODO: pub(crate)
401#[async_trait::async_trait]
402pub trait CommitHandler: Debug + Send + Sync {
403    async fn resolve_latest_location(
404        &self,
405        base_path: &Path,
406        object_store: &ObjectStore,
407    ) -> Result<ManifestLocation> {
408        Ok(current_manifest_path(object_store, base_path).await?)
409    }
410
411    /// Get the path to the latest version manifest of a dataset at the base_path
412    async fn resolve_latest_version(
413        &self,
414        base_path: &Path,
415        object_store: &ObjectStore,
416    ) -> std::result::Result<Path, Error> {
417        // TODO: we need to pade 0's to the version number on the manifest file path
418        Ok(current_manifest_path(object_store, base_path).await?.path)
419    }
420
421    // for default implementation, parse the version from the path
422    async fn resolve_latest_version_id(
423        &self,
424        base_path: &Path,
425        object_store: &ObjectStore,
426    ) -> Result<u64> {
427        Ok(current_manifest_path(object_store, base_path)
428            .await?
429            .version)
430    }
431
432    /// Get the path to a specific versioned manifest of a dataset at the base_path
433    ///
434    /// The version must already exist.
435    async fn resolve_version(
436        &self,
437        base_path: &Path,
438        version: u64,
439        object_store: &dyn OSObjectStore,
440    ) -> std::result::Result<Path, Error> {
441        Ok(default_resolve_version(base_path, version, object_store)
442            .await?
443            .path)
444    }
445
446    async fn resolve_version_location(
447        &self,
448        base_path: &Path,
449        version: u64,
450        object_store: &dyn OSObjectStore,
451    ) -> Result<ManifestLocation> {
452        default_resolve_version(base_path, version, object_store).await
453    }
454
455    /// List manifests that are available for a dataset at the base_path
456    async fn list_manifests<'a>(
457        &self,
458        base_path: &Path,
459        object_store: &'a dyn OSObjectStore,
460    ) -> Result<BoxStream<'a, Result<Path>>> {
461        list_manifests(base_path, object_store).await
462    }
463
464    /// Commit a manifest.
465    ///
466    /// This function should return an [CommitError::CommitConflict] if another
467    /// transaction has already been committed to the path.
468    async fn commit(
469        &self,
470        manifest: &mut Manifest,
471        indices: Option<Vec<Index>>,
472        base_path: &Path,
473        object_store: &ObjectStore,
474        manifest_writer: ManifestWriter,
475        naming_scheme: ManifestNamingScheme,
476    ) -> std::result::Result<Path, CommitError>;
477
478    /// Delete the recorded manifest information for a dataset at the base_path
479    async fn delete(&self, _base_path: &Path) -> Result<()> {
480        Ok(())
481    }
482}
483
484async fn default_resolve_version(
485    base_path: &Path,
486    version: u64,
487    object_store: &dyn OSObjectStore,
488) -> Result<ManifestLocation> {
489    if is_detached_version(version) {
490        return Ok(ManifestLocation {
491            version,
492            // Detached versions are not supported with V1 naming scheme.  If we need
493            // to support in the future we could use a different prefix (e.g. 'x' or something)
494            naming_scheme: ManifestNamingScheme::V2,
495            // Both V1 and V2 should give the same path for detached versions
496            path: ManifestNamingScheme::V2.manifest_path(base_path, version),
497            size: None,
498        });
499    }
500
501    // try V2, fallback to V1.
502    let scheme = ManifestNamingScheme::V2;
503    let path = scheme.manifest_path(base_path, version);
504    match object_store.head(&path).await {
505        Ok(meta) => Ok(ManifestLocation {
506            version,
507            path,
508            size: Some(meta.size as u64),
509            naming_scheme: scheme,
510        }),
511        Err(ObjectStoreError::NotFound { .. }) => {
512            // fallback to V1
513            let scheme = ManifestNamingScheme::V1;
514            Ok(ManifestLocation {
515                version,
516                path: scheme.manifest_path(base_path, version),
517                size: None,
518                naming_scheme: scheme,
519            })
520        }
521        Err(e) => Err(e.into()),
522    }
523}
524/// Adapt an object_store credentials into AWS SDK creds
525#[cfg(feature = "dynamodb")]
526#[derive(Debug)]
527struct OSObjectStoreToAwsCredAdaptor(AwsCredentialProvider);
528
529#[cfg(feature = "dynamodb")]
530impl ProvideCredentials for OSObjectStoreToAwsCredAdaptor {
531    fn provide_credentials<'a>(
532        &'a self,
533    ) -> aws_credential_types::provider::future::ProvideCredentials<'a>
534    where
535        Self: 'a,
536    {
537        aws_credential_types::provider::future::ProvideCredentials::new(async {
538            let creds = self
539                .0
540                .get_credential()
541                .await
542                .map_err(|e| CredentialsError::provider_error(Box::new(e)))?;
543            Ok(aws_credential_types::Credentials::new(
544                &creds.key_id,
545                &creds.secret_key,
546                creds.token.clone(),
547                Some(
548                    SystemTime::now()
549                        .checked_add(Duration::from_secs(
550                            60 * 10, //  10 min
551                        ))
552                        .expect("overflow"),
553                ),
554                "",
555            ))
556        })
557    }
558}
559
560#[cfg(feature = "dynamodb")]
561async fn build_dynamodb_external_store(
562    table_name: &str,
563    creds: AwsCredentialProvider,
564    region: &str,
565    endpoint: Option<String>,
566    app_name: &str,
567) -> Result<Arc<dyn ExternalManifestStore>> {
568    use super::commit::dynamodb::DynamoDBExternalManifestStore;
569    use aws_sdk_dynamodb::{
570        config::{IdentityCache, Region},
571        Client,
572    };
573
574    let mut dynamodb_config = aws_sdk_dynamodb::config::Builder::new()
575        .behavior_version_latest()
576        .region(Some(Region::new(region.to_string())))
577        .credentials_provider(OSObjectStoreToAwsCredAdaptor(creds))
578        // caching should be handled by passed AwsCredentialProvider
579        .identity_cache(IdentityCache::no_cache());
580
581    if let Some(endpoint) = endpoint {
582        dynamodb_config = dynamodb_config.endpoint_url(endpoint);
583    }
584    let client = Client::from_conf(dynamodb_config.build());
585
586    DynamoDBExternalManifestStore::new_external_store(client.into(), table_name, app_name).await
587}
588
589pub async fn commit_handler_from_url(
590    url_or_path: &str,
591    // This looks unused if dynamodb feature disabled
592    #[allow(unused_variables)] options: &Option<ObjectStoreParams>,
593) -> Result<Arc<dyn CommitHandler>> {
594    let url = match Url::parse(url_or_path) {
595        Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
596            // On Windows, the drive is parsed as a scheme
597            return Ok(Arc::new(RenameCommitHandler));
598        }
599        Ok(url) => url,
600        Err(_) => {
601            return Ok(Arc::new(RenameCommitHandler));
602        }
603    };
604
605    match url.scheme() {
606        // TODO: for Cloudflare R2 and Minio, we can provide a PutIfNotExist commit handler
607        // See: https://docs.rs/object_store/latest/object_store/aws/enum.S3ConditionalPut.html#variant.ETagMatch
608        "s3" => Ok(Arc::new(UnsafeCommitHandler)),
609        #[cfg(not(feature = "dynamodb"))]
610        "s3+ddb" => Err(Error::InvalidInput {
611            source: "`s3+ddb://` scheme requires `dynamodb` feature to be enabled".into(),
612            location: location!(),
613        }),
614        #[cfg(feature = "dynamodb")]
615        "s3+ddb" => {
616            if url.query_pairs().count() != 1 {
617                return Err(Error::InvalidInput {
618                    source: "`s3+ddb://` scheme and expects exactly one query `ddbTableName`"
619                        .into(),
620                    location: location!(),
621                });
622            }
623            let table_name = match url.query_pairs().next() {
624                Some((Cow::Borrowed(key), Cow::Borrowed(table_name)))
625                    if key == DDB_URL_QUERY_KEY =>
626                {
627                    if table_name.is_empty() {
628                        return Err(Error::InvalidInput {
629                            source: "`s3+ddb://` scheme requires non empty dynamodb table name"
630                                .into(),
631                            location: location!(),
632                        });
633                    }
634                    table_name
635                }
636                _ => {
637                    return Err(Error::InvalidInput {
638                        source: "`s3+ddb://` scheme and expects exactly one query `ddbTableName`"
639                            .into(),
640                        location: location!(),
641                    });
642                }
643            };
644            let options = options.clone().unwrap_or_default();
645            let storage_options = StorageOptions(options.storage_options.unwrap_or_default());
646            let dynamo_endpoint = get_dynamodb_endpoint(&storage_options);
647            let storage_options = storage_options.as_s3_options();
648
649            let region = storage_options
650                .get(&AmazonS3ConfigKey::Region)
651                .map(|s| s.to_string());
652
653            let (aws_creds, region) = build_aws_credential(
654                options.s3_credentials_refresh_offset,
655                options.aws_credentials.clone(),
656                Some(&storage_options),
657                region,
658            )
659            .await?;
660
661            Ok(Arc::new(ExternalManifestCommitHandler {
662                external_manifest_store: build_dynamodb_external_store(
663                    table_name,
664                    aws_creds.clone(),
665                    &region,
666                    dynamo_endpoint,
667                    "lancedb",
668                )
669                .await?,
670            }))
671        }
672        "gs" | "az" | "file" | "file-object-store" | "memory" => Ok(Arc::new(RenameCommitHandler)),
673        _ => Ok(Arc::new(UnsafeCommitHandler)),
674    }
675}
676
677#[cfg(feature = "dynamodb")]
678fn get_dynamodb_endpoint(storage_options: &StorageOptions) -> Option<String> {
679    if let Some(endpoint) = storage_options.0.get("dynamodb_endpoint") {
680        Some(endpoint.to_string())
681    } else {
682        std::env::var("DYNAMODB_ENDPOINT").ok()
683    }
684}
685
686/// Errors that can occur when committing a manifest.
687#[derive(Debug)]
688pub enum CommitError {
689    /// Another transaction has already been written to the path
690    CommitConflict,
691    /// Something else went wrong
692    OtherError(Error),
693}
694
695impl From<Error> for CommitError {
696    fn from(e: Error) -> Self {
697        Self::OtherError(e)
698    }
699}
700
701impl From<CommitError> for Error {
702    fn from(e: CommitError) -> Self {
703        match e {
704            CommitError::CommitConflict => Self::Internal {
705                message: "Commit conflict".to_string(),
706                location: location!(),
707            },
708            CommitError::OtherError(e) => e,
709        }
710    }
711}
712
713/// Whether we have issued a warning about using the unsafe commit handler.
714static WARNED_ON_UNSAFE_COMMIT: AtomicBool = AtomicBool::new(false);
715
716/// A naive commit implementation that does not prevent conflicting writes.
717///
718/// This will log a warning the first time it is used.
719pub struct UnsafeCommitHandler;
720
721#[async_trait::async_trait]
722impl CommitHandler for UnsafeCommitHandler {
723    async fn commit(
724        &self,
725        manifest: &mut Manifest,
726        indices: Option<Vec<Index>>,
727        base_path: &Path,
728        object_store: &ObjectStore,
729        manifest_writer: ManifestWriter,
730        naming_scheme: ManifestNamingScheme,
731    ) -> std::result::Result<Path, CommitError> {
732        // Log a one-time warning
733        if !WARNED_ON_UNSAFE_COMMIT.load(std::sync::atomic::Ordering::Relaxed) {
734            WARNED_ON_UNSAFE_COMMIT.store(true, std::sync::atomic::Ordering::Relaxed);
735            log::warn!(
736                "Using unsafe commit handler. Concurrent writes may result in data loss. \
737                 Consider providing a commit handler that prevents conflicting writes."
738            );
739        }
740
741        let version_path = naming_scheme.manifest_path(base_path, manifest.version);
742        // Write the manifest naively
743        manifest_writer(object_store, manifest, indices, &version_path).await?;
744
745        Ok(version_path)
746    }
747}
748
749impl Debug for UnsafeCommitHandler {
750    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
751        f.debug_struct("UnsafeCommitHandler").finish()
752    }
753}
754
755/// A commit implementation that uses a lock to prevent conflicting writes.
756#[async_trait::async_trait]
757pub trait CommitLock: Debug {
758    type Lease: CommitLease;
759
760    /// Attempt to lock the table for the given version.
761    ///
762    /// If it is already locked by another transaction, wait until it is unlocked.
763    /// Once it is unlocked, return [CommitError::CommitConflict] if the version
764    /// has already been committed. Otherwise, return the lock.
765    ///
766    /// To prevent poisoned locks, it's recommended to set a timeout on the lock
767    /// of at least 30 seconds.
768    ///
769    /// It is not required that the lock tracks the version. It is provided in
770    /// case the locking is handled by a catalog service that needs to know the
771    /// current version of the table.
772    async fn lock(&self, version: u64) -> std::result::Result<Self::Lease, CommitError>;
773}
774
775#[async_trait::async_trait]
776pub trait CommitLease: Send + Sync {
777    /// Return the lease, indicating whether the commit was successful.
778    async fn release(&self, success: bool) -> std::result::Result<(), CommitError>;
779}
780
781#[async_trait::async_trait]
782impl<T: CommitLock + Send + Sync> CommitHandler for T {
783    async fn commit(
784        &self,
785        manifest: &mut Manifest,
786        indices: Option<Vec<Index>>,
787        base_path: &Path,
788        object_store: &ObjectStore,
789        manifest_writer: ManifestWriter,
790        naming_scheme: ManifestNamingScheme,
791    ) -> std::result::Result<Path, CommitError> {
792        let path = naming_scheme.manifest_path(base_path, manifest.version);
793        // NOTE: once we have the lease we cannot use ? to return errors, since
794        // we must release the lease before returning.
795        let lease = self.lock(manifest.version).await?;
796
797        // Head the location and make sure it's not already committed
798        match object_store.inner.head(&path).await {
799            Ok(_) => {
800                // The path already exists, so it's already committed
801                // Release the lock
802                lease.release(false).await?;
803
804                return Err(CommitError::CommitConflict);
805            }
806            Err(ObjectStoreError::NotFound { .. }) => {}
807            Err(e) => {
808                // Something else went wrong
809                // Release the lock
810                lease.release(false).await?;
811
812                return Err(CommitError::OtherError(e.into()));
813            }
814        }
815        let res = manifest_writer(object_store, manifest, indices, &path).await;
816
817        // Release the lock
818        lease.release(res.is_ok()).await?;
819
820        res.map_err(|err| err.into()).map(|_| path)
821    }
822}
823
824#[async_trait::async_trait]
825impl<T: CommitLock + Send + Sync> CommitHandler for Arc<T> {
826    async fn commit(
827        &self,
828        manifest: &mut Manifest,
829        indices: Option<Vec<Index>>,
830        base_path: &Path,
831        object_store: &ObjectStore,
832        manifest_writer: ManifestWriter,
833        naming_scheme: ManifestNamingScheme,
834    ) -> std::result::Result<Path, CommitError> {
835        self.as_ref()
836            .commit(
837                manifest,
838                indices,
839                base_path,
840                object_store,
841                manifest_writer,
842                naming_scheme,
843            )
844            .await
845    }
846}
847
848/// A commit implementation that uses a temporary path and renames the object.
849///
850/// This only works for object stores that support atomic rename if not exist.
851pub struct RenameCommitHandler;
852
853#[async_trait::async_trait]
854impl CommitHandler for RenameCommitHandler {
855    async fn commit(
856        &self,
857        manifest: &mut Manifest,
858        indices: Option<Vec<Index>>,
859        base_path: &Path,
860        object_store: &ObjectStore,
861        manifest_writer: ManifestWriter,
862        naming_scheme: ManifestNamingScheme,
863    ) -> std::result::Result<Path, CommitError> {
864        // Create a temporary object, then use `rename_if_not_exists` to commit.
865        // If failed, clean up the temporary object.
866
867        let path = naming_scheme.manifest_path(base_path, manifest.version);
868        let tmp_path = make_staging_manifest_path(&path)?;
869
870        // Write the manifest to the temporary path
871        manifest_writer(object_store, manifest, indices, &tmp_path).await?;
872
873        match object_store
874            .inner
875            .rename_if_not_exists(&tmp_path, &path)
876            .await
877        {
878            Ok(_) => Ok(path),
879            Err(ObjectStoreError::AlreadyExists { .. }) => {
880                // Another transaction has already been committed
881                // Attempt to clean up temporary object, but ignore errors if we can't
882                let _ = object_store.delete(&tmp_path).await;
883
884                return Err(CommitError::CommitConflict);
885            }
886            Err(e) => {
887                // Something else went wrong
888                return Err(CommitError::OtherError(e.into()));
889            }
890        }
891    }
892}
893
894impl Debug for RenameCommitHandler {
895    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
896        f.debug_struct("RenameCommitHandler").finish()
897    }
898}
899
900#[derive(Debug, Clone)]
901pub struct CommitConfig {
902    pub num_retries: u32,
903    // TODO: add isolation_level
904}
905
906impl Default for CommitConfig {
907    fn default() -> Self {
908        Self { num_retries: 20 }
909    }
910}
911
912#[cfg(test)]
913mod tests {
914    use super::*;
915
916    #[test]
917    fn test_manifest_naming_scheme() {
918        let v1 = ManifestNamingScheme::V1;
919        let v2 = ManifestNamingScheme::V2;
920
921        assert_eq!(
922            v1.manifest_path(&Path::from("base"), 0),
923            Path::from("base/_versions/0.manifest")
924        );
925        assert_eq!(
926            v1.manifest_path(&Path::from("base"), 42),
927            Path::from("base/_versions/42.manifest")
928        );
929
930        assert_eq!(
931            v2.manifest_path(&Path::from("base"), 0),
932            Path::from("base/_versions/18446744073709551615.manifest")
933        );
934        assert_eq!(
935            v2.manifest_path(&Path::from("base"), 42),
936            Path::from("base/_versions/18446744073709551573.manifest")
937        );
938
939        assert_eq!(v1.parse_version("0.manifest"), Some(0));
940        assert_eq!(v1.parse_version("42.manifest"), Some(42));
941        assert_eq!(
942            v1.parse_version("42.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
943            Some(42)
944        );
945
946        assert_eq!(v2.parse_version("18446744073709551615.manifest"), Some(0));
947        assert_eq!(v2.parse_version("18446744073709551573.manifest"), Some(42));
948        assert_eq!(
949            v2.parse_version("18446744073709551573.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
950            Some(42)
951        );
952
953        assert_eq!(ManifestNamingScheme::detect_scheme("0.manifest"), Some(v1));
954        assert_eq!(
955            ManifestNamingScheme::detect_scheme("18446744073709551615.manifest"),
956            Some(v2)
957        );
958        assert_eq!(ManifestNamingScheme::detect_scheme("something else"), None);
959    }
960
961    #[tokio::test]
962    async fn test_manifest_naming_migration() {
963        let object_store = ObjectStore::memory();
964        let base = Path::from("base");
965        let versions_dir = base.child(VERSIONS_DIR);
966
967        // Write two v1 files and one v1
968        let original_files = vec![
969            versions_dir.child("irrelevant"),
970            ManifestNamingScheme::V1.manifest_path(&base, 0),
971            ManifestNamingScheme::V2.manifest_path(&base, 1),
972        ];
973        for path in original_files {
974            object_store.put(&path, b"".as_slice()).await.unwrap();
975        }
976
977        migrate_scheme_to_v2(&object_store, &base).await.unwrap();
978
979        let expected_files = vec![
980            ManifestNamingScheme::V2.manifest_path(&base, 1),
981            ManifestNamingScheme::V2.manifest_path(&base, 0),
982            versions_dir.child("irrelevant"),
983        ];
984        let actual_files = object_store
985            .inner
986            .list(Some(&versions_dir))
987            .map_ok(|res| res.location)
988            .try_collect::<Vec<_>>()
989            .await
990            .unwrap();
991        assert_eq!(actual_files, expected_files);
992    }
993}