Skip to main content

lance_table/io/
commit.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Trait for commit implementations.
5//!
6//! In Lance, a transaction is committed by writing the next manifest file.
7//! However, care should be taken to ensure that the manifest file is written
8//! only once, even if there are concurrent writers. Different stores have
9//! different abilities to handle concurrent writes, so a trait is provided
10//! to allow for different implementations.
11//!
12//! The trait [CommitHandler] can be implemented to provide different commit
13//! strategies. The default implementation for most object stores is
14//! [RenameCommitHandler], which writes the manifest to a temporary path, then
15//! renames the temporary path to the final path if no object already exists
16//! at the final path. This is an atomic operation in most object stores, but
17//! not in AWS S3. So for AWS S3, the default commit handler is
18//! [UnsafeCommitHandler], which writes the manifest to the final path without
19//! any checks.
20//!
21//! When providing your own commit handler, most often you are implementing in
22//! terms of a lock. The trait [CommitLock] can be implemented as a simpler
23//! alternative to [CommitHandler].
24
25use std::io;
26use std::pin::Pin;
27use std::sync::atomic::AtomicBool;
28use std::sync::Arc;
29use std::{fmt::Debug, fs::DirEntry};
30
31use super::manifest::write_manifest;
32use futures::future::Either;
33use futures::Stream;
34use futures::{
35    future::{self, BoxFuture},
36    stream::BoxStream,
37    StreamExt, TryStreamExt,
38};
39use lance_file::format::{MAGIC, MAJOR_VERSION, MINOR_VERSION};
40use lance_io::object_writer::{ObjectWriter, WriteResult};
41use log::warn;
42use object_store::PutOptions;
43use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore};
44use snafu::location;
45use tracing::info;
46use url::Url;
47
48#[cfg(feature = "dynamodb")]
49pub mod dynamodb;
50pub mod external_manifest;
51
52use lance_core::{Error, Result};
53use lance_io::object_store::{ObjectStore, ObjectStoreExt, ObjectStoreParams};
54use lance_io::traits::WriteExt;
55
56use crate::format::{is_detached_version, IndexMetadata, Manifest, Transaction};
57use lance_core::utils::tracing::{AUDIT_MODE_CREATE, AUDIT_TYPE_MANIFEST, TRACE_FILE_AUDIT};
58#[cfg(feature = "dynamodb")]
59use {
60    self::external_manifest::{ExternalManifestCommitHandler, ExternalManifestStore},
61    aws_credential_types::provider::error::CredentialsError,
62    aws_credential_types::provider::ProvideCredentials,
63    lance_io::object_store::{providers::aws::build_aws_credential, StorageOptions},
64    object_store::aws::AmazonS3ConfigKey,
65    object_store::aws::AwsCredentialProvider,
66    std::borrow::Cow,
67    std::time::{Duration, SystemTime},
68};
69
70const VERSIONS_DIR: &str = "_versions";
71const MANIFEST_EXTENSION: &str = "manifest";
72const DETACHED_VERSION_PREFIX: &str = "d";
73
74/// How manifest files should be named.
75#[derive(Clone, Copy, Debug, PartialEq, Eq)]
76pub enum ManifestNamingScheme {
77    /// `_versions/{version}.manifest`
78    V1,
79    /// `_manifests/{u64::MAX - version}.manifest`
80    ///
81    /// Zero-padded and reversed for O(1) lookup of latest version on object stores.
82    V2,
83}
84
85impl ManifestNamingScheme {
86    pub fn manifest_path(&self, base: &Path, version: u64) -> Path {
87        let directory = base.child(VERSIONS_DIR);
88        if is_detached_version(version) {
89            // Detached versions should never show up first in a list operation which
90            // means it needs to come lexicographically after all attached manifest
91            // files and so we add the prefix `d`.  There is no need to invert the
92            // version number since detached versions are not part of the version
93            let directory = base.child(VERSIONS_DIR);
94            directory.child(format!(
95                "{DETACHED_VERSION_PREFIX}{version}.{MANIFEST_EXTENSION}"
96            ))
97        } else {
98            match self {
99                Self::V1 => directory.child(format!("{version}.{MANIFEST_EXTENSION}")),
100                Self::V2 => {
101                    let inverted_version = u64::MAX - version;
102                    directory.child(format!("{inverted_version:020}.{MANIFEST_EXTENSION}"))
103                }
104            }
105        }
106    }
107
108    pub fn parse_version(&self, filename: &str) -> Option<u64> {
109        let file_number = filename
110            .split_once('.')
111            // Detached versions will fail the `parse` step, which is ok.
112            .and_then(|(version_str, _)| version_str.parse::<u64>().ok());
113        match self {
114            Self::V1 => file_number,
115            Self::V2 => file_number.map(|v| u64::MAX - v),
116        }
117    }
118
119    pub fn detect_scheme(filename: &str) -> Option<Self> {
120        if filename.starts_with(DETACHED_VERSION_PREFIX) {
121            // Currently, detached versions must imply V2
122            return Some(Self::V2);
123        }
124        if filename.ends_with(MANIFEST_EXTENSION) {
125            const V2_LEN: usize = 20 + 1 + MANIFEST_EXTENSION.len();
126            if filename.len() == V2_LEN {
127                Some(Self::V2)
128            } else {
129                Some(Self::V1)
130            }
131        } else {
132            None
133        }
134    }
135
136    pub fn detect_scheme_staging(filename: &str) -> Self {
137        // We shouldn't have to worry about detached versions here since there is no
138        // such thing as "detached" and "staged" at the same time.
139        if filename.chars().nth(20) == Some('.') {
140            Self::V2
141        } else {
142            Self::V1
143        }
144    }
145}
146
147/// Migrate all V1 manifests to V2 naming scheme.
148///
149/// This function will rename all V1 manifests to V2 naming scheme.
150///
151/// This function is idempotent, and can be run multiple times without
152/// changing the state of the object store.
153///
154/// However, it should not be run while other concurrent operations are happening.
155/// And it should also run until completion before resuming other operations.
156pub async fn migrate_scheme_to_v2(object_store: &ObjectStore, dataset_base: &Path) -> Result<()> {
157    object_store
158        .inner
159        .list(Some(&dataset_base.child(VERSIONS_DIR)))
160        .try_filter(|res| {
161            let res = if let Some(filename) = res.location.filename() {
162                ManifestNamingScheme::detect_scheme(filename) == Some(ManifestNamingScheme::V1)
163            } else {
164                false
165            };
166            future::ready(res)
167        })
168        .try_for_each_concurrent(object_store.io_parallelism(), |meta| async move {
169            let filename = meta.location.filename().unwrap();
170            let version = ManifestNamingScheme::V1.parse_version(filename).unwrap();
171            let path = ManifestNamingScheme::V2.manifest_path(dataset_base, version);
172            object_store.inner.rename(&meta.location, &path).await?;
173            Ok(())
174        })
175        .await?;
176
177    Ok(())
178}
179
180/// Function that writes the manifest to the object store.
181///
182/// Returns the size of the written manifest.
183pub type ManifestWriter = for<'a> fn(
184    object_store: &'a ObjectStore,
185    manifest: &'a mut Manifest,
186    indices: Option<Vec<IndexMetadata>>,
187    path: &'a Path,
188    transaction: Option<Transaction>,
189) -> BoxFuture<'a, Result<WriteResult>>;
190
191/// Canonical manifest writer; its function item type exactly matches `ManifestWriter`.
192/// Rationale: keep a crate-local writer implementation so call sites can pass this function
193/// directly without non-primitive casts or lifetime coercions.
194pub fn write_manifest_file_to_path<'a>(
195    object_store: &'a ObjectStore,
196    manifest: &'a mut Manifest,
197    indices: Option<Vec<IndexMetadata>>,
198    path: &'a Path,
199    transaction: Option<Transaction>,
200) -> BoxFuture<'a, Result<WriteResult>> {
201    Box::pin(async move {
202        let mut object_writer = ObjectWriter::new(object_store, path).await?;
203        let pos = write_manifest(&mut object_writer, manifest, indices, transaction).await?;
204        object_writer
205            .write_magics(pos, MAJOR_VERSION, MINOR_VERSION, MAGIC)
206            .await?;
207        let res = object_writer.shutdown().await?;
208        info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_MANIFEST, path = path.to_string());
209        Ok(res)
210    })
211}
212
213#[derive(Debug, Clone)]
214pub struct ManifestLocation {
215    /// The version the manifest corresponds to.
216    pub version: u64,
217    /// Path of the manifest file, relative to the table root.
218    pub path: Path,
219    /// Size, in bytes, of the manifest file. If it is not known, this field should be `None`.
220    pub size: Option<u64>,
221    /// Naming scheme of the manifest file.
222    pub naming_scheme: ManifestNamingScheme,
223    /// Optional e-tag, used for integrity checks. Manifests should be immutable, so
224    /// if we detect a change in the e-tag, it means the manifest was tampered with.
225    /// This might happen if the dataset was deleted and then re-created.
226    pub e_tag: Option<String>,
227}
228
229impl TryFrom<object_store::ObjectMeta> for ManifestLocation {
230    type Error = Error;
231
232    fn try_from(meta: object_store::ObjectMeta) -> Result<Self> {
233        let filename = meta.location.filename().ok_or_else(|| Error::Internal {
234            message: "ObjectMeta location does not have a filename".to_string(),
235            location: location!(),
236        })?;
237        let scheme =
238            ManifestNamingScheme::detect_scheme(filename).ok_or_else(|| Error::Internal {
239                message: format!("Invalid manifest filename: '{}'", filename),
240                location: location!(),
241            })?;
242        let version = scheme
243            .parse_version(filename)
244            .ok_or_else(|| Error::Internal {
245                message: format!("Invalid manifest filename: '{}'", filename),
246                location: location!(),
247            })?;
248        Ok(Self {
249            version,
250            path: meta.location,
251            size: Some(meta.size),
252            naming_scheme: scheme,
253            e_tag: meta.e_tag,
254        })
255    }
256}
257
258/// Get the latest manifest path
259async fn current_manifest_path(
260    object_store: &ObjectStore,
261    base: &Path,
262) -> Result<ManifestLocation> {
263    if object_store.is_local() {
264        if let Ok(Some(location)) = current_manifest_local(base) {
265            return Ok(location);
266        }
267    }
268
269    let manifest_files = object_store.list(Some(base.child(VERSIONS_DIR)));
270
271    let mut valid_manifests = manifest_files.try_filter_map(|res| {
272        if let Some(scheme) = ManifestNamingScheme::detect_scheme(res.location.filename().unwrap())
273        {
274            future::ready(Ok(Some((scheme, res))))
275        } else {
276            future::ready(Ok(None))
277        }
278    });
279
280    let first = valid_manifests.next().await.transpose()?;
281    match (first, object_store.list_is_lexically_ordered) {
282        // If the first valid manifest we see is V2, we can assume that we are using
283        // V2 naming scheme for all manifests.
284        (Some((scheme @ ManifestNamingScheme::V2, meta)), true) => {
285            let version = scheme
286                .parse_version(meta.location.filename().unwrap())
287                .unwrap();
288
289            // Sanity check: verify at least for the first 1k files that they are all V2
290            // and that the version numbers are decreasing. We use the first 1k because
291            // this is the typical size of an object store list endpoint response page.
292            for (scheme, meta) in valid_manifests.take(999).try_collect::<Vec<_>>().await? {
293                if scheme != ManifestNamingScheme::V2 {
294                    warn!(
295                        "Found V1 Manifest in a V2 directory. Use `migrate_manifest_paths_v2` \
296                         to migrate the directory."
297                    );
298                    break;
299                }
300                let next_version = scheme
301                    .parse_version(meta.location.filename().unwrap())
302                    .unwrap();
303                if next_version >= version {
304                    warn!(
305                        "List operation was expected to be lexically ordered, but was not. This \
306                         could mean a corrupt read. Please make a bug report on the lance-format/lance \
307                         GitHub repository."
308                    );
309                    break;
310                }
311            }
312
313            Ok(ManifestLocation {
314                version,
315                path: meta.location,
316                size: Some(meta.size),
317                naming_scheme: scheme,
318                e_tag: meta.e_tag,
319            })
320        }
321        // If the list is not lexically ordered, we need to iterate all manifests
322        // to find the latest version. This works for both V1 and V2 schemes.
323        (Some((first_scheme, meta)), _) => {
324            let mut current_version = first_scheme
325                .parse_version(meta.location.filename().unwrap())
326                .unwrap();
327            let mut current_meta = meta;
328            let scheme = first_scheme;
329
330            while let Some((entry_scheme, meta)) = valid_manifests.next().await.transpose()? {
331                if entry_scheme != scheme {
332                    return Err(Error::Internal {
333                        message: format!(
334                            "Found multiple manifest naming schemes in the same directory: {:?} and {:?}. \
335                             Use `migrate_manifest_paths_v2` to migrate the directory.",
336                            scheme, entry_scheme
337                        ),
338                        location: location!(),
339                    });
340                }
341                let version = entry_scheme
342                    .parse_version(meta.location.filename().unwrap())
343                    .unwrap();
344                if version > current_version {
345                    current_version = version;
346                    current_meta = meta;
347                }
348            }
349            Ok(ManifestLocation {
350                version: current_version,
351                path: current_meta.location,
352                size: Some(current_meta.size),
353                naming_scheme: scheme,
354                e_tag: current_meta.e_tag,
355            })
356        }
357        (None, _) => Err(Error::NotFound {
358            uri: base.child(VERSIONS_DIR).to_string(),
359            location: location!(),
360        }),
361    }
362}
363
364// This is an optimized function that searches for the latest manifest. In
365// object_store, list operations lookup metadata for each file listed. This
366// method only gets the metadata for the found latest manifest.
367fn current_manifest_local(base: &Path) -> std::io::Result<Option<ManifestLocation>> {
368    let path = lance_io::local::to_local_path(&base.child(VERSIONS_DIR));
369    let entries = std::fs::read_dir(path)?;
370
371    let mut latest_entry: Option<(u64, DirEntry)> = None;
372
373    let mut scheme: Option<ManifestNamingScheme> = None;
374
375    for entry in entries {
376        let entry = entry?;
377        let filename_raw = entry.file_name();
378        let filename = filename_raw.to_string_lossy();
379
380        let Some(entry_scheme) = ManifestNamingScheme::detect_scheme(&filename) else {
381            // Need to ignore temporary files, such as
382            // .tmp_7.manifest_9c100374-3298-4537-afc6-f5ee7913666d
383            continue;
384        };
385
386        if let Some(scheme) = scheme {
387            if scheme != entry_scheme {
388                return Err(io::Error::new(
389                    io::ErrorKind::InvalidData,
390                    format!(
391                        "Found multiple manifest naming schemes in the same directory: {:?} and {:?}",
392                        scheme, entry_scheme
393                    ),
394                ));
395            }
396        } else {
397            scheme = Some(entry_scheme);
398        }
399
400        let Some(version) = entry_scheme.parse_version(&filename) else {
401            continue;
402        };
403
404        if let Some((latest_version, _)) = &latest_entry {
405            if version > *latest_version {
406                latest_entry = Some((version, entry));
407            }
408        } else {
409            latest_entry = Some((version, entry));
410        }
411    }
412
413    if let Some((version, entry)) = latest_entry {
414        let path = Path::from_filesystem_path(entry.path())
415            .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?;
416        let metadata = entry.metadata()?;
417        Ok(Some(ManifestLocation {
418            version,
419            path,
420            size: Some(metadata.len()),
421            naming_scheme: scheme.unwrap(),
422            e_tag: Some(get_etag(&metadata)),
423        }))
424    } else {
425        Ok(None)
426    }
427}
428
429// Based on object store's implementation.
430fn get_etag(metadata: &std::fs::Metadata) -> String {
431    let inode = get_inode(metadata);
432    let size = metadata.len();
433    let mtime = metadata
434        .modified()
435        .ok()
436        .and_then(|mtime| mtime.duration_since(std::time::SystemTime::UNIX_EPOCH).ok())
437        .unwrap_or_default()
438        .as_micros();
439
440    // Use an ETag scheme based on that used by many popular HTTP servers
441    // <https://httpd.apache.org/docs/2.2/mod/core.html#fileetag>
442    // <https://stackoverflow.com/questions/47512043/how-etags-are-generated-and-configured>
443    format!("{inode:x}-{mtime:x}-{size:x}")
444}
445
446#[cfg(unix)]
447/// We include the inode when available to yield an ETag more resistant to collisions
448/// and as used by popular web servers such as [Apache](https://httpd.apache.org/docs/2.2/mod/core.html#fileetag)
449fn get_inode(metadata: &std::fs::Metadata) -> u64 {
450    std::os::unix::fs::MetadataExt::ino(metadata)
451}
452
453#[cfg(not(unix))]
454/// On platforms where an inode isn't available, fallback to just relying on size and mtime
455fn get_inode(_metadata: &std::fs::Metadata) -> u64 {
456    0
457}
458
459fn list_manifests<'a>(
460    base_path: &Path,
461    object_store: &'a dyn OSObjectStore,
462) -> impl Stream<Item = Result<ManifestLocation>> + 'a {
463    object_store
464        .read_dir_all(&base_path.child(VERSIONS_DIR), None)
465        .filter_map(|obj_meta| {
466            futures::future::ready(
467                obj_meta
468                    .map(|m| ManifestLocation::try_from(m).ok())
469                    .transpose(),
470            )
471        })
472        .boxed()
473}
474
475fn make_staging_manifest_path(base: &Path) -> Result<Path> {
476    let id = uuid::Uuid::new_v4().to_string();
477    Path::parse(format!("{base}-{id}")).map_err(|e| Error::IO {
478        source: Box::new(e),
479        location: location!(),
480    })
481}
482
483#[cfg(feature = "dynamodb")]
484const DDB_URL_QUERY_KEY: &str = "ddbTableName";
485
486/// Handle commits that prevent conflicting writes.
487///
488/// Commit implementations ensure that if there are multiple concurrent writers
489/// attempting to write the next version of a table, only one will win. In order
490/// to work, all writers must use the same commit handler type.
491/// This trait is also responsible for resolving where the manifests live.
492///
493// TODO: pub(crate)
494#[async_trait::async_trait]
495#[allow(clippy::too_many_arguments)]
496pub trait CommitHandler: Debug + Send + Sync {
497    async fn resolve_latest_location(
498        &self,
499        base_path: &Path,
500        object_store: &ObjectStore,
501    ) -> Result<ManifestLocation> {
502        Ok(current_manifest_path(object_store, base_path).await?)
503    }
504
505    async fn resolve_version_location(
506        &self,
507        base_path: &Path,
508        version: u64,
509        object_store: &dyn OSObjectStore,
510    ) -> Result<ManifestLocation> {
511        default_resolve_version(base_path, version, object_store).await
512    }
513
514    /// If `sorted_descending` is `true`, the stream will yield manifests in descending
515    /// order of version. When the object store has a lexicographically
516    /// ordered list and the naming scheme is V2, this will use an optimized
517    /// list operation. Otherwise, it will list all manifests and sort them
518    /// in memory. When `sorted_descending` is `false`, the stream will yield manifests
519    /// in arbitrary order.
520    fn list_manifest_locations<'a>(
521        &self,
522        base_path: &Path,
523        object_store: &'a ObjectStore,
524        sorted_descending: bool,
525    ) -> BoxStream<'a, Result<ManifestLocation>> {
526        let underlying_stream = list_manifests(base_path, &object_store.inner);
527
528        if !sorted_descending {
529            return underlying_stream.boxed();
530        }
531
532        async fn sort_stream(
533            input_stream: impl futures::Stream<Item = Result<ManifestLocation>> + Unpin,
534        ) -> Result<impl Stream<Item = Result<ManifestLocation>> + Unpin> {
535            let mut locations = input_stream.try_collect::<Vec<_>>().await?;
536            locations.sort_by_key(|m| std::cmp::Reverse(m.version));
537            Ok(futures::stream::iter(locations.into_iter().map(Ok)))
538        }
539
540        // If the object store supports lexicographically ordered lists and
541        // the naming scheme is V2, we can use an optimized list operation.
542        if object_store.list_is_lexically_ordered {
543            // We don't know the naming scheme until we see the first manifest.
544            let mut peekable = underlying_stream.peekable();
545
546            futures::stream::once(async move {
547                let naming_scheme = match Pin::new(&mut peekable).peek().await {
548                    Some(Ok(m)) => m.naming_scheme,
549                    // If we get an error or no manifests are found, we default
550                    // to V2 naming scheme, since it doesn't matter.
551                    Some(Err(_)) => ManifestNamingScheme::V2,
552                    None => ManifestNamingScheme::V2,
553                };
554
555                if naming_scheme == ManifestNamingScheme::V2 {
556                    // If the first manifest is V2, we can use the optimized list operation.
557                    Ok(Either::Left(peekable))
558                } else {
559                    sort_stream(peekable).await.map(Either::Right)
560                }
561            })
562            .try_flatten()
563            .boxed()
564        } else {
565            // If the object store does not support lexicographically ordered lists,
566            // we need to sort the manifests in memory. Systems where this isn't
567            // supported (local fs, S3 express) are typically fast enough
568            // that this is not a problem.
569            futures::stream::once(sort_stream(underlying_stream))
570                .try_flatten()
571                .boxed()
572        }
573    }
574
575    /// Commit a manifest.
576    ///
577    /// This function should return an [CommitError::CommitConflict] if another
578    /// transaction has already been committed to the path.
579    async fn commit(
580        &self,
581        manifest: &mut Manifest,
582        indices: Option<Vec<IndexMetadata>>,
583        base_path: &Path,
584        object_store: &ObjectStore,
585        manifest_writer: ManifestWriter,
586        naming_scheme: ManifestNamingScheme,
587        transaction: Option<Transaction>,
588    ) -> std::result::Result<ManifestLocation, CommitError>;
589
590    /// Delete the recorded manifest information for a dataset at the base_path
591    async fn delete(&self, _base_path: &Path) -> Result<()> {
592        Ok(())
593    }
594}
595
596async fn default_resolve_version(
597    base_path: &Path,
598    version: u64,
599    object_store: &dyn OSObjectStore,
600) -> Result<ManifestLocation> {
601    if is_detached_version(version) {
602        return Ok(ManifestLocation {
603            version,
604            // Detached versions are not supported with V1 naming scheme.  If we need
605            // to support in the future we could use a different prefix (e.g. 'x' or something)
606            naming_scheme: ManifestNamingScheme::V2,
607            // Both V1 and V2 should give the same path for detached versions
608            path: ManifestNamingScheme::V2.manifest_path(base_path, version),
609            size: None,
610            e_tag: None,
611        });
612    }
613
614    // try V2, fallback to V1.
615    let scheme = ManifestNamingScheme::V2;
616    let path = scheme.manifest_path(base_path, version);
617    match object_store.head(&path).await {
618        Ok(meta) => Ok(ManifestLocation {
619            version,
620            path,
621            size: Some(meta.size),
622            naming_scheme: scheme,
623            e_tag: meta.e_tag,
624        }),
625        Err(ObjectStoreError::NotFound { .. }) => {
626            // fallback to V1
627            let scheme = ManifestNamingScheme::V1;
628            Ok(ManifestLocation {
629                version,
630                path: scheme.manifest_path(base_path, version),
631                size: None,
632                naming_scheme: scheme,
633                e_tag: None,
634            })
635        }
636        Err(e) => Err(e.into()),
637    }
638}
639/// Adapt an object_store credentials into AWS SDK creds
640#[cfg(feature = "dynamodb")]
641#[derive(Debug)]
642struct OSObjectStoreToAwsCredAdaptor(AwsCredentialProvider);
643
644#[cfg(feature = "dynamodb")]
645impl ProvideCredentials for OSObjectStoreToAwsCredAdaptor {
646    fn provide_credentials<'a>(
647        &'a self,
648    ) -> aws_credential_types::provider::future::ProvideCredentials<'a>
649    where
650        Self: 'a,
651    {
652        aws_credential_types::provider::future::ProvideCredentials::new(async {
653            let creds = self
654                .0
655                .get_credential()
656                .await
657                .map_err(|e| CredentialsError::provider_error(Box::new(e)))?;
658            Ok(aws_credential_types::Credentials::new(
659                &creds.key_id,
660                &creds.secret_key,
661                creds.token.clone(),
662                Some(
663                    SystemTime::now()
664                        .checked_add(Duration::from_secs(
665                            60 * 10, //  10 min
666                        ))
667                        .expect("overflow"),
668                ),
669                "",
670            ))
671        })
672    }
673}
674
675#[cfg(feature = "dynamodb")]
676async fn build_dynamodb_external_store(
677    table_name: &str,
678    creds: AwsCredentialProvider,
679    region: &str,
680    endpoint: Option<String>,
681    app_name: &str,
682) -> Result<Arc<dyn ExternalManifestStore>> {
683    use super::commit::dynamodb::DynamoDBExternalManifestStore;
684    use aws_sdk_dynamodb::{
685        config::{retry::RetryConfig, IdentityCache, Region},
686        Client,
687    };
688
689    let mut dynamodb_config = aws_sdk_dynamodb::config::Builder::new()
690        .behavior_version_latest()
691        .region(Some(Region::new(region.to_string())))
692        .credentials_provider(OSObjectStoreToAwsCredAdaptor(creds))
693        // caching should be handled by passed AwsCredentialProvider
694        .identity_cache(IdentityCache::no_cache())
695        // Be more resilient to transient network issues.
696        // 5 attempts = 1 initial + 4 retries with exponential backoff.
697        .retry_config(RetryConfig::standard().with_max_attempts(5));
698
699    if let Some(endpoint) = endpoint {
700        dynamodb_config = dynamodb_config.endpoint_url(endpoint);
701    }
702    let client = Client::from_conf(dynamodb_config.build());
703
704    DynamoDBExternalManifestStore::new_external_store(client.into(), table_name, app_name).await
705}
706
707pub async fn commit_handler_from_url(
708    url_or_path: &str,
709    // This looks unused if dynamodb feature disabled
710    #[allow(unused_variables)] options: &Option<ObjectStoreParams>,
711) -> Result<Arc<dyn CommitHandler>> {
712    let local_handler: Arc<dyn CommitHandler> = if cfg!(windows) {
713        Arc::new(RenameCommitHandler)
714    } else {
715        Arc::new(ConditionalPutCommitHandler)
716    };
717
718    let url = match Url::parse(url_or_path) {
719        Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
720            // On Windows, the drive is parsed as a scheme
721            return Ok(local_handler);
722        }
723        Ok(url) => url,
724        Err(_) => {
725            return Ok(local_handler);
726        }
727    };
728
729    match url.scheme() {
730        "file" | "file-object-store" => Ok(local_handler),
731        "s3" | "gs" | "az" | "memory" | "oss" => Ok(Arc::new(ConditionalPutCommitHandler)),
732        #[cfg(not(feature = "dynamodb"))]
733        "s3+ddb" => Err(Error::InvalidInput {
734            source: "`s3+ddb://` scheme requires `dynamodb` feature to be enabled".into(),
735            location: location!(),
736        }),
737        #[cfg(feature = "dynamodb")]
738        "s3+ddb" => {
739            if url.query_pairs().count() != 1 {
740                return Err(Error::InvalidInput {
741                    source: "`s3+ddb://` scheme and expects exactly one query `ddbTableName`"
742                        .into(),
743                    location: location!(),
744                });
745            }
746            let table_name = match url.query_pairs().next() {
747                Some((Cow::Borrowed(key), Cow::Borrowed(table_name)))
748                    if key == DDB_URL_QUERY_KEY =>
749                {
750                    if table_name.is_empty() {
751                        return Err(Error::InvalidInput {
752                            source: "`s3+ddb://` scheme requires non empty dynamodb table name"
753                                .into(),
754                            location: location!(),
755                        });
756                    }
757                    table_name
758                }
759                _ => {
760                    return Err(Error::InvalidInput {
761                        source: "`s3+ddb://` scheme and expects exactly one query `ddbTableName`"
762                            .into(),
763                        location: location!(),
764                    });
765                }
766            };
767            let options = options.clone().unwrap_or_default();
768            let storage_options_raw =
769                StorageOptions(options.storage_options().cloned().unwrap_or_default());
770            let dynamo_endpoint = get_dynamodb_endpoint(&storage_options_raw);
771            let storage_options = storage_options_raw.as_s3_options();
772
773            let region = storage_options.get(&AmazonS3ConfigKey::Region).cloned();
774
775            // Get accessor from the options
776            let accessor = options.get_accessor();
777
778            let (aws_creds, region) = build_aws_credential(
779                options.s3_credentials_refresh_offset,
780                options.aws_credentials.clone(),
781                Some(&storage_options),
782                region,
783                accessor,
784            )
785            .await?;
786
787            Ok(Arc::new(ExternalManifestCommitHandler {
788                external_manifest_store: build_dynamodb_external_store(
789                    table_name,
790                    aws_creds.clone(),
791                    &region,
792                    dynamo_endpoint,
793                    "lancedb",
794                )
795                .await?,
796            }))
797        }
798        _ => Ok(Arc::new(UnsafeCommitHandler)),
799    }
800}
801
802#[cfg(feature = "dynamodb")]
803fn get_dynamodb_endpoint(storage_options: &StorageOptions) -> Option<String> {
804    if let Some(endpoint) = storage_options.0.get("dynamodb_endpoint") {
805        Some(endpoint.clone())
806    } else {
807        std::env::var("DYNAMODB_ENDPOINT").ok()
808    }
809}
810
811/// Errors that can occur when committing a manifest.
812#[derive(Debug)]
813pub enum CommitError {
814    /// Another transaction has already been written to the path
815    CommitConflict,
816    /// Something else went wrong
817    OtherError(Error),
818}
819
820impl From<Error> for CommitError {
821    fn from(e: Error) -> Self {
822        Self::OtherError(e)
823    }
824}
825
826impl From<CommitError> for Error {
827    fn from(e: CommitError) -> Self {
828        match e {
829            CommitError::CommitConflict => Self::Internal {
830                message: "Commit conflict".to_string(),
831                location: location!(),
832            },
833            CommitError::OtherError(e) => e,
834        }
835    }
836}
837
838/// Whether we have issued a warning about using the unsafe commit handler.
839static WARNED_ON_UNSAFE_COMMIT: AtomicBool = AtomicBool::new(false);
840
841/// A naive commit implementation that does not prevent conflicting writes.
842///
843/// This will log a warning the first time it is used.
844pub struct UnsafeCommitHandler;
845
846#[async_trait::async_trait]
847#[allow(clippy::too_many_arguments)]
848impl CommitHandler for UnsafeCommitHandler {
849    async fn commit(
850        &self,
851        manifest: &mut Manifest,
852        indices: Option<Vec<IndexMetadata>>,
853        base_path: &Path,
854        object_store: &ObjectStore,
855        manifest_writer: ManifestWriter,
856        naming_scheme: ManifestNamingScheme,
857        transaction: Option<Transaction>,
858    ) -> std::result::Result<ManifestLocation, CommitError> {
859        // Log a one-time warning
860        if !WARNED_ON_UNSAFE_COMMIT.load(std::sync::atomic::Ordering::Relaxed) {
861            WARNED_ON_UNSAFE_COMMIT.store(true, std::sync::atomic::Ordering::Relaxed);
862            log::warn!(
863                "Using unsafe commit handler. Concurrent writes may result in data loss. \
864                 Consider providing a commit handler that prevents conflicting writes."
865            );
866        }
867
868        let version_path = naming_scheme.manifest_path(base_path, manifest.version);
869        let res =
870            manifest_writer(object_store, manifest, indices, &version_path, transaction).await?;
871
872        Ok(ManifestLocation {
873            version: manifest.version,
874            size: Some(res.size as u64),
875            naming_scheme,
876            path: version_path,
877            e_tag: res.e_tag,
878        })
879    }
880}
881
882impl Debug for UnsafeCommitHandler {
883    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
884        f.debug_struct("UnsafeCommitHandler").finish()
885    }
886}
887
888/// A commit implementation that uses a lock to prevent conflicting writes.
889#[async_trait::async_trait]
890pub trait CommitLock: Debug {
891    type Lease: CommitLease;
892
893    /// Attempt to lock the table for the given version.
894    ///
895    /// If it is already locked by another transaction, wait until it is unlocked.
896    /// Once it is unlocked, return [CommitError::CommitConflict] if the version
897    /// has already been committed. Otherwise, return the lock.
898    ///
899    /// To prevent poisoned locks, it's recommended to set a timeout on the lock
900    /// of at least 30 seconds.
901    ///
902    /// It is not required that the lock tracks the version. It is provided in
903    /// case the locking is handled by a catalog service that needs to know the
904    /// current version of the table.
905    async fn lock(&self, version: u64) -> std::result::Result<Self::Lease, CommitError>;
906}
907
908#[async_trait::async_trait]
909pub trait CommitLease: Send + Sync {
910    /// Return the lease, indicating whether the commit was successful.
911    async fn release(&self, success: bool) -> std::result::Result<(), CommitError>;
912}
913
914#[async_trait::async_trait]
915impl<T: CommitLock + Send + Sync> CommitHandler for T {
916    async fn commit(
917        &self,
918        manifest: &mut Manifest,
919        indices: Option<Vec<IndexMetadata>>,
920        base_path: &Path,
921        object_store: &ObjectStore,
922        manifest_writer: ManifestWriter,
923        naming_scheme: ManifestNamingScheme,
924        transaction: Option<Transaction>,
925    ) -> std::result::Result<ManifestLocation, CommitError> {
926        let path = naming_scheme.manifest_path(base_path, manifest.version);
927        // NOTE: once we have the lease we cannot use ? to return errors, since
928        // we must release the lease before returning.
929        let lease = self.lock(manifest.version).await?;
930
931        // Head the location and make sure it's not already committed
932        match object_store.inner.head(&path).await {
933            Ok(_) => {
934                // The path already exists, so it's already committed
935                // Release the lock
936                lease.release(false).await?;
937
938                return Err(CommitError::CommitConflict);
939            }
940            Err(ObjectStoreError::NotFound { .. }) => {}
941            Err(e) => {
942                // Something else went wrong
943                // Release the lock
944                lease.release(false).await?;
945
946                return Err(CommitError::OtherError(e.into()));
947            }
948        }
949        let res = manifest_writer(object_store, manifest, indices, &path, transaction).await;
950
951        // Release the lock
952        lease.release(res.is_ok()).await?;
953
954        let res = res?;
955        Ok(ManifestLocation {
956            version: manifest.version,
957            size: Some(res.size as u64),
958            naming_scheme,
959            path,
960            e_tag: res.e_tag,
961        })
962    }
963}
964
965#[async_trait::async_trait]
966impl<T: CommitLock + Send + Sync> CommitHandler for Arc<T> {
967    async fn commit(
968        &self,
969        manifest: &mut Manifest,
970        indices: Option<Vec<IndexMetadata>>,
971        base_path: &Path,
972        object_store: &ObjectStore,
973        manifest_writer: ManifestWriter,
974        naming_scheme: ManifestNamingScheme,
975        transaction: Option<Transaction>,
976    ) -> std::result::Result<ManifestLocation, CommitError> {
977        self.as_ref()
978            .commit(
979                manifest,
980                indices,
981                base_path,
982                object_store,
983                manifest_writer,
984                naming_scheme,
985                transaction,
986            )
987            .await
988    }
989}
990
991/// A commit implementation that uses a temporary path and renames the object.
992///
993/// This only works for object stores that support atomic rename if not exist.
994pub struct RenameCommitHandler;
995
996#[async_trait::async_trait]
997impl CommitHandler for RenameCommitHandler {
998    async fn commit(
999        &self,
1000        manifest: &mut Manifest,
1001        indices: Option<Vec<IndexMetadata>>,
1002        base_path: &Path,
1003        object_store: &ObjectStore,
1004        manifest_writer: ManifestWriter,
1005        naming_scheme: ManifestNamingScheme,
1006        transaction: Option<Transaction>,
1007    ) -> std::result::Result<ManifestLocation, CommitError> {
1008        // Create a temporary object, then use `rename_if_not_exists` to commit.
1009        // If failed, clean up the temporary object.
1010
1011        let path = naming_scheme.manifest_path(base_path, manifest.version);
1012        let tmp_path = make_staging_manifest_path(&path)?;
1013
1014        let res = manifest_writer(object_store, manifest, indices, &tmp_path, transaction).await?;
1015
1016        match object_store
1017            .inner
1018            .rename_if_not_exists(&tmp_path, &path)
1019            .await
1020        {
1021            Ok(_) => {
1022                // Successfully committed
1023                Ok(ManifestLocation {
1024                    version: manifest.version,
1025                    path,
1026                    size: Some(res.size as u64),
1027                    naming_scheme,
1028                    e_tag: None, // Re-name can change e-tag.
1029                })
1030            }
1031            Err(ObjectStoreError::AlreadyExists { .. }) => {
1032                // Another transaction has already been committed
1033                // Attempt to clean up temporary object, but ignore errors if we can't
1034                let _ = object_store.delete(&tmp_path).await;
1035
1036                return Err(CommitError::CommitConflict);
1037            }
1038            Err(e) => {
1039                // Something else went wrong
1040                return Err(CommitError::OtherError(e.into()));
1041            }
1042        }
1043    }
1044}
1045
1046impl Debug for RenameCommitHandler {
1047    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1048        f.debug_struct("RenameCommitHandler").finish()
1049    }
1050}
1051
1052pub struct ConditionalPutCommitHandler;
1053
1054#[async_trait::async_trait]
1055impl CommitHandler for ConditionalPutCommitHandler {
1056    async fn commit(
1057        &self,
1058        manifest: &mut Manifest,
1059        indices: Option<Vec<IndexMetadata>>,
1060        base_path: &Path,
1061        object_store: &ObjectStore,
1062        manifest_writer: ManifestWriter,
1063        naming_scheme: ManifestNamingScheme,
1064        transaction: Option<Transaction>,
1065    ) -> std::result::Result<ManifestLocation, CommitError> {
1066        let path = naming_scheme.manifest_path(base_path, manifest.version);
1067
1068        let memory_store = ObjectStore::memory();
1069        let dummy_path = "dummy";
1070        manifest_writer(
1071            &memory_store,
1072            manifest,
1073            indices,
1074            &dummy_path.into(),
1075            transaction,
1076        )
1077        .await?;
1078        let dummy_data = memory_store.read_one_all(&dummy_path.into()).await?;
1079        let size = dummy_data.len() as u64;
1080        let res = object_store
1081            .inner
1082            .put_opts(
1083                &path,
1084                dummy_data.into(),
1085                PutOptions {
1086                    mode: object_store::PutMode::Create,
1087                    ..Default::default()
1088                },
1089            )
1090            .await
1091            .map_err(|err| match err {
1092                ObjectStoreError::AlreadyExists { .. } | ObjectStoreError::Precondition { .. } => {
1093                    CommitError::CommitConflict
1094                }
1095                _ => CommitError::OtherError(err.into()),
1096            })?;
1097
1098        Ok(ManifestLocation {
1099            version: manifest.version,
1100            path,
1101            size: Some(size),
1102            naming_scheme,
1103            e_tag: res.e_tag,
1104        })
1105    }
1106}
1107
1108impl Debug for ConditionalPutCommitHandler {
1109    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1110        f.debug_struct("ConditionalPutCommitHandler").finish()
1111    }
1112}
1113
1114#[derive(Debug, Clone)]
1115pub struct CommitConfig {
1116    pub num_retries: u32,
1117    pub skip_auto_cleanup: bool,
1118    // TODO: add isolation_level
1119}
1120
1121impl Default for CommitConfig {
1122    fn default() -> Self {
1123        Self {
1124            num_retries: 20,
1125            skip_auto_cleanup: false,
1126        }
1127    }
1128}
1129
1130#[cfg(test)]
1131mod tests {
1132    use lance_core::utils::tempfile::TempObjDir;
1133
1134    use super::*;
1135
1136    #[test]
1137    fn test_manifest_naming_scheme() {
1138        let v1 = ManifestNamingScheme::V1;
1139        let v2 = ManifestNamingScheme::V2;
1140
1141        assert_eq!(
1142            v1.manifest_path(&Path::from("base"), 0),
1143            Path::from("base/_versions/0.manifest")
1144        );
1145        assert_eq!(
1146            v1.manifest_path(&Path::from("base"), 42),
1147            Path::from("base/_versions/42.manifest")
1148        );
1149
1150        assert_eq!(
1151            v2.manifest_path(&Path::from("base"), 0),
1152            Path::from("base/_versions/18446744073709551615.manifest")
1153        );
1154        assert_eq!(
1155            v2.manifest_path(&Path::from("base"), 42),
1156            Path::from("base/_versions/18446744073709551573.manifest")
1157        );
1158
1159        assert_eq!(v1.parse_version("0.manifest"), Some(0));
1160        assert_eq!(v1.parse_version("42.manifest"), Some(42));
1161        assert_eq!(
1162            v1.parse_version("42.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
1163            Some(42)
1164        );
1165
1166        assert_eq!(v2.parse_version("18446744073709551615.manifest"), Some(0));
1167        assert_eq!(v2.parse_version("18446744073709551573.manifest"), Some(42));
1168        assert_eq!(
1169            v2.parse_version("18446744073709551573.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
1170            Some(42)
1171        );
1172
1173        assert_eq!(ManifestNamingScheme::detect_scheme("0.manifest"), Some(v1));
1174        assert_eq!(
1175            ManifestNamingScheme::detect_scheme("18446744073709551615.manifest"),
1176            Some(v2)
1177        );
1178        assert_eq!(ManifestNamingScheme::detect_scheme("something else"), None);
1179    }
1180
1181    #[tokio::test]
1182    async fn test_manifest_naming_migration() {
1183        let object_store = ObjectStore::memory();
1184        let base = Path::from("base");
1185        let versions_dir = base.child(VERSIONS_DIR);
1186
1187        // Write two v1 files and one v1
1188        let original_files = vec![
1189            versions_dir.child("irrelevant"),
1190            ManifestNamingScheme::V1.manifest_path(&base, 0),
1191            ManifestNamingScheme::V2.manifest_path(&base, 1),
1192        ];
1193        for path in original_files {
1194            object_store.put(&path, b"".as_slice()).await.unwrap();
1195        }
1196
1197        migrate_scheme_to_v2(&object_store, &base).await.unwrap();
1198
1199        let expected_files = vec![
1200            ManifestNamingScheme::V2.manifest_path(&base, 1),
1201            ManifestNamingScheme::V2.manifest_path(&base, 0),
1202            versions_dir.child("irrelevant"),
1203        ];
1204        let actual_files = object_store
1205            .inner
1206            .list(Some(&versions_dir))
1207            .map_ok(|res| res.location)
1208            .try_collect::<Vec<_>>()
1209            .await
1210            .unwrap();
1211        assert_eq!(actual_files, expected_files);
1212    }
1213
1214    #[tokio::test]
1215    #[rstest::rstest]
1216    async fn test_list_manifests_sorted(
1217        #[values(true, false)] lexical_list_store: bool,
1218        #[values(ManifestNamingScheme::V1, ManifestNamingScheme::V2)]
1219        naming_scheme: ManifestNamingScheme,
1220    ) {
1221        let tempdir;
1222        let (object_store, base) = if lexical_list_store {
1223            (Box::new(ObjectStore::memory()), Path::from("base"))
1224        } else {
1225            tempdir = TempObjDir::default();
1226            let path = tempdir.child("base");
1227            let store = Box::new(ObjectStore::local());
1228            assert!(!store.list_is_lexically_ordered);
1229            (store, path)
1230        };
1231
1232        // Write 12 manifest files, latest first
1233        let mut expected_paths = Vec::new();
1234        for i in (0..12).rev() {
1235            let path = naming_scheme.manifest_path(&base, i);
1236            object_store.put(&path, b"".as_slice()).await.unwrap();
1237            expected_paths.push(path);
1238        }
1239
1240        let actual_versions = ConditionalPutCommitHandler
1241            .list_manifest_locations(&base, &object_store, true)
1242            .map_ok(|location| location.path)
1243            .try_collect::<Vec<_>>()
1244            .await
1245            .unwrap();
1246
1247        assert_eq!(actual_versions, expected_paths);
1248    }
1249
1250    #[tokio::test]
1251    #[rstest::rstest]
1252    async fn test_current_manifest_path(
1253        #[values(true, false)] lexical_list_store: bool,
1254        #[values(ManifestNamingScheme::V1, ManifestNamingScheme::V2)]
1255        naming_scheme: ManifestNamingScheme,
1256    ) {
1257        // Use memory store for both cases to avoid local FS special codepath.
1258        // Modify list_is_lexically_ordered to simulate different object stores.
1259        let mut object_store = ObjectStore::memory();
1260        object_store.list_is_lexically_ordered = lexical_list_store;
1261        let object_store = Box::new(object_store);
1262        let base = Path::from("base");
1263
1264        // Write 12 manifest files in non-sequential order
1265        for version in [5, 2, 11, 0, 8, 3, 10, 1, 7, 4, 9, 6] {
1266            let path = naming_scheme.manifest_path(&base, version);
1267            object_store.put(&path, b"".as_slice()).await.unwrap();
1268        }
1269
1270        let location = current_manifest_path(&object_store, &base).await.unwrap();
1271
1272        assert_eq!(location.version, 11);
1273        assert_eq!(location.naming_scheme, naming_scheme);
1274        assert_eq!(location.path, naming_scheme.manifest_path(&base, 11));
1275    }
1276}