lance_table/io/
commit.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Trait for commit implementations.
5//!
6//! In Lance, a transaction is committed by writing the next manifest file.
7//! However, care should be taken to ensure that the manifest file is written
8//! only once, even if there are concurrent writers. Different stores have
9//! different abilities to handle concurrent writes, so a trait is provided
10//! to allow for different implementations.
11//!
12//! The trait [CommitHandler] can be implemented to provide different commit
13//! strategies. The default implementation for most object stores is
14//! [RenameCommitHandler], which writes the manifest to a temporary path, then
15//! renames the temporary path to the final path if no object already exists
16//! at the final path. This is an atomic operation in most object stores, but
17//! not in AWS S3. So for AWS S3, the default commit handler is
18//! [UnsafeCommitHandler], which writes the manifest to the final path without
19//! any checks.
20//!
21//! When providing your own commit handler, most often you are implementing in
22//! terms of a lock. The trait [CommitLock] can be implemented as a simpler
23//! alternative to [CommitHandler].
24
25use std::io;
26use std::pin::Pin;
27use std::sync::atomic::AtomicBool;
28use std::sync::Arc;
29use std::{fmt::Debug, fs::DirEntry};
30
31use super::manifest::write_manifest;
32use futures::future::Either;
33use futures::Stream;
34use futures::{
35    future::{self, BoxFuture},
36    stream::BoxStream,
37    StreamExt, TryStreamExt,
38};
39use lance_file::format::{MAGIC, MAJOR_VERSION, MINOR_VERSION};
40use lance_io::object_writer::{ObjectWriter, WriteResult};
41use log::warn;
42use object_store::PutOptions;
43use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore};
44use snafu::location;
45use tracing::info;
46use url::Url;
47
48#[cfg(feature = "dynamodb")]
49pub mod dynamodb;
50pub mod external_manifest;
51
52use lance_core::{Error, Result};
53use lance_io::object_store::{ObjectStore, ObjectStoreExt, ObjectStoreParams};
54use lance_io::traits::WriteExt;
55
56use crate::format::{is_detached_version, IndexMetadata, Manifest, Transaction};
57use lance_core::utils::tracing::{AUDIT_MODE_CREATE, AUDIT_TYPE_MANIFEST, TRACE_FILE_AUDIT};
58#[cfg(feature = "dynamodb")]
59use {
60    self::external_manifest::{ExternalManifestCommitHandler, ExternalManifestStore},
61    aws_credential_types::provider::error::CredentialsError,
62    aws_credential_types::provider::ProvideCredentials,
63    lance_io::object_store::{providers::aws::build_aws_credential, StorageOptions},
64    object_store::aws::AmazonS3ConfigKey,
65    object_store::aws::AwsCredentialProvider,
66    std::borrow::Cow,
67    std::time::{Duration, SystemTime},
68};
69
70const VERSIONS_DIR: &str = "_versions";
71const MANIFEST_EXTENSION: &str = "manifest";
72const DETACHED_VERSION_PREFIX: &str = "d";
73
74/// How manifest files should be named.
75#[derive(Clone, Copy, Debug, PartialEq, Eq)]
76pub enum ManifestNamingScheme {
77    /// `_versions/{version}.manifest`
78    V1,
79    /// `_manifests/{u64::MAX - version}.manifest`
80    ///
81    /// Zero-padded and reversed for O(1) lookup of latest version on object stores.
82    V2,
83}
84
85impl ManifestNamingScheme {
86    pub fn manifest_path(&self, base: &Path, version: u64) -> Path {
87        let directory = base.child(VERSIONS_DIR);
88        if is_detached_version(version) {
89            // Detached versions should never show up first in a list operation which
90            // means it needs to come lexicographically after all attached manifest
91            // files and so we add the prefix `d`.  There is no need to invert the
92            // version number since detached versions are not part of the version
93            let directory = base.child(VERSIONS_DIR);
94            directory.child(format!(
95                "{DETACHED_VERSION_PREFIX}{version}.{MANIFEST_EXTENSION}"
96            ))
97        } else {
98            match self {
99                Self::V1 => directory.child(format!("{version}.{MANIFEST_EXTENSION}")),
100                Self::V2 => {
101                    let inverted_version = u64::MAX - version;
102                    directory.child(format!("{inverted_version:020}.{MANIFEST_EXTENSION}"))
103                }
104            }
105        }
106    }
107
108    pub fn parse_version(&self, filename: &str) -> Option<u64> {
109        let file_number = filename
110            .split_once('.')
111            // Detached versions will fail the `parse` step, which is ok.
112            .and_then(|(version_str, _)| version_str.parse::<u64>().ok());
113        match self {
114            Self::V1 => file_number,
115            Self::V2 => file_number.map(|v| u64::MAX - v),
116        }
117    }
118
119    pub fn detect_scheme(filename: &str) -> Option<Self> {
120        if filename.starts_with(DETACHED_VERSION_PREFIX) {
121            // Currently, detached versions must imply V2
122            return Some(Self::V2);
123        }
124        if filename.ends_with(MANIFEST_EXTENSION) {
125            const V2_LEN: usize = 20 + 1 + MANIFEST_EXTENSION.len();
126            if filename.len() == V2_LEN {
127                Some(Self::V2)
128            } else {
129                Some(Self::V1)
130            }
131        } else {
132            None
133        }
134    }
135
136    pub fn detect_scheme_staging(filename: &str) -> Self {
137        // We shouldn't have to worry about detached versions here since there is no
138        // such thing as "detached" and "staged" at the same time.
139        if filename.chars().nth(20) == Some('.') {
140            Self::V2
141        } else {
142            Self::V1
143        }
144    }
145}
146
147/// Migrate all V1 manifests to V2 naming scheme.
148///
149/// This function will rename all V1 manifests to V2 naming scheme.
150///
151/// This function is idempotent, and can be run multiple times without
152/// changing the state of the object store.
153///
154/// However, it should not be run while other concurrent operations are happening.
155/// And it should also run until completion before resuming other operations.
156pub async fn migrate_scheme_to_v2(object_store: &ObjectStore, dataset_base: &Path) -> Result<()> {
157    object_store
158        .inner
159        .list(Some(&dataset_base.child(VERSIONS_DIR)))
160        .try_filter(|res| {
161            let res = if let Some(filename) = res.location.filename() {
162                ManifestNamingScheme::detect_scheme(filename) == Some(ManifestNamingScheme::V1)
163            } else {
164                false
165            };
166            future::ready(res)
167        })
168        .try_for_each_concurrent(object_store.io_parallelism(), |meta| async move {
169            let filename = meta.location.filename().unwrap();
170            let version = ManifestNamingScheme::V1.parse_version(filename).unwrap();
171            let path = ManifestNamingScheme::V2.manifest_path(dataset_base, version);
172            object_store.inner.rename(&meta.location, &path).await?;
173            Ok(())
174        })
175        .await?;
176
177    Ok(())
178}
179
180/// Function that writes the manifest to the object store.
181///
182/// Returns the size of the written manifest.
183pub type ManifestWriter = for<'a> fn(
184    object_store: &'a ObjectStore,
185    manifest: &'a mut Manifest,
186    indices: Option<Vec<IndexMetadata>>,
187    path: &'a Path,
188    transaction: Option<Transaction>,
189) -> BoxFuture<'a, Result<WriteResult>>;
190
191/// Canonical manifest writer; its function item type exactly matches `ManifestWriter`.
192/// Rationale: keep a crate-local writer implementation so call sites can pass this function
193/// directly without non-primitive casts or lifetime coercions.
194pub fn write_manifest_file_to_path<'a>(
195    object_store: &'a ObjectStore,
196    manifest: &'a mut Manifest,
197    indices: Option<Vec<IndexMetadata>>,
198    path: &'a Path,
199    transaction: Option<Transaction>,
200) -> BoxFuture<'a, Result<WriteResult>> {
201    Box::pin(async move {
202        let mut object_writer = ObjectWriter::new(object_store, path).await?;
203        let pos = write_manifest(&mut object_writer, manifest, indices, transaction).await?;
204        object_writer
205            .write_magics(pos, MAJOR_VERSION, MINOR_VERSION, MAGIC)
206            .await?;
207        let res = object_writer.shutdown().await?;
208        info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_MANIFEST, path = path.to_string());
209        Ok(res)
210    })
211}
212
213#[derive(Debug, Clone)]
214pub struct ManifestLocation {
215    /// The version the manifest corresponds to.
216    pub version: u64,
217    /// Path of the manifest file, relative to the table root.
218    pub path: Path,
219    /// Size, in bytes, of the manifest file. If it is not known, this field should be `None`.
220    pub size: Option<u64>,
221    /// Naming scheme of the manifest file.
222    pub naming_scheme: ManifestNamingScheme,
223    /// Optional e-tag, used for integrity checks. Manifests should be immutable, so
224    /// if we detect a change in the e-tag, it means the manifest was tampered with.
225    /// This might happen if the dataset was deleted and then re-created.
226    pub e_tag: Option<String>,
227}
228
229impl TryFrom<object_store::ObjectMeta> for ManifestLocation {
230    type Error = Error;
231
232    fn try_from(meta: object_store::ObjectMeta) -> Result<Self> {
233        let filename = meta.location.filename().ok_or_else(|| Error::Internal {
234            message: "ObjectMeta location does not have a filename".to_string(),
235            location: location!(),
236        })?;
237        let scheme =
238            ManifestNamingScheme::detect_scheme(filename).ok_or_else(|| Error::Internal {
239                message: format!("Invalid manifest filename: '{}'", filename),
240                location: location!(),
241            })?;
242        let version = scheme
243            .parse_version(filename)
244            .ok_or_else(|| Error::Internal {
245                message: format!("Invalid manifest filename: '{}'", filename),
246                location: location!(),
247            })?;
248        Ok(Self {
249            version,
250            path: meta.location,
251            size: Some(meta.size),
252            naming_scheme: scheme,
253            e_tag: meta.e_tag,
254        })
255    }
256}
257
258/// Get the latest manifest path
259async fn current_manifest_path(
260    object_store: &ObjectStore,
261    base: &Path,
262) -> Result<ManifestLocation> {
263    if object_store.is_local() {
264        if let Ok(Some(location)) = current_manifest_local(base) {
265            return Ok(location);
266        }
267    }
268
269    let manifest_files = object_store.list(Some(base.child(VERSIONS_DIR)));
270
271    let mut valid_manifests = manifest_files.try_filter_map(|res| {
272        if let Some(scheme) = ManifestNamingScheme::detect_scheme(res.location.filename().unwrap())
273        {
274            future::ready(Ok(Some((scheme, res))))
275        } else {
276            future::ready(Ok(None))
277        }
278    });
279
280    let first = valid_manifests.next().await.transpose()?;
281    match (first, object_store.list_is_lexically_ordered) {
282        // If the first valid manifest we see is V2, we can assume that we are using
283        // V2 naming scheme for all manifests.
284        (Some((scheme @ ManifestNamingScheme::V2, meta)), true) => {
285            let version = scheme
286                .parse_version(meta.location.filename().unwrap())
287                .unwrap();
288
289            // Sanity check: verify at least for the first 1k files that they are all V2
290            // and that the version numbers are decreasing. We use the first 1k because
291            // this is the typical size of an object store list endpoint response page.
292            for (scheme, meta) in valid_manifests.take(999).try_collect::<Vec<_>>().await? {
293                if scheme != ManifestNamingScheme::V2 {
294                    warn!(
295                        "Found V1 Manifest in a V2 directory. Use `migrate_manifest_paths_v2` \
296                         to migrate the directory."
297                    );
298                    break;
299                }
300                let next_version = scheme
301                    .parse_version(meta.location.filename().unwrap())
302                    .unwrap();
303                if next_version >= version {
304                    warn!(
305                        "List operation was expected to be lexically ordered, but was not. This \
306                         could mean a corrupt read. Please make a bug report on the lance-format/lance \
307                         GitHub repository."
308                    );
309                    break;
310                }
311            }
312
313            Ok(ManifestLocation {
314                version,
315                path: meta.location,
316                size: Some(meta.size),
317                naming_scheme: scheme,
318                e_tag: meta.e_tag,
319            })
320        }
321        // If the list is not lexically ordered, we need to iterate all manifests
322        // to find the latest version. This works for both V1 and V2 schemes.
323        (Some((first_scheme, meta)), _) => {
324            let mut current_version = first_scheme
325                .parse_version(meta.location.filename().unwrap())
326                .unwrap();
327            let mut current_meta = meta;
328            let scheme = first_scheme;
329
330            while let Some((entry_scheme, meta)) = valid_manifests.next().await.transpose()? {
331                if entry_scheme != scheme {
332                    return Err(Error::Internal {
333                        message: format!(
334                            "Found multiple manifest naming schemes in the same directory: {:?} and {:?}. \
335                             Use `migrate_manifest_paths_v2` to migrate the directory.",
336                            scheme, entry_scheme
337                        ),
338                        location: location!(),
339                    });
340                }
341                let version = entry_scheme
342                    .parse_version(meta.location.filename().unwrap())
343                    .unwrap();
344                if version > current_version {
345                    current_version = version;
346                    current_meta = meta;
347                }
348            }
349            Ok(ManifestLocation {
350                version: current_version,
351                path: current_meta.location,
352                size: Some(current_meta.size),
353                naming_scheme: scheme,
354                e_tag: current_meta.e_tag,
355            })
356        }
357        (None, _) => Err(Error::NotFound {
358            uri: base.child(VERSIONS_DIR).to_string(),
359            location: location!(),
360        }),
361    }
362}
363
364// This is an optimized function that searches for the latest manifest. In
365// object_store, list operations lookup metadata for each file listed. This
366// method only gets the metadata for the found latest manifest.
367fn current_manifest_local(base: &Path) -> std::io::Result<Option<ManifestLocation>> {
368    let path = lance_io::local::to_local_path(&base.child(VERSIONS_DIR));
369    let entries = std::fs::read_dir(path)?;
370
371    let mut latest_entry: Option<(u64, DirEntry)> = None;
372
373    let mut scheme: Option<ManifestNamingScheme> = None;
374
375    for entry in entries {
376        let entry = entry?;
377        let filename_raw = entry.file_name();
378        let filename = filename_raw.to_string_lossy();
379
380        let Some(entry_scheme) = ManifestNamingScheme::detect_scheme(&filename) else {
381            // Need to ignore temporary files, such as
382            // .tmp_7.manifest_9c100374-3298-4537-afc6-f5ee7913666d
383            continue;
384        };
385
386        if let Some(scheme) = scheme {
387            if scheme != entry_scheme {
388                return Err(io::Error::new(
389                    io::ErrorKind::InvalidData,
390                    format!(
391                        "Found multiple manifest naming schemes in the same directory: {:?} and {:?}",
392                        scheme, entry_scheme
393                    ),
394                ));
395            }
396        } else {
397            scheme = Some(entry_scheme);
398        }
399
400        let Some(version) = entry_scheme.parse_version(&filename) else {
401            continue;
402        };
403
404        if let Some((latest_version, _)) = &latest_entry {
405            if version > *latest_version {
406                latest_entry = Some((version, entry));
407            }
408        } else {
409            latest_entry = Some((version, entry));
410        }
411    }
412
413    if let Some((version, entry)) = latest_entry {
414        let path = Path::from_filesystem_path(entry.path())
415            .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?;
416        let metadata = entry.metadata()?;
417        Ok(Some(ManifestLocation {
418            version,
419            path,
420            size: Some(metadata.len()),
421            naming_scheme: scheme.unwrap(),
422            e_tag: Some(get_etag(&metadata)),
423        }))
424    } else {
425        Ok(None)
426    }
427}
428
429// Based on object store's implementation.
430fn get_etag(metadata: &std::fs::Metadata) -> String {
431    let inode = get_inode(metadata);
432    let size = metadata.len();
433    let mtime = metadata
434        .modified()
435        .ok()
436        .and_then(|mtime| mtime.duration_since(std::time::SystemTime::UNIX_EPOCH).ok())
437        .unwrap_or_default()
438        .as_micros();
439
440    // Use an ETag scheme based on that used by many popular HTTP servers
441    // <https://httpd.apache.org/docs/2.2/mod/core.html#fileetag>
442    // <https://stackoverflow.com/questions/47512043/how-etags-are-generated-and-configured>
443    format!("{inode:x}-{mtime:x}-{size:x}")
444}
445
446#[cfg(unix)]
447/// We include the inode when available to yield an ETag more resistant to collisions
448/// and as used by popular web servers such as [Apache](https://httpd.apache.org/docs/2.2/mod/core.html#fileetag)
449fn get_inode(metadata: &std::fs::Metadata) -> u64 {
450    std::os::unix::fs::MetadataExt::ino(metadata)
451}
452
453#[cfg(not(unix))]
454/// On platforms where an inode isn't available, fallback to just relying on size and mtime
455fn get_inode(_metadata: &std::fs::Metadata) -> u64 {
456    0
457}
458
459fn list_manifests<'a>(
460    base_path: &Path,
461    object_store: &'a dyn OSObjectStore,
462) -> impl Stream<Item = Result<ManifestLocation>> + 'a {
463    object_store
464        .read_dir_all(&base_path.child(VERSIONS_DIR), None)
465        .filter_map(|obj_meta| {
466            futures::future::ready(
467                obj_meta
468                    .map(|m| ManifestLocation::try_from(m).ok())
469                    .transpose(),
470            )
471        })
472        .boxed()
473}
474
475fn make_staging_manifest_path(base: &Path) -> Result<Path> {
476    let id = uuid::Uuid::new_v4().to_string();
477    Path::parse(format!("{base}-{id}")).map_err(|e| Error::IO {
478        source: Box::new(e),
479        location: location!(),
480    })
481}
482
483#[cfg(feature = "dynamodb")]
484const DDB_URL_QUERY_KEY: &str = "ddbTableName";
485
486/// Handle commits that prevent conflicting writes.
487///
488/// Commit implementations ensure that if there are multiple concurrent writers
489/// attempting to write the next version of a table, only one will win. In order
490/// to work, all writers must use the same commit handler type.
491/// This trait is also responsible for resolving where the manifests live.
492///
493// TODO: pub(crate)
494#[async_trait::async_trait]
495#[allow(clippy::too_many_arguments)]
496pub trait CommitHandler: Debug + Send + Sync {
497    async fn resolve_latest_location(
498        &self,
499        base_path: &Path,
500        object_store: &ObjectStore,
501    ) -> Result<ManifestLocation> {
502        Ok(current_manifest_path(object_store, base_path).await?)
503    }
504
505    async fn resolve_version_location(
506        &self,
507        base_path: &Path,
508        version: u64,
509        object_store: &dyn OSObjectStore,
510    ) -> Result<ManifestLocation> {
511        default_resolve_version(base_path, version, object_store).await
512    }
513
514    /// If `sorted_descending` is `true`, the stream will yield manifests in descending
515    /// order of version. When the object store has a lexicographically
516    /// ordered list and the naming scheme is V2, this will use an optimized
517    /// list operation. Otherwise, it will list all manifests and sort them
518    /// in memory. When `sorted_descending` is `false`, the stream will yield manifests
519    /// in arbitrary order.
520    fn list_manifest_locations<'a>(
521        &self,
522        base_path: &Path,
523        object_store: &'a ObjectStore,
524        sorted_descending: bool,
525    ) -> BoxStream<'a, Result<ManifestLocation>> {
526        let underlying_stream = list_manifests(base_path, &object_store.inner);
527
528        if !sorted_descending {
529            return underlying_stream.boxed();
530        }
531
532        async fn sort_stream(
533            input_stream: impl futures::Stream<Item = Result<ManifestLocation>> + Unpin,
534        ) -> Result<impl Stream<Item = Result<ManifestLocation>> + Unpin> {
535            let mut locations = input_stream.try_collect::<Vec<_>>().await?;
536            locations.sort_by_key(|m| std::cmp::Reverse(m.version));
537            Ok(futures::stream::iter(locations.into_iter().map(Ok)))
538        }
539
540        // If the object store supports lexicographically ordered lists and
541        // the naming scheme is V2, we can use an optimized list operation.
542        if object_store.list_is_lexically_ordered {
543            // We don't know the naming scheme until we see the first manifest.
544            let mut peekable = underlying_stream.peekable();
545
546            futures::stream::once(async move {
547                let naming_scheme = match Pin::new(&mut peekable).peek().await {
548                    Some(Ok(m)) => m.naming_scheme,
549                    // If we get an error or no manifests are found, we default
550                    // to V2 naming scheme, since it doesn't matter.
551                    Some(Err(_)) => ManifestNamingScheme::V2,
552                    None => ManifestNamingScheme::V2,
553                };
554
555                if naming_scheme == ManifestNamingScheme::V2 {
556                    // If the first manifest is V2, we can use the optimized list operation.
557                    Ok(Either::Left(peekable))
558                } else {
559                    sort_stream(peekable).await.map(Either::Right)
560                }
561            })
562            .try_flatten()
563            .boxed()
564        } else {
565            // If the object store does not support lexicographically ordered lists,
566            // we need to sort the manifests in memory. Systems where this isn't
567            // supported (local fs, S3 express) are typically fast enough
568            // that this is not a problem.
569            futures::stream::once(sort_stream(underlying_stream))
570                .try_flatten()
571                .boxed()
572        }
573    }
574
575    /// Commit a manifest.
576    ///
577    /// This function should return an [CommitError::CommitConflict] if another
578    /// transaction has already been committed to the path.
579    async fn commit(
580        &self,
581        manifest: &mut Manifest,
582        indices: Option<Vec<IndexMetadata>>,
583        base_path: &Path,
584        object_store: &ObjectStore,
585        manifest_writer: ManifestWriter,
586        naming_scheme: ManifestNamingScheme,
587        transaction: Option<Transaction>,
588    ) -> std::result::Result<ManifestLocation, CommitError>;
589
590    /// Delete the recorded manifest information for a dataset at the base_path
591    async fn delete(&self, _base_path: &Path) -> Result<()> {
592        Ok(())
593    }
594}
595
596async fn default_resolve_version(
597    base_path: &Path,
598    version: u64,
599    object_store: &dyn OSObjectStore,
600) -> Result<ManifestLocation> {
601    if is_detached_version(version) {
602        return Ok(ManifestLocation {
603            version,
604            // Detached versions are not supported with V1 naming scheme.  If we need
605            // to support in the future we could use a different prefix (e.g. 'x' or something)
606            naming_scheme: ManifestNamingScheme::V2,
607            // Both V1 and V2 should give the same path for detached versions
608            path: ManifestNamingScheme::V2.manifest_path(base_path, version),
609            size: None,
610            e_tag: None,
611        });
612    }
613
614    // try V2, fallback to V1.
615    let scheme = ManifestNamingScheme::V2;
616    let path = scheme.manifest_path(base_path, version);
617    match object_store.head(&path).await {
618        Ok(meta) => Ok(ManifestLocation {
619            version,
620            path,
621            size: Some(meta.size),
622            naming_scheme: scheme,
623            e_tag: meta.e_tag,
624        }),
625        Err(ObjectStoreError::NotFound { .. }) => {
626            // fallback to V1
627            let scheme = ManifestNamingScheme::V1;
628            Ok(ManifestLocation {
629                version,
630                path: scheme.manifest_path(base_path, version),
631                size: None,
632                naming_scheme: scheme,
633                e_tag: None,
634            })
635        }
636        Err(e) => Err(e.into()),
637    }
638}
639/// Adapt an object_store credentials into AWS SDK creds
640#[cfg(feature = "dynamodb")]
641#[derive(Debug)]
642struct OSObjectStoreToAwsCredAdaptor(AwsCredentialProvider);
643
644#[cfg(feature = "dynamodb")]
645impl ProvideCredentials for OSObjectStoreToAwsCredAdaptor {
646    fn provide_credentials<'a>(
647        &'a self,
648    ) -> aws_credential_types::provider::future::ProvideCredentials<'a>
649    where
650        Self: 'a,
651    {
652        aws_credential_types::provider::future::ProvideCredentials::new(async {
653            let creds = self
654                .0
655                .get_credential()
656                .await
657                .map_err(|e| CredentialsError::provider_error(Box::new(e)))?;
658            Ok(aws_credential_types::Credentials::new(
659                &creds.key_id,
660                &creds.secret_key,
661                creds.token.clone(),
662                Some(
663                    SystemTime::now()
664                        .checked_add(Duration::from_secs(
665                            60 * 10, //  10 min
666                        ))
667                        .expect("overflow"),
668                ),
669                "",
670            ))
671        })
672    }
673}
674
675#[cfg(feature = "dynamodb")]
676async fn build_dynamodb_external_store(
677    table_name: &str,
678    creds: AwsCredentialProvider,
679    region: &str,
680    endpoint: Option<String>,
681    app_name: &str,
682) -> Result<Arc<dyn ExternalManifestStore>> {
683    use super::commit::dynamodb::DynamoDBExternalManifestStore;
684    use aws_sdk_dynamodb::{
685        config::{retry::RetryConfig, IdentityCache, Region},
686        Client,
687    };
688
689    let mut dynamodb_config = aws_sdk_dynamodb::config::Builder::new()
690        .behavior_version_latest()
691        .region(Some(Region::new(region.to_string())))
692        .credentials_provider(OSObjectStoreToAwsCredAdaptor(creds))
693        // caching should be handled by passed AwsCredentialProvider
694        .identity_cache(IdentityCache::no_cache())
695        // Be more resilient to transient network issues.
696        // 5 attempts = 1 initial + 4 retries with exponential backoff.
697        .retry_config(RetryConfig::standard().with_max_attempts(5));
698
699    if let Some(endpoint) = endpoint {
700        dynamodb_config = dynamodb_config.endpoint_url(endpoint);
701    }
702    let client = Client::from_conf(dynamodb_config.build());
703
704    DynamoDBExternalManifestStore::new_external_store(client.into(), table_name, app_name).await
705}
706
707pub async fn commit_handler_from_url(
708    url_or_path: &str,
709    // This looks unused if dynamodb feature disabled
710    #[allow(unused_variables)] options: &Option<ObjectStoreParams>,
711) -> Result<Arc<dyn CommitHandler>> {
712    let local_handler: Arc<dyn CommitHandler> = if cfg!(windows) {
713        Arc::new(RenameCommitHandler)
714    } else {
715        Arc::new(ConditionalPutCommitHandler)
716    };
717
718    let url = match Url::parse(url_or_path) {
719        Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
720            // On Windows, the drive is parsed as a scheme
721            return Ok(local_handler);
722        }
723        Ok(url) => url,
724        Err(_) => {
725            return Ok(local_handler);
726        }
727    };
728
729    match url.scheme() {
730        "file" | "file-object-store" => Ok(local_handler),
731        "s3" | "gs" | "az" | "memory" | "oss" => Ok(Arc::new(ConditionalPutCommitHandler)),
732        #[cfg(not(feature = "dynamodb"))]
733        "s3+ddb" => Err(Error::InvalidInput {
734            source: "`s3+ddb://` scheme requires `dynamodb` feature to be enabled".into(),
735            location: location!(),
736        }),
737        #[cfg(feature = "dynamodb")]
738        "s3+ddb" => {
739            if url.query_pairs().count() != 1 {
740                return Err(Error::InvalidInput {
741                    source: "`s3+ddb://` scheme and expects exactly one query `ddbTableName`"
742                        .into(),
743                    location: location!(),
744                });
745            }
746            let table_name = match url.query_pairs().next() {
747                Some((Cow::Borrowed(key), Cow::Borrowed(table_name)))
748                    if key == DDB_URL_QUERY_KEY =>
749                {
750                    if table_name.is_empty() {
751                        return Err(Error::InvalidInput {
752                            source: "`s3+ddb://` scheme requires non empty dynamodb table name"
753                                .into(),
754                            location: location!(),
755                        });
756                    }
757                    table_name
758                }
759                _ => {
760                    return Err(Error::InvalidInput {
761                        source: "`s3+ddb://` scheme and expects exactly one query `ddbTableName`"
762                            .into(),
763                        location: location!(),
764                    });
765                }
766            };
767            let options = options.clone().unwrap_or_default();
768            let storage_options = StorageOptions(options.storage_options.unwrap_or_default());
769            let dynamo_endpoint = get_dynamodb_endpoint(&storage_options);
770            let expires_at_millis = storage_options.expires_at_millis();
771            let storage_options = storage_options.as_s3_options();
772
773            let region = storage_options.get(&AmazonS3ConfigKey::Region).cloned();
774
775            let (aws_creds, region) = build_aws_credential(
776                options.s3_credentials_refresh_offset,
777                options.aws_credentials.clone(),
778                Some(&storage_options),
779                region,
780                options.storage_options_provider.clone(),
781                expires_at_millis,
782            )
783            .await?;
784
785            Ok(Arc::new(ExternalManifestCommitHandler {
786                external_manifest_store: build_dynamodb_external_store(
787                    table_name,
788                    aws_creds.clone(),
789                    &region,
790                    dynamo_endpoint,
791                    "lancedb",
792                )
793                .await?,
794            }))
795        }
796        _ => Ok(Arc::new(UnsafeCommitHandler)),
797    }
798}
799
800#[cfg(feature = "dynamodb")]
801fn get_dynamodb_endpoint(storage_options: &StorageOptions) -> Option<String> {
802    if let Some(endpoint) = storage_options.0.get("dynamodb_endpoint") {
803        Some(endpoint.clone())
804    } else {
805        std::env::var("DYNAMODB_ENDPOINT").ok()
806    }
807}
808
809/// Errors that can occur when committing a manifest.
810#[derive(Debug)]
811pub enum CommitError {
812    /// Another transaction has already been written to the path
813    CommitConflict,
814    /// Something else went wrong
815    OtherError(Error),
816}
817
818impl From<Error> for CommitError {
819    fn from(e: Error) -> Self {
820        Self::OtherError(e)
821    }
822}
823
824impl From<CommitError> for Error {
825    fn from(e: CommitError) -> Self {
826        match e {
827            CommitError::CommitConflict => Self::Internal {
828                message: "Commit conflict".to_string(),
829                location: location!(),
830            },
831            CommitError::OtherError(e) => e,
832        }
833    }
834}
835
836/// Whether we have issued a warning about using the unsafe commit handler.
837static WARNED_ON_UNSAFE_COMMIT: AtomicBool = AtomicBool::new(false);
838
839/// A naive commit implementation that does not prevent conflicting writes.
840///
841/// This will log a warning the first time it is used.
842pub struct UnsafeCommitHandler;
843
844#[async_trait::async_trait]
845#[allow(clippy::too_many_arguments)]
846impl CommitHandler for UnsafeCommitHandler {
847    async fn commit(
848        &self,
849        manifest: &mut Manifest,
850        indices: Option<Vec<IndexMetadata>>,
851        base_path: &Path,
852        object_store: &ObjectStore,
853        manifest_writer: ManifestWriter,
854        naming_scheme: ManifestNamingScheme,
855        transaction: Option<Transaction>,
856    ) -> std::result::Result<ManifestLocation, CommitError> {
857        // Log a one-time warning
858        if !WARNED_ON_UNSAFE_COMMIT.load(std::sync::atomic::Ordering::Relaxed) {
859            WARNED_ON_UNSAFE_COMMIT.store(true, std::sync::atomic::Ordering::Relaxed);
860            log::warn!(
861                "Using unsafe commit handler. Concurrent writes may result in data loss. \
862                 Consider providing a commit handler that prevents conflicting writes."
863            );
864        }
865
866        let version_path = naming_scheme.manifest_path(base_path, manifest.version);
867        let res =
868            manifest_writer(object_store, manifest, indices, &version_path, transaction).await?;
869
870        Ok(ManifestLocation {
871            version: manifest.version,
872            size: Some(res.size as u64),
873            naming_scheme,
874            path: version_path,
875            e_tag: res.e_tag,
876        })
877    }
878}
879
880impl Debug for UnsafeCommitHandler {
881    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
882        f.debug_struct("UnsafeCommitHandler").finish()
883    }
884}
885
886/// A commit implementation that uses a lock to prevent conflicting writes.
887#[async_trait::async_trait]
888pub trait CommitLock: Debug {
889    type Lease: CommitLease;
890
891    /// Attempt to lock the table for the given version.
892    ///
893    /// If it is already locked by another transaction, wait until it is unlocked.
894    /// Once it is unlocked, return [CommitError::CommitConflict] if the version
895    /// has already been committed. Otherwise, return the lock.
896    ///
897    /// To prevent poisoned locks, it's recommended to set a timeout on the lock
898    /// of at least 30 seconds.
899    ///
900    /// It is not required that the lock tracks the version. It is provided in
901    /// case the locking is handled by a catalog service that needs to know the
902    /// current version of the table.
903    async fn lock(&self, version: u64) -> std::result::Result<Self::Lease, CommitError>;
904}
905
906#[async_trait::async_trait]
907pub trait CommitLease: Send + Sync {
908    /// Return the lease, indicating whether the commit was successful.
909    async fn release(&self, success: bool) -> std::result::Result<(), CommitError>;
910}
911
912#[async_trait::async_trait]
913impl<T: CommitLock + Send + Sync> CommitHandler for T {
914    async fn commit(
915        &self,
916        manifest: &mut Manifest,
917        indices: Option<Vec<IndexMetadata>>,
918        base_path: &Path,
919        object_store: &ObjectStore,
920        manifest_writer: ManifestWriter,
921        naming_scheme: ManifestNamingScheme,
922        transaction: Option<Transaction>,
923    ) -> std::result::Result<ManifestLocation, CommitError> {
924        let path = naming_scheme.manifest_path(base_path, manifest.version);
925        // NOTE: once we have the lease we cannot use ? to return errors, since
926        // we must release the lease before returning.
927        let lease = self.lock(manifest.version).await?;
928
929        // Head the location and make sure it's not already committed
930        match object_store.inner.head(&path).await {
931            Ok(_) => {
932                // The path already exists, so it's already committed
933                // Release the lock
934                lease.release(false).await?;
935
936                return Err(CommitError::CommitConflict);
937            }
938            Err(ObjectStoreError::NotFound { .. }) => {}
939            Err(e) => {
940                // Something else went wrong
941                // Release the lock
942                lease.release(false).await?;
943
944                return Err(CommitError::OtherError(e.into()));
945            }
946        }
947        let res = manifest_writer(object_store, manifest, indices, &path, transaction).await;
948
949        // Release the lock
950        lease.release(res.is_ok()).await?;
951
952        let res = res?;
953        Ok(ManifestLocation {
954            version: manifest.version,
955            size: Some(res.size as u64),
956            naming_scheme,
957            path,
958            e_tag: res.e_tag,
959        })
960    }
961}
962
963#[async_trait::async_trait]
964impl<T: CommitLock + Send + Sync> CommitHandler for Arc<T> {
965    async fn commit(
966        &self,
967        manifest: &mut Manifest,
968        indices: Option<Vec<IndexMetadata>>,
969        base_path: &Path,
970        object_store: &ObjectStore,
971        manifest_writer: ManifestWriter,
972        naming_scheme: ManifestNamingScheme,
973        transaction: Option<Transaction>,
974    ) -> std::result::Result<ManifestLocation, CommitError> {
975        self.as_ref()
976            .commit(
977                manifest,
978                indices,
979                base_path,
980                object_store,
981                manifest_writer,
982                naming_scheme,
983                transaction,
984            )
985            .await
986    }
987}
988
989/// A commit implementation that uses a temporary path and renames the object.
990///
991/// This only works for object stores that support atomic rename if not exist.
992pub struct RenameCommitHandler;
993
994#[async_trait::async_trait]
995impl CommitHandler for RenameCommitHandler {
996    async fn commit(
997        &self,
998        manifest: &mut Manifest,
999        indices: Option<Vec<IndexMetadata>>,
1000        base_path: &Path,
1001        object_store: &ObjectStore,
1002        manifest_writer: ManifestWriter,
1003        naming_scheme: ManifestNamingScheme,
1004        transaction: Option<Transaction>,
1005    ) -> std::result::Result<ManifestLocation, CommitError> {
1006        // Create a temporary object, then use `rename_if_not_exists` to commit.
1007        // If failed, clean up the temporary object.
1008
1009        let path = naming_scheme.manifest_path(base_path, manifest.version);
1010        let tmp_path = make_staging_manifest_path(&path)?;
1011
1012        let res = manifest_writer(object_store, manifest, indices, &tmp_path, transaction).await?;
1013
1014        match object_store
1015            .inner
1016            .rename_if_not_exists(&tmp_path, &path)
1017            .await
1018        {
1019            Ok(_) => {
1020                // Successfully committed
1021                Ok(ManifestLocation {
1022                    version: manifest.version,
1023                    path,
1024                    size: Some(res.size as u64),
1025                    naming_scheme,
1026                    e_tag: None, // Re-name can change e-tag.
1027                })
1028            }
1029            Err(ObjectStoreError::AlreadyExists { .. }) => {
1030                // Another transaction has already been committed
1031                // Attempt to clean up temporary object, but ignore errors if we can't
1032                let _ = object_store.delete(&tmp_path).await;
1033
1034                return Err(CommitError::CommitConflict);
1035            }
1036            Err(e) => {
1037                // Something else went wrong
1038                return Err(CommitError::OtherError(e.into()));
1039            }
1040        }
1041    }
1042}
1043
1044impl Debug for RenameCommitHandler {
1045    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1046        f.debug_struct("RenameCommitHandler").finish()
1047    }
1048}
1049
1050pub struct ConditionalPutCommitHandler;
1051
1052#[async_trait::async_trait]
1053impl CommitHandler for ConditionalPutCommitHandler {
1054    async fn commit(
1055        &self,
1056        manifest: &mut Manifest,
1057        indices: Option<Vec<IndexMetadata>>,
1058        base_path: &Path,
1059        object_store: &ObjectStore,
1060        manifest_writer: ManifestWriter,
1061        naming_scheme: ManifestNamingScheme,
1062        transaction: Option<Transaction>,
1063    ) -> std::result::Result<ManifestLocation, CommitError> {
1064        let path = naming_scheme.manifest_path(base_path, manifest.version);
1065
1066        let memory_store = ObjectStore::memory();
1067        let dummy_path = "dummy";
1068        manifest_writer(
1069            &memory_store,
1070            manifest,
1071            indices,
1072            &dummy_path.into(),
1073            transaction,
1074        )
1075        .await?;
1076        let dummy_data = memory_store.read_one_all(&dummy_path.into()).await?;
1077        let size = dummy_data.len() as u64;
1078        let res = object_store
1079            .inner
1080            .put_opts(
1081                &path,
1082                dummy_data.into(),
1083                PutOptions {
1084                    mode: object_store::PutMode::Create,
1085                    ..Default::default()
1086                },
1087            )
1088            .await
1089            .map_err(|err| match err {
1090                ObjectStoreError::AlreadyExists { .. } | ObjectStoreError::Precondition { .. } => {
1091                    CommitError::CommitConflict
1092                }
1093                _ => CommitError::OtherError(err.into()),
1094            })?;
1095
1096        Ok(ManifestLocation {
1097            version: manifest.version,
1098            path,
1099            size: Some(size),
1100            naming_scheme,
1101            e_tag: res.e_tag,
1102        })
1103    }
1104}
1105
1106impl Debug for ConditionalPutCommitHandler {
1107    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1108        f.debug_struct("ConditionalPutCommitHandler").finish()
1109    }
1110}
1111
1112#[derive(Debug, Clone)]
1113pub struct CommitConfig {
1114    pub num_retries: u32,
1115    pub skip_auto_cleanup: bool,
1116    // TODO: add isolation_level
1117}
1118
1119impl Default for CommitConfig {
1120    fn default() -> Self {
1121        Self {
1122            num_retries: 20,
1123            skip_auto_cleanup: false,
1124        }
1125    }
1126}
1127
1128#[cfg(test)]
1129mod tests {
1130    use lance_core::utils::tempfile::TempObjDir;
1131
1132    use super::*;
1133
1134    #[test]
1135    fn test_manifest_naming_scheme() {
1136        let v1 = ManifestNamingScheme::V1;
1137        let v2 = ManifestNamingScheme::V2;
1138
1139        assert_eq!(
1140            v1.manifest_path(&Path::from("base"), 0),
1141            Path::from("base/_versions/0.manifest")
1142        );
1143        assert_eq!(
1144            v1.manifest_path(&Path::from("base"), 42),
1145            Path::from("base/_versions/42.manifest")
1146        );
1147
1148        assert_eq!(
1149            v2.manifest_path(&Path::from("base"), 0),
1150            Path::from("base/_versions/18446744073709551615.manifest")
1151        );
1152        assert_eq!(
1153            v2.manifest_path(&Path::from("base"), 42),
1154            Path::from("base/_versions/18446744073709551573.manifest")
1155        );
1156
1157        assert_eq!(v1.parse_version("0.manifest"), Some(0));
1158        assert_eq!(v1.parse_version("42.manifest"), Some(42));
1159        assert_eq!(
1160            v1.parse_version("42.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
1161            Some(42)
1162        );
1163
1164        assert_eq!(v2.parse_version("18446744073709551615.manifest"), Some(0));
1165        assert_eq!(v2.parse_version("18446744073709551573.manifest"), Some(42));
1166        assert_eq!(
1167            v2.parse_version("18446744073709551573.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
1168            Some(42)
1169        );
1170
1171        assert_eq!(ManifestNamingScheme::detect_scheme("0.manifest"), Some(v1));
1172        assert_eq!(
1173            ManifestNamingScheme::detect_scheme("18446744073709551615.manifest"),
1174            Some(v2)
1175        );
1176        assert_eq!(ManifestNamingScheme::detect_scheme("something else"), None);
1177    }
1178
1179    #[tokio::test]
1180    async fn test_manifest_naming_migration() {
1181        let object_store = ObjectStore::memory();
1182        let base = Path::from("base");
1183        let versions_dir = base.child(VERSIONS_DIR);
1184
1185        // Write two v1 files and one v1
1186        let original_files = vec![
1187            versions_dir.child("irrelevant"),
1188            ManifestNamingScheme::V1.manifest_path(&base, 0),
1189            ManifestNamingScheme::V2.manifest_path(&base, 1),
1190        ];
1191        for path in original_files {
1192            object_store.put(&path, b"".as_slice()).await.unwrap();
1193        }
1194
1195        migrate_scheme_to_v2(&object_store, &base).await.unwrap();
1196
1197        let expected_files = vec![
1198            ManifestNamingScheme::V2.manifest_path(&base, 1),
1199            ManifestNamingScheme::V2.manifest_path(&base, 0),
1200            versions_dir.child("irrelevant"),
1201        ];
1202        let actual_files = object_store
1203            .inner
1204            .list(Some(&versions_dir))
1205            .map_ok(|res| res.location)
1206            .try_collect::<Vec<_>>()
1207            .await
1208            .unwrap();
1209        assert_eq!(actual_files, expected_files);
1210    }
1211
1212    #[tokio::test]
1213    #[rstest::rstest]
1214    async fn test_list_manifests_sorted(
1215        #[values(true, false)] lexical_list_store: bool,
1216        #[values(ManifestNamingScheme::V1, ManifestNamingScheme::V2)]
1217        naming_scheme: ManifestNamingScheme,
1218    ) {
1219        let tempdir;
1220        let (object_store, base) = if lexical_list_store {
1221            (Box::new(ObjectStore::memory()), Path::from("base"))
1222        } else {
1223            tempdir = TempObjDir::default();
1224            let path = tempdir.child("base");
1225            let store = Box::new(ObjectStore::local());
1226            assert!(!store.list_is_lexically_ordered);
1227            (store, path)
1228        };
1229
1230        // Write 12 manifest files, latest first
1231        let mut expected_paths = Vec::new();
1232        for i in (0..12).rev() {
1233            let path = naming_scheme.manifest_path(&base, i);
1234            object_store.put(&path, b"".as_slice()).await.unwrap();
1235            expected_paths.push(path);
1236        }
1237
1238        let actual_versions = ConditionalPutCommitHandler
1239            .list_manifest_locations(&base, &object_store, true)
1240            .map_ok(|location| location.path)
1241            .try_collect::<Vec<_>>()
1242            .await
1243            .unwrap();
1244
1245        assert_eq!(actual_versions, expected_paths);
1246    }
1247
1248    #[tokio::test]
1249    #[rstest::rstest]
1250    async fn test_current_manifest_path(
1251        #[values(true, false)] lexical_list_store: bool,
1252        #[values(ManifestNamingScheme::V1, ManifestNamingScheme::V2)]
1253        naming_scheme: ManifestNamingScheme,
1254    ) {
1255        // Use memory store for both cases to avoid local FS special codepath.
1256        // Modify list_is_lexically_ordered to simulate different object stores.
1257        let mut object_store = ObjectStore::memory();
1258        object_store.list_is_lexically_ordered = lexical_list_store;
1259        let object_store = Box::new(object_store);
1260        let base = Path::from("base");
1261
1262        // Write 12 manifest files in non-sequential order
1263        for version in [5, 2, 11, 0, 8, 3, 10, 1, 7, 4, 9, 6] {
1264            let path = naming_scheme.manifest_path(&base, version);
1265            object_store.put(&path, b"".as_slice()).await.unwrap();
1266        }
1267
1268        let location = current_manifest_path(&object_store, &base).await.unwrap();
1269
1270        assert_eq!(location.version, 11);
1271        assert_eq!(location.naming_scheme, naming_scheme);
1272        assert_eq!(location.path, naming_scheme.manifest_path(&base, 11));
1273    }
1274}