lance_table/io/
commit.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Trait for commit implementations.
5//!
6//! In Lance, a transaction is committed by writing the next manifest file.
7//! However, care should be taken to ensure that the manifest file is written
8//! only once, even if there are concurrent writers. Different stores have
9//! different abilities to handle concurrent writes, so a trait is provided
10//! to allow for different implementations.
11//!
12//! The trait [CommitHandler] can be implemented to provide different commit
13//! strategies. The default implementation for most object stores is
14//! [RenameCommitHandler], which writes the manifest to a temporary path, then
15//! renames the temporary path to the final path if no object already exists
16//! at the final path. This is an atomic operation in most object stores, but
17//! not in AWS S3. So for AWS S3, the default commit handler is
18//! [UnsafeCommitHandler], which writes the manifest to the final path without
19//! any checks.
20//!
21//! When providing your own commit handler, most often you are implementing in
22//! terms of a lock. The trait [CommitLock] can be implemented as a simpler
23//! alternative to [CommitHandler].
24
25use std::io;
26use std::pin::Pin;
27use std::sync::atomic::AtomicBool;
28use std::sync::Arc;
29use std::{fmt::Debug, fs::DirEntry};
30
31use super::manifest::write_manifest;
32use futures::future::Either;
33use futures::Stream;
34use futures::{
35    future::{self, BoxFuture},
36    stream::BoxStream,
37    StreamExt, TryStreamExt,
38};
39use lance_file::format::{MAGIC, MAJOR_VERSION, MINOR_VERSION};
40use lance_io::object_writer::{ObjectWriter, WriteResult};
41use log::warn;
42use object_store::PutOptions;
43use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore};
44use snafu::location;
45use tracing::info;
46use url::Url;
47
48#[cfg(feature = "dynamodb")]
49pub mod dynamodb;
50pub mod external_manifest;
51
52use lance_core::{Error, Result};
53use lance_io::object_store::{ObjectStore, ObjectStoreExt, ObjectStoreParams};
54use lance_io::traits::WriteExt;
55
56use crate::format::{is_detached_version, IndexMetadata, Manifest, Transaction};
57use lance_core::utils::tracing::{AUDIT_MODE_CREATE, AUDIT_TYPE_MANIFEST, TRACE_FILE_AUDIT};
58#[cfg(feature = "dynamodb")]
59use {
60    self::external_manifest::{ExternalManifestCommitHandler, ExternalManifestStore},
61    aws_credential_types::provider::error::CredentialsError,
62    aws_credential_types::provider::ProvideCredentials,
63    lance_io::object_store::{providers::aws::build_aws_credential, StorageOptions},
64    object_store::aws::AmazonS3ConfigKey,
65    object_store::aws::AwsCredentialProvider,
66    std::borrow::Cow,
67    std::time::{Duration, SystemTime},
68};
69
70const VERSIONS_DIR: &str = "_versions";
71const MANIFEST_EXTENSION: &str = "manifest";
72const DETACHED_VERSION_PREFIX: &str = "d";
73
74/// How manifest files should be named.
75#[derive(Clone, Copy, Debug, PartialEq, Eq)]
76pub enum ManifestNamingScheme {
77    /// `_versions/{version}.manifest`
78    V1,
79    /// `_manifests/{u64::MAX - version}.manifest`
80    ///
81    /// Zero-padded and reversed for O(1) lookup of latest version on object stores.
82    V2,
83}
84
85impl ManifestNamingScheme {
86    pub fn manifest_path(&self, base: &Path, version: u64) -> Path {
87        let directory = base.child(VERSIONS_DIR);
88        if is_detached_version(version) {
89            // Detached versions should never show up first in a list operation which
90            // means it needs to come lexicographically after all attached manifest
91            // files and so we add the prefix `d`.  There is no need to invert the
92            // version number since detached versions are not part of the version
93            let directory = base.child(VERSIONS_DIR);
94            directory.child(format!(
95                "{DETACHED_VERSION_PREFIX}{version}.{MANIFEST_EXTENSION}"
96            ))
97        } else {
98            match self {
99                Self::V1 => directory.child(format!("{version}.{MANIFEST_EXTENSION}")),
100                Self::V2 => {
101                    let inverted_version = u64::MAX - version;
102                    directory.child(format!("{inverted_version:020}.{MANIFEST_EXTENSION}"))
103                }
104            }
105        }
106    }
107
108    pub fn parse_version(&self, filename: &str) -> Option<u64> {
109        let file_number = filename
110            .split_once('.')
111            // Detached versions will fail the `parse` step, which is ok.
112            .and_then(|(version_str, _)| version_str.parse::<u64>().ok());
113        match self {
114            Self::V1 => file_number,
115            Self::V2 => file_number.map(|v| u64::MAX - v),
116        }
117    }
118
119    pub fn detect_scheme(filename: &str) -> Option<Self> {
120        if filename.starts_with(DETACHED_VERSION_PREFIX) {
121            // Currently, detached versions must imply V2
122            return Some(Self::V2);
123        }
124        if filename.ends_with(MANIFEST_EXTENSION) {
125            const V2_LEN: usize = 20 + 1 + MANIFEST_EXTENSION.len();
126            if filename.len() == V2_LEN {
127                Some(Self::V2)
128            } else {
129                Some(Self::V1)
130            }
131        } else {
132            None
133        }
134    }
135
136    pub fn detect_scheme_staging(filename: &str) -> Self {
137        // We shouldn't have to worry about detached versions here since there is no
138        // such thing as "detached" and "staged" at the same time.
139        if filename.chars().nth(20) == Some('.') {
140            Self::V2
141        } else {
142            Self::V1
143        }
144    }
145}
146
147/// Migrate all V1 manifests to V2 naming scheme.
148///
149/// This function will rename all V1 manifests to V2 naming scheme.
150///
151/// This function is idempotent, and can be run multiple times without
152/// changing the state of the object store.
153///
154/// However, it should not be run while other concurrent operations are happening.
155/// And it should also run until completion before resuming other operations.
156pub async fn migrate_scheme_to_v2(object_store: &ObjectStore, dataset_base: &Path) -> Result<()> {
157    object_store
158        .inner
159        .list(Some(&dataset_base.child(VERSIONS_DIR)))
160        .try_filter(|res| {
161            let res = if let Some(filename) = res.location.filename() {
162                ManifestNamingScheme::detect_scheme(filename) == Some(ManifestNamingScheme::V1)
163            } else {
164                false
165            };
166            future::ready(res)
167        })
168        .try_for_each_concurrent(object_store.io_parallelism(), |meta| async move {
169            let filename = meta.location.filename().unwrap();
170            let version = ManifestNamingScheme::V1.parse_version(filename).unwrap();
171            let path = ManifestNamingScheme::V2.manifest_path(dataset_base, version);
172            object_store.inner.rename(&meta.location, &path).await?;
173            Ok(())
174        })
175        .await?;
176
177    Ok(())
178}
179
180/// Function that writes the manifest to the object store.
181///
182/// Returns the size of the written manifest.
183pub type ManifestWriter = for<'a> fn(
184    object_store: &'a ObjectStore,
185    manifest: &'a mut Manifest,
186    indices: Option<Vec<IndexMetadata>>,
187    path: &'a Path,
188    transaction: Option<Transaction>,
189) -> BoxFuture<'a, Result<WriteResult>>;
190
191/// Canonical manifest writer; its function item type exactly matches `ManifestWriter`.
192/// Rationale: keep a crate-local writer implementation so call sites can pass this function
193/// directly without non-primitive casts or lifetime coercions.
194pub fn write_manifest_file_to_path<'a>(
195    object_store: &'a ObjectStore,
196    manifest: &'a mut Manifest,
197    indices: Option<Vec<IndexMetadata>>,
198    path: &'a Path,
199    transaction: Option<Transaction>,
200) -> BoxFuture<'a, Result<WriteResult>> {
201    Box::pin(async move {
202        let mut object_writer = ObjectWriter::new(object_store, path).await?;
203        let pos = write_manifest(&mut object_writer, manifest, indices, transaction).await?;
204        object_writer
205            .write_magics(pos, MAJOR_VERSION, MINOR_VERSION, MAGIC)
206            .await?;
207        let res = object_writer.shutdown().await?;
208        info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_MANIFEST, path = path.to_string());
209        Ok(res)
210    })
211}
212
213#[derive(Debug, Clone)]
214pub struct ManifestLocation {
215    /// The version the manifest corresponds to.
216    pub version: u64,
217    /// Path of the manifest file, relative to the table root.
218    pub path: Path,
219    /// Size, in bytes, of the manifest file. If it is not known, this field should be `None`.
220    pub size: Option<u64>,
221    /// Naming scheme of the manifest file.
222    pub naming_scheme: ManifestNamingScheme,
223    /// Optional e-tag, used for integrity checks. Manifests should be immutable, so
224    /// if we detect a change in the e-tag, it means the manifest was tampered with.
225    /// This might happen if the dataset was deleted and then re-created.
226    pub e_tag: Option<String>,
227}
228
229impl TryFrom<object_store::ObjectMeta> for ManifestLocation {
230    type Error = Error;
231
232    fn try_from(meta: object_store::ObjectMeta) -> Result<Self> {
233        let filename = meta.location.filename().ok_or_else(|| Error::Internal {
234            message: "ObjectMeta location does not have a filename".to_string(),
235            location: location!(),
236        })?;
237        let scheme =
238            ManifestNamingScheme::detect_scheme(filename).ok_or_else(|| Error::Internal {
239                message: format!("Invalid manifest filename: '{}'", filename),
240                location: location!(),
241            })?;
242        let version = scheme
243            .parse_version(filename)
244            .ok_or_else(|| Error::Internal {
245                message: format!("Invalid manifest filename: '{}'", filename),
246                location: location!(),
247            })?;
248        Ok(Self {
249            version,
250            path: meta.location,
251            size: Some(meta.size),
252            naming_scheme: scheme,
253            e_tag: meta.e_tag,
254        })
255    }
256}
257
258/// Get the latest manifest path
259async fn current_manifest_path(
260    object_store: &ObjectStore,
261    base: &Path,
262) -> Result<ManifestLocation> {
263    if object_store.is_local() {
264        if let Ok(Some(location)) = current_manifest_local(base) {
265            return Ok(location);
266        }
267    }
268
269    let manifest_files = object_store.list(Some(base.child(VERSIONS_DIR)));
270
271    let mut valid_manifests = manifest_files.try_filter_map(|res| {
272        if let Some(scheme) = ManifestNamingScheme::detect_scheme(res.location.filename().unwrap())
273        {
274            future::ready(Ok(Some((scheme, res))))
275        } else {
276            future::ready(Ok(None))
277        }
278    });
279
280    let first = valid_manifests.next().await.transpose()?;
281    match (first, object_store.list_is_lexically_ordered) {
282        // If the first valid manifest we see is V2, we can assume that we are using
283        // V2 naming scheme for all manifests.
284        (Some((scheme @ ManifestNamingScheme::V2, meta)), true) => {
285            let version = scheme
286                .parse_version(meta.location.filename().unwrap())
287                .unwrap();
288
289            // Sanity check: verify at least for the first 1k files that they are all V2
290            // and that the version numbers are decreasing. We use the first 1k because
291            // this is the typical size of an object store list endpoint response page.
292            for (scheme, meta) in valid_manifests.take(999).try_collect::<Vec<_>>().await? {
293                if scheme != ManifestNamingScheme::V2 {
294                    warn!(
295                        "Found V1 Manifest in a V2 directory. Use `migrate_manifest_paths_v2` \
296                         to migrate the directory."
297                    );
298                    break;
299                }
300                let next_version = scheme
301                    .parse_version(meta.location.filename().unwrap())
302                    .unwrap();
303                if next_version >= version {
304                    warn!(
305                        "List operation was expected to be lexically ordered, but was not. This \
306                         could mean a corrupt read. Please make a bug report on the lance-format/lance \
307                         GitHub repository."
308                    );
309                    break;
310                }
311            }
312
313            Ok(ManifestLocation {
314                version,
315                path: meta.location,
316                size: Some(meta.size),
317                naming_scheme: scheme,
318                e_tag: meta.e_tag,
319            })
320        }
321        // If the first valid manifest we see if V1, assume for now that we are
322        // using V1 naming scheme for all manifests. Since we are listing the
323        // directory anyways, we will assert there aren't any V2 manifests.
324        (Some((scheme, meta)), _) => {
325            let mut current_version = scheme
326                .parse_version(meta.location.filename().unwrap())
327                .unwrap();
328            let mut current_meta = meta;
329
330            while let Some((scheme, meta)) = valid_manifests.next().await.transpose()? {
331                if matches!(scheme, ManifestNamingScheme::V2) {
332                    return Err(Error::Internal {
333                        message: "Found V2 manifest in a V1 manifest directory".to_string(),
334                        location: location!(),
335                    });
336                }
337                let version = scheme
338                    .parse_version(meta.location.filename().unwrap())
339                    .unwrap();
340                if version > current_version {
341                    current_version = version;
342                    current_meta = meta;
343                }
344            }
345            Ok(ManifestLocation {
346                version: current_version,
347                path: current_meta.location,
348                size: Some(current_meta.size),
349                naming_scheme: scheme,
350                e_tag: current_meta.e_tag,
351            })
352        }
353        (None, _) => Err(Error::NotFound {
354            uri: base.child(VERSIONS_DIR).to_string(),
355            location: location!(),
356        }),
357    }
358}
359
360// This is an optimized function that searches for the latest manifest. In
361// object_store, list operations lookup metadata for each file listed. This
362// method only gets the metadata for the found latest manifest.
363fn current_manifest_local(base: &Path) -> std::io::Result<Option<ManifestLocation>> {
364    let path = lance_io::local::to_local_path(&base.child(VERSIONS_DIR));
365    let entries = std::fs::read_dir(path)?;
366
367    let mut latest_entry: Option<(u64, DirEntry)> = None;
368
369    let mut scheme: Option<ManifestNamingScheme> = None;
370
371    for entry in entries {
372        let entry = entry?;
373        let filename_raw = entry.file_name();
374        let filename = filename_raw.to_string_lossy();
375
376        let Some(entry_scheme) = ManifestNamingScheme::detect_scheme(&filename) else {
377            // Need to ignore temporary files, such as
378            // .tmp_7.manifest_9c100374-3298-4537-afc6-f5ee7913666d
379            continue;
380        };
381
382        if let Some(scheme) = scheme {
383            if scheme != entry_scheme {
384                return Err(io::Error::new(
385                    io::ErrorKind::InvalidData,
386                    format!(
387                        "Found multiple manifest naming schemes in the same directory: {:?} and {:?}",
388                        scheme, entry_scheme
389                    ),
390                ));
391            }
392        } else {
393            scheme = Some(entry_scheme);
394        }
395
396        let Some(version) = entry_scheme.parse_version(&filename) else {
397            continue;
398        };
399
400        if let Some((latest_version, _)) = &latest_entry {
401            if version > *latest_version {
402                latest_entry = Some((version, entry));
403            }
404        } else {
405            latest_entry = Some((version, entry));
406        }
407    }
408
409    if let Some((version, entry)) = latest_entry {
410        let path = Path::from_filesystem_path(entry.path())
411            .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?;
412        let metadata = entry.metadata()?;
413        Ok(Some(ManifestLocation {
414            version,
415            path,
416            size: Some(metadata.len()),
417            naming_scheme: scheme.unwrap(),
418            e_tag: Some(get_etag(&metadata)),
419        }))
420    } else {
421        Ok(None)
422    }
423}
424
425// Based on object store's implementation.
426fn get_etag(metadata: &std::fs::Metadata) -> String {
427    let inode = get_inode(metadata);
428    let size = metadata.len();
429    let mtime = metadata
430        .modified()
431        .ok()
432        .and_then(|mtime| mtime.duration_since(std::time::SystemTime::UNIX_EPOCH).ok())
433        .unwrap_or_default()
434        .as_micros();
435
436    // Use an ETag scheme based on that used by many popular HTTP servers
437    // <https://httpd.apache.org/docs/2.2/mod/core.html#fileetag>
438    // <https://stackoverflow.com/questions/47512043/how-etags-are-generated-and-configured>
439    format!("{inode:x}-{mtime:x}-{size:x}")
440}
441
442#[cfg(unix)]
443/// We include the inode when available to yield an ETag more resistant to collisions
444/// and as used by popular web servers such as [Apache](https://httpd.apache.org/docs/2.2/mod/core.html#fileetag)
445fn get_inode(metadata: &std::fs::Metadata) -> u64 {
446    std::os::unix::fs::MetadataExt::ino(metadata)
447}
448
449#[cfg(not(unix))]
450/// On platforms where an inode isn't available, fallback to just relying on size and mtime
451fn get_inode(_metadata: &std::fs::Metadata) -> u64 {
452    0
453}
454
455fn list_manifests<'a>(
456    base_path: &Path,
457    object_store: &'a dyn OSObjectStore,
458) -> impl Stream<Item = Result<ManifestLocation>> + 'a {
459    object_store
460        .read_dir_all(&base_path.child(VERSIONS_DIR), None)
461        .filter_map(|obj_meta| {
462            futures::future::ready(
463                obj_meta
464                    .map(|m| ManifestLocation::try_from(m).ok())
465                    .transpose(),
466            )
467        })
468        .boxed()
469}
470
471fn make_staging_manifest_path(base: &Path) -> Result<Path> {
472    let id = uuid::Uuid::new_v4().to_string();
473    Path::parse(format!("{base}-{id}")).map_err(|e| Error::IO {
474        source: Box::new(e),
475        location: location!(),
476    })
477}
478
479#[cfg(feature = "dynamodb")]
480const DDB_URL_QUERY_KEY: &str = "ddbTableName";
481
482/// Handle commits that prevent conflicting writes.
483///
484/// Commit implementations ensure that if there are multiple concurrent writers
485/// attempting to write the next version of a table, only one will win. In order
486/// to work, all writers must use the same commit handler type.
487/// This trait is also responsible for resolving where the manifests live.
488///
489// TODO: pub(crate)
490#[async_trait::async_trait]
491#[allow(clippy::too_many_arguments)]
492pub trait CommitHandler: Debug + Send + Sync {
493    async fn resolve_latest_location(
494        &self,
495        base_path: &Path,
496        object_store: &ObjectStore,
497    ) -> Result<ManifestLocation> {
498        Ok(current_manifest_path(object_store, base_path).await?)
499    }
500
501    async fn resolve_version_location(
502        &self,
503        base_path: &Path,
504        version: u64,
505        object_store: &dyn OSObjectStore,
506    ) -> Result<ManifestLocation> {
507        default_resolve_version(base_path, version, object_store).await
508    }
509
510    /// If `sorted_descending` is `true`, the stream will yield manifests in descending
511    /// order of version. When the object store has a lexicographically
512    /// ordered list and the naming scheme is V2, this will use an optimized
513    /// list operation. Otherwise, it will list all manifests and sort them
514    /// in memory. When `sorted_descending` is `false`, the stream will yield manifests
515    /// in arbitrary order.
516    fn list_manifest_locations<'a>(
517        &self,
518        base_path: &Path,
519        object_store: &'a ObjectStore,
520        sorted_descending: bool,
521    ) -> BoxStream<'a, Result<ManifestLocation>> {
522        let underlying_stream = list_manifests(base_path, &object_store.inner);
523
524        if !sorted_descending {
525            return underlying_stream.boxed();
526        }
527
528        async fn sort_stream(
529            input_stream: impl futures::Stream<Item = Result<ManifestLocation>> + Unpin,
530        ) -> Result<impl Stream<Item = Result<ManifestLocation>> + Unpin> {
531            let mut locations = input_stream.try_collect::<Vec<_>>().await?;
532            locations.sort_by_key(|m| std::cmp::Reverse(m.version));
533            Ok(futures::stream::iter(locations.into_iter().map(Ok)))
534        }
535
536        // If the object store supports lexicographically ordered lists and
537        // the naming scheme is V2, we can use an optimized list operation.
538        if object_store.list_is_lexically_ordered {
539            // We don't know the naming scheme until we see the first manifest.
540            let mut peekable = underlying_stream.peekable();
541
542            futures::stream::once(async move {
543                let naming_scheme = match Pin::new(&mut peekable).peek().await {
544                    Some(Ok(m)) => m.naming_scheme,
545                    // If we get an error or no manifests are found, we default
546                    // to V2 naming scheme, since it doesn't matter.
547                    Some(Err(_)) => ManifestNamingScheme::V2,
548                    None => ManifestNamingScheme::V2,
549                };
550
551                if naming_scheme == ManifestNamingScheme::V2 {
552                    // If the first manifest is V2, we can use the optimized list operation.
553                    Ok(Either::Left(peekable))
554                } else {
555                    sort_stream(peekable).await.map(Either::Right)
556                }
557            })
558            .try_flatten()
559            .boxed()
560        } else {
561            // If the object store does not support lexicographically ordered lists,
562            // we need to sort the manifests in memory. Systems where this isn't
563            // supported (local fs, S3 express) are typically fast enough
564            // that this is not a problem.
565            futures::stream::once(sort_stream(underlying_stream))
566                .try_flatten()
567                .boxed()
568        }
569    }
570
571    /// Commit a manifest.
572    ///
573    /// This function should return an [CommitError::CommitConflict] if another
574    /// transaction has already been committed to the path.
575    async fn commit(
576        &self,
577        manifest: &mut Manifest,
578        indices: Option<Vec<IndexMetadata>>,
579        base_path: &Path,
580        object_store: &ObjectStore,
581        manifest_writer: ManifestWriter,
582        naming_scheme: ManifestNamingScheme,
583        transaction: Option<Transaction>,
584    ) -> std::result::Result<ManifestLocation, CommitError>;
585
586    /// Delete the recorded manifest information for a dataset at the base_path
587    async fn delete(&self, _base_path: &Path) -> Result<()> {
588        Ok(())
589    }
590}
591
592async fn default_resolve_version(
593    base_path: &Path,
594    version: u64,
595    object_store: &dyn OSObjectStore,
596) -> Result<ManifestLocation> {
597    if is_detached_version(version) {
598        return Ok(ManifestLocation {
599            version,
600            // Detached versions are not supported with V1 naming scheme.  If we need
601            // to support in the future we could use a different prefix (e.g. 'x' or something)
602            naming_scheme: ManifestNamingScheme::V2,
603            // Both V1 and V2 should give the same path for detached versions
604            path: ManifestNamingScheme::V2.manifest_path(base_path, version),
605            size: None,
606            e_tag: None,
607        });
608    }
609
610    // try V2, fallback to V1.
611    let scheme = ManifestNamingScheme::V2;
612    let path = scheme.manifest_path(base_path, version);
613    match object_store.head(&path).await {
614        Ok(meta) => Ok(ManifestLocation {
615            version,
616            path,
617            size: Some(meta.size),
618            naming_scheme: scheme,
619            e_tag: meta.e_tag,
620        }),
621        Err(ObjectStoreError::NotFound { .. }) => {
622            // fallback to V1
623            let scheme = ManifestNamingScheme::V1;
624            Ok(ManifestLocation {
625                version,
626                path: scheme.manifest_path(base_path, version),
627                size: None,
628                naming_scheme: scheme,
629                e_tag: None,
630            })
631        }
632        Err(e) => Err(e.into()),
633    }
634}
635/// Adapt an object_store credentials into AWS SDK creds
636#[cfg(feature = "dynamodb")]
637#[derive(Debug)]
638struct OSObjectStoreToAwsCredAdaptor(AwsCredentialProvider);
639
640#[cfg(feature = "dynamodb")]
641impl ProvideCredentials for OSObjectStoreToAwsCredAdaptor {
642    fn provide_credentials<'a>(
643        &'a self,
644    ) -> aws_credential_types::provider::future::ProvideCredentials<'a>
645    where
646        Self: 'a,
647    {
648        aws_credential_types::provider::future::ProvideCredentials::new(async {
649            let creds = self
650                .0
651                .get_credential()
652                .await
653                .map_err(|e| CredentialsError::provider_error(Box::new(e)))?;
654            Ok(aws_credential_types::Credentials::new(
655                &creds.key_id,
656                &creds.secret_key,
657                creds.token.clone(),
658                Some(
659                    SystemTime::now()
660                        .checked_add(Duration::from_secs(
661                            60 * 10, //  10 min
662                        ))
663                        .expect("overflow"),
664                ),
665                "",
666            ))
667        })
668    }
669}
670
671#[cfg(feature = "dynamodb")]
672async fn build_dynamodb_external_store(
673    table_name: &str,
674    creds: AwsCredentialProvider,
675    region: &str,
676    endpoint: Option<String>,
677    app_name: &str,
678) -> Result<Arc<dyn ExternalManifestStore>> {
679    use super::commit::dynamodb::DynamoDBExternalManifestStore;
680    use aws_sdk_dynamodb::{
681        config::{retry::RetryConfig, IdentityCache, Region},
682        Client,
683    };
684
685    let mut dynamodb_config = aws_sdk_dynamodb::config::Builder::new()
686        .behavior_version_latest()
687        .region(Some(Region::new(region.to_string())))
688        .credentials_provider(OSObjectStoreToAwsCredAdaptor(creds))
689        // caching should be handled by passed AwsCredentialProvider
690        .identity_cache(IdentityCache::no_cache())
691        // Be more resilient to transient network issues.
692        // 5 attempts = 1 initial + 4 retries with exponential backoff.
693        .retry_config(RetryConfig::standard().with_max_attempts(5));
694
695    if let Some(endpoint) = endpoint {
696        dynamodb_config = dynamodb_config.endpoint_url(endpoint);
697    }
698    let client = Client::from_conf(dynamodb_config.build());
699
700    DynamoDBExternalManifestStore::new_external_store(client.into(), table_name, app_name).await
701}
702
703pub async fn commit_handler_from_url(
704    url_or_path: &str,
705    // This looks unused if dynamodb feature disabled
706    #[allow(unused_variables)] options: &Option<ObjectStoreParams>,
707) -> Result<Arc<dyn CommitHandler>> {
708    let local_handler: Arc<dyn CommitHandler> = if cfg!(windows) {
709        Arc::new(RenameCommitHandler)
710    } else {
711        Arc::new(ConditionalPutCommitHandler)
712    };
713
714    let url = match Url::parse(url_or_path) {
715        Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
716            // On Windows, the drive is parsed as a scheme
717            return Ok(local_handler);
718        }
719        Ok(url) => url,
720        Err(_) => {
721            return Ok(local_handler);
722        }
723    };
724
725    match url.scheme() {
726        "file" | "file-object-store" => Ok(local_handler),
727        "s3" | "gs" | "az" | "memory" | "oss" => Ok(Arc::new(ConditionalPutCommitHandler)),
728        #[cfg(not(feature = "dynamodb"))]
729        "s3+ddb" => Err(Error::InvalidInput {
730            source: "`s3+ddb://` scheme requires `dynamodb` feature to be enabled".into(),
731            location: location!(),
732        }),
733        #[cfg(feature = "dynamodb")]
734        "s3+ddb" => {
735            if url.query_pairs().count() != 1 {
736                return Err(Error::InvalidInput {
737                    source: "`s3+ddb://` scheme and expects exactly one query `ddbTableName`"
738                        .into(),
739                    location: location!(),
740                });
741            }
742            let table_name = match url.query_pairs().next() {
743                Some((Cow::Borrowed(key), Cow::Borrowed(table_name)))
744                    if key == DDB_URL_QUERY_KEY =>
745                {
746                    if table_name.is_empty() {
747                        return Err(Error::InvalidInput {
748                            source: "`s3+ddb://` scheme requires non empty dynamodb table name"
749                                .into(),
750                            location: location!(),
751                        });
752                    }
753                    table_name
754                }
755                _ => {
756                    return Err(Error::InvalidInput {
757                        source: "`s3+ddb://` scheme and expects exactly one query `ddbTableName`"
758                            .into(),
759                        location: location!(),
760                    });
761                }
762            };
763            let options = options.clone().unwrap_or_default();
764            let storage_options = StorageOptions(options.storage_options.unwrap_or_default());
765            let dynamo_endpoint = get_dynamodb_endpoint(&storage_options);
766            let expires_at_millis = storage_options.expires_at_millis();
767            let storage_options = storage_options.as_s3_options();
768
769            let region = storage_options.get(&AmazonS3ConfigKey::Region).cloned();
770
771            let (aws_creds, region) = build_aws_credential(
772                options.s3_credentials_refresh_offset,
773                options.aws_credentials.clone(),
774                Some(&storage_options),
775                region,
776                options.storage_options_provider.clone(),
777                expires_at_millis,
778            )
779            .await?;
780
781            Ok(Arc::new(ExternalManifestCommitHandler {
782                external_manifest_store: build_dynamodb_external_store(
783                    table_name,
784                    aws_creds.clone(),
785                    &region,
786                    dynamo_endpoint,
787                    "lancedb",
788                )
789                .await?,
790            }))
791        }
792        _ => Ok(Arc::new(UnsafeCommitHandler)),
793    }
794}
795
796#[cfg(feature = "dynamodb")]
797fn get_dynamodb_endpoint(storage_options: &StorageOptions) -> Option<String> {
798    if let Some(endpoint) = storage_options.0.get("dynamodb_endpoint") {
799        Some(endpoint.clone())
800    } else {
801        std::env::var("DYNAMODB_ENDPOINT").ok()
802    }
803}
804
805/// Errors that can occur when committing a manifest.
806#[derive(Debug)]
807pub enum CommitError {
808    /// Another transaction has already been written to the path
809    CommitConflict,
810    /// Something else went wrong
811    OtherError(Error),
812}
813
814impl From<Error> for CommitError {
815    fn from(e: Error) -> Self {
816        Self::OtherError(e)
817    }
818}
819
820impl From<CommitError> for Error {
821    fn from(e: CommitError) -> Self {
822        match e {
823            CommitError::CommitConflict => Self::Internal {
824                message: "Commit conflict".to_string(),
825                location: location!(),
826            },
827            CommitError::OtherError(e) => e,
828        }
829    }
830}
831
832/// Whether we have issued a warning about using the unsafe commit handler.
833static WARNED_ON_UNSAFE_COMMIT: AtomicBool = AtomicBool::new(false);
834
835/// A naive commit implementation that does not prevent conflicting writes.
836///
837/// This will log a warning the first time it is used.
838pub struct UnsafeCommitHandler;
839
840#[async_trait::async_trait]
841#[allow(clippy::too_many_arguments)]
842impl CommitHandler for UnsafeCommitHandler {
843    async fn commit(
844        &self,
845        manifest: &mut Manifest,
846        indices: Option<Vec<IndexMetadata>>,
847        base_path: &Path,
848        object_store: &ObjectStore,
849        manifest_writer: ManifestWriter,
850        naming_scheme: ManifestNamingScheme,
851        transaction: Option<Transaction>,
852    ) -> std::result::Result<ManifestLocation, CommitError> {
853        // Log a one-time warning
854        if !WARNED_ON_UNSAFE_COMMIT.load(std::sync::atomic::Ordering::Relaxed) {
855            WARNED_ON_UNSAFE_COMMIT.store(true, std::sync::atomic::Ordering::Relaxed);
856            log::warn!(
857                "Using unsafe commit handler. Concurrent writes may result in data loss. \
858                 Consider providing a commit handler that prevents conflicting writes."
859            );
860        }
861
862        let version_path = naming_scheme.manifest_path(base_path, manifest.version);
863        let res =
864            manifest_writer(object_store, manifest, indices, &version_path, transaction).await?;
865
866        Ok(ManifestLocation {
867            version: manifest.version,
868            size: Some(res.size as u64),
869            naming_scheme,
870            path: version_path,
871            e_tag: res.e_tag,
872        })
873    }
874}
875
876impl Debug for UnsafeCommitHandler {
877    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
878        f.debug_struct("UnsafeCommitHandler").finish()
879    }
880}
881
882/// A commit implementation that uses a lock to prevent conflicting writes.
883#[async_trait::async_trait]
884pub trait CommitLock: Debug {
885    type Lease: CommitLease;
886
887    /// Attempt to lock the table for the given version.
888    ///
889    /// If it is already locked by another transaction, wait until it is unlocked.
890    /// Once it is unlocked, return [CommitError::CommitConflict] if the version
891    /// has already been committed. Otherwise, return the lock.
892    ///
893    /// To prevent poisoned locks, it's recommended to set a timeout on the lock
894    /// of at least 30 seconds.
895    ///
896    /// It is not required that the lock tracks the version. It is provided in
897    /// case the locking is handled by a catalog service that needs to know the
898    /// current version of the table.
899    async fn lock(&self, version: u64) -> std::result::Result<Self::Lease, CommitError>;
900}
901
902#[async_trait::async_trait]
903pub trait CommitLease: Send + Sync {
904    /// Return the lease, indicating whether the commit was successful.
905    async fn release(&self, success: bool) -> std::result::Result<(), CommitError>;
906}
907
908#[async_trait::async_trait]
909impl<T: CommitLock + Send + Sync> CommitHandler for T {
910    async fn commit(
911        &self,
912        manifest: &mut Manifest,
913        indices: Option<Vec<IndexMetadata>>,
914        base_path: &Path,
915        object_store: &ObjectStore,
916        manifest_writer: ManifestWriter,
917        naming_scheme: ManifestNamingScheme,
918        transaction: Option<Transaction>,
919    ) -> std::result::Result<ManifestLocation, CommitError> {
920        let path = naming_scheme.manifest_path(base_path, manifest.version);
921        // NOTE: once we have the lease we cannot use ? to return errors, since
922        // we must release the lease before returning.
923        let lease = self.lock(manifest.version).await?;
924
925        // Head the location and make sure it's not already committed
926        match object_store.inner.head(&path).await {
927            Ok(_) => {
928                // The path already exists, so it's already committed
929                // Release the lock
930                lease.release(false).await?;
931
932                return Err(CommitError::CommitConflict);
933            }
934            Err(ObjectStoreError::NotFound { .. }) => {}
935            Err(e) => {
936                // Something else went wrong
937                // Release the lock
938                lease.release(false).await?;
939
940                return Err(CommitError::OtherError(e.into()));
941            }
942        }
943        let res = manifest_writer(object_store, manifest, indices, &path, transaction).await;
944
945        // Release the lock
946        lease.release(res.is_ok()).await?;
947
948        let res = res?;
949        Ok(ManifestLocation {
950            version: manifest.version,
951            size: Some(res.size as u64),
952            naming_scheme,
953            path,
954            e_tag: res.e_tag,
955        })
956    }
957}
958
959#[async_trait::async_trait]
960impl<T: CommitLock + Send + Sync> CommitHandler for Arc<T> {
961    async fn commit(
962        &self,
963        manifest: &mut Manifest,
964        indices: Option<Vec<IndexMetadata>>,
965        base_path: &Path,
966        object_store: &ObjectStore,
967        manifest_writer: ManifestWriter,
968        naming_scheme: ManifestNamingScheme,
969        transaction: Option<Transaction>,
970    ) -> std::result::Result<ManifestLocation, CommitError> {
971        self.as_ref()
972            .commit(
973                manifest,
974                indices,
975                base_path,
976                object_store,
977                manifest_writer,
978                naming_scheme,
979                transaction,
980            )
981            .await
982    }
983}
984
985/// A commit implementation that uses a temporary path and renames the object.
986///
987/// This only works for object stores that support atomic rename if not exist.
988pub struct RenameCommitHandler;
989
990#[async_trait::async_trait]
991impl CommitHandler for RenameCommitHandler {
992    async fn commit(
993        &self,
994        manifest: &mut Manifest,
995        indices: Option<Vec<IndexMetadata>>,
996        base_path: &Path,
997        object_store: &ObjectStore,
998        manifest_writer: ManifestWriter,
999        naming_scheme: ManifestNamingScheme,
1000        transaction: Option<Transaction>,
1001    ) -> std::result::Result<ManifestLocation, CommitError> {
1002        // Create a temporary object, then use `rename_if_not_exists` to commit.
1003        // If failed, clean up the temporary object.
1004
1005        let path = naming_scheme.manifest_path(base_path, manifest.version);
1006        let tmp_path = make_staging_manifest_path(&path)?;
1007
1008        let res = manifest_writer(object_store, manifest, indices, &tmp_path, transaction).await?;
1009
1010        match object_store
1011            .inner
1012            .rename_if_not_exists(&tmp_path, &path)
1013            .await
1014        {
1015            Ok(_) => {
1016                // Successfully committed
1017                Ok(ManifestLocation {
1018                    version: manifest.version,
1019                    path,
1020                    size: Some(res.size as u64),
1021                    naming_scheme,
1022                    e_tag: None, // Re-name can change e-tag.
1023                })
1024            }
1025            Err(ObjectStoreError::AlreadyExists { .. }) => {
1026                // Another transaction has already been committed
1027                // Attempt to clean up temporary object, but ignore errors if we can't
1028                let _ = object_store.delete(&tmp_path).await;
1029
1030                return Err(CommitError::CommitConflict);
1031            }
1032            Err(e) => {
1033                // Something else went wrong
1034                return Err(CommitError::OtherError(e.into()));
1035            }
1036        }
1037    }
1038}
1039
1040impl Debug for RenameCommitHandler {
1041    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1042        f.debug_struct("RenameCommitHandler").finish()
1043    }
1044}
1045
1046pub struct ConditionalPutCommitHandler;
1047
1048#[async_trait::async_trait]
1049impl CommitHandler for ConditionalPutCommitHandler {
1050    async fn commit(
1051        &self,
1052        manifest: &mut Manifest,
1053        indices: Option<Vec<IndexMetadata>>,
1054        base_path: &Path,
1055        object_store: &ObjectStore,
1056        manifest_writer: ManifestWriter,
1057        naming_scheme: ManifestNamingScheme,
1058        transaction: Option<Transaction>,
1059    ) -> std::result::Result<ManifestLocation, CommitError> {
1060        let path = naming_scheme.manifest_path(base_path, manifest.version);
1061
1062        let memory_store = ObjectStore::memory();
1063        let dummy_path = "dummy";
1064        manifest_writer(
1065            &memory_store,
1066            manifest,
1067            indices,
1068            &dummy_path.into(),
1069            transaction,
1070        )
1071        .await?;
1072        let dummy_data = memory_store.read_one_all(&dummy_path.into()).await?;
1073        let size = dummy_data.len() as u64;
1074        let res = object_store
1075            .inner
1076            .put_opts(
1077                &path,
1078                dummy_data.into(),
1079                PutOptions {
1080                    mode: object_store::PutMode::Create,
1081                    ..Default::default()
1082                },
1083            )
1084            .await
1085            .map_err(|err| match err {
1086                ObjectStoreError::AlreadyExists { .. } | ObjectStoreError::Precondition { .. } => {
1087                    CommitError::CommitConflict
1088                }
1089                _ => CommitError::OtherError(err.into()),
1090            })?;
1091
1092        Ok(ManifestLocation {
1093            version: manifest.version,
1094            path,
1095            size: Some(size),
1096            naming_scheme,
1097            e_tag: res.e_tag,
1098        })
1099    }
1100}
1101
1102impl Debug for ConditionalPutCommitHandler {
1103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1104        f.debug_struct("ConditionalPutCommitHandler").finish()
1105    }
1106}
1107
1108#[derive(Debug, Clone)]
1109pub struct CommitConfig {
1110    pub num_retries: u32,
1111    pub skip_auto_cleanup: bool,
1112    // TODO: add isolation_level
1113}
1114
1115impl Default for CommitConfig {
1116    fn default() -> Self {
1117        Self {
1118            num_retries: 20,
1119            skip_auto_cleanup: false,
1120        }
1121    }
1122}
1123
1124#[cfg(test)]
1125mod tests {
1126    use lance_core::utils::tempfile::TempObjDir;
1127
1128    use super::*;
1129
1130    #[test]
1131    fn test_manifest_naming_scheme() {
1132        let v1 = ManifestNamingScheme::V1;
1133        let v2 = ManifestNamingScheme::V2;
1134
1135        assert_eq!(
1136            v1.manifest_path(&Path::from("base"), 0),
1137            Path::from("base/_versions/0.manifest")
1138        );
1139        assert_eq!(
1140            v1.manifest_path(&Path::from("base"), 42),
1141            Path::from("base/_versions/42.manifest")
1142        );
1143
1144        assert_eq!(
1145            v2.manifest_path(&Path::from("base"), 0),
1146            Path::from("base/_versions/18446744073709551615.manifest")
1147        );
1148        assert_eq!(
1149            v2.manifest_path(&Path::from("base"), 42),
1150            Path::from("base/_versions/18446744073709551573.manifest")
1151        );
1152
1153        assert_eq!(v1.parse_version("0.manifest"), Some(0));
1154        assert_eq!(v1.parse_version("42.manifest"), Some(42));
1155        assert_eq!(
1156            v1.parse_version("42.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
1157            Some(42)
1158        );
1159
1160        assert_eq!(v2.parse_version("18446744073709551615.manifest"), Some(0));
1161        assert_eq!(v2.parse_version("18446744073709551573.manifest"), Some(42));
1162        assert_eq!(
1163            v2.parse_version("18446744073709551573.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
1164            Some(42)
1165        );
1166
1167        assert_eq!(ManifestNamingScheme::detect_scheme("0.manifest"), Some(v1));
1168        assert_eq!(
1169            ManifestNamingScheme::detect_scheme("18446744073709551615.manifest"),
1170            Some(v2)
1171        );
1172        assert_eq!(ManifestNamingScheme::detect_scheme("something else"), None);
1173    }
1174
1175    #[tokio::test]
1176    async fn test_manifest_naming_migration() {
1177        let object_store = ObjectStore::memory();
1178        let base = Path::from("base");
1179        let versions_dir = base.child(VERSIONS_DIR);
1180
1181        // Write two v1 files and one v1
1182        let original_files = vec![
1183            versions_dir.child("irrelevant"),
1184            ManifestNamingScheme::V1.manifest_path(&base, 0),
1185            ManifestNamingScheme::V2.manifest_path(&base, 1),
1186        ];
1187        for path in original_files {
1188            object_store.put(&path, b"".as_slice()).await.unwrap();
1189        }
1190
1191        migrate_scheme_to_v2(&object_store, &base).await.unwrap();
1192
1193        let expected_files = vec![
1194            ManifestNamingScheme::V2.manifest_path(&base, 1),
1195            ManifestNamingScheme::V2.manifest_path(&base, 0),
1196            versions_dir.child("irrelevant"),
1197        ];
1198        let actual_files = object_store
1199            .inner
1200            .list(Some(&versions_dir))
1201            .map_ok(|res| res.location)
1202            .try_collect::<Vec<_>>()
1203            .await
1204            .unwrap();
1205        assert_eq!(actual_files, expected_files);
1206    }
1207
1208    #[tokio::test]
1209    #[rstest::rstest]
1210    async fn test_list_manifests_sorted(
1211        #[values(true, false)] lexical_list_store: bool,
1212        #[values(ManifestNamingScheme::V1, ManifestNamingScheme::V2)]
1213        naming_scheme: ManifestNamingScheme,
1214    ) {
1215        let tempdir;
1216        let (object_store, base) = if lexical_list_store {
1217            (Box::new(ObjectStore::memory()), Path::from("base"))
1218        } else {
1219            tempdir = TempObjDir::default();
1220            let path = tempdir.child("base");
1221            let store = Box::new(ObjectStore::local());
1222            assert!(!store.list_is_lexically_ordered);
1223            (store, path)
1224        };
1225
1226        // Write 12 manifest files, latest first
1227        let mut expected_paths = Vec::new();
1228        for i in (0..12).rev() {
1229            let path = naming_scheme.manifest_path(&base, i);
1230            object_store.put(&path, b"".as_slice()).await.unwrap();
1231            expected_paths.push(path);
1232        }
1233
1234        let actual_versions = ConditionalPutCommitHandler
1235            .list_manifest_locations(&base, &object_store, true)
1236            .map_ok(|location| location.path)
1237            .try_collect::<Vec<_>>()
1238            .await
1239            .unwrap();
1240
1241        assert_eq!(actual_versions, expected_paths);
1242    }
1243}