lance_table/io/
commit.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Trait for commit implementations.
5//!
6//! In Lance, a transaction is committed by writing the next manifest file.
7//! However, care should be taken to ensure that the manifest file is written
8//! only once, even if there are concurrent writers. Different stores have
9//! different abilities to handle concurrent writes, so a trait is provided
10//! to allow for different implementations.
11//!
12//! The trait [CommitHandler] can be implemented to provide different commit
13//! strategies. The default implementation for most object stores is
14//! [RenameCommitHandler], which writes the manifest to a temporary path, then
15//! renames the temporary path to the final path if no object already exists
16//! at the final path. This is an atomic operation in most object stores, but
17//! not in AWS S3. So for AWS S3, the default commit handler is
18//! [UnsafeCommitHandler], which writes the manifest to the final path without
19//! any checks.
20//!
21//! When providing your own commit handler, most often you are implementing in
22//! terms of a lock. The trait [CommitLock] can be implemented as a simpler
23//! alternative to [CommitHandler].
24
25use std::io;
26use std::sync::atomic::AtomicBool;
27use std::sync::Arc;
28use std::{fmt::Debug, fs::DirEntry};
29
30use futures::{
31    future::{self, BoxFuture},
32    stream::BoxStream,
33    StreamExt, TryStreamExt,
34};
35use log::warn;
36use object_store::PutOptions;
37use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore};
38use snafu::location;
39use url::Url;
40
41#[cfg(feature = "dynamodb")]
42pub mod dynamodb;
43pub mod external_manifest;
44
45use lance_core::{Error, Result};
46use lance_io::object_store::{ObjectStore, ObjectStoreExt, ObjectStoreParams};
47
48#[cfg(feature = "dynamodb")]
49use {
50    self::external_manifest::{ExternalManifestCommitHandler, ExternalManifestStore},
51    aws_credential_types::provider::error::CredentialsError,
52    aws_credential_types::provider::ProvideCredentials,
53    lance_io::object_store::{build_aws_credential, StorageOptions},
54    object_store::aws::AmazonS3ConfigKey,
55    object_store::aws::AwsCredentialProvider,
56    std::borrow::Cow,
57    std::time::{Duration, SystemTime},
58};
59
60use crate::format::{is_detached_version, Index, Manifest};
61
62const VERSIONS_DIR: &str = "_versions";
63const MANIFEST_EXTENSION: &str = "manifest";
64const DETACHED_VERSION_PREFIX: &str = "d";
65
66/// How manifest files should be named.
67#[derive(Clone, Copy, Debug, PartialEq, Eq)]
68pub enum ManifestNamingScheme {
69    /// `_versions/{version}.manifest`
70    V1,
71    /// `_manifests/{u64::MAX - version}.manifest`
72    ///
73    /// Zero-padded and reversed for O(1) lookup of latest version on object stores.
74    V2,
75}
76
77impl ManifestNamingScheme {
78    pub fn manifest_path(&self, base: &Path, version: u64) -> Path {
79        let directory = base.child(VERSIONS_DIR);
80        if is_detached_version(version) {
81            // Detached versions should never show up first in a list operation which
82            // means it needs to come lexicographically after all attached manifest
83            // files and so we add the prefix `d`.  There is no need to invert the
84            // version number since detached versions are not part of the version
85            let directory = base.child(VERSIONS_DIR);
86            directory.child(format!(
87                "{DETACHED_VERSION_PREFIX}{version}.{MANIFEST_EXTENSION}"
88            ))
89        } else {
90            match self {
91                Self::V1 => directory.child(format!("{version}.{MANIFEST_EXTENSION}")),
92                Self::V2 => {
93                    let inverted_version = u64::MAX - version;
94                    directory.child(format!("{inverted_version:020}.{MANIFEST_EXTENSION}"))
95                }
96            }
97        }
98    }
99
100    pub fn parse_version(&self, filename: &str) -> Option<u64> {
101        let file_number = filename
102            .split_once('.')
103            // Detached versions will fail the `parse` step, which is ok.
104            .and_then(|(version_str, _)| version_str.parse::<u64>().ok());
105        match self {
106            Self::V1 => file_number,
107            Self::V2 => file_number.map(|v| u64::MAX - v),
108        }
109    }
110
111    pub fn detect_scheme(filename: &str) -> Option<Self> {
112        if filename.starts_with(DETACHED_VERSION_PREFIX) {
113            // Currently, detached versions must imply V2
114            return Some(Self::V2);
115        }
116        if filename.ends_with(MANIFEST_EXTENSION) {
117            const V2_LEN: usize = 20 + 1 + MANIFEST_EXTENSION.len();
118            if filename.len() == V2_LEN {
119                Some(Self::V2)
120            } else {
121                Some(Self::V1)
122            }
123        } else {
124            None
125        }
126    }
127
128    pub fn detect_scheme_staging(filename: &str) -> Self {
129        // We shouldn't have to worry about detached versions here since there is no
130        // such thing as "detached" and "staged" at the same time.
131        if filename.chars().nth(20) == Some('.') {
132            Self::V2
133        } else {
134            Self::V1
135        }
136    }
137}
138
139/// Migrate all V1 manifests to V2 naming scheme.
140///
141/// This function will rename all V1 manifests to V2 naming scheme.
142///
143/// This function is idempotent, and can be run multiple times without
144/// changing the state of the object store.
145///
146/// However, it should not be run while other concurrent operations are happening.
147/// And it should also run until completion before resuming other operations.
148pub async fn migrate_scheme_to_v2(object_store: &ObjectStore, dataset_base: &Path) -> Result<()> {
149    object_store
150        .inner
151        .list(Some(&dataset_base.child(VERSIONS_DIR)))
152        .try_filter(|res| {
153            let res = if let Some(filename) = res.location.filename() {
154                ManifestNamingScheme::detect_scheme(filename) == Some(ManifestNamingScheme::V1)
155            } else {
156                false
157            };
158            future::ready(res)
159        })
160        .try_for_each_concurrent(object_store.io_parallelism(), |meta| async move {
161            let filename = meta.location.filename().unwrap();
162            let version = ManifestNamingScheme::V1.parse_version(filename).unwrap();
163            let path = ManifestNamingScheme::V2.manifest_path(dataset_base, version);
164            object_store.inner.rename(&meta.location, &path).await?;
165            Ok(())
166        })
167        .await?;
168
169    Ok(())
170}
171
172/// Function that writes the manifest to the object store.
173///
174/// Returns the size of the written manifest.
175pub type ManifestWriter = for<'a> fn(
176    object_store: &'a ObjectStore,
177    manifest: &'a mut Manifest,
178    indices: Option<Vec<Index>>,
179    path: &'a Path,
180) -> BoxFuture<'a, Result<u64>>;
181
182#[derive(Debug)]
183pub struct ManifestLocation {
184    /// The version the manifest corresponds to.
185    pub version: u64,
186    /// Path of the manifest file, relative to the table root.
187    pub path: Path,
188    /// Size, in bytes, of the manifest file. If it is not known, this field should be `None`.
189    pub size: Option<u64>,
190    /// Naming scheme of the manifest file.
191    pub naming_scheme: ManifestNamingScheme,
192}
193
194/// Get the latest manifest path
195async fn current_manifest_path(
196    object_store: &ObjectStore,
197    base: &Path,
198) -> Result<ManifestLocation> {
199    if object_store.is_local() {
200        if let Ok(Some(location)) = current_manifest_local(base) {
201            return Ok(location);
202        }
203    }
204
205    let manifest_files = object_store.inner.list(Some(&base.child(VERSIONS_DIR)));
206
207    let mut valid_manifests = manifest_files.try_filter_map(|res| {
208        if let Some(scheme) = ManifestNamingScheme::detect_scheme(res.location.filename().unwrap())
209        {
210            future::ready(Ok(Some((scheme, res))))
211        } else {
212            future::ready(Ok(None))
213        }
214    });
215
216    let first = valid_manifests.next().await.transpose()?;
217    match (first, object_store.list_is_lexically_ordered) {
218        // If the first valid manifest we see is V2, we can assume that we are using
219        // V2 naming scheme for all manifests.
220        (Some((scheme @ ManifestNamingScheme::V2, meta)), true) => {
221            let version = scheme
222                .parse_version(meta.location.filename().unwrap())
223                .unwrap();
224
225            // Sanity check: verify at least for the first 1k files that they are all V2
226            // and that the version numbers are decreasing. We use the first 1k because
227            // this is the typical size of an object store list endpoint response page.
228            for (scheme, meta) in valid_manifests.take(999).try_collect::<Vec<_>>().await? {
229                if scheme != ManifestNamingScheme::V2 {
230                    warn!(
231                        "Found V1 Manifest in a V2 directory. Use `migrate_manifest_paths_v2` \
232                         to migrate the directory."
233                    );
234                    break;
235                }
236                let next_version = scheme
237                    .parse_version(meta.location.filename().unwrap())
238                    .unwrap();
239                if next_version >= version {
240                    warn!(
241                        "List operation was expected to be lexically ordered, but was not. This \
242                         could mean a corrupt read. Please make a bug report on the lancedb/lance \
243                         GitHub repository."
244                    );
245                    break;
246                }
247            }
248
249            Ok(ManifestLocation {
250                version,
251                path: meta.location,
252                size: Some(meta.size as u64),
253                naming_scheme: scheme,
254            })
255        }
256        // If the first valid manifest we see if V1, assume for now that we are
257        // using V1 naming scheme for all manifests. Since we are listing the
258        // directory anyways, we will assert there aren't any V2 manifests.
259        (Some((scheme, meta)), _) => {
260            let mut current_version = scheme
261                .parse_version(meta.location.filename().unwrap())
262                .unwrap();
263            let mut current_meta = meta;
264
265            while let Some((scheme, meta)) = valid_manifests.next().await.transpose()? {
266                if matches!(scheme, ManifestNamingScheme::V2) {
267                    return Err(Error::Internal {
268                        message: "Found V2 manifest in a V1 manifest directory".to_string(),
269                        location: location!(),
270                    });
271                }
272                let version = scheme
273                    .parse_version(meta.location.filename().unwrap())
274                    .unwrap();
275                if version > current_version {
276                    current_version = version;
277                    current_meta = meta;
278                }
279            }
280            Ok(ManifestLocation {
281                version: current_version,
282                path: current_meta.location,
283                size: Some(current_meta.size as u64),
284                naming_scheme: scheme,
285            })
286        }
287        (None, _) => Err(Error::NotFound {
288            uri: base.child(VERSIONS_DIR).to_string(),
289            location: location!(),
290        }),
291    }
292}
293
294// This is an optimized function that searches for the latest manifest. In
295// object_store, list operations lookup metadata for each file listed. This
296// method only gets the metadata for the found latest manifest.
297fn current_manifest_local(base: &Path) -> std::io::Result<Option<ManifestLocation>> {
298    let path = lance_io::local::to_local_path(&base.child(VERSIONS_DIR));
299    let entries = std::fs::read_dir(path)?;
300
301    let mut latest_entry: Option<(u64, DirEntry)> = None;
302
303    let mut scheme: Option<ManifestNamingScheme> = None;
304
305    for entry in entries {
306        let entry = entry?;
307        let filename_raw = entry.file_name();
308        let filename = filename_raw.to_string_lossy();
309
310        let Some(entry_scheme) = ManifestNamingScheme::detect_scheme(&filename) else {
311            // Need to ignore temporary files, such as
312            // .tmp_7.manifest_9c100374-3298-4537-afc6-f5ee7913666d
313            continue;
314        };
315
316        if let Some(scheme) = scheme {
317            if scheme != entry_scheme {
318                return Err(io::Error::new(
319                    io::ErrorKind::InvalidData,
320                    format!(
321                        "Found multiple manifest naming schemes in the same directory: {:?} and {:?}",
322                        scheme, entry_scheme
323                    ),
324                ));
325            }
326        } else {
327            scheme = Some(entry_scheme);
328        }
329
330        let Some(version) = entry_scheme.parse_version(&filename) else {
331            continue;
332        };
333
334        if let Some((latest_version, _)) = &latest_entry {
335            if version > *latest_version {
336                latest_entry = Some((version, entry));
337            }
338        } else {
339            latest_entry = Some((version, entry));
340        }
341    }
342
343    if let Some((version, entry)) = latest_entry {
344        let path = Path::from_filesystem_path(entry.path())
345            .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?;
346        Ok(Some(ManifestLocation {
347            version,
348            path,
349            size: Some(entry.metadata()?.len()),
350            naming_scheme: scheme.unwrap(),
351        }))
352    } else {
353        Ok(None)
354    }
355}
356
357async fn list_manifests<'a>(
358    base_path: &Path,
359    object_store: &'a dyn OSObjectStore,
360) -> Result<BoxStream<'a, Result<Path>>> {
361    Ok(object_store
362        .read_dir_all(&base_path.child(VERSIONS_DIR), None)
363        .await?
364        .try_filter_map(|obj_meta| {
365            if obj_meta.location.extension() == Some(MANIFEST_EXTENSION) {
366                future::ready(Ok(Some(obj_meta.location)))
367            } else {
368                future::ready(Ok(None))
369            }
370        })
371        .boxed())
372}
373
374pub fn parse_version_from_path(path: &Path) -> Result<u64> {
375    path.filename()
376        .and_then(|name| name.split_once('.'))
377        .filter(|(_, extension)| *extension == MANIFEST_EXTENSION)
378        .and_then(|(version, _)| version.parse::<u64>().ok())
379        .ok_or(Error::Internal {
380            message: format!("Expected manifest file, but found {}", path),
381            location: location!(),
382        })
383}
384
385fn make_staging_manifest_path(base: &Path) -> Result<Path> {
386    let id = uuid::Uuid::new_v4().to_string();
387    Path::parse(format!("{base}-{id}")).map_err(|e| Error::IO {
388        source: Box::new(e),
389        location: location!(),
390    })
391}
392
393#[cfg(feature = "dynamodb")]
394const DDB_URL_QUERY_KEY: &str = "ddbTableName";
395
396/// Handle commits that prevent conflicting writes.
397///
398/// Commit implementations ensure that if there are multiple concurrent writers
399/// attempting to write the next version of a table, only one will win. In order
400/// to work, all writers must use the same commit handler type.
401/// This trait is also responsible for resolving where the manifests live.
402///
403// TODO: pub(crate)
404#[async_trait::async_trait]
405pub trait CommitHandler: Debug + Send + Sync {
406    async fn resolve_latest_location(
407        &self,
408        base_path: &Path,
409        object_store: &ObjectStore,
410    ) -> Result<ManifestLocation> {
411        Ok(current_manifest_path(object_store, base_path).await?)
412    }
413
414    /// Get the path to the latest version manifest of a dataset at the base_path
415    async fn resolve_latest_version(
416        &self,
417        base_path: &Path,
418        object_store: &ObjectStore,
419    ) -> std::result::Result<Path, Error> {
420        // TODO: we need to pade 0's to the version number on the manifest file path
421        Ok(current_manifest_path(object_store, base_path).await?.path)
422    }
423
424    // for default implementation, parse the version from the path
425    async fn resolve_latest_version_id(
426        &self,
427        base_path: &Path,
428        object_store: &ObjectStore,
429    ) -> Result<u64> {
430        Ok(current_manifest_path(object_store, base_path)
431            .await?
432            .version)
433    }
434
435    /// Get the path to a specific versioned manifest of a dataset at the base_path
436    ///
437    /// The version must already exist.
438    async fn resolve_version(
439        &self,
440        base_path: &Path,
441        version: u64,
442        object_store: &dyn OSObjectStore,
443    ) -> std::result::Result<Path, Error> {
444        Ok(default_resolve_version(base_path, version, object_store)
445            .await?
446            .path)
447    }
448
449    async fn resolve_version_location(
450        &self,
451        base_path: &Path,
452        version: u64,
453        object_store: &dyn OSObjectStore,
454    ) -> Result<ManifestLocation> {
455        default_resolve_version(base_path, version, object_store).await
456    }
457
458    /// List manifests that are available for a dataset at the base_path
459    async fn list_manifests<'a>(
460        &self,
461        base_path: &Path,
462        object_store: &'a dyn OSObjectStore,
463    ) -> Result<BoxStream<'a, Result<Path>>> {
464        list_manifests(base_path, object_store).await
465    }
466
467    /// Commit a manifest.
468    ///
469    /// This function should return an [CommitError::CommitConflict] if another
470    /// transaction has already been committed to the path.
471    async fn commit(
472        &self,
473        manifest: &mut Manifest,
474        indices: Option<Vec<Index>>,
475        base_path: &Path,
476        object_store: &ObjectStore,
477        manifest_writer: ManifestWriter,
478        naming_scheme: ManifestNamingScheme,
479    ) -> std::result::Result<Path, CommitError>;
480
481    /// Delete the recorded manifest information for a dataset at the base_path
482    async fn delete(&self, _base_path: &Path) -> Result<()> {
483        Ok(())
484    }
485}
486
487async fn default_resolve_version(
488    base_path: &Path,
489    version: u64,
490    object_store: &dyn OSObjectStore,
491) -> Result<ManifestLocation> {
492    if is_detached_version(version) {
493        return Ok(ManifestLocation {
494            version,
495            // Detached versions are not supported with V1 naming scheme.  If we need
496            // to support in the future we could use a different prefix (e.g. 'x' or something)
497            naming_scheme: ManifestNamingScheme::V2,
498            // Both V1 and V2 should give the same path for detached versions
499            path: ManifestNamingScheme::V2.manifest_path(base_path, version),
500            size: None,
501        });
502    }
503
504    // try V2, fallback to V1.
505    let scheme = ManifestNamingScheme::V2;
506    let path = scheme.manifest_path(base_path, version);
507    match object_store.head(&path).await {
508        Ok(meta) => Ok(ManifestLocation {
509            version,
510            path,
511            size: Some(meta.size as u64),
512            naming_scheme: scheme,
513        }),
514        Err(ObjectStoreError::NotFound { .. }) => {
515            // fallback to V1
516            let scheme = ManifestNamingScheme::V1;
517            Ok(ManifestLocation {
518                version,
519                path: scheme.manifest_path(base_path, version),
520                size: None,
521                naming_scheme: scheme,
522            })
523        }
524        Err(e) => Err(e.into()),
525    }
526}
527/// Adapt an object_store credentials into AWS SDK creds
528#[cfg(feature = "dynamodb")]
529#[derive(Debug)]
530struct OSObjectStoreToAwsCredAdaptor(AwsCredentialProvider);
531
532#[cfg(feature = "dynamodb")]
533impl ProvideCredentials for OSObjectStoreToAwsCredAdaptor {
534    fn provide_credentials<'a>(
535        &'a self,
536    ) -> aws_credential_types::provider::future::ProvideCredentials<'a>
537    where
538        Self: 'a,
539    {
540        aws_credential_types::provider::future::ProvideCredentials::new(async {
541            let creds = self
542                .0
543                .get_credential()
544                .await
545                .map_err(|e| CredentialsError::provider_error(Box::new(e)))?;
546            Ok(aws_credential_types::Credentials::new(
547                &creds.key_id,
548                &creds.secret_key,
549                creds.token.clone(),
550                Some(
551                    SystemTime::now()
552                        .checked_add(Duration::from_secs(
553                            60 * 10, //  10 min
554                        ))
555                        .expect("overflow"),
556                ),
557                "",
558            ))
559        })
560    }
561}
562
563#[cfg(feature = "dynamodb")]
564async fn build_dynamodb_external_store(
565    table_name: &str,
566    creds: AwsCredentialProvider,
567    region: &str,
568    endpoint: Option<String>,
569    app_name: &str,
570) -> Result<Arc<dyn ExternalManifestStore>> {
571    use super::commit::dynamodb::DynamoDBExternalManifestStore;
572    use aws_sdk_dynamodb::{
573        config::{IdentityCache, Region},
574        Client,
575    };
576
577    let mut dynamodb_config = aws_sdk_dynamodb::config::Builder::new()
578        .behavior_version_latest()
579        .region(Some(Region::new(region.to_string())))
580        .credentials_provider(OSObjectStoreToAwsCredAdaptor(creds))
581        // caching should be handled by passed AwsCredentialProvider
582        .identity_cache(IdentityCache::no_cache());
583
584    if let Some(endpoint) = endpoint {
585        dynamodb_config = dynamodb_config.endpoint_url(endpoint);
586    }
587    let client = Client::from_conf(dynamodb_config.build());
588
589    DynamoDBExternalManifestStore::new_external_store(client.into(), table_name, app_name).await
590}
591
592pub async fn commit_handler_from_url(
593    url_or_path: &str,
594    // This looks unused if dynamodb feature disabled
595    #[allow(unused_variables)] options: &Option<ObjectStoreParams>,
596) -> Result<Arc<dyn CommitHandler>> {
597    let url = match Url::parse(url_or_path) {
598        Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
599            // On Windows, the drive is parsed as a scheme
600            return Ok(Arc::new(RenameCommitHandler));
601        }
602        Ok(url) => url,
603        Err(_) => {
604            return Ok(Arc::new(RenameCommitHandler));
605        }
606    };
607
608    match url.scheme() {
609        "s3" | "gs" | "az" | "memory" | "file" | "file-object-store" => {
610            Ok(Arc::new(ConditionalPutCommitHandler))
611        }
612        #[cfg(not(feature = "dynamodb"))]
613        "s3+ddb" => Err(Error::InvalidInput {
614            source: "`s3+ddb://` scheme requires `dynamodb` feature to be enabled".into(),
615            location: location!(),
616        }),
617        #[cfg(feature = "dynamodb")]
618        "s3+ddb" => {
619            if url.query_pairs().count() != 1 {
620                return Err(Error::InvalidInput {
621                    source: "`s3+ddb://` scheme and expects exactly one query `ddbTableName`"
622                        .into(),
623                    location: location!(),
624                });
625            }
626            let table_name = match url.query_pairs().next() {
627                Some((Cow::Borrowed(key), Cow::Borrowed(table_name)))
628                    if key == DDB_URL_QUERY_KEY =>
629                {
630                    if table_name.is_empty() {
631                        return Err(Error::InvalidInput {
632                            source: "`s3+ddb://` scheme requires non empty dynamodb table name"
633                                .into(),
634                            location: location!(),
635                        });
636                    }
637                    table_name
638                }
639                _ => {
640                    return Err(Error::InvalidInput {
641                        source: "`s3+ddb://` scheme and expects exactly one query `ddbTableName`"
642                            .into(),
643                        location: location!(),
644                    });
645                }
646            };
647            let options = options.clone().unwrap_or_default();
648            let storage_options = StorageOptions(options.storage_options.unwrap_or_default());
649            let dynamo_endpoint = get_dynamodb_endpoint(&storage_options);
650            let storage_options = storage_options.as_s3_options();
651
652            let region = storage_options
653                .get(&AmazonS3ConfigKey::Region)
654                .map(|s| s.to_string());
655
656            let (aws_creds, region) = build_aws_credential(
657                options.s3_credentials_refresh_offset,
658                options.aws_credentials.clone(),
659                Some(&storage_options),
660                region,
661            )
662            .await?;
663
664            Ok(Arc::new(ExternalManifestCommitHandler {
665                external_manifest_store: build_dynamodb_external_store(
666                    table_name,
667                    aws_creds.clone(),
668                    &region,
669                    dynamo_endpoint,
670                    "lancedb",
671                )
672                .await?,
673            }))
674        }
675        _ => Ok(Arc::new(UnsafeCommitHandler)),
676    }
677}
678
679#[cfg(feature = "dynamodb")]
680fn get_dynamodb_endpoint(storage_options: &StorageOptions) -> Option<String> {
681    if let Some(endpoint) = storage_options.0.get("dynamodb_endpoint") {
682        Some(endpoint.to_string())
683    } else {
684        std::env::var("DYNAMODB_ENDPOINT").ok()
685    }
686}
687
688/// Errors that can occur when committing a manifest.
689#[derive(Debug)]
690pub enum CommitError {
691    /// Another transaction has already been written to the path
692    CommitConflict,
693    /// Something else went wrong
694    OtherError(Error),
695}
696
697impl From<Error> for CommitError {
698    fn from(e: Error) -> Self {
699        Self::OtherError(e)
700    }
701}
702
703impl From<CommitError> for Error {
704    fn from(e: CommitError) -> Self {
705        match e {
706            CommitError::CommitConflict => Self::Internal {
707                message: "Commit conflict".to_string(),
708                location: location!(),
709            },
710            CommitError::OtherError(e) => e,
711        }
712    }
713}
714
715/// Whether we have issued a warning about using the unsafe commit handler.
716static WARNED_ON_UNSAFE_COMMIT: AtomicBool = AtomicBool::new(false);
717
718/// A naive commit implementation that does not prevent conflicting writes.
719///
720/// This will log a warning the first time it is used.
721pub struct UnsafeCommitHandler;
722
723#[async_trait::async_trait]
724impl CommitHandler for UnsafeCommitHandler {
725    async fn commit(
726        &self,
727        manifest: &mut Manifest,
728        indices: Option<Vec<Index>>,
729        base_path: &Path,
730        object_store: &ObjectStore,
731        manifest_writer: ManifestWriter,
732        naming_scheme: ManifestNamingScheme,
733    ) -> std::result::Result<Path, CommitError> {
734        // Log a one-time warning
735        if !WARNED_ON_UNSAFE_COMMIT.load(std::sync::atomic::Ordering::Relaxed) {
736            WARNED_ON_UNSAFE_COMMIT.store(true, std::sync::atomic::Ordering::Relaxed);
737            log::warn!(
738                "Using unsafe commit handler. Concurrent writes may result in data loss. \
739                 Consider providing a commit handler that prevents conflicting writes."
740            );
741        }
742
743        let version_path = naming_scheme.manifest_path(base_path, manifest.version);
744        // Write the manifest naively
745        manifest_writer(object_store, manifest, indices, &version_path).await?;
746
747        Ok(version_path)
748    }
749}
750
751impl Debug for UnsafeCommitHandler {
752    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
753        f.debug_struct("UnsafeCommitHandler").finish()
754    }
755}
756
757/// A commit implementation that uses a lock to prevent conflicting writes.
758#[async_trait::async_trait]
759pub trait CommitLock: Debug {
760    type Lease: CommitLease;
761
762    /// Attempt to lock the table for the given version.
763    ///
764    /// If it is already locked by another transaction, wait until it is unlocked.
765    /// Once it is unlocked, return [CommitError::CommitConflict] if the version
766    /// has already been committed. Otherwise, return the lock.
767    ///
768    /// To prevent poisoned locks, it's recommended to set a timeout on the lock
769    /// of at least 30 seconds.
770    ///
771    /// It is not required that the lock tracks the version. It is provided in
772    /// case the locking is handled by a catalog service that needs to know the
773    /// current version of the table.
774    async fn lock(&self, version: u64) -> std::result::Result<Self::Lease, CommitError>;
775}
776
777#[async_trait::async_trait]
778pub trait CommitLease: Send + Sync {
779    /// Return the lease, indicating whether the commit was successful.
780    async fn release(&self, success: bool) -> std::result::Result<(), CommitError>;
781}
782
783#[async_trait::async_trait]
784impl<T: CommitLock + Send + Sync> CommitHandler for T {
785    async fn commit(
786        &self,
787        manifest: &mut Manifest,
788        indices: Option<Vec<Index>>,
789        base_path: &Path,
790        object_store: &ObjectStore,
791        manifest_writer: ManifestWriter,
792        naming_scheme: ManifestNamingScheme,
793    ) -> std::result::Result<Path, CommitError> {
794        let path = naming_scheme.manifest_path(base_path, manifest.version);
795        // NOTE: once we have the lease we cannot use ? to return errors, since
796        // we must release the lease before returning.
797        let lease = self.lock(manifest.version).await?;
798
799        // Head the location and make sure it's not already committed
800        match object_store.inner.head(&path).await {
801            Ok(_) => {
802                // The path already exists, so it's already committed
803                // Release the lock
804                lease.release(false).await?;
805
806                return Err(CommitError::CommitConflict);
807            }
808            Err(ObjectStoreError::NotFound { .. }) => {}
809            Err(e) => {
810                // Something else went wrong
811                // Release the lock
812                lease.release(false).await?;
813
814                return Err(CommitError::OtherError(e.into()));
815            }
816        }
817        let res = manifest_writer(object_store, manifest, indices, &path).await;
818
819        // Release the lock
820        lease.release(res.is_ok()).await?;
821
822        res.map_err(|err| err.into()).map(|_| path)
823    }
824}
825
826#[async_trait::async_trait]
827impl<T: CommitLock + Send + Sync> CommitHandler for Arc<T> {
828    async fn commit(
829        &self,
830        manifest: &mut Manifest,
831        indices: Option<Vec<Index>>,
832        base_path: &Path,
833        object_store: &ObjectStore,
834        manifest_writer: ManifestWriter,
835        naming_scheme: ManifestNamingScheme,
836    ) -> std::result::Result<Path, CommitError> {
837        self.as_ref()
838            .commit(
839                manifest,
840                indices,
841                base_path,
842                object_store,
843                manifest_writer,
844                naming_scheme,
845            )
846            .await
847    }
848}
849
850/// A commit implementation that uses a temporary path and renames the object.
851///
852/// This only works for object stores that support atomic rename if not exist.
853pub struct RenameCommitHandler;
854
855#[async_trait::async_trait]
856impl CommitHandler for RenameCommitHandler {
857    async fn commit(
858        &self,
859        manifest: &mut Manifest,
860        indices: Option<Vec<Index>>,
861        base_path: &Path,
862        object_store: &ObjectStore,
863        manifest_writer: ManifestWriter,
864        naming_scheme: ManifestNamingScheme,
865    ) -> std::result::Result<Path, CommitError> {
866        // Create a temporary object, then use `rename_if_not_exists` to commit.
867        // If failed, clean up the temporary object.
868
869        let path = naming_scheme.manifest_path(base_path, manifest.version);
870        let tmp_path = make_staging_manifest_path(&path)?;
871
872        // Write the manifest to the temporary path
873        manifest_writer(object_store, manifest, indices, &tmp_path).await?;
874
875        match object_store
876            .inner
877            .rename_if_not_exists(&tmp_path, &path)
878            .await
879        {
880            Ok(_) => Ok(path),
881            Err(ObjectStoreError::AlreadyExists { .. }) => {
882                // Another transaction has already been committed
883                // Attempt to clean up temporary object, but ignore errors if we can't
884                let _ = object_store.delete(&tmp_path).await;
885
886                return Err(CommitError::CommitConflict);
887            }
888            Err(e) => {
889                // Something else went wrong
890                return Err(CommitError::OtherError(e.into()));
891            }
892        }
893    }
894}
895
896impl Debug for RenameCommitHandler {
897    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
898        f.debug_struct("RenameCommitHandler").finish()
899    }
900}
901
902pub struct ConditionalPutCommitHandler;
903
904#[async_trait::async_trait]
905impl CommitHandler for ConditionalPutCommitHandler {
906    async fn commit(
907        &self,
908        manifest: &mut Manifest,
909        indices: Option<Vec<Index>>,
910        base_path: &Path,
911        object_store: &ObjectStore,
912        manifest_writer: ManifestWriter,
913        naming_scheme: ManifestNamingScheme,
914    ) -> std::result::Result<Path, CommitError> {
915        let path = naming_scheme.manifest_path(base_path, manifest.version);
916
917        let memory_store = ObjectStore::memory();
918        let dummy_path = "dummy";
919        manifest_writer(&memory_store, manifest, indices, &dummy_path.into()).await?;
920        let dummy_data = memory_store.read_one_all(&dummy_path.into()).await?;
921        object_store
922            .inner
923            .put_opts(
924                &path,
925                dummy_data.into(),
926                PutOptions {
927                    mode: object_store::PutMode::Create,
928                    ..Default::default()
929                },
930            )
931            .await
932            .map_err(|err| match err {
933                ObjectStoreError::AlreadyExists { .. } | ObjectStoreError::Precondition { .. } => {
934                    CommitError::CommitConflict
935                }
936                _ => CommitError::OtherError(err.into()),
937            })?;
938
939        Ok(path)
940    }
941}
942
943impl Debug for ConditionalPutCommitHandler {
944    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
945        f.debug_struct("ConditionalPutCommitHandler").finish()
946    }
947}
948
949#[derive(Debug, Clone)]
950pub struct CommitConfig {
951    pub num_retries: u32,
952    // TODO: add isolation_level
953}
954
955impl Default for CommitConfig {
956    fn default() -> Self {
957        Self { num_retries: 20 }
958    }
959}
960
961#[cfg(test)]
962mod tests {
963    use super::*;
964
965    #[test]
966    fn test_manifest_naming_scheme() {
967        let v1 = ManifestNamingScheme::V1;
968        let v2 = ManifestNamingScheme::V2;
969
970        assert_eq!(
971            v1.manifest_path(&Path::from("base"), 0),
972            Path::from("base/_versions/0.manifest")
973        );
974        assert_eq!(
975            v1.manifest_path(&Path::from("base"), 42),
976            Path::from("base/_versions/42.manifest")
977        );
978
979        assert_eq!(
980            v2.manifest_path(&Path::from("base"), 0),
981            Path::from("base/_versions/18446744073709551615.manifest")
982        );
983        assert_eq!(
984            v2.manifest_path(&Path::from("base"), 42),
985            Path::from("base/_versions/18446744073709551573.manifest")
986        );
987
988        assert_eq!(v1.parse_version("0.manifest"), Some(0));
989        assert_eq!(v1.parse_version("42.manifest"), Some(42));
990        assert_eq!(
991            v1.parse_version("42.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
992            Some(42)
993        );
994
995        assert_eq!(v2.parse_version("18446744073709551615.manifest"), Some(0));
996        assert_eq!(v2.parse_version("18446744073709551573.manifest"), Some(42));
997        assert_eq!(
998            v2.parse_version("18446744073709551573.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
999            Some(42)
1000        );
1001
1002        assert_eq!(ManifestNamingScheme::detect_scheme("0.manifest"), Some(v1));
1003        assert_eq!(
1004            ManifestNamingScheme::detect_scheme("18446744073709551615.manifest"),
1005            Some(v2)
1006        );
1007        assert_eq!(ManifestNamingScheme::detect_scheme("something else"), None);
1008    }
1009
1010    #[tokio::test]
1011    async fn test_manifest_naming_migration() {
1012        let object_store = ObjectStore::memory();
1013        let base = Path::from("base");
1014        let versions_dir = base.child(VERSIONS_DIR);
1015
1016        // Write two v1 files and one v1
1017        let original_files = vec![
1018            versions_dir.child("irrelevant"),
1019            ManifestNamingScheme::V1.manifest_path(&base, 0),
1020            ManifestNamingScheme::V2.manifest_path(&base, 1),
1021        ];
1022        for path in original_files {
1023            object_store.put(&path, b"".as_slice()).await.unwrap();
1024        }
1025
1026        migrate_scheme_to_v2(&object_store, &base).await.unwrap();
1027
1028        let expected_files = vec![
1029            ManifestNamingScheme::V2.manifest_path(&base, 1),
1030            ManifestNamingScheme::V2.manifest_path(&base, 0),
1031            versions_dir.child("irrelevant"),
1032        ];
1033        let actual_files = object_store
1034            .inner
1035            .list(Some(&versions_dir))
1036            .map_ok(|res| res.location)
1037            .try_collect::<Vec<_>>()
1038            .await
1039            .unwrap();
1040        assert_eq!(actual_files, expected_files);
1041    }
1042}