Skip to main content

lance_table/io/
commit.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Trait for commit implementations.
5//!
6//! In Lance, a transaction is committed by writing the next manifest file.
7//! However, care should be taken to ensure that the manifest file is written
8//! only once, even if there are concurrent writers. Different stores have
9//! different abilities to handle concurrent writes, so a trait is provided
10//! to allow for different implementations.
11//!
12//! The trait [CommitHandler] can be implemented to provide different commit
13//! strategies. The default implementation for most object stores is
14//! [RenameCommitHandler], which writes the manifest to a temporary path, then
15//! renames the temporary path to the final path if no object already exists
16//! at the final path. This is an atomic operation in most object stores, but
17//! not in AWS S3. So for AWS S3, the default commit handler is
18//! [UnsafeCommitHandler], which writes the manifest to the final path without
19//! any checks.
20//!
21//! When providing your own commit handler, most often you are implementing in
22//! terms of a lock. The trait [CommitLock] can be implemented as a simpler
23//! alternative to [CommitHandler].
24
25use std::io;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::sync::atomic::AtomicBool;
29use std::{fmt::Debug, fs::DirEntry};
30
31use super::manifest::write_manifest;
32use futures::Stream;
33use futures::future::Either;
34use futures::{
35    StreamExt, TryStreamExt,
36    future::{self, BoxFuture},
37    stream::BoxStream,
38};
39use lance_file::format::{MAGIC, MAJOR_VERSION, MINOR_VERSION};
40use lance_io::object_writer::{ObjectWriter, WriteResult, get_etag};
41use log::warn;
42use object_store::PutOptions;
43use object_store::{Error as ObjectStoreError, ObjectStore as OSObjectStore, path::Path};
44use tracing::info;
45use url::Url;
46
47#[cfg(feature = "dynamodb")]
48pub mod dynamodb;
49pub mod external_manifest;
50
51use lance_core::{Error, Result};
52use lance_io::object_store::{ObjectStore, ObjectStoreExt, ObjectStoreParams};
53use lance_io::traits::{WriteExt, Writer};
54
55use crate::format::{IndexMetadata, Manifest, Transaction, is_detached_version};
56use lance_core::utils::tracing::{AUDIT_MODE_CREATE, AUDIT_TYPE_MANIFEST, TRACE_FILE_AUDIT};
57#[cfg(feature = "dynamodb")]
58use {
59    self::external_manifest::{ExternalManifestCommitHandler, ExternalManifestStore},
60    aws_credential_types::provider::ProvideCredentials,
61    aws_credential_types::provider::error::CredentialsError,
62    lance_io::object_store::{StorageOptions, providers::aws::build_aws_credential},
63    object_store::aws::AmazonS3ConfigKey,
64    object_store::aws::AwsCredentialProvider,
65    std::borrow::Cow,
66    std::time::{Duration, SystemTime},
67};
68
69pub const VERSIONS_DIR: &str = "_versions";
70const MANIFEST_EXTENSION: &str = "manifest";
71const DETACHED_VERSION_PREFIX: &str = "d";
72
73/// How manifest files should be named.
74#[derive(Clone, Copy, Debug, PartialEq, Eq)]
75pub enum ManifestNamingScheme {
76    /// `_versions/{version}.manifest`
77    V1,
78    /// `_manifests/{u64::MAX - version}.manifest`
79    ///
80    /// Zero-padded and reversed for O(1) lookup of latest version on object stores.
81    V2,
82}
83
84impl ManifestNamingScheme {
85    pub fn manifest_path(&self, base: &Path, version: u64) -> Path {
86        let directory = base.child(VERSIONS_DIR);
87        if is_detached_version(version) {
88            // Detached versions should never show up first in a list operation which
89            // means it needs to come lexicographically after all attached manifest
90            // files and so we add the prefix `d`.  There is no need to invert the
91            // version number since detached versions are not part of the version
92            let directory = base.child(VERSIONS_DIR);
93            directory.child(format!(
94                "{DETACHED_VERSION_PREFIX}{version}.{MANIFEST_EXTENSION}"
95            ))
96        } else {
97            match self {
98                Self::V1 => directory.child(format!("{version}.{MANIFEST_EXTENSION}")),
99                Self::V2 => {
100                    let inverted_version = u64::MAX - version;
101                    directory.child(format!("{inverted_version:020}.{MANIFEST_EXTENSION}"))
102                }
103            }
104        }
105    }
106
107    pub fn parse_version(&self, filename: &str) -> Option<u64> {
108        let file_number = filename
109            .split_once('.')
110            // Detached versions will fail the `parse` step, which is ok.
111            .and_then(|(version_str, _)| version_str.parse::<u64>().ok());
112        match self {
113            Self::V1 => file_number,
114            Self::V2 => file_number.map(|v| u64::MAX - v),
115        }
116    }
117
118    /// Parse a detached version from a filename like `d123456.manifest`.
119    ///
120    /// Returns the full version number with the detached mask bit set.
121    pub fn parse_detached_version(filename: &str) -> Option<u64> {
122        if !filename.starts_with(DETACHED_VERSION_PREFIX) {
123            return None;
124        }
125        let without_prefix = &filename[DETACHED_VERSION_PREFIX.len()..];
126        without_prefix
127            .split_once('.')
128            .and_then(|(version_str, _)| version_str.parse::<u64>().ok())
129    }
130
131    pub fn detect_scheme(filename: &str) -> Option<Self> {
132        if filename.starts_with(DETACHED_VERSION_PREFIX) {
133            // Currently, detached versions must imply V2
134            return Some(Self::V2);
135        }
136        if filename.ends_with(MANIFEST_EXTENSION) {
137            const V2_LEN: usize = 20 + 1 + MANIFEST_EXTENSION.len();
138            if filename.len() == V2_LEN {
139                Some(Self::V2)
140            } else {
141                Some(Self::V1)
142            }
143        } else {
144            None
145        }
146    }
147
148    pub fn detect_scheme_staging(filename: &str) -> Self {
149        // We shouldn't have to worry about detached versions here since there is no
150        // such thing as "detached" and "staged" at the same time.
151        if filename.chars().nth(20) == Some('.') {
152            Self::V2
153        } else {
154            Self::V1
155        }
156    }
157}
158
159/// Migrate all V1 manifests to V2 naming scheme.
160///
161/// This function will rename all V1 manifests to V2 naming scheme.
162///
163/// This function is idempotent, and can be run multiple times without
164/// changing the state of the object store.
165///
166/// However, it should not be run while other concurrent operations are happening.
167/// And it should also run until completion before resuming other operations.
168pub async fn migrate_scheme_to_v2(object_store: &ObjectStore, dataset_base: &Path) -> Result<()> {
169    object_store
170        .inner
171        .list(Some(&dataset_base.child(VERSIONS_DIR)))
172        .try_filter(|res| {
173            let res = if let Some(filename) = res.location.filename() {
174                ManifestNamingScheme::detect_scheme(filename) == Some(ManifestNamingScheme::V1)
175            } else {
176                false
177            };
178            future::ready(res)
179        })
180        .try_for_each_concurrent(object_store.io_parallelism(), |meta| async move {
181            let filename = meta.location.filename().unwrap();
182            let version = ManifestNamingScheme::V1.parse_version(filename).unwrap();
183            let path = ManifestNamingScheme::V2.manifest_path(dataset_base, version);
184            object_store.inner.rename(&meta.location, &path).await?;
185            Ok(())
186        })
187        .await?;
188
189    Ok(())
190}
191
192/// Function that writes the manifest to the object store.
193///
194/// Returns the size of the written manifest.
195pub type ManifestWriter = for<'a> fn(
196    object_store: &'a ObjectStore,
197    manifest: &'a mut Manifest,
198    indices: Option<Vec<IndexMetadata>>,
199    path: &'a Path,
200    transaction: Option<Transaction>,
201) -> BoxFuture<'a, Result<WriteResult>>;
202
203/// Canonical manifest writer; its function item type exactly matches `ManifestWriter`.
204/// Rationale: keep a crate-local writer implementation so call sites can pass this function
205/// directly without non-primitive casts or lifetime coercions.
206pub fn write_manifest_file_to_path<'a>(
207    object_store: &'a ObjectStore,
208    manifest: &'a mut Manifest,
209    indices: Option<Vec<IndexMetadata>>,
210    path: &'a Path,
211    transaction: Option<Transaction>,
212) -> BoxFuture<'a, Result<WriteResult>> {
213    Box::pin(async move {
214        let mut object_writer = ObjectWriter::new(object_store, path).await?;
215        let pos = write_manifest(&mut object_writer, manifest, indices, transaction).await?;
216        object_writer
217            .write_magics(pos, MAJOR_VERSION, MINOR_VERSION, MAGIC)
218            .await?;
219        let res = Writer::shutdown(&mut object_writer).await?;
220        info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_MANIFEST, path = path.to_string());
221        Ok(res)
222    })
223}
224
225#[derive(Debug, Clone)]
226pub struct ManifestLocation {
227    /// The version the manifest corresponds to.
228    pub version: u64,
229    /// Path of the manifest file, relative to the table root.
230    pub path: Path,
231    /// Size, in bytes, of the manifest file. If it is not known, this field should be `None`.
232    pub size: Option<u64>,
233    /// Naming scheme of the manifest file.
234    pub naming_scheme: ManifestNamingScheme,
235    /// Optional e-tag, used for integrity checks. Manifests should be immutable, so
236    /// if we detect a change in the e-tag, it means the manifest was tampered with.
237    /// This might happen if the dataset was deleted and then re-created.
238    pub e_tag: Option<String>,
239}
240
241impl TryFrom<object_store::ObjectMeta> for ManifestLocation {
242    type Error = Error;
243
244    fn try_from(meta: object_store::ObjectMeta) -> Result<Self> {
245        let filename = meta.location.filename().ok_or_else(|| {
246            Error::internal("ObjectMeta location does not have a filename".to_string())
247        })?;
248        let scheme = ManifestNamingScheme::detect_scheme(filename)
249            .ok_or_else(|| Error::internal(format!("Invalid manifest filename: '{}'", filename)))?;
250        let version = scheme
251            .parse_version(filename)
252            .ok_or_else(|| Error::internal(format!("Invalid manifest filename: '{}'", filename)))?;
253        Ok(Self {
254            version,
255            path: meta.location,
256            size: Some(meta.size),
257            naming_scheme: scheme,
258            e_tag: meta.e_tag,
259        })
260    }
261}
262
263/// Get the latest manifest path
264async fn current_manifest_path(
265    object_store: &ObjectStore,
266    base: &Path,
267) -> Result<ManifestLocation> {
268    if object_store.is_local()
269        && let Ok(Some(location)) = current_manifest_local(base)
270    {
271        return Ok(location);
272    }
273
274    let manifest_files = object_store.list(Some(base.child(VERSIONS_DIR)));
275
276    let mut valid_manifests = manifest_files.try_filter_map(|res| {
277        let filename = res.location.filename().unwrap();
278        if let Some(scheme) = ManifestNamingScheme::detect_scheme(filename) {
279            // Only include if we can parse a version (skip detached versions)
280            if scheme.parse_version(filename).is_some() {
281                future::ready(Ok(Some((scheme, res))))
282            } else {
283                future::ready(Ok(None))
284            }
285        } else {
286            future::ready(Ok(None))
287        }
288    });
289
290    let first = valid_manifests.next().await.transpose()?;
291    match (first, object_store.list_is_lexically_ordered) {
292        // If the first valid manifest we see is V2, we can assume that we are using
293        // V2 naming scheme for all manifests.
294        (Some((scheme @ ManifestNamingScheme::V2, meta)), true) => {
295            let version = scheme
296                .parse_version(meta.location.filename().unwrap())
297                .unwrap();
298
299            // Sanity check: verify at least for the first 1k files that they are all V2
300            // and that the version numbers are decreasing. We use the first 1k because
301            // this is the typical size of an object store list endpoint response page.
302            for (scheme, meta) in valid_manifests.take(999).try_collect::<Vec<_>>().await? {
303                if scheme != ManifestNamingScheme::V2 {
304                    warn!(
305                        "Found V1 Manifest in a V2 directory. Use `migrate_manifest_paths_v2` \
306                         to migrate the directory."
307                    );
308                    break;
309                }
310                let next_version = scheme
311                    .parse_version(meta.location.filename().unwrap())
312                    .unwrap();
313                if next_version >= version {
314                    warn!(
315                        "List operation was expected to be lexically ordered, but was not. This \
316                         could mean a corrupt read. Please make a bug report on the lance-format/lance \
317                         GitHub repository."
318                    );
319                    break;
320                }
321            }
322
323            Ok(ManifestLocation {
324                version,
325                path: meta.location,
326                size: Some(meta.size),
327                naming_scheme: scheme,
328                e_tag: meta.e_tag,
329            })
330        }
331        // If the list is not lexically ordered, we need to iterate all manifests
332        // to find the latest version. This works for both V1 and V2 schemes.
333        (Some((first_scheme, meta)), _) => {
334            let mut current_version = first_scheme
335                .parse_version(meta.location.filename().unwrap())
336                .unwrap();
337            let mut current_meta = meta;
338            let scheme = first_scheme;
339
340            while let Some((entry_scheme, meta)) = valid_manifests.next().await.transpose()? {
341                if entry_scheme != scheme {
342                    return Err(Error::internal(format!(
343                        "Found multiple manifest naming schemes in the same directory: {:?} and {:?}. \
344                         Use `migrate_manifest_paths_v2` to migrate the directory.",
345                        scheme, entry_scheme
346                    )));
347                }
348                let version = entry_scheme
349                    .parse_version(meta.location.filename().unwrap())
350                    .unwrap();
351                if version > current_version {
352                    current_version = version;
353                    current_meta = meta;
354                }
355            }
356            Ok(ManifestLocation {
357                version: current_version,
358                path: current_meta.location,
359                size: Some(current_meta.size),
360                naming_scheme: scheme,
361                e_tag: current_meta.e_tag,
362            })
363        }
364        (None, _) => Err(Error::not_found(base.child(VERSIONS_DIR).to_string())),
365    }
366}
367
368// This is an optimized function that searches for the latest manifest. In
369// object_store, list operations lookup metadata for each file listed. This
370// method only gets the metadata for the found latest manifest.
371fn current_manifest_local(base: &Path) -> std::io::Result<Option<ManifestLocation>> {
372    let path = lance_io::local::to_local_path(&base.child(VERSIONS_DIR));
373    let entries = std::fs::read_dir(path)?;
374
375    let mut latest_entry: Option<(u64, DirEntry)> = None;
376
377    let mut scheme: Option<ManifestNamingScheme> = None;
378
379    for entry in entries {
380        let entry = entry?;
381        let filename_raw = entry.file_name();
382        let filename = filename_raw.to_string_lossy();
383
384        let Some(entry_scheme) = ManifestNamingScheme::detect_scheme(&filename) else {
385            // Need to ignore temporary files, such as
386            // .tmp_7.manifest_9c100374-3298-4537-afc6-f5ee7913666d
387            continue;
388        };
389
390        if let Some(scheme) = scheme {
391            if scheme != entry_scheme {
392                return Err(io::Error::new(
393                    io::ErrorKind::InvalidData,
394                    format!(
395                        "Found multiple manifest naming schemes in the same directory: {:?} and {:?}",
396                        scheme, entry_scheme
397                    ),
398                ));
399            }
400        } else {
401            scheme = Some(entry_scheme);
402        }
403
404        let Some(version) = entry_scheme.parse_version(&filename) else {
405            continue;
406        };
407
408        if let Some((latest_version, _)) = &latest_entry {
409            if version > *latest_version {
410                latest_entry = Some((version, entry));
411            }
412        } else {
413            latest_entry = Some((version, entry));
414        }
415    }
416
417    if let Some((version, entry)) = latest_entry {
418        let path = Path::from_filesystem_path(entry.path())
419            .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?;
420        let metadata = entry.metadata()?;
421        Ok(Some(ManifestLocation {
422            version,
423            path,
424            size: Some(metadata.len()),
425            naming_scheme: scheme.unwrap(),
426            e_tag: Some(get_etag(&metadata)),
427        }))
428    } else {
429        Ok(None)
430    }
431}
432
433fn list_manifests<'a>(
434    base_path: &Path,
435    object_store: &'a dyn OSObjectStore,
436) -> impl Stream<Item = Result<ManifestLocation>> + 'a {
437    object_store
438        .read_dir_all(&base_path.child(VERSIONS_DIR), None)
439        .filter_map(|obj_meta| {
440            futures::future::ready(
441                obj_meta
442                    .map(|m| ManifestLocation::try_from(m).ok())
443                    .transpose(),
444            )
445        })
446        .boxed()
447}
448
449/// Convert object metadata to ManifestLocation for detached manifests.
450fn detached_manifest_location_from_meta(
451    meta: object_store::ObjectMeta,
452) -> Option<ManifestLocation> {
453    let filename = meta.location.filename()?;
454    let version = ManifestNamingScheme::parse_detached_version(filename)?;
455    Some(ManifestLocation {
456        version,
457        path: meta.location,
458        size: Some(meta.size),
459        naming_scheme: ManifestNamingScheme::V2,
460        e_tag: meta.e_tag,
461    })
462}
463
464/// List all detached manifest files in the versions directory.
465pub fn list_detached_manifests<'a>(
466    base_path: &Path,
467    object_store: &'a dyn OSObjectStore,
468) -> impl Stream<Item = Result<ManifestLocation>> + 'a {
469    object_store
470        .read_dir_all(&base_path.child(VERSIONS_DIR), None)
471        .filter_map(|obj_meta| {
472            futures::future::ready(
473                obj_meta
474                    .map(detached_manifest_location_from_meta)
475                    .transpose(),
476            )
477        })
478        .boxed()
479}
480
481fn make_staging_manifest_path(base: &Path) -> Result<Path> {
482    let id = uuid::Uuid::new_v4().to_string();
483    Path::parse(format!("{base}-{id}")).map_err(|e| Error::io_source(Box::new(e)))
484}
485
486#[cfg(feature = "dynamodb")]
487const DDB_URL_QUERY_KEY: &str = "ddbTableName";
488
489/// Handle commits that prevent conflicting writes.
490///
491/// Commit implementations ensure that if there are multiple concurrent writers
492/// attempting to write the next version of a table, only one will win. In order
493/// to work, all writers must use the same commit handler type.
494/// This trait is also responsible for resolving where the manifests live.
495///
496// TODO: pub(crate)
497#[async_trait::async_trait]
498#[allow(clippy::too_many_arguments)]
499pub trait CommitHandler: Debug + Send + Sync {
500    async fn resolve_latest_location(
501        &self,
502        base_path: &Path,
503        object_store: &ObjectStore,
504    ) -> Result<ManifestLocation> {
505        Ok(current_manifest_path(object_store, base_path).await?)
506    }
507
508    async fn resolve_version_location(
509        &self,
510        base_path: &Path,
511        version: u64,
512        object_store: &dyn OSObjectStore,
513    ) -> Result<ManifestLocation> {
514        default_resolve_version(base_path, version, object_store).await
515    }
516
517    /// List detached manifest locations.
518    ///
519    /// Returns a stream of detached manifest locations in arbitrary order.
520    fn list_detached_manifest_locations<'a>(
521        &self,
522        base_path: &Path,
523        object_store: &'a ObjectStore,
524    ) -> BoxStream<'a, Result<ManifestLocation>> {
525        list_detached_manifests(base_path, &object_store.inner).boxed()
526    }
527
528    /// If `sorted_descending` is `true`, the stream will yield manifests in descending
529    /// order of version. When the object store has a lexicographically
530    /// ordered list and the naming scheme is V2, this will use an optimized
531    /// list operation. Otherwise, it will list all manifests and sort them
532    /// in memory. When `sorted_descending` is `false`, the stream will yield manifests
533    /// in arbitrary order.
534    fn list_manifest_locations<'a>(
535        &self,
536        base_path: &Path,
537        object_store: &'a ObjectStore,
538        sorted_descending: bool,
539    ) -> BoxStream<'a, Result<ManifestLocation>> {
540        let underlying_stream = list_manifests(base_path, &object_store.inner);
541
542        if !sorted_descending {
543            return underlying_stream.boxed();
544        }
545
546        async fn sort_stream(
547            input_stream: impl futures::Stream<Item = Result<ManifestLocation>> + Unpin,
548        ) -> Result<impl Stream<Item = Result<ManifestLocation>> + Unpin> {
549            let mut locations = input_stream.try_collect::<Vec<_>>().await?;
550            locations.sort_by_key(|m| std::cmp::Reverse(m.version));
551            Ok(futures::stream::iter(locations.into_iter().map(Ok)))
552        }
553
554        // If the object store supports lexicographically ordered lists and
555        // the naming scheme is V2, we can use an optimized list operation.
556        if object_store.list_is_lexically_ordered {
557            // We don't know the naming scheme until we see the first manifest.
558            let mut peekable = underlying_stream.peekable();
559
560            futures::stream::once(async move {
561                let naming_scheme = match Pin::new(&mut peekable).peek().await {
562                    Some(Ok(m)) => m.naming_scheme,
563                    // If we get an error or no manifests are found, we default
564                    // to V2 naming scheme, since it doesn't matter.
565                    Some(Err(_)) => ManifestNamingScheme::V2,
566                    None => ManifestNamingScheme::V2,
567                };
568
569                if naming_scheme == ManifestNamingScheme::V2 {
570                    // If the first manifest is V2, we can use the optimized list operation.
571                    Ok(Either::Left(peekable))
572                } else {
573                    sort_stream(peekable).await.map(Either::Right)
574                }
575            })
576            .try_flatten()
577            .boxed()
578        } else {
579            // If the object store does not support lexicographically ordered lists,
580            // we need to sort the manifests in memory. Systems where this isn't
581            // supported (local fs, S3 express) are typically fast enough
582            // that this is not a problem.
583            futures::stream::once(sort_stream(underlying_stream))
584                .try_flatten()
585                .boxed()
586        }
587    }
588
589    /// Commit a manifest.
590    ///
591    /// This function should return an [CommitError::CommitConflict] if another
592    /// transaction has already been committed to the path.
593    async fn commit(
594        &self,
595        manifest: &mut Manifest,
596        indices: Option<Vec<IndexMetadata>>,
597        base_path: &Path,
598        object_store: &ObjectStore,
599        manifest_writer: ManifestWriter,
600        naming_scheme: ManifestNamingScheme,
601        transaction: Option<Transaction>,
602    ) -> std::result::Result<ManifestLocation, CommitError>;
603
604    /// Delete the recorded manifest information for a dataset at the base_path
605    async fn delete(&self, _base_path: &Path) -> Result<()> {
606        Ok(())
607    }
608}
609
610async fn default_resolve_version(
611    base_path: &Path,
612    version: u64,
613    object_store: &dyn OSObjectStore,
614) -> Result<ManifestLocation> {
615    if is_detached_version(version) {
616        return Ok(ManifestLocation {
617            version,
618            // Detached versions are not supported with V1 naming scheme.  If we need
619            // to support in the future we could use a different prefix (e.g. 'x' or something)
620            naming_scheme: ManifestNamingScheme::V2,
621            // Both V1 and V2 should give the same path for detached versions
622            path: ManifestNamingScheme::V2.manifest_path(base_path, version),
623            size: None,
624            e_tag: None,
625        });
626    }
627
628    // try V2, fallback to V1.
629    let scheme = ManifestNamingScheme::V2;
630    let path = scheme.manifest_path(base_path, version);
631    match object_store.head(&path).await {
632        Ok(meta) => Ok(ManifestLocation {
633            version,
634            path,
635            size: Some(meta.size),
636            naming_scheme: scheme,
637            e_tag: meta.e_tag,
638        }),
639        Err(ObjectStoreError::NotFound { .. }) => {
640            // fallback to V1
641            let scheme = ManifestNamingScheme::V1;
642            Ok(ManifestLocation {
643                version,
644                path: scheme.manifest_path(base_path, version),
645                size: None,
646                naming_scheme: scheme,
647                e_tag: None,
648            })
649        }
650        Err(e) => Err(e.into()),
651    }
652}
653/// Adapt an object_store credentials into AWS SDK creds
654#[cfg(feature = "dynamodb")]
655#[derive(Debug)]
656struct OSObjectStoreToAwsCredAdaptor(AwsCredentialProvider);
657
658#[cfg(feature = "dynamodb")]
659impl ProvideCredentials for OSObjectStoreToAwsCredAdaptor {
660    fn provide_credentials<'a>(
661        &'a self,
662    ) -> aws_credential_types::provider::future::ProvideCredentials<'a>
663    where
664        Self: 'a,
665    {
666        aws_credential_types::provider::future::ProvideCredentials::new(async {
667            let creds = self
668                .0
669                .get_credential()
670                .await
671                .map_err(|e| CredentialsError::provider_error(Box::new(e)))?;
672            Ok(aws_credential_types::Credentials::new(
673                &creds.key_id,
674                &creds.secret_key,
675                creds.token.clone(),
676                Some(
677                    SystemTime::now()
678                        .checked_add(Duration::from_secs(
679                            60 * 10, //  10 min
680                        ))
681                        .expect("overflow"),
682                ),
683                "",
684            ))
685        })
686    }
687}
688
689#[cfg(feature = "dynamodb")]
690async fn build_dynamodb_external_store(
691    table_name: &str,
692    creds: AwsCredentialProvider,
693    region: &str,
694    endpoint: Option<String>,
695    app_name: &str,
696) -> Result<Arc<dyn ExternalManifestStore>> {
697    use super::commit::dynamodb::DynamoDBExternalManifestStore;
698    use aws_sdk_dynamodb::{
699        Client,
700        config::{IdentityCache, Region, retry::RetryConfig},
701    };
702
703    let mut dynamodb_config = aws_sdk_dynamodb::config::Builder::new()
704        .behavior_version_latest()
705        .region(Some(Region::new(region.to_string())))
706        .credentials_provider(OSObjectStoreToAwsCredAdaptor(creds))
707        // caching should be handled by passed AwsCredentialProvider
708        .identity_cache(IdentityCache::no_cache())
709        // Be more resilient to transient network issues.
710        // 5 attempts = 1 initial + 4 retries with exponential backoff.
711        .retry_config(RetryConfig::standard().with_max_attempts(5));
712
713    if let Some(endpoint) = endpoint {
714        dynamodb_config = dynamodb_config.endpoint_url(endpoint);
715    }
716    let client = Client::from_conf(dynamodb_config.build());
717
718    DynamoDBExternalManifestStore::new_external_store(client.into(), table_name, app_name).await
719}
720
721pub async fn commit_handler_from_url(
722    url_or_path: &str,
723    // This looks unused if dynamodb feature disabled
724    #[allow(unused_variables)] options: &Option<ObjectStoreParams>,
725) -> Result<Arc<dyn CommitHandler>> {
726    let local_handler: Arc<dyn CommitHandler> = if cfg!(windows) {
727        Arc::new(RenameCommitHandler)
728    } else {
729        Arc::new(ConditionalPutCommitHandler)
730    };
731
732    let url = match Url::parse(url_or_path) {
733        Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
734            // On Windows, the drive is parsed as a scheme
735            return Ok(local_handler);
736        }
737        Ok(url) => url,
738        Err(_) => {
739            return Ok(local_handler);
740        }
741    };
742
743    match url.scheme() {
744        "file" | "file-object-store" => Ok(local_handler),
745        "s3" | "gs" | "az" | "abfss" | "memory" | "oss" | "cos" => {
746            Ok(Arc::new(ConditionalPutCommitHandler))
747        }
748        #[cfg(not(feature = "dynamodb"))]
749        "s3+ddb" => Err(Error::invalid_input_source(
750            "`s3+ddb://` scheme requires `dynamodb` feature to be enabled".into(),
751        )),
752        #[cfg(feature = "dynamodb")]
753        "s3+ddb" => {
754            if url.query_pairs().count() != 1 {
755                return Err(Error::invalid_input_source(
756                    "`s3+ddb://` scheme and expects exactly one query `ddbTableName`".into(),
757                ));
758            }
759            let table_name = match url.query_pairs().next() {
760                Some((Cow::Borrowed(key), Cow::Borrowed(table_name)))
761                    if key == DDB_URL_QUERY_KEY =>
762                {
763                    if table_name.is_empty() {
764                        return Err(Error::invalid_input_source(
765                            "`s3+ddb://` scheme requires non empty dynamodb table name".into(),
766                        ));
767                    }
768                    table_name
769                }
770                _ => {
771                    return Err(Error::invalid_input_source(
772                        "`s3+ddb://` scheme and expects exactly one query `ddbTableName`".into(),
773                    ));
774                }
775            };
776            let options = options.clone().unwrap_or_default();
777            let storage_options_raw =
778                StorageOptions(options.storage_options().cloned().unwrap_or_default());
779            let dynamo_endpoint = get_dynamodb_endpoint(&storage_options_raw);
780            let storage_options = storage_options_raw.as_s3_options();
781
782            let region = storage_options.get(&AmazonS3ConfigKey::Region).cloned();
783
784            // Get accessor from the options
785            let accessor = options.get_accessor();
786
787            let (aws_creds, region) = build_aws_credential(
788                options.s3_credentials_refresh_offset,
789                options.aws_credentials.clone(),
790                Some(&storage_options),
791                region,
792                accessor,
793            )
794            .await?;
795
796            Ok(Arc::new(ExternalManifestCommitHandler {
797                external_manifest_store: build_dynamodb_external_store(
798                    table_name,
799                    aws_creds.clone(),
800                    &region,
801                    dynamo_endpoint,
802                    "lancedb",
803                )
804                .await?,
805            }))
806        }
807        _ => Ok(Arc::new(UnsafeCommitHandler)),
808    }
809}
810
811#[cfg(feature = "dynamodb")]
812fn get_dynamodb_endpoint(storage_options: &StorageOptions) -> Option<String> {
813    if let Some(endpoint) = storage_options.0.get("dynamodb_endpoint") {
814        Some(endpoint.clone())
815    } else {
816        std::env::var("DYNAMODB_ENDPOINT").ok()
817    }
818}
819
820/// Errors that can occur when committing a manifest.
821#[derive(Debug)]
822pub enum CommitError {
823    /// Another transaction has already been written to the path
824    CommitConflict,
825    /// Something else went wrong
826    OtherError(Error),
827}
828
829impl From<Error> for CommitError {
830    fn from(e: Error) -> Self {
831        Self::OtherError(e)
832    }
833}
834
835impl From<CommitError> for Error {
836    fn from(e: CommitError) -> Self {
837        match e {
838            CommitError::CommitConflict => Self::internal("Commit conflict".to_string()),
839            CommitError::OtherError(e) => e,
840        }
841    }
842}
843
844/// Whether we have issued a warning about using the unsafe commit handler.
845static WARNED_ON_UNSAFE_COMMIT: AtomicBool = AtomicBool::new(false);
846
847/// A naive commit implementation that does not prevent conflicting writes.
848///
849/// This will log a warning the first time it is used.
850pub struct UnsafeCommitHandler;
851
852#[async_trait::async_trait]
853#[allow(clippy::too_many_arguments)]
854impl CommitHandler for UnsafeCommitHandler {
855    async fn commit(
856        &self,
857        manifest: &mut Manifest,
858        indices: Option<Vec<IndexMetadata>>,
859        base_path: &Path,
860        object_store: &ObjectStore,
861        manifest_writer: ManifestWriter,
862        naming_scheme: ManifestNamingScheme,
863        transaction: Option<Transaction>,
864    ) -> std::result::Result<ManifestLocation, CommitError> {
865        // Log a one-time warning
866        if !WARNED_ON_UNSAFE_COMMIT.load(std::sync::atomic::Ordering::Relaxed) {
867            WARNED_ON_UNSAFE_COMMIT.store(true, std::sync::atomic::Ordering::Relaxed);
868            log::warn!(
869                "Using unsafe commit handler. Concurrent writes may result in data loss. \
870                 Consider providing a commit handler that prevents conflicting writes."
871            );
872        }
873
874        let version_path = naming_scheme.manifest_path(base_path, manifest.version);
875        let res =
876            manifest_writer(object_store, manifest, indices, &version_path, transaction).await?;
877
878        Ok(ManifestLocation {
879            version: manifest.version,
880            size: Some(res.size as u64),
881            naming_scheme,
882            path: version_path,
883            e_tag: res.e_tag,
884        })
885    }
886}
887
888impl Debug for UnsafeCommitHandler {
889    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
890        f.debug_struct("UnsafeCommitHandler").finish()
891    }
892}
893
894/// A commit implementation that uses a lock to prevent conflicting writes.
895#[async_trait::async_trait]
896pub trait CommitLock: Debug {
897    type Lease: CommitLease;
898
899    /// Attempt to lock the table for the given version.
900    ///
901    /// If it is already locked by another transaction, wait until it is unlocked.
902    /// Once it is unlocked, return [CommitError::CommitConflict] if the version
903    /// has already been committed. Otherwise, return the lock.
904    ///
905    /// To prevent poisoned locks, it's recommended to set a timeout on the lock
906    /// of at least 30 seconds.
907    ///
908    /// It is not required that the lock tracks the version. It is provided in
909    /// case the locking is handled by a catalog service that needs to know the
910    /// current version of the table.
911    async fn lock(&self, version: u64) -> std::result::Result<Self::Lease, CommitError>;
912}
913
914#[async_trait::async_trait]
915pub trait CommitLease: Send + Sync {
916    /// Return the lease, indicating whether the commit was successful.
917    async fn release(&self, success: bool) -> std::result::Result<(), CommitError>;
918}
919
920#[async_trait::async_trait]
921impl<T: CommitLock + Send + Sync> CommitHandler for T {
922    async fn commit(
923        &self,
924        manifest: &mut Manifest,
925        indices: Option<Vec<IndexMetadata>>,
926        base_path: &Path,
927        object_store: &ObjectStore,
928        manifest_writer: ManifestWriter,
929        naming_scheme: ManifestNamingScheme,
930        transaction: Option<Transaction>,
931    ) -> std::result::Result<ManifestLocation, CommitError> {
932        let path = naming_scheme.manifest_path(base_path, manifest.version);
933        // NOTE: once we have the lease we cannot use ? to return errors, since
934        // we must release the lease before returning.
935        let lease = self.lock(manifest.version).await?;
936
937        // Head the location and make sure it's not already committed
938        match object_store.inner.head(&path).await {
939            Ok(_) => {
940                // The path already exists, so it's already committed
941                // Release the lock
942                lease.release(false).await?;
943
944                return Err(CommitError::CommitConflict);
945            }
946            Err(ObjectStoreError::NotFound { .. }) => {}
947            Err(e) => {
948                // Something else went wrong
949                // Release the lock
950                lease.release(false).await?;
951
952                return Err(CommitError::OtherError(e.into()));
953            }
954        }
955        let res = manifest_writer(object_store, manifest, indices, &path, transaction).await;
956
957        // Release the lock
958        lease.release(res.is_ok()).await?;
959
960        let res = res?;
961        Ok(ManifestLocation {
962            version: manifest.version,
963            size: Some(res.size as u64),
964            naming_scheme,
965            path,
966            e_tag: res.e_tag,
967        })
968    }
969}
970
971#[async_trait::async_trait]
972impl<T: CommitLock + Send + Sync> CommitHandler for Arc<T> {
973    async fn commit(
974        &self,
975        manifest: &mut Manifest,
976        indices: Option<Vec<IndexMetadata>>,
977        base_path: &Path,
978        object_store: &ObjectStore,
979        manifest_writer: ManifestWriter,
980        naming_scheme: ManifestNamingScheme,
981        transaction: Option<Transaction>,
982    ) -> std::result::Result<ManifestLocation, CommitError> {
983        self.as_ref()
984            .commit(
985                manifest,
986                indices,
987                base_path,
988                object_store,
989                manifest_writer,
990                naming_scheme,
991                transaction,
992            )
993            .await
994    }
995}
996
997/// A commit implementation that uses a temporary path and renames the object.
998///
999/// This only works for object stores that support atomic rename if not exist.
1000pub struct RenameCommitHandler;
1001
1002#[async_trait::async_trait]
1003impl CommitHandler for RenameCommitHandler {
1004    async fn commit(
1005        &self,
1006        manifest: &mut Manifest,
1007        indices: Option<Vec<IndexMetadata>>,
1008        base_path: &Path,
1009        object_store: &ObjectStore,
1010        manifest_writer: ManifestWriter,
1011        naming_scheme: ManifestNamingScheme,
1012        transaction: Option<Transaction>,
1013    ) -> std::result::Result<ManifestLocation, CommitError> {
1014        // Create a temporary object, then use `rename_if_not_exists` to commit.
1015        // If failed, clean up the temporary object.
1016
1017        let path = naming_scheme.manifest_path(base_path, manifest.version);
1018        let tmp_path = make_staging_manifest_path(&path)?;
1019
1020        let res = manifest_writer(object_store, manifest, indices, &tmp_path, transaction).await?;
1021
1022        match object_store
1023            .inner
1024            .rename_if_not_exists(&tmp_path, &path)
1025            .await
1026        {
1027            Ok(_) => {
1028                // Successfully committed
1029                Ok(ManifestLocation {
1030                    version: manifest.version,
1031                    path,
1032                    size: Some(res.size as u64),
1033                    naming_scheme,
1034                    e_tag: None, // Re-name can change e-tag.
1035                })
1036            }
1037            Err(ObjectStoreError::AlreadyExists { .. }) => {
1038                // Another transaction has already been committed
1039                // Attempt to clean up temporary object, but ignore errors if we can't
1040                let _ = object_store.delete(&tmp_path).await;
1041
1042                return Err(CommitError::CommitConflict);
1043            }
1044            Err(e) => {
1045                // Something else went wrong
1046                return Err(CommitError::OtherError(e.into()));
1047            }
1048        }
1049    }
1050}
1051
1052impl Debug for RenameCommitHandler {
1053    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1054        f.debug_struct("RenameCommitHandler").finish()
1055    }
1056}
1057
1058pub struct ConditionalPutCommitHandler;
1059
1060#[async_trait::async_trait]
1061impl CommitHandler for ConditionalPutCommitHandler {
1062    async fn commit(
1063        &self,
1064        manifest: &mut Manifest,
1065        indices: Option<Vec<IndexMetadata>>,
1066        base_path: &Path,
1067        object_store: &ObjectStore,
1068        manifest_writer: ManifestWriter,
1069        naming_scheme: ManifestNamingScheme,
1070        transaction: Option<Transaction>,
1071    ) -> std::result::Result<ManifestLocation, CommitError> {
1072        let path = naming_scheme.manifest_path(base_path, manifest.version);
1073
1074        let memory_store = ObjectStore::memory();
1075        let dummy_path = "dummy";
1076        manifest_writer(
1077            &memory_store,
1078            manifest,
1079            indices,
1080            &dummy_path.into(),
1081            transaction,
1082        )
1083        .await?;
1084        let dummy_data = memory_store.read_one_all(&dummy_path.into()).await?;
1085        let size = dummy_data.len() as u64;
1086        let res = object_store
1087            .inner
1088            .put_opts(
1089                &path,
1090                dummy_data.into(),
1091                PutOptions {
1092                    mode: object_store::PutMode::Create,
1093                    ..Default::default()
1094                },
1095            )
1096            .await
1097            .map_err(|err| match err {
1098                ObjectStoreError::AlreadyExists { .. } | ObjectStoreError::Precondition { .. } => {
1099                    CommitError::CommitConflict
1100                }
1101                _ => CommitError::OtherError(err.into()),
1102            })?;
1103
1104        Ok(ManifestLocation {
1105            version: manifest.version,
1106            path,
1107            size: Some(size),
1108            naming_scheme,
1109            e_tag: res.e_tag,
1110        })
1111    }
1112}
1113
1114impl Debug for ConditionalPutCommitHandler {
1115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1116        f.debug_struct("ConditionalPutCommitHandler").finish()
1117    }
1118}
1119
1120#[derive(Debug, Clone)]
1121pub struct CommitConfig {
1122    pub num_retries: u32,
1123    pub skip_auto_cleanup: bool,
1124    // TODO: add isolation_level
1125}
1126
1127impl Default for CommitConfig {
1128    fn default() -> Self {
1129        Self {
1130            num_retries: 20,
1131            skip_auto_cleanup: false,
1132        }
1133    }
1134}
1135
1136#[cfg(test)]
1137mod tests {
1138    use lance_core::utils::tempfile::TempObjDir;
1139
1140    use super::*;
1141
1142    #[test]
1143    fn test_manifest_naming_scheme() {
1144        let v1 = ManifestNamingScheme::V1;
1145        let v2 = ManifestNamingScheme::V2;
1146
1147        assert_eq!(
1148            v1.manifest_path(&Path::from("base"), 0),
1149            Path::from("base/_versions/0.manifest")
1150        );
1151        assert_eq!(
1152            v1.manifest_path(&Path::from("base"), 42),
1153            Path::from("base/_versions/42.manifest")
1154        );
1155
1156        assert_eq!(
1157            v2.manifest_path(&Path::from("base"), 0),
1158            Path::from("base/_versions/18446744073709551615.manifest")
1159        );
1160        assert_eq!(
1161            v2.manifest_path(&Path::from("base"), 42),
1162            Path::from("base/_versions/18446744073709551573.manifest")
1163        );
1164
1165        assert_eq!(v1.parse_version("0.manifest"), Some(0));
1166        assert_eq!(v1.parse_version("42.manifest"), Some(42));
1167        assert_eq!(
1168            v1.parse_version("42.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
1169            Some(42)
1170        );
1171
1172        assert_eq!(v2.parse_version("18446744073709551615.manifest"), Some(0));
1173        assert_eq!(v2.parse_version("18446744073709551573.manifest"), Some(42));
1174        assert_eq!(
1175            v2.parse_version("18446744073709551573.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
1176            Some(42)
1177        );
1178
1179        assert_eq!(ManifestNamingScheme::detect_scheme("0.manifest"), Some(v1));
1180        assert_eq!(
1181            ManifestNamingScheme::detect_scheme("18446744073709551615.manifest"),
1182            Some(v2)
1183        );
1184        assert_eq!(ManifestNamingScheme::detect_scheme("something else"), None);
1185    }
1186
1187    #[tokio::test]
1188    async fn test_manifest_naming_migration() {
1189        let object_store = ObjectStore::memory();
1190        let base = Path::from("base");
1191        let versions_dir = base.child(VERSIONS_DIR);
1192
1193        // Write two v1 files and one v1
1194        let original_files = vec![
1195            versions_dir.child("irrelevant"),
1196            ManifestNamingScheme::V1.manifest_path(&base, 0),
1197            ManifestNamingScheme::V2.manifest_path(&base, 1),
1198        ];
1199        for path in original_files {
1200            object_store.put(&path, b"".as_slice()).await.unwrap();
1201        }
1202
1203        migrate_scheme_to_v2(&object_store, &base).await.unwrap();
1204
1205        let expected_files = vec![
1206            ManifestNamingScheme::V2.manifest_path(&base, 1),
1207            ManifestNamingScheme::V2.manifest_path(&base, 0),
1208            versions_dir.child("irrelevant"),
1209        ];
1210        let actual_files = object_store
1211            .inner
1212            .list(Some(&versions_dir))
1213            .map_ok(|res| res.location)
1214            .try_collect::<Vec<_>>()
1215            .await
1216            .unwrap();
1217        assert_eq!(actual_files, expected_files);
1218    }
1219
1220    #[tokio::test]
1221    #[rstest::rstest]
1222    async fn test_list_manifests_sorted(
1223        #[values(true, false)] lexical_list_store: bool,
1224        #[values(ManifestNamingScheme::V1, ManifestNamingScheme::V2)]
1225        naming_scheme: ManifestNamingScheme,
1226    ) {
1227        let tempdir;
1228        let (object_store, base) = if lexical_list_store {
1229            (Box::new(ObjectStore::memory()), Path::from("base"))
1230        } else {
1231            tempdir = TempObjDir::default();
1232            let path = tempdir.child("base");
1233            let store = Box::new(ObjectStore::local());
1234            assert!(!store.list_is_lexically_ordered);
1235            (store, path)
1236        };
1237
1238        // Write 12 manifest files, latest first
1239        let mut expected_paths = Vec::new();
1240        for i in (0..12).rev() {
1241            let path = naming_scheme.manifest_path(&base, i);
1242            object_store.put(&path, b"".as_slice()).await.unwrap();
1243            expected_paths.push(path);
1244        }
1245
1246        let actual_versions = ConditionalPutCommitHandler
1247            .list_manifest_locations(&base, &object_store, true)
1248            .map_ok(|location| location.path)
1249            .try_collect::<Vec<_>>()
1250            .await
1251            .unwrap();
1252
1253        assert_eq!(actual_versions, expected_paths);
1254    }
1255
1256    #[tokio::test]
1257    #[rstest::rstest]
1258    async fn test_current_manifest_path(
1259        #[values(true, false)] lexical_list_store: bool,
1260        #[values(ManifestNamingScheme::V1, ManifestNamingScheme::V2)]
1261        naming_scheme: ManifestNamingScheme,
1262    ) {
1263        // Use memory store for both cases to avoid local FS special codepath.
1264        // Modify list_is_lexically_ordered to simulate different object stores.
1265        let mut object_store = ObjectStore::memory();
1266        object_store.list_is_lexically_ordered = lexical_list_store;
1267        let object_store = Box::new(object_store);
1268        let base = Path::from("base");
1269
1270        // Write 12 manifest files in non-sequential order
1271        for version in [5, 2, 11, 0, 8, 3, 10, 1, 7, 4, 9, 6] {
1272            let path = naming_scheme.manifest_path(&base, version);
1273            object_store.put(&path, b"".as_slice()).await.unwrap();
1274        }
1275
1276        let location = current_manifest_path(&object_store, &base).await.unwrap();
1277
1278        assert_eq!(location.version, 11);
1279        assert_eq!(location.naming_scheme, naming_scheme);
1280        assert_eq!(location.path, naming_scheme.manifest_path(&base, 11));
1281    }
1282
1283    #[test]
1284    fn test_parse_detached_version() {
1285        // Valid detached version filenames
1286        assert_eq!(
1287            ManifestNamingScheme::parse_detached_version("d12345.manifest"),
1288            Some(12345)
1289        );
1290        assert_eq!(
1291            ManifestNamingScheme::parse_detached_version("d9223372036854775808.manifest"),
1292            Some(9223372036854775808)
1293        );
1294
1295        // Invalid: not starting with 'd' prefix
1296        assert_eq!(
1297            ManifestNamingScheme::parse_detached_version("12345.manifest"),
1298            None
1299        );
1300
1301        // Invalid: regular V2 manifest
1302        assert_eq!(
1303            ManifestNamingScheme::parse_detached_version("18446744073709551615.manifest"),
1304            None
1305        );
1306
1307        // Invalid: no extension
1308        assert_eq!(ManifestNamingScheme::parse_detached_version("d12345"), None);
1309    }
1310
1311    #[tokio::test]
1312    async fn test_list_detached_manifests() {
1313        use crate::format::DETACHED_VERSION_MASK;
1314        use futures::TryStreamExt;
1315
1316        let object_store = ObjectStore::memory();
1317        let base = Path::from("base");
1318        let versions_dir = base.child(VERSIONS_DIR);
1319
1320        // Create some regular manifests
1321        for version in [1, 2, 3] {
1322            let path = ManifestNamingScheme::V2.manifest_path(&base, version);
1323            object_store.put(&path, b"".as_slice()).await.unwrap();
1324        }
1325
1326        // Create some detached manifests
1327        let detached_versions: Vec<u64> = vec![
1328            100 | DETACHED_VERSION_MASK,
1329            200 | DETACHED_VERSION_MASK,
1330            300 | DETACHED_VERSION_MASK,
1331        ];
1332        for version in &detached_versions {
1333            let path = versions_dir.child(format!("d{}.manifest", version));
1334            object_store.put(&path, b"".as_slice()).await.unwrap();
1335        }
1336
1337        // List detached manifests
1338        let detached_locations: Vec<ManifestLocation> =
1339            list_detached_manifests(&base, &object_store.inner)
1340                .try_collect()
1341                .await
1342                .unwrap();
1343
1344        assert_eq!(detached_locations.len(), 3);
1345        for loc in &detached_locations {
1346            assert_eq!(loc.naming_scheme, ManifestNamingScheme::V2);
1347        }
1348
1349        let mut found_versions: Vec<u64> = detached_locations.iter().map(|l| l.version).collect();
1350        found_versions.sort();
1351        let mut expected_versions = detached_versions.clone();
1352        expected_versions.sort();
1353        assert_eq!(found_versions, expected_versions);
1354    }
1355}