lance_table/io/
commit.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Trait for commit implementations.
5//!
6//! In Lance, a transaction is committed by writing the next manifest file.
7//! However, care should be taken to ensure that the manifest file is written
8//! only once, even if there are concurrent writers. Different stores have
9//! different abilities to handle concurrent writes, so a trait is provided
10//! to allow for different implementations.
11//!
12//! The trait [CommitHandler] can be implemented to provide different commit
13//! strategies. The default implementation for most object stores is
14//! [RenameCommitHandler], which writes the manifest to a temporary path, then
15//! renames the temporary path to the final path if no object already exists
16//! at the final path. This is an atomic operation in most object stores, but
17//! not in AWS S3. So for AWS S3, the default commit handler is
18//! [UnsafeCommitHandler], which writes the manifest to the final path without
19//! any checks.
20//!
21//! When providing your own commit handler, most often you are implementing in
22//! terms of a lock. The trait [CommitLock] can be implemented as a simpler
23//! alternative to [CommitHandler].
24
25use std::io;
26use std::sync::atomic::AtomicBool;
27use std::sync::Arc;
28use std::{fmt::Debug, fs::DirEntry};
29
30use futures::{
31    future::{self, BoxFuture},
32    stream::BoxStream,
33    StreamExt, TryStreamExt,
34};
35use lance_io::object_writer::WriteResult;
36use log::warn;
37use object_store::PutOptions;
38use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore};
39use snafu::location;
40use url::Url;
41
42#[cfg(feature = "dynamodb")]
43pub mod dynamodb;
44pub mod external_manifest;
45
46use lance_core::{Error, Result};
47use lance_io::object_store::{ObjectStore, ObjectStoreExt, ObjectStoreParams};
48
49#[cfg(feature = "dynamodb")]
50use {
51    self::external_manifest::{ExternalManifestCommitHandler, ExternalManifestStore},
52    aws_credential_types::provider::error::CredentialsError,
53    aws_credential_types::provider::ProvideCredentials,
54    lance_io::object_store::{providers::aws::build_aws_credential, StorageOptions},
55    object_store::aws::AmazonS3ConfigKey,
56    object_store::aws::AwsCredentialProvider,
57    std::borrow::Cow,
58    std::time::{Duration, SystemTime},
59};
60
61use crate::format::{is_detached_version, Index, Manifest};
62
63const VERSIONS_DIR: &str = "_versions";
64const MANIFEST_EXTENSION: &str = "manifest";
65const DETACHED_VERSION_PREFIX: &str = "d";
66
67/// How manifest files should be named.
68#[derive(Clone, Copy, Debug, PartialEq, Eq)]
69pub enum ManifestNamingScheme {
70    /// `_versions/{version}.manifest`
71    V1,
72    /// `_manifests/{u64::MAX - version}.manifest`
73    ///
74    /// Zero-padded and reversed for O(1) lookup of latest version on object stores.
75    V2,
76}
77
78impl ManifestNamingScheme {
79    pub fn manifest_path(&self, base: &Path, version: u64) -> Path {
80        let directory = base.child(VERSIONS_DIR);
81        if is_detached_version(version) {
82            // Detached versions should never show up first in a list operation which
83            // means it needs to come lexicographically after all attached manifest
84            // files and so we add the prefix `d`.  There is no need to invert the
85            // version number since detached versions are not part of the version
86            let directory = base.child(VERSIONS_DIR);
87            directory.child(format!(
88                "{DETACHED_VERSION_PREFIX}{version}.{MANIFEST_EXTENSION}"
89            ))
90        } else {
91            match self {
92                Self::V1 => directory.child(format!("{version}.{MANIFEST_EXTENSION}")),
93                Self::V2 => {
94                    let inverted_version = u64::MAX - version;
95                    directory.child(format!("{inverted_version:020}.{MANIFEST_EXTENSION}"))
96                }
97            }
98        }
99    }
100
101    pub fn parse_version(&self, filename: &str) -> Option<u64> {
102        let file_number = filename
103            .split_once('.')
104            // Detached versions will fail the `parse` step, which is ok.
105            .and_then(|(version_str, _)| version_str.parse::<u64>().ok());
106        match self {
107            Self::V1 => file_number,
108            Self::V2 => file_number.map(|v| u64::MAX - v),
109        }
110    }
111
112    pub fn detect_scheme(filename: &str) -> Option<Self> {
113        if filename.starts_with(DETACHED_VERSION_PREFIX) {
114            // Currently, detached versions must imply V2
115            return Some(Self::V2);
116        }
117        if filename.ends_with(MANIFEST_EXTENSION) {
118            const V2_LEN: usize = 20 + 1 + MANIFEST_EXTENSION.len();
119            if filename.len() == V2_LEN {
120                Some(Self::V2)
121            } else {
122                Some(Self::V1)
123            }
124        } else {
125            None
126        }
127    }
128
129    pub fn detect_scheme_staging(filename: &str) -> Self {
130        // We shouldn't have to worry about detached versions here since there is no
131        // such thing as "detached" and "staged" at the same time.
132        if filename.chars().nth(20) == Some('.') {
133            Self::V2
134        } else {
135            Self::V1
136        }
137    }
138}
139
140/// Migrate all V1 manifests to V2 naming scheme.
141///
142/// This function will rename all V1 manifests to V2 naming scheme.
143///
144/// This function is idempotent, and can be run multiple times without
145/// changing the state of the object store.
146///
147/// However, it should not be run while other concurrent operations are happening.
148/// And it should also run until completion before resuming other operations.
149pub async fn migrate_scheme_to_v2(object_store: &ObjectStore, dataset_base: &Path) -> Result<()> {
150    object_store
151        .inner
152        .list(Some(&dataset_base.child(VERSIONS_DIR)))
153        .try_filter(|res| {
154            let res = if let Some(filename) = res.location.filename() {
155                ManifestNamingScheme::detect_scheme(filename) == Some(ManifestNamingScheme::V1)
156            } else {
157                false
158            };
159            future::ready(res)
160        })
161        .try_for_each_concurrent(object_store.io_parallelism(), |meta| async move {
162            let filename = meta.location.filename().unwrap();
163            let version = ManifestNamingScheme::V1.parse_version(filename).unwrap();
164            let path = ManifestNamingScheme::V2.manifest_path(dataset_base, version);
165            object_store.inner.rename(&meta.location, &path).await?;
166            Ok(())
167        })
168        .await?;
169
170    Ok(())
171}
172
173/// Function that writes the manifest to the object store.
174///
175/// Returns the size of the written manifest.
176pub type ManifestWriter = for<'a> fn(
177    object_store: &'a ObjectStore,
178    manifest: &'a mut Manifest,
179    indices: Option<Vec<Index>>,
180    path: &'a Path,
181) -> BoxFuture<'a, Result<WriteResult>>;
182
183#[derive(Debug, Clone)]
184pub struct ManifestLocation {
185    /// The version the manifest corresponds to.
186    pub version: u64,
187    /// Path of the manifest file, relative to the table root.
188    pub path: Path,
189    /// Size, in bytes, of the manifest file. If it is not known, this field should be `None`.
190    pub size: Option<u64>,
191    /// Naming scheme of the manifest file.
192    pub naming_scheme: ManifestNamingScheme,
193    /// Optional e-tag, used for integrity checks. Manifests should be immutable, so
194    /// if we detect a change in the e-tag, it means the manifest was tampered with.
195    /// This might happen if the dataset was deleted and then re-created.
196    pub e_tag: Option<String>,
197}
198
199impl TryFrom<object_store::ObjectMeta> for ManifestLocation {
200    type Error = Error;
201
202    fn try_from(meta: object_store::ObjectMeta) -> Result<Self> {
203        let filename = meta.location.filename().ok_or_else(|| Error::Internal {
204            message: "ObjectMeta location does not have a filename".to_string(),
205            location: location!(),
206        })?;
207        let scheme =
208            ManifestNamingScheme::detect_scheme(filename).ok_or_else(|| Error::Internal {
209                message: format!("Invalid manifest filename: '{}'", filename),
210                location: location!(),
211            })?;
212        let version = scheme
213            .parse_version(filename)
214            .ok_or_else(|| Error::Internal {
215                message: format!("Invalid manifest filename: '{}'", filename),
216                location: location!(),
217            })?;
218        Ok(Self {
219            version,
220            path: meta.location,
221            size: Some(meta.size as u64),
222            naming_scheme: scheme,
223            e_tag: meta.e_tag,
224        })
225    }
226}
227
228/// Get the latest manifest path
229async fn current_manifest_path(
230    object_store: &ObjectStore,
231    base: &Path,
232) -> Result<ManifestLocation> {
233    if object_store.is_local() {
234        if let Ok(Some(location)) = current_manifest_local(base) {
235            return Ok(location);
236        }
237    }
238
239    let manifest_files = object_store.list(Some(base.child(VERSIONS_DIR)));
240
241    let mut valid_manifests = manifest_files.try_filter_map(|res| {
242        if let Some(scheme) = ManifestNamingScheme::detect_scheme(res.location.filename().unwrap())
243        {
244            future::ready(Ok(Some((scheme, res))))
245        } else {
246            future::ready(Ok(None))
247        }
248    });
249
250    let first = valid_manifests.next().await.transpose()?;
251    match (first, object_store.list_is_lexically_ordered) {
252        // If the first valid manifest we see is V2, we can assume that we are using
253        // V2 naming scheme for all manifests.
254        (Some((scheme @ ManifestNamingScheme::V2, meta)), true) => {
255            let version = scheme
256                .parse_version(meta.location.filename().unwrap())
257                .unwrap();
258
259            // Sanity check: verify at least for the first 1k files that they are all V2
260            // and that the version numbers are decreasing. We use the first 1k because
261            // this is the typical size of an object store list endpoint response page.
262            for (scheme, meta) in valid_manifests.take(999).try_collect::<Vec<_>>().await? {
263                if scheme != ManifestNamingScheme::V2 {
264                    warn!(
265                        "Found V1 Manifest in a V2 directory. Use `migrate_manifest_paths_v2` \
266                         to migrate the directory."
267                    );
268                    break;
269                }
270                let next_version = scheme
271                    .parse_version(meta.location.filename().unwrap())
272                    .unwrap();
273                if next_version >= version {
274                    warn!(
275                        "List operation was expected to be lexically ordered, but was not. This \
276                         could mean a corrupt read. Please make a bug report on the lancedb/lance \
277                         GitHub repository."
278                    );
279                    break;
280                }
281            }
282
283            Ok(ManifestLocation {
284                version,
285                path: meta.location,
286                size: Some(meta.size as u64),
287                naming_scheme: scheme,
288                e_tag: meta.e_tag,
289            })
290        }
291        // If the first valid manifest we see if V1, assume for now that we are
292        // using V1 naming scheme for all manifests. Since we are listing the
293        // directory anyways, we will assert there aren't any V2 manifests.
294        (Some((scheme, meta)), _) => {
295            let mut current_version = scheme
296                .parse_version(meta.location.filename().unwrap())
297                .unwrap();
298            let mut current_meta = meta;
299
300            while let Some((scheme, meta)) = valid_manifests.next().await.transpose()? {
301                if matches!(scheme, ManifestNamingScheme::V2) {
302                    return Err(Error::Internal {
303                        message: "Found V2 manifest in a V1 manifest directory".to_string(),
304                        location: location!(),
305                    });
306                }
307                let version = scheme
308                    .parse_version(meta.location.filename().unwrap())
309                    .unwrap();
310                if version > current_version {
311                    current_version = version;
312                    current_meta = meta;
313                }
314            }
315            Ok(ManifestLocation {
316                version: current_version,
317                path: current_meta.location,
318                size: Some(current_meta.size as u64),
319                naming_scheme: scheme,
320                e_tag: current_meta.e_tag,
321            })
322        }
323        (None, _) => Err(Error::NotFound {
324            uri: base.child(VERSIONS_DIR).to_string(),
325            location: location!(),
326        }),
327    }
328}
329
330// This is an optimized function that searches for the latest manifest. In
331// object_store, list operations lookup metadata for each file listed. This
332// method only gets the metadata for the found latest manifest.
333fn current_manifest_local(base: &Path) -> std::io::Result<Option<ManifestLocation>> {
334    let path = lance_io::local::to_local_path(&base.child(VERSIONS_DIR));
335    let entries = std::fs::read_dir(path)?;
336
337    let mut latest_entry: Option<(u64, DirEntry)> = None;
338
339    let mut scheme: Option<ManifestNamingScheme> = None;
340
341    for entry in entries {
342        let entry = entry?;
343        let filename_raw = entry.file_name();
344        let filename = filename_raw.to_string_lossy();
345
346        let Some(entry_scheme) = ManifestNamingScheme::detect_scheme(&filename) else {
347            // Need to ignore temporary files, such as
348            // .tmp_7.manifest_9c100374-3298-4537-afc6-f5ee7913666d
349            continue;
350        };
351
352        if let Some(scheme) = scheme {
353            if scheme != entry_scheme {
354                return Err(io::Error::new(
355                    io::ErrorKind::InvalidData,
356                    format!(
357                        "Found multiple manifest naming schemes in the same directory: {:?} and {:?}",
358                        scheme, entry_scheme
359                    ),
360                ));
361            }
362        } else {
363            scheme = Some(entry_scheme);
364        }
365
366        let Some(version) = entry_scheme.parse_version(&filename) else {
367            continue;
368        };
369
370        if let Some((latest_version, _)) = &latest_entry {
371            if version > *latest_version {
372                latest_entry = Some((version, entry));
373            }
374        } else {
375            latest_entry = Some((version, entry));
376        }
377    }
378
379    if let Some((version, entry)) = latest_entry {
380        let path = Path::from_filesystem_path(entry.path())
381            .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?;
382        let metadata = entry.metadata()?;
383        Ok(Some(ManifestLocation {
384            version,
385            path,
386            size: Some(metadata.len()),
387            naming_scheme: scheme.unwrap(),
388            e_tag: Some(get_etag(&metadata)),
389        }))
390    } else {
391        Ok(None)
392    }
393}
394
395// Based on object store's implementation.
396fn get_etag(metadata: &std::fs::Metadata) -> String {
397    let inode = get_inode(metadata);
398    let size = metadata.len();
399    let mtime = metadata
400        .modified()
401        .ok()
402        .and_then(|mtime| mtime.duration_since(std::time::SystemTime::UNIX_EPOCH).ok())
403        .unwrap_or_default()
404        .as_micros();
405
406    // Use an ETag scheme based on that used by many popular HTTP servers
407    // <https://httpd.apache.org/docs/2.2/mod/core.html#fileetag>
408    // <https://stackoverflow.com/questions/47512043/how-etags-are-generated-and-configured>
409    format!("{inode:x}-{mtime:x}-{size:x}")
410}
411
412#[cfg(unix)]
413/// We include the inode when available to yield an ETag more resistant to collisions
414/// and as used by popular web servers such as [Apache](https://httpd.apache.org/docs/2.2/mod/core.html#fileetag)
415fn get_inode(metadata: &std::fs::Metadata) -> u64 {
416    std::os::unix::fs::MetadataExt::ino(metadata)
417}
418
419#[cfg(not(unix))]
420/// On platforms where an inode isn't available, fallback to just relying on size and mtime
421fn get_inode(_metadata: &std::fs::Metadata) -> u64 {
422    0
423}
424
425async fn list_manifests<'a>(
426    base_path: &Path,
427    object_store: &'a dyn OSObjectStore,
428) -> Result<BoxStream<'a, Result<ManifestLocation>>> {
429    Ok(object_store
430        .read_dir_all(&base_path.child(VERSIONS_DIR), None)
431        .await?
432        .filter_map(|obj_meta| {
433            futures::future::ready(
434                obj_meta
435                    .map(|m| ManifestLocation::try_from(m).ok())
436                    .transpose(),
437            )
438        })
439        .boxed())
440}
441
442fn make_staging_manifest_path(base: &Path) -> Result<Path> {
443    let id = uuid::Uuid::new_v4().to_string();
444    Path::parse(format!("{base}-{id}")).map_err(|e| Error::IO {
445        source: Box::new(e),
446        location: location!(),
447    })
448}
449
450#[cfg(feature = "dynamodb")]
451const DDB_URL_QUERY_KEY: &str = "ddbTableName";
452
453/// Handle commits that prevent conflicting writes.
454///
455/// Commit implementations ensure that if there are multiple concurrent writers
456/// attempting to write the next version of a table, only one will win. In order
457/// to work, all writers must use the same commit handler type.
458/// This trait is also responsible for resolving where the manifests live.
459///
460// TODO: pub(crate)
461#[async_trait::async_trait]
462pub trait CommitHandler: Debug + Send + Sync {
463    async fn resolve_latest_location(
464        &self,
465        base_path: &Path,
466        object_store: &ObjectStore,
467    ) -> Result<ManifestLocation> {
468        Ok(current_manifest_path(object_store, base_path).await?)
469    }
470
471    async fn resolve_version_location(
472        &self,
473        base_path: &Path,
474        version: u64,
475        object_store: &dyn OSObjectStore,
476    ) -> Result<ManifestLocation> {
477        default_resolve_version(base_path, version, object_store).await
478    }
479
480    async fn list_manifest_locations<'a>(
481        &self,
482        base_path: &Path,
483        object_store: &'a dyn OSObjectStore,
484    ) -> Result<BoxStream<'a, Result<ManifestLocation>>> {
485        list_manifests(base_path, object_store).await
486    }
487
488    /// Commit a manifest.
489    ///
490    /// This function should return an [CommitError::CommitConflict] if another
491    /// transaction has already been committed to the path.
492    async fn commit(
493        &self,
494        manifest: &mut Manifest,
495        indices: Option<Vec<Index>>,
496        base_path: &Path,
497        object_store: &ObjectStore,
498        manifest_writer: ManifestWriter,
499        naming_scheme: ManifestNamingScheme,
500    ) -> std::result::Result<ManifestLocation, CommitError>;
501
502    /// Delete the recorded manifest information for a dataset at the base_path
503    async fn delete(&self, _base_path: &Path) -> Result<()> {
504        Ok(())
505    }
506}
507
508async fn default_resolve_version(
509    base_path: &Path,
510    version: u64,
511    object_store: &dyn OSObjectStore,
512) -> Result<ManifestLocation> {
513    if is_detached_version(version) {
514        return Ok(ManifestLocation {
515            version,
516            // Detached versions are not supported with V1 naming scheme.  If we need
517            // to support in the future we could use a different prefix (e.g. 'x' or something)
518            naming_scheme: ManifestNamingScheme::V2,
519            // Both V1 and V2 should give the same path for detached versions
520            path: ManifestNamingScheme::V2.manifest_path(base_path, version),
521            size: None,
522            e_tag: None,
523        });
524    }
525
526    // try V2, fallback to V1.
527    let scheme = ManifestNamingScheme::V2;
528    let path = scheme.manifest_path(base_path, version);
529    match object_store.head(&path).await {
530        Ok(meta) => Ok(ManifestLocation {
531            version,
532            path,
533            size: Some(meta.size as u64),
534            naming_scheme: scheme,
535            e_tag: meta.e_tag,
536        }),
537        Err(ObjectStoreError::NotFound { .. }) => {
538            // fallback to V1
539            let scheme = ManifestNamingScheme::V1;
540            Ok(ManifestLocation {
541                version,
542                path: scheme.manifest_path(base_path, version),
543                size: None,
544                naming_scheme: scheme,
545                e_tag: None,
546            })
547        }
548        Err(e) => Err(e.into()),
549    }
550}
551/// Adapt an object_store credentials into AWS SDK creds
552#[cfg(feature = "dynamodb")]
553#[derive(Debug)]
554struct OSObjectStoreToAwsCredAdaptor(AwsCredentialProvider);
555
556#[cfg(feature = "dynamodb")]
557impl ProvideCredentials for OSObjectStoreToAwsCredAdaptor {
558    fn provide_credentials<'a>(
559        &'a self,
560    ) -> aws_credential_types::provider::future::ProvideCredentials<'a>
561    where
562        Self: 'a,
563    {
564        aws_credential_types::provider::future::ProvideCredentials::new(async {
565            let creds = self
566                .0
567                .get_credential()
568                .await
569                .map_err(|e| CredentialsError::provider_error(Box::new(e)))?;
570            Ok(aws_credential_types::Credentials::new(
571                &creds.key_id,
572                &creds.secret_key,
573                creds.token.clone(),
574                Some(
575                    SystemTime::now()
576                        .checked_add(Duration::from_secs(
577                            60 * 10, //  10 min
578                        ))
579                        .expect("overflow"),
580                ),
581                "",
582            ))
583        })
584    }
585}
586
587#[cfg(feature = "dynamodb")]
588async fn build_dynamodb_external_store(
589    table_name: &str,
590    creds: AwsCredentialProvider,
591    region: &str,
592    endpoint: Option<String>,
593    app_name: &str,
594) -> Result<Arc<dyn ExternalManifestStore>> {
595    use super::commit::dynamodb::DynamoDBExternalManifestStore;
596    use aws_sdk_dynamodb::{
597        config::{IdentityCache, Region},
598        Client,
599    };
600
601    let mut dynamodb_config = aws_sdk_dynamodb::config::Builder::new()
602        .behavior_version_latest()
603        .region(Some(Region::new(region.to_string())))
604        .credentials_provider(OSObjectStoreToAwsCredAdaptor(creds))
605        // caching should be handled by passed AwsCredentialProvider
606        .identity_cache(IdentityCache::no_cache());
607
608    if let Some(endpoint) = endpoint {
609        dynamodb_config = dynamodb_config.endpoint_url(endpoint);
610    }
611    let client = Client::from_conf(dynamodb_config.build());
612
613    DynamoDBExternalManifestStore::new_external_store(client.into(), table_name, app_name).await
614}
615
616pub async fn commit_handler_from_url(
617    url_or_path: &str,
618    // This looks unused if dynamodb feature disabled
619    #[allow(unused_variables)] options: &Option<ObjectStoreParams>,
620) -> Result<Arc<dyn CommitHandler>> {
621    let local_handler: Arc<dyn CommitHandler> = if cfg!(windows) {
622        Arc::new(RenameCommitHandler)
623    } else {
624        Arc::new(ConditionalPutCommitHandler)
625    };
626
627    let url = match Url::parse(url_or_path) {
628        Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
629            // On Windows, the drive is parsed as a scheme
630            return Ok(local_handler);
631        }
632        Ok(url) => url,
633        Err(_) => {
634            return Ok(local_handler);
635        }
636    };
637
638    match url.scheme() {
639        "file" | "file-object-store" => Ok(local_handler),
640        "s3" | "gs" | "az" | "memory" => Ok(Arc::new(ConditionalPutCommitHandler)),
641        #[cfg(not(feature = "dynamodb"))]
642        "s3+ddb" => Err(Error::InvalidInput {
643            source: "`s3+ddb://` scheme requires `dynamodb` feature to be enabled".into(),
644            location: location!(),
645        }),
646        #[cfg(feature = "dynamodb")]
647        "s3+ddb" => {
648            if url.query_pairs().count() != 1 {
649                return Err(Error::InvalidInput {
650                    source: "`s3+ddb://` scheme and expects exactly one query `ddbTableName`"
651                        .into(),
652                    location: location!(),
653                });
654            }
655            let table_name = match url.query_pairs().next() {
656                Some((Cow::Borrowed(key), Cow::Borrowed(table_name)))
657                    if key == DDB_URL_QUERY_KEY =>
658                {
659                    if table_name.is_empty() {
660                        return Err(Error::InvalidInput {
661                            source: "`s3+ddb://` scheme requires non empty dynamodb table name"
662                                .into(),
663                            location: location!(),
664                        });
665                    }
666                    table_name
667                }
668                _ => {
669                    return Err(Error::InvalidInput {
670                        source: "`s3+ddb://` scheme and expects exactly one query `ddbTableName`"
671                            .into(),
672                        location: location!(),
673                    });
674                }
675            };
676            let options = options.clone().unwrap_or_default();
677            let storage_options = StorageOptions(options.storage_options.unwrap_or_default());
678            let dynamo_endpoint = get_dynamodb_endpoint(&storage_options);
679            let storage_options = storage_options.as_s3_options();
680
681            let region = storage_options
682                .get(&AmazonS3ConfigKey::Region)
683                .map(|s| s.to_string());
684
685            let (aws_creds, region) = build_aws_credential(
686                options.s3_credentials_refresh_offset,
687                options.aws_credentials.clone(),
688                Some(&storage_options),
689                region,
690            )
691            .await?;
692
693            Ok(Arc::new(ExternalManifestCommitHandler {
694                external_manifest_store: build_dynamodb_external_store(
695                    table_name,
696                    aws_creds.clone(),
697                    &region,
698                    dynamo_endpoint,
699                    "lancedb",
700                )
701                .await?,
702            }))
703        }
704        _ => Ok(Arc::new(UnsafeCommitHandler)),
705    }
706}
707
708#[cfg(feature = "dynamodb")]
709fn get_dynamodb_endpoint(storage_options: &StorageOptions) -> Option<String> {
710    if let Some(endpoint) = storage_options.0.get("dynamodb_endpoint") {
711        Some(endpoint.to_string())
712    } else {
713        std::env::var("DYNAMODB_ENDPOINT").ok()
714    }
715}
716
717/// Errors that can occur when committing a manifest.
718#[derive(Debug)]
719pub enum CommitError {
720    /// Another transaction has already been written to the path
721    CommitConflict,
722    /// Something else went wrong
723    OtherError(Error),
724}
725
726impl From<Error> for CommitError {
727    fn from(e: Error) -> Self {
728        Self::OtherError(e)
729    }
730}
731
732impl From<CommitError> for Error {
733    fn from(e: CommitError) -> Self {
734        match e {
735            CommitError::CommitConflict => Self::Internal {
736                message: "Commit conflict".to_string(),
737                location: location!(),
738            },
739            CommitError::OtherError(e) => e,
740        }
741    }
742}
743
744/// Whether we have issued a warning about using the unsafe commit handler.
745static WARNED_ON_UNSAFE_COMMIT: AtomicBool = AtomicBool::new(false);
746
747/// A naive commit implementation that does not prevent conflicting writes.
748///
749/// This will log a warning the first time it is used.
750pub struct UnsafeCommitHandler;
751
752#[async_trait::async_trait]
753impl CommitHandler for UnsafeCommitHandler {
754    async fn commit(
755        &self,
756        manifest: &mut Manifest,
757        indices: Option<Vec<Index>>,
758        base_path: &Path,
759        object_store: &ObjectStore,
760        manifest_writer: ManifestWriter,
761        naming_scheme: ManifestNamingScheme,
762    ) -> std::result::Result<ManifestLocation, CommitError> {
763        // Log a one-time warning
764        if !WARNED_ON_UNSAFE_COMMIT.load(std::sync::atomic::Ordering::Relaxed) {
765            WARNED_ON_UNSAFE_COMMIT.store(true, std::sync::atomic::Ordering::Relaxed);
766            log::warn!(
767                "Using unsafe commit handler. Concurrent writes may result in data loss. \
768                 Consider providing a commit handler that prevents conflicting writes."
769            );
770        }
771
772        let version_path = naming_scheme.manifest_path(base_path, manifest.version);
773        // Write the manifest naively
774        let res = manifest_writer(object_store, manifest, indices, &version_path).await?;
775
776        Ok(ManifestLocation {
777            version: manifest.version,
778            size: Some(res.size as u64),
779            naming_scheme,
780            path: version_path,
781            e_tag: res.e_tag,
782        })
783    }
784}
785
786impl Debug for UnsafeCommitHandler {
787    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
788        f.debug_struct("UnsafeCommitHandler").finish()
789    }
790}
791
792/// A commit implementation that uses a lock to prevent conflicting writes.
793#[async_trait::async_trait]
794pub trait CommitLock: Debug {
795    type Lease: CommitLease;
796
797    /// Attempt to lock the table for the given version.
798    ///
799    /// If it is already locked by another transaction, wait until it is unlocked.
800    /// Once it is unlocked, return [CommitError::CommitConflict] if the version
801    /// has already been committed. Otherwise, return the lock.
802    ///
803    /// To prevent poisoned locks, it's recommended to set a timeout on the lock
804    /// of at least 30 seconds.
805    ///
806    /// It is not required that the lock tracks the version. It is provided in
807    /// case the locking is handled by a catalog service that needs to know the
808    /// current version of the table.
809    async fn lock(&self, version: u64) -> std::result::Result<Self::Lease, CommitError>;
810}
811
812#[async_trait::async_trait]
813pub trait CommitLease: Send + Sync {
814    /// Return the lease, indicating whether the commit was successful.
815    async fn release(&self, success: bool) -> std::result::Result<(), CommitError>;
816}
817
818#[async_trait::async_trait]
819impl<T: CommitLock + Send + Sync> CommitHandler for T {
820    async fn commit(
821        &self,
822        manifest: &mut Manifest,
823        indices: Option<Vec<Index>>,
824        base_path: &Path,
825        object_store: &ObjectStore,
826        manifest_writer: ManifestWriter,
827        naming_scheme: ManifestNamingScheme,
828    ) -> std::result::Result<ManifestLocation, CommitError> {
829        let path = naming_scheme.manifest_path(base_path, manifest.version);
830        // NOTE: once we have the lease we cannot use ? to return errors, since
831        // we must release the lease before returning.
832        let lease = self.lock(manifest.version).await?;
833
834        // Head the location and make sure it's not already committed
835        match object_store.inner.head(&path).await {
836            Ok(_) => {
837                // The path already exists, so it's already committed
838                // Release the lock
839                lease.release(false).await?;
840
841                return Err(CommitError::CommitConflict);
842            }
843            Err(ObjectStoreError::NotFound { .. }) => {}
844            Err(e) => {
845                // Something else went wrong
846                // Release the lock
847                lease.release(false).await?;
848
849                return Err(CommitError::OtherError(e.into()));
850            }
851        }
852        let res = manifest_writer(object_store, manifest, indices, &path).await;
853
854        // Release the lock
855        lease.release(res.is_ok()).await?;
856
857        let res = res?;
858        Ok(ManifestLocation {
859            version: manifest.version,
860            size: Some(res.size as u64),
861            naming_scheme,
862            path,
863            e_tag: res.e_tag,
864        })
865    }
866}
867
868#[async_trait::async_trait]
869impl<T: CommitLock + Send + Sync> CommitHandler for Arc<T> {
870    async fn commit(
871        &self,
872        manifest: &mut Manifest,
873        indices: Option<Vec<Index>>,
874        base_path: &Path,
875        object_store: &ObjectStore,
876        manifest_writer: ManifestWriter,
877        naming_scheme: ManifestNamingScheme,
878    ) -> std::result::Result<ManifestLocation, CommitError> {
879        self.as_ref()
880            .commit(
881                manifest,
882                indices,
883                base_path,
884                object_store,
885                manifest_writer,
886                naming_scheme,
887            )
888            .await
889    }
890}
891
892/// A commit implementation that uses a temporary path and renames the object.
893///
894/// This only works for object stores that support atomic rename if not exist.
895pub struct RenameCommitHandler;
896
897#[async_trait::async_trait]
898impl CommitHandler for RenameCommitHandler {
899    async fn commit(
900        &self,
901        manifest: &mut Manifest,
902        indices: Option<Vec<Index>>,
903        base_path: &Path,
904        object_store: &ObjectStore,
905        manifest_writer: ManifestWriter,
906        naming_scheme: ManifestNamingScheme,
907    ) -> std::result::Result<ManifestLocation, CommitError> {
908        // Create a temporary object, then use `rename_if_not_exists` to commit.
909        // If failed, clean up the temporary object.
910
911        let path = naming_scheme.manifest_path(base_path, manifest.version);
912        let tmp_path = make_staging_manifest_path(&path)?;
913
914        // Write the manifest to the temporary path
915        let res = manifest_writer(object_store, manifest, indices, &tmp_path).await?;
916
917        match object_store
918            .inner
919            .rename_if_not_exists(&tmp_path, &path)
920            .await
921        {
922            Ok(_) => {
923                // Successfully committed
924                Ok(ManifestLocation {
925                    version: manifest.version,
926                    path,
927                    size: Some(res.size as u64),
928                    naming_scheme,
929                    e_tag: None, // Re-name can change e-tag.
930                })
931            }
932            Err(ObjectStoreError::AlreadyExists { .. }) => {
933                // Another transaction has already been committed
934                // Attempt to clean up temporary object, but ignore errors if we can't
935                let _ = object_store.delete(&tmp_path).await;
936
937                return Err(CommitError::CommitConflict);
938            }
939            Err(e) => {
940                // Something else went wrong
941                return Err(CommitError::OtherError(e.into()));
942            }
943        }
944    }
945}
946
947impl Debug for RenameCommitHandler {
948    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
949        f.debug_struct("RenameCommitHandler").finish()
950    }
951}
952
953pub struct ConditionalPutCommitHandler;
954
955#[async_trait::async_trait]
956impl CommitHandler for ConditionalPutCommitHandler {
957    async fn commit(
958        &self,
959        manifest: &mut Manifest,
960        indices: Option<Vec<Index>>,
961        base_path: &Path,
962        object_store: &ObjectStore,
963        manifest_writer: ManifestWriter,
964        naming_scheme: ManifestNamingScheme,
965    ) -> std::result::Result<ManifestLocation, CommitError> {
966        let path = naming_scheme.manifest_path(base_path, manifest.version);
967
968        let memory_store = ObjectStore::memory();
969        let dummy_path = "dummy";
970        manifest_writer(&memory_store, manifest, indices, &dummy_path.into()).await?;
971        let dummy_data = memory_store.read_one_all(&dummy_path.into()).await?;
972        let size = dummy_data.len() as u64;
973        let res = object_store
974            .inner
975            .put_opts(
976                &path,
977                dummy_data.into(),
978                PutOptions {
979                    mode: object_store::PutMode::Create,
980                    ..Default::default()
981                },
982            )
983            .await
984            .map_err(|err| match err {
985                ObjectStoreError::AlreadyExists { .. } | ObjectStoreError::Precondition { .. } => {
986                    CommitError::CommitConflict
987                }
988                _ => CommitError::OtherError(err.into()),
989            })?;
990
991        Ok(ManifestLocation {
992            version: manifest.version,
993            path,
994            size: Some(size),
995            naming_scheme,
996            e_tag: res.e_tag,
997        })
998    }
999}
1000
1001impl Debug for ConditionalPutCommitHandler {
1002    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1003        f.debug_struct("ConditionalPutCommitHandler").finish()
1004    }
1005}
1006
1007#[derive(Debug, Clone)]
1008pub struct CommitConfig {
1009    pub num_retries: u32,
1010    // TODO: add isolation_level
1011}
1012
1013impl Default for CommitConfig {
1014    fn default() -> Self {
1015        Self { num_retries: 20 }
1016    }
1017}
1018
1019#[cfg(test)]
1020mod tests {
1021    use super::*;
1022
1023    #[test]
1024    fn test_manifest_naming_scheme() {
1025        let v1 = ManifestNamingScheme::V1;
1026        let v2 = ManifestNamingScheme::V2;
1027
1028        assert_eq!(
1029            v1.manifest_path(&Path::from("base"), 0),
1030            Path::from("base/_versions/0.manifest")
1031        );
1032        assert_eq!(
1033            v1.manifest_path(&Path::from("base"), 42),
1034            Path::from("base/_versions/42.manifest")
1035        );
1036
1037        assert_eq!(
1038            v2.manifest_path(&Path::from("base"), 0),
1039            Path::from("base/_versions/18446744073709551615.manifest")
1040        );
1041        assert_eq!(
1042            v2.manifest_path(&Path::from("base"), 42),
1043            Path::from("base/_versions/18446744073709551573.manifest")
1044        );
1045
1046        assert_eq!(v1.parse_version("0.manifest"), Some(0));
1047        assert_eq!(v1.parse_version("42.manifest"), Some(42));
1048        assert_eq!(
1049            v1.parse_version("42.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
1050            Some(42)
1051        );
1052
1053        assert_eq!(v2.parse_version("18446744073709551615.manifest"), Some(0));
1054        assert_eq!(v2.parse_version("18446744073709551573.manifest"), Some(42));
1055        assert_eq!(
1056            v2.parse_version("18446744073709551573.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
1057            Some(42)
1058        );
1059
1060        assert_eq!(ManifestNamingScheme::detect_scheme("0.manifest"), Some(v1));
1061        assert_eq!(
1062            ManifestNamingScheme::detect_scheme("18446744073709551615.manifest"),
1063            Some(v2)
1064        );
1065        assert_eq!(ManifestNamingScheme::detect_scheme("something else"), None);
1066    }
1067
1068    #[tokio::test]
1069    async fn test_manifest_naming_migration() {
1070        let object_store = ObjectStore::memory();
1071        let base = Path::from("base");
1072        let versions_dir = base.child(VERSIONS_DIR);
1073
1074        // Write two v1 files and one v1
1075        let original_files = vec![
1076            versions_dir.child("irrelevant"),
1077            ManifestNamingScheme::V1.manifest_path(&base, 0),
1078            ManifestNamingScheme::V2.manifest_path(&base, 1),
1079        ];
1080        for path in original_files {
1081            object_store.put(&path, b"".as_slice()).await.unwrap();
1082        }
1083
1084        migrate_scheme_to_v2(&object_store, &base).await.unwrap();
1085
1086        let expected_files = vec![
1087            ManifestNamingScheme::V2.manifest_path(&base, 1),
1088            ManifestNamingScheme::V2.manifest_path(&base, 0),
1089            versions_dir.child("irrelevant"),
1090        ];
1091        let actual_files = object_store
1092            .inner
1093            .list(Some(&versions_dir))
1094            .map_ok(|res| res.location)
1095            .try_collect::<Vec<_>>()
1096            .await
1097            .unwrap();
1098        assert_eq!(actual_files, expected_files);
1099    }
1100}