Skip to main content

lance_table/io/
commit.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Trait for commit implementations.
5//!
6//! In Lance, a transaction is committed by writing the next manifest file.
7//! However, care should be taken to ensure that the manifest file is written
8//! only once, even if there are concurrent writers. Different stores have
9//! different abilities to handle concurrent writes, so a trait is provided
10//! to allow for different implementations.
11//!
12//! The trait [CommitHandler] can be implemented to provide different commit
13//! strategies. The default implementation for most object stores is
14//! [RenameCommitHandler], which writes the manifest to a temporary path, then
15//! renames the temporary path to the final path if no object already exists
16//! at the final path. This is an atomic operation in most object stores, but
17//! not in AWS S3. So for AWS S3, the default commit handler is
18//! [UnsafeCommitHandler], which writes the manifest to the final path without
19//! any checks.
20//!
21//! When providing your own commit handler, most often you are implementing in
22//! terms of a lock. The trait [CommitLock] can be implemented as a simpler
23//! alternative to [CommitHandler].
24
25use std::io;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::sync::atomic::AtomicBool;
29use std::{fmt::Debug, fs::DirEntry};
30
31use super::manifest::write_manifest;
32use futures::Stream;
33use futures::future::Either;
34use futures::{
35    StreamExt, TryStreamExt,
36    future::{self, BoxFuture},
37    stream::BoxStream,
38};
39use lance_file::format::{MAGIC, MAJOR_VERSION, MINOR_VERSION};
40use lance_io::object_writer::{ObjectWriter, WriteResult, get_etag};
41use log::warn;
42use object_store::PutOptions;
43use object_store::{Error as ObjectStoreError, ObjectStore as OSObjectStore, path::Path};
44use tracing::info;
45use url::Url;
46
47#[cfg(feature = "dynamodb")]
48pub mod dynamodb;
49pub mod external_manifest;
50
51use lance_core::{Error, Result};
52use lance_io::object_store::{ObjectStore, ObjectStoreExt, ObjectStoreParams};
53use lance_io::traits::{WriteExt, Writer};
54
55use crate::format::{IndexMetadata, Manifest, Transaction, is_detached_version};
56use lance_core::utils::tracing::{AUDIT_MODE_CREATE, AUDIT_TYPE_MANIFEST, TRACE_FILE_AUDIT};
57#[cfg(feature = "dynamodb")]
58use {
59    self::external_manifest::{ExternalManifestCommitHandler, ExternalManifestStore},
60    aws_credential_types::provider::ProvideCredentials,
61    aws_credential_types::provider::error::CredentialsError,
62    lance_io::object_store::{StorageOptions, providers::aws::build_aws_credential},
63    object_store::aws::AmazonS3ConfigKey,
64    object_store::aws::AwsCredentialProvider,
65    std::borrow::Cow,
66    std::time::{Duration, SystemTime},
67};
68
69pub const VERSIONS_DIR: &str = "_versions";
70const MANIFEST_EXTENSION: &str = "manifest";
71const DETACHED_VERSION_PREFIX: &str = "d";
72
73/// How manifest files should be named.
74#[derive(Clone, Copy, Debug, PartialEq, Eq)]
75pub enum ManifestNamingScheme {
76    /// `_versions/{version}.manifest`
77    V1,
78    /// `_manifests/{u64::MAX - version}.manifest`
79    ///
80    /// Zero-padded and reversed for O(1) lookup of latest version on object stores.
81    V2,
82}
83
84impl ManifestNamingScheme {
85    pub fn manifest_path(&self, base: &Path, version: u64) -> Path {
86        let directory = base.child(VERSIONS_DIR);
87        if is_detached_version(version) {
88            // Detached versions should never show up first in a list operation which
89            // means it needs to come lexicographically after all attached manifest
90            // files and so we add the prefix `d`.  There is no need to invert the
91            // version number since detached versions are not part of the version
92            let directory = base.child(VERSIONS_DIR);
93            directory.child(format!(
94                "{DETACHED_VERSION_PREFIX}{version}.{MANIFEST_EXTENSION}"
95            ))
96        } else {
97            match self {
98                Self::V1 => directory.child(format!("{version}.{MANIFEST_EXTENSION}")),
99                Self::V2 => {
100                    let inverted_version = u64::MAX - version;
101                    directory.child(format!("{inverted_version:020}.{MANIFEST_EXTENSION}"))
102                }
103            }
104        }
105    }
106
107    pub fn parse_version(&self, filename: &str) -> Option<u64> {
108        let file_number = filename
109            .split_once('.')
110            // Detached versions will fail the `parse` step, which is ok.
111            .and_then(|(version_str, _)| version_str.parse::<u64>().ok());
112        match self {
113            Self::V1 => file_number,
114            Self::V2 => file_number.map(|v| u64::MAX - v),
115        }
116    }
117
118    pub fn detect_scheme(filename: &str) -> Option<Self> {
119        if filename.starts_with(DETACHED_VERSION_PREFIX) {
120            // Currently, detached versions must imply V2
121            return Some(Self::V2);
122        }
123        if filename.ends_with(MANIFEST_EXTENSION) {
124            const V2_LEN: usize = 20 + 1 + MANIFEST_EXTENSION.len();
125            if filename.len() == V2_LEN {
126                Some(Self::V2)
127            } else {
128                Some(Self::V1)
129            }
130        } else {
131            None
132        }
133    }
134
135    pub fn detect_scheme_staging(filename: &str) -> Self {
136        // We shouldn't have to worry about detached versions here since there is no
137        // such thing as "detached" and "staged" at the same time.
138        if filename.chars().nth(20) == Some('.') {
139            Self::V2
140        } else {
141            Self::V1
142        }
143    }
144}
145
146/// Migrate all V1 manifests to V2 naming scheme.
147///
148/// This function will rename all V1 manifests to V2 naming scheme.
149///
150/// This function is idempotent, and can be run multiple times without
151/// changing the state of the object store.
152///
153/// However, it should not be run while other concurrent operations are happening.
154/// And it should also run until completion before resuming other operations.
155pub async fn migrate_scheme_to_v2(object_store: &ObjectStore, dataset_base: &Path) -> Result<()> {
156    object_store
157        .inner
158        .list(Some(&dataset_base.child(VERSIONS_DIR)))
159        .try_filter(|res| {
160            let res = if let Some(filename) = res.location.filename() {
161                ManifestNamingScheme::detect_scheme(filename) == Some(ManifestNamingScheme::V1)
162            } else {
163                false
164            };
165            future::ready(res)
166        })
167        .try_for_each_concurrent(object_store.io_parallelism(), |meta| async move {
168            let filename = meta.location.filename().unwrap();
169            let version = ManifestNamingScheme::V1.parse_version(filename).unwrap();
170            let path = ManifestNamingScheme::V2.manifest_path(dataset_base, version);
171            object_store.inner.rename(&meta.location, &path).await?;
172            Ok(())
173        })
174        .await?;
175
176    Ok(())
177}
178
179/// Function that writes the manifest to the object store.
180///
181/// Returns the size of the written manifest.
182pub type ManifestWriter = for<'a> fn(
183    object_store: &'a ObjectStore,
184    manifest: &'a mut Manifest,
185    indices: Option<Vec<IndexMetadata>>,
186    path: &'a Path,
187    transaction: Option<Transaction>,
188) -> BoxFuture<'a, Result<WriteResult>>;
189
190/// Canonical manifest writer; its function item type exactly matches `ManifestWriter`.
191/// Rationale: keep a crate-local writer implementation so call sites can pass this function
192/// directly without non-primitive casts or lifetime coercions.
193pub fn write_manifest_file_to_path<'a>(
194    object_store: &'a ObjectStore,
195    manifest: &'a mut Manifest,
196    indices: Option<Vec<IndexMetadata>>,
197    path: &'a Path,
198    transaction: Option<Transaction>,
199) -> BoxFuture<'a, Result<WriteResult>> {
200    Box::pin(async move {
201        let mut object_writer = ObjectWriter::new(object_store, path).await?;
202        let pos = write_manifest(&mut object_writer, manifest, indices, transaction).await?;
203        object_writer
204            .write_magics(pos, MAJOR_VERSION, MINOR_VERSION, MAGIC)
205            .await?;
206        let res = Writer::shutdown(&mut object_writer).await?;
207        info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_MANIFEST, path = path.to_string());
208        Ok(res)
209    })
210}
211
212#[derive(Debug, Clone)]
213pub struct ManifestLocation {
214    /// The version the manifest corresponds to.
215    pub version: u64,
216    /// Path of the manifest file, relative to the table root.
217    pub path: Path,
218    /// Size, in bytes, of the manifest file. If it is not known, this field should be `None`.
219    pub size: Option<u64>,
220    /// Naming scheme of the manifest file.
221    pub naming_scheme: ManifestNamingScheme,
222    /// Optional e-tag, used for integrity checks. Manifests should be immutable, so
223    /// if we detect a change in the e-tag, it means the manifest was tampered with.
224    /// This might happen if the dataset was deleted and then re-created.
225    pub e_tag: Option<String>,
226}
227
228impl TryFrom<object_store::ObjectMeta> for ManifestLocation {
229    type Error = Error;
230
231    fn try_from(meta: object_store::ObjectMeta) -> Result<Self> {
232        let filename = meta.location.filename().ok_or_else(|| {
233            Error::internal("ObjectMeta location does not have a filename".to_string())
234        })?;
235        let scheme = ManifestNamingScheme::detect_scheme(filename)
236            .ok_or_else(|| Error::internal(format!("Invalid manifest filename: '{}'", filename)))?;
237        let version = scheme
238            .parse_version(filename)
239            .ok_or_else(|| Error::internal(format!("Invalid manifest filename: '{}'", filename)))?;
240        Ok(Self {
241            version,
242            path: meta.location,
243            size: Some(meta.size),
244            naming_scheme: scheme,
245            e_tag: meta.e_tag,
246        })
247    }
248}
249
250/// Get the latest manifest path
251async fn current_manifest_path(
252    object_store: &ObjectStore,
253    base: &Path,
254) -> Result<ManifestLocation> {
255    if object_store.is_local()
256        && let Ok(Some(location)) = current_manifest_local(base)
257    {
258        return Ok(location);
259    }
260
261    let manifest_files = object_store.list(Some(base.child(VERSIONS_DIR)));
262
263    let mut valid_manifests = manifest_files.try_filter_map(|res| {
264        if let Some(scheme) = ManifestNamingScheme::detect_scheme(res.location.filename().unwrap())
265        {
266            future::ready(Ok(Some((scheme, res))))
267        } else {
268            future::ready(Ok(None))
269        }
270    });
271
272    let first = valid_manifests.next().await.transpose()?;
273    match (first, object_store.list_is_lexically_ordered) {
274        // If the first valid manifest we see is V2, we can assume that we are using
275        // V2 naming scheme for all manifests.
276        (Some((scheme @ ManifestNamingScheme::V2, meta)), true) => {
277            let version = scheme
278                .parse_version(meta.location.filename().unwrap())
279                .unwrap();
280
281            // Sanity check: verify at least for the first 1k files that they are all V2
282            // and that the version numbers are decreasing. We use the first 1k because
283            // this is the typical size of an object store list endpoint response page.
284            for (scheme, meta) in valid_manifests.take(999).try_collect::<Vec<_>>().await? {
285                if scheme != ManifestNamingScheme::V2 {
286                    warn!(
287                        "Found V1 Manifest in a V2 directory. Use `migrate_manifest_paths_v2` \
288                         to migrate the directory."
289                    );
290                    break;
291                }
292                let next_version = scheme
293                    .parse_version(meta.location.filename().unwrap())
294                    .unwrap();
295                if next_version >= version {
296                    warn!(
297                        "List operation was expected to be lexically ordered, but was not. This \
298                         could mean a corrupt read. Please make a bug report on the lance-format/lance \
299                         GitHub repository."
300                    );
301                    break;
302                }
303            }
304
305            Ok(ManifestLocation {
306                version,
307                path: meta.location,
308                size: Some(meta.size),
309                naming_scheme: scheme,
310                e_tag: meta.e_tag,
311            })
312        }
313        // If the list is not lexically ordered, we need to iterate all manifests
314        // to find the latest version. This works for both V1 and V2 schemes.
315        (Some((first_scheme, meta)), _) => {
316            let mut current_version = first_scheme
317                .parse_version(meta.location.filename().unwrap())
318                .unwrap();
319            let mut current_meta = meta;
320            let scheme = first_scheme;
321
322            while let Some((entry_scheme, meta)) = valid_manifests.next().await.transpose()? {
323                if entry_scheme != scheme {
324                    return Err(Error::internal(format!(
325                        "Found multiple manifest naming schemes in the same directory: {:?} and {:?}. \
326                         Use `migrate_manifest_paths_v2` to migrate the directory.",
327                        scheme, entry_scheme
328                    )));
329                }
330                let version = entry_scheme
331                    .parse_version(meta.location.filename().unwrap())
332                    .unwrap();
333                if version > current_version {
334                    current_version = version;
335                    current_meta = meta;
336                }
337            }
338            Ok(ManifestLocation {
339                version: current_version,
340                path: current_meta.location,
341                size: Some(current_meta.size),
342                naming_scheme: scheme,
343                e_tag: current_meta.e_tag,
344            })
345        }
346        (None, _) => Err(Error::not_found(base.child(VERSIONS_DIR).to_string())),
347    }
348}
349
350// This is an optimized function that searches for the latest manifest. In
351// object_store, list operations lookup metadata for each file listed. This
352// method only gets the metadata for the found latest manifest.
353fn current_manifest_local(base: &Path) -> std::io::Result<Option<ManifestLocation>> {
354    let path = lance_io::local::to_local_path(&base.child(VERSIONS_DIR));
355    let entries = std::fs::read_dir(path)?;
356
357    let mut latest_entry: Option<(u64, DirEntry)> = None;
358
359    let mut scheme: Option<ManifestNamingScheme> = None;
360
361    for entry in entries {
362        let entry = entry?;
363        let filename_raw = entry.file_name();
364        let filename = filename_raw.to_string_lossy();
365
366        let Some(entry_scheme) = ManifestNamingScheme::detect_scheme(&filename) else {
367            // Need to ignore temporary files, such as
368            // .tmp_7.manifest_9c100374-3298-4537-afc6-f5ee7913666d
369            continue;
370        };
371
372        if let Some(scheme) = scheme {
373            if scheme != entry_scheme {
374                return Err(io::Error::new(
375                    io::ErrorKind::InvalidData,
376                    format!(
377                        "Found multiple manifest naming schemes in the same directory: {:?} and {:?}",
378                        scheme, entry_scheme
379                    ),
380                ));
381            }
382        } else {
383            scheme = Some(entry_scheme);
384        }
385
386        let Some(version) = entry_scheme.parse_version(&filename) else {
387            continue;
388        };
389
390        if let Some((latest_version, _)) = &latest_entry {
391            if version > *latest_version {
392                latest_entry = Some((version, entry));
393            }
394        } else {
395            latest_entry = Some((version, entry));
396        }
397    }
398
399    if let Some((version, entry)) = latest_entry {
400        let path = Path::from_filesystem_path(entry.path())
401            .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?;
402        let metadata = entry.metadata()?;
403        Ok(Some(ManifestLocation {
404            version,
405            path,
406            size: Some(metadata.len()),
407            naming_scheme: scheme.unwrap(),
408            e_tag: Some(get_etag(&metadata)),
409        }))
410    } else {
411        Ok(None)
412    }
413}
414
415fn list_manifests<'a>(
416    base_path: &Path,
417    object_store: &'a dyn OSObjectStore,
418) -> impl Stream<Item = Result<ManifestLocation>> + 'a {
419    object_store
420        .read_dir_all(&base_path.child(VERSIONS_DIR), None)
421        .filter_map(|obj_meta| {
422            futures::future::ready(
423                obj_meta
424                    .map(|m| ManifestLocation::try_from(m).ok())
425                    .transpose(),
426            )
427        })
428        .boxed()
429}
430
431fn make_staging_manifest_path(base: &Path) -> Result<Path> {
432    let id = uuid::Uuid::new_v4().to_string();
433    Path::parse(format!("{base}-{id}")).map_err(|e| Error::io_source(Box::new(e)))
434}
435
436#[cfg(feature = "dynamodb")]
437const DDB_URL_QUERY_KEY: &str = "ddbTableName";
438
439/// Handle commits that prevent conflicting writes.
440///
441/// Commit implementations ensure that if there are multiple concurrent writers
442/// attempting to write the next version of a table, only one will win. In order
443/// to work, all writers must use the same commit handler type.
444/// This trait is also responsible for resolving where the manifests live.
445///
446// TODO: pub(crate)
447#[async_trait::async_trait]
448#[allow(clippy::too_many_arguments)]
449pub trait CommitHandler: Debug + Send + Sync {
450    async fn resolve_latest_location(
451        &self,
452        base_path: &Path,
453        object_store: &ObjectStore,
454    ) -> Result<ManifestLocation> {
455        Ok(current_manifest_path(object_store, base_path).await?)
456    }
457
458    async fn resolve_version_location(
459        &self,
460        base_path: &Path,
461        version: u64,
462        object_store: &dyn OSObjectStore,
463    ) -> Result<ManifestLocation> {
464        default_resolve_version(base_path, version, object_store).await
465    }
466
467    /// If `sorted_descending` is `true`, the stream will yield manifests in descending
468    /// order of version. When the object store has a lexicographically
469    /// ordered list and the naming scheme is V2, this will use an optimized
470    /// list operation. Otherwise, it will list all manifests and sort them
471    /// in memory. When `sorted_descending` is `false`, the stream will yield manifests
472    /// in arbitrary order.
473    fn list_manifest_locations<'a>(
474        &self,
475        base_path: &Path,
476        object_store: &'a ObjectStore,
477        sorted_descending: bool,
478    ) -> BoxStream<'a, Result<ManifestLocation>> {
479        let underlying_stream = list_manifests(base_path, &object_store.inner);
480
481        if !sorted_descending {
482            return underlying_stream.boxed();
483        }
484
485        async fn sort_stream(
486            input_stream: impl futures::Stream<Item = Result<ManifestLocation>> + Unpin,
487        ) -> Result<impl Stream<Item = Result<ManifestLocation>> + Unpin> {
488            let mut locations = input_stream.try_collect::<Vec<_>>().await?;
489            locations.sort_by_key(|m| std::cmp::Reverse(m.version));
490            Ok(futures::stream::iter(locations.into_iter().map(Ok)))
491        }
492
493        // If the object store supports lexicographically ordered lists and
494        // the naming scheme is V2, we can use an optimized list operation.
495        if object_store.list_is_lexically_ordered {
496            // We don't know the naming scheme until we see the first manifest.
497            let mut peekable = underlying_stream.peekable();
498
499            futures::stream::once(async move {
500                let naming_scheme = match Pin::new(&mut peekable).peek().await {
501                    Some(Ok(m)) => m.naming_scheme,
502                    // If we get an error or no manifests are found, we default
503                    // to V2 naming scheme, since it doesn't matter.
504                    Some(Err(_)) => ManifestNamingScheme::V2,
505                    None => ManifestNamingScheme::V2,
506                };
507
508                if naming_scheme == ManifestNamingScheme::V2 {
509                    // If the first manifest is V2, we can use the optimized list operation.
510                    Ok(Either::Left(peekable))
511                } else {
512                    sort_stream(peekable).await.map(Either::Right)
513                }
514            })
515            .try_flatten()
516            .boxed()
517        } else {
518            // If the object store does not support lexicographically ordered lists,
519            // we need to sort the manifests in memory. Systems where this isn't
520            // supported (local fs, S3 express) are typically fast enough
521            // that this is not a problem.
522            futures::stream::once(sort_stream(underlying_stream))
523                .try_flatten()
524                .boxed()
525        }
526    }
527
528    /// Commit a manifest.
529    ///
530    /// This function should return an [CommitError::CommitConflict] if another
531    /// transaction has already been committed to the path.
532    async fn commit(
533        &self,
534        manifest: &mut Manifest,
535        indices: Option<Vec<IndexMetadata>>,
536        base_path: &Path,
537        object_store: &ObjectStore,
538        manifest_writer: ManifestWriter,
539        naming_scheme: ManifestNamingScheme,
540        transaction: Option<Transaction>,
541    ) -> std::result::Result<ManifestLocation, CommitError>;
542
543    /// Delete the recorded manifest information for a dataset at the base_path
544    async fn delete(&self, _base_path: &Path) -> Result<()> {
545        Ok(())
546    }
547}
548
549async fn default_resolve_version(
550    base_path: &Path,
551    version: u64,
552    object_store: &dyn OSObjectStore,
553) -> Result<ManifestLocation> {
554    if is_detached_version(version) {
555        return Ok(ManifestLocation {
556            version,
557            // Detached versions are not supported with V1 naming scheme.  If we need
558            // to support in the future we could use a different prefix (e.g. 'x' or something)
559            naming_scheme: ManifestNamingScheme::V2,
560            // Both V1 and V2 should give the same path for detached versions
561            path: ManifestNamingScheme::V2.manifest_path(base_path, version),
562            size: None,
563            e_tag: None,
564        });
565    }
566
567    // try V2, fallback to V1.
568    let scheme = ManifestNamingScheme::V2;
569    let path = scheme.manifest_path(base_path, version);
570    match object_store.head(&path).await {
571        Ok(meta) => Ok(ManifestLocation {
572            version,
573            path,
574            size: Some(meta.size),
575            naming_scheme: scheme,
576            e_tag: meta.e_tag,
577        }),
578        Err(ObjectStoreError::NotFound { .. }) => {
579            // fallback to V1
580            let scheme = ManifestNamingScheme::V1;
581            Ok(ManifestLocation {
582                version,
583                path: scheme.manifest_path(base_path, version),
584                size: None,
585                naming_scheme: scheme,
586                e_tag: None,
587            })
588        }
589        Err(e) => Err(e.into()),
590    }
591}
592/// Adapt an object_store credentials into AWS SDK creds
593#[cfg(feature = "dynamodb")]
594#[derive(Debug)]
595struct OSObjectStoreToAwsCredAdaptor(AwsCredentialProvider);
596
597#[cfg(feature = "dynamodb")]
598impl ProvideCredentials for OSObjectStoreToAwsCredAdaptor {
599    fn provide_credentials<'a>(
600        &'a self,
601    ) -> aws_credential_types::provider::future::ProvideCredentials<'a>
602    where
603        Self: 'a,
604    {
605        aws_credential_types::provider::future::ProvideCredentials::new(async {
606            let creds = self
607                .0
608                .get_credential()
609                .await
610                .map_err(|e| CredentialsError::provider_error(Box::new(e)))?;
611            Ok(aws_credential_types::Credentials::new(
612                &creds.key_id,
613                &creds.secret_key,
614                creds.token.clone(),
615                Some(
616                    SystemTime::now()
617                        .checked_add(Duration::from_secs(
618                            60 * 10, //  10 min
619                        ))
620                        .expect("overflow"),
621                ),
622                "",
623            ))
624        })
625    }
626}
627
628#[cfg(feature = "dynamodb")]
629async fn build_dynamodb_external_store(
630    table_name: &str,
631    creds: AwsCredentialProvider,
632    region: &str,
633    endpoint: Option<String>,
634    app_name: &str,
635) -> Result<Arc<dyn ExternalManifestStore>> {
636    use super::commit::dynamodb::DynamoDBExternalManifestStore;
637    use aws_sdk_dynamodb::{
638        Client,
639        config::{IdentityCache, Region, retry::RetryConfig},
640    };
641
642    let mut dynamodb_config = aws_sdk_dynamodb::config::Builder::new()
643        .behavior_version_latest()
644        .region(Some(Region::new(region.to_string())))
645        .credentials_provider(OSObjectStoreToAwsCredAdaptor(creds))
646        // caching should be handled by passed AwsCredentialProvider
647        .identity_cache(IdentityCache::no_cache())
648        // Be more resilient to transient network issues.
649        // 5 attempts = 1 initial + 4 retries with exponential backoff.
650        .retry_config(RetryConfig::standard().with_max_attempts(5));
651
652    if let Some(endpoint) = endpoint {
653        dynamodb_config = dynamodb_config.endpoint_url(endpoint);
654    }
655    let client = Client::from_conf(dynamodb_config.build());
656
657    DynamoDBExternalManifestStore::new_external_store(client.into(), table_name, app_name).await
658}
659
660pub async fn commit_handler_from_url(
661    url_or_path: &str,
662    // This looks unused if dynamodb feature disabled
663    #[allow(unused_variables)] options: &Option<ObjectStoreParams>,
664) -> Result<Arc<dyn CommitHandler>> {
665    let local_handler: Arc<dyn CommitHandler> = if cfg!(windows) {
666        Arc::new(RenameCommitHandler)
667    } else {
668        Arc::new(ConditionalPutCommitHandler)
669    };
670
671    let url = match Url::parse(url_or_path) {
672        Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
673            // On Windows, the drive is parsed as a scheme
674            return Ok(local_handler);
675        }
676        Ok(url) => url,
677        Err(_) => {
678            return Ok(local_handler);
679        }
680    };
681
682    match url.scheme() {
683        "file" | "file-object-store" => Ok(local_handler),
684        "s3" | "gs" | "az" | "memory" | "oss" | "cos" => Ok(Arc::new(ConditionalPutCommitHandler)),
685        #[cfg(not(feature = "dynamodb"))]
686        "s3+ddb" => Err(Error::invalid_input_source(
687            "`s3+ddb://` scheme requires `dynamodb` feature to be enabled".into(),
688        )),
689        #[cfg(feature = "dynamodb")]
690        "s3+ddb" => {
691            if url.query_pairs().count() != 1 {
692                return Err(Error::invalid_input_source(
693                    "`s3+ddb://` scheme and expects exactly one query `ddbTableName`".into(),
694                ));
695            }
696            let table_name = match url.query_pairs().next() {
697                Some((Cow::Borrowed(key), Cow::Borrowed(table_name)))
698                    if key == DDB_URL_QUERY_KEY =>
699                {
700                    if table_name.is_empty() {
701                        return Err(Error::invalid_input_source(
702                            "`s3+ddb://` scheme requires non empty dynamodb table name".into(),
703                        ));
704                    }
705                    table_name
706                }
707                _ => {
708                    return Err(Error::invalid_input_source(
709                        "`s3+ddb://` scheme and expects exactly one query `ddbTableName`".into(),
710                    ));
711                }
712            };
713            let options = options.clone().unwrap_or_default();
714            let storage_options_raw =
715                StorageOptions(options.storage_options().cloned().unwrap_or_default());
716            let dynamo_endpoint = get_dynamodb_endpoint(&storage_options_raw);
717            let storage_options = storage_options_raw.as_s3_options();
718
719            let region = storage_options.get(&AmazonS3ConfigKey::Region).cloned();
720
721            // Get accessor from the options
722            let accessor = options.get_accessor();
723
724            let (aws_creds, region) = build_aws_credential(
725                options.s3_credentials_refresh_offset,
726                options.aws_credentials.clone(),
727                Some(&storage_options),
728                region,
729                accessor,
730            )
731            .await?;
732
733            Ok(Arc::new(ExternalManifestCommitHandler {
734                external_manifest_store: build_dynamodb_external_store(
735                    table_name,
736                    aws_creds.clone(),
737                    &region,
738                    dynamo_endpoint,
739                    "lancedb",
740                )
741                .await?,
742            }))
743        }
744        _ => Ok(Arc::new(UnsafeCommitHandler)),
745    }
746}
747
748#[cfg(feature = "dynamodb")]
749fn get_dynamodb_endpoint(storage_options: &StorageOptions) -> Option<String> {
750    if let Some(endpoint) = storage_options.0.get("dynamodb_endpoint") {
751        Some(endpoint.clone())
752    } else {
753        std::env::var("DYNAMODB_ENDPOINT").ok()
754    }
755}
756
757/// Errors that can occur when committing a manifest.
758#[derive(Debug)]
759pub enum CommitError {
760    /// Another transaction has already been written to the path
761    CommitConflict,
762    /// Something else went wrong
763    OtherError(Error),
764}
765
766impl From<Error> for CommitError {
767    fn from(e: Error) -> Self {
768        Self::OtherError(e)
769    }
770}
771
772impl From<CommitError> for Error {
773    fn from(e: CommitError) -> Self {
774        match e {
775            CommitError::CommitConflict => Self::internal("Commit conflict".to_string()),
776            CommitError::OtherError(e) => e,
777        }
778    }
779}
780
781/// Whether we have issued a warning about using the unsafe commit handler.
782static WARNED_ON_UNSAFE_COMMIT: AtomicBool = AtomicBool::new(false);
783
784/// A naive commit implementation that does not prevent conflicting writes.
785///
786/// This will log a warning the first time it is used.
787pub struct UnsafeCommitHandler;
788
789#[async_trait::async_trait]
790#[allow(clippy::too_many_arguments)]
791impl CommitHandler for UnsafeCommitHandler {
792    async fn commit(
793        &self,
794        manifest: &mut Manifest,
795        indices: Option<Vec<IndexMetadata>>,
796        base_path: &Path,
797        object_store: &ObjectStore,
798        manifest_writer: ManifestWriter,
799        naming_scheme: ManifestNamingScheme,
800        transaction: Option<Transaction>,
801    ) -> std::result::Result<ManifestLocation, CommitError> {
802        // Log a one-time warning
803        if !WARNED_ON_UNSAFE_COMMIT.load(std::sync::atomic::Ordering::Relaxed) {
804            WARNED_ON_UNSAFE_COMMIT.store(true, std::sync::atomic::Ordering::Relaxed);
805            log::warn!(
806                "Using unsafe commit handler. Concurrent writes may result in data loss. \
807                 Consider providing a commit handler that prevents conflicting writes."
808            );
809        }
810
811        let version_path = naming_scheme.manifest_path(base_path, manifest.version);
812        let res =
813            manifest_writer(object_store, manifest, indices, &version_path, transaction).await?;
814
815        Ok(ManifestLocation {
816            version: manifest.version,
817            size: Some(res.size as u64),
818            naming_scheme,
819            path: version_path,
820            e_tag: res.e_tag,
821        })
822    }
823}
824
825impl Debug for UnsafeCommitHandler {
826    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
827        f.debug_struct("UnsafeCommitHandler").finish()
828    }
829}
830
831/// A commit implementation that uses a lock to prevent conflicting writes.
832#[async_trait::async_trait]
833pub trait CommitLock: Debug {
834    type Lease: CommitLease;
835
836    /// Attempt to lock the table for the given version.
837    ///
838    /// If it is already locked by another transaction, wait until it is unlocked.
839    /// Once it is unlocked, return [CommitError::CommitConflict] if the version
840    /// has already been committed. Otherwise, return the lock.
841    ///
842    /// To prevent poisoned locks, it's recommended to set a timeout on the lock
843    /// of at least 30 seconds.
844    ///
845    /// It is not required that the lock tracks the version. It is provided in
846    /// case the locking is handled by a catalog service that needs to know the
847    /// current version of the table.
848    async fn lock(&self, version: u64) -> std::result::Result<Self::Lease, CommitError>;
849}
850
851#[async_trait::async_trait]
852pub trait CommitLease: Send + Sync {
853    /// Return the lease, indicating whether the commit was successful.
854    async fn release(&self, success: bool) -> std::result::Result<(), CommitError>;
855}
856
857#[async_trait::async_trait]
858impl<T: CommitLock + Send + Sync> CommitHandler for T {
859    async fn commit(
860        &self,
861        manifest: &mut Manifest,
862        indices: Option<Vec<IndexMetadata>>,
863        base_path: &Path,
864        object_store: &ObjectStore,
865        manifest_writer: ManifestWriter,
866        naming_scheme: ManifestNamingScheme,
867        transaction: Option<Transaction>,
868    ) -> std::result::Result<ManifestLocation, CommitError> {
869        let path = naming_scheme.manifest_path(base_path, manifest.version);
870        // NOTE: once we have the lease we cannot use ? to return errors, since
871        // we must release the lease before returning.
872        let lease = self.lock(manifest.version).await?;
873
874        // Head the location and make sure it's not already committed
875        match object_store.inner.head(&path).await {
876            Ok(_) => {
877                // The path already exists, so it's already committed
878                // Release the lock
879                lease.release(false).await?;
880
881                return Err(CommitError::CommitConflict);
882            }
883            Err(ObjectStoreError::NotFound { .. }) => {}
884            Err(e) => {
885                // Something else went wrong
886                // Release the lock
887                lease.release(false).await?;
888
889                return Err(CommitError::OtherError(e.into()));
890            }
891        }
892        let res = manifest_writer(object_store, manifest, indices, &path, transaction).await;
893
894        // Release the lock
895        lease.release(res.is_ok()).await?;
896
897        let res = res?;
898        Ok(ManifestLocation {
899            version: manifest.version,
900            size: Some(res.size as u64),
901            naming_scheme,
902            path,
903            e_tag: res.e_tag,
904        })
905    }
906}
907
908#[async_trait::async_trait]
909impl<T: CommitLock + Send + Sync> CommitHandler for Arc<T> {
910    async fn commit(
911        &self,
912        manifest: &mut Manifest,
913        indices: Option<Vec<IndexMetadata>>,
914        base_path: &Path,
915        object_store: &ObjectStore,
916        manifest_writer: ManifestWriter,
917        naming_scheme: ManifestNamingScheme,
918        transaction: Option<Transaction>,
919    ) -> std::result::Result<ManifestLocation, CommitError> {
920        self.as_ref()
921            .commit(
922                manifest,
923                indices,
924                base_path,
925                object_store,
926                manifest_writer,
927                naming_scheme,
928                transaction,
929            )
930            .await
931    }
932}
933
934/// A commit implementation that uses a temporary path and renames the object.
935///
936/// This only works for object stores that support atomic rename if not exist.
937pub struct RenameCommitHandler;
938
939#[async_trait::async_trait]
940impl CommitHandler for RenameCommitHandler {
941    async fn commit(
942        &self,
943        manifest: &mut Manifest,
944        indices: Option<Vec<IndexMetadata>>,
945        base_path: &Path,
946        object_store: &ObjectStore,
947        manifest_writer: ManifestWriter,
948        naming_scheme: ManifestNamingScheme,
949        transaction: Option<Transaction>,
950    ) -> std::result::Result<ManifestLocation, CommitError> {
951        // Create a temporary object, then use `rename_if_not_exists` to commit.
952        // If failed, clean up the temporary object.
953
954        let path = naming_scheme.manifest_path(base_path, manifest.version);
955        let tmp_path = make_staging_manifest_path(&path)?;
956
957        let res = manifest_writer(object_store, manifest, indices, &tmp_path, transaction).await?;
958
959        match object_store
960            .inner
961            .rename_if_not_exists(&tmp_path, &path)
962            .await
963        {
964            Ok(_) => {
965                // Successfully committed
966                Ok(ManifestLocation {
967                    version: manifest.version,
968                    path,
969                    size: Some(res.size as u64),
970                    naming_scheme,
971                    e_tag: None, // Re-name can change e-tag.
972                })
973            }
974            Err(ObjectStoreError::AlreadyExists { .. }) => {
975                // Another transaction has already been committed
976                // Attempt to clean up temporary object, but ignore errors if we can't
977                let _ = object_store.delete(&tmp_path).await;
978
979                return Err(CommitError::CommitConflict);
980            }
981            Err(e) => {
982                // Something else went wrong
983                return Err(CommitError::OtherError(e.into()));
984            }
985        }
986    }
987}
988
989impl Debug for RenameCommitHandler {
990    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
991        f.debug_struct("RenameCommitHandler").finish()
992    }
993}
994
995pub struct ConditionalPutCommitHandler;
996
997#[async_trait::async_trait]
998impl CommitHandler for ConditionalPutCommitHandler {
999    async fn commit(
1000        &self,
1001        manifest: &mut Manifest,
1002        indices: Option<Vec<IndexMetadata>>,
1003        base_path: &Path,
1004        object_store: &ObjectStore,
1005        manifest_writer: ManifestWriter,
1006        naming_scheme: ManifestNamingScheme,
1007        transaction: Option<Transaction>,
1008    ) -> std::result::Result<ManifestLocation, CommitError> {
1009        let path = naming_scheme.manifest_path(base_path, manifest.version);
1010
1011        let memory_store = ObjectStore::memory();
1012        let dummy_path = "dummy";
1013        manifest_writer(
1014            &memory_store,
1015            manifest,
1016            indices,
1017            &dummy_path.into(),
1018            transaction,
1019        )
1020        .await?;
1021        let dummy_data = memory_store.read_one_all(&dummy_path.into()).await?;
1022        let size = dummy_data.len() as u64;
1023        let res = object_store
1024            .inner
1025            .put_opts(
1026                &path,
1027                dummy_data.into(),
1028                PutOptions {
1029                    mode: object_store::PutMode::Create,
1030                    ..Default::default()
1031                },
1032            )
1033            .await
1034            .map_err(|err| match err {
1035                ObjectStoreError::AlreadyExists { .. } | ObjectStoreError::Precondition { .. } => {
1036                    CommitError::CommitConflict
1037                }
1038                _ => CommitError::OtherError(err.into()),
1039            })?;
1040
1041        Ok(ManifestLocation {
1042            version: manifest.version,
1043            path,
1044            size: Some(size),
1045            naming_scheme,
1046            e_tag: res.e_tag,
1047        })
1048    }
1049}
1050
1051impl Debug for ConditionalPutCommitHandler {
1052    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1053        f.debug_struct("ConditionalPutCommitHandler").finish()
1054    }
1055}
1056
1057#[derive(Debug, Clone)]
1058pub struct CommitConfig {
1059    pub num_retries: u32,
1060    pub skip_auto_cleanup: bool,
1061    // TODO: add isolation_level
1062}
1063
1064impl Default for CommitConfig {
1065    fn default() -> Self {
1066        Self {
1067            num_retries: 20,
1068            skip_auto_cleanup: false,
1069        }
1070    }
1071}
1072
1073#[cfg(test)]
1074mod tests {
1075    use lance_core::utils::tempfile::TempObjDir;
1076
1077    use super::*;
1078
1079    #[test]
1080    fn test_manifest_naming_scheme() {
1081        let v1 = ManifestNamingScheme::V1;
1082        let v2 = ManifestNamingScheme::V2;
1083
1084        assert_eq!(
1085            v1.manifest_path(&Path::from("base"), 0),
1086            Path::from("base/_versions/0.manifest")
1087        );
1088        assert_eq!(
1089            v1.manifest_path(&Path::from("base"), 42),
1090            Path::from("base/_versions/42.manifest")
1091        );
1092
1093        assert_eq!(
1094            v2.manifest_path(&Path::from("base"), 0),
1095            Path::from("base/_versions/18446744073709551615.manifest")
1096        );
1097        assert_eq!(
1098            v2.manifest_path(&Path::from("base"), 42),
1099            Path::from("base/_versions/18446744073709551573.manifest")
1100        );
1101
1102        assert_eq!(v1.parse_version("0.manifest"), Some(0));
1103        assert_eq!(v1.parse_version("42.manifest"), Some(42));
1104        assert_eq!(
1105            v1.parse_version("42.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
1106            Some(42)
1107        );
1108
1109        assert_eq!(v2.parse_version("18446744073709551615.manifest"), Some(0));
1110        assert_eq!(v2.parse_version("18446744073709551573.manifest"), Some(42));
1111        assert_eq!(
1112            v2.parse_version("18446744073709551573.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
1113            Some(42)
1114        );
1115
1116        assert_eq!(ManifestNamingScheme::detect_scheme("0.manifest"), Some(v1));
1117        assert_eq!(
1118            ManifestNamingScheme::detect_scheme("18446744073709551615.manifest"),
1119            Some(v2)
1120        );
1121        assert_eq!(ManifestNamingScheme::detect_scheme("something else"), None);
1122    }
1123
1124    #[tokio::test]
1125    async fn test_manifest_naming_migration() {
1126        let object_store = ObjectStore::memory();
1127        let base = Path::from("base");
1128        let versions_dir = base.child(VERSIONS_DIR);
1129
1130        // Write two v1 files and one v1
1131        let original_files = vec![
1132            versions_dir.child("irrelevant"),
1133            ManifestNamingScheme::V1.manifest_path(&base, 0),
1134            ManifestNamingScheme::V2.manifest_path(&base, 1),
1135        ];
1136        for path in original_files {
1137            object_store.put(&path, b"".as_slice()).await.unwrap();
1138        }
1139
1140        migrate_scheme_to_v2(&object_store, &base).await.unwrap();
1141
1142        let expected_files = vec![
1143            ManifestNamingScheme::V2.manifest_path(&base, 1),
1144            ManifestNamingScheme::V2.manifest_path(&base, 0),
1145            versions_dir.child("irrelevant"),
1146        ];
1147        let actual_files = object_store
1148            .inner
1149            .list(Some(&versions_dir))
1150            .map_ok(|res| res.location)
1151            .try_collect::<Vec<_>>()
1152            .await
1153            .unwrap();
1154        assert_eq!(actual_files, expected_files);
1155    }
1156
1157    #[tokio::test]
1158    #[rstest::rstest]
1159    async fn test_list_manifests_sorted(
1160        #[values(true, false)] lexical_list_store: bool,
1161        #[values(ManifestNamingScheme::V1, ManifestNamingScheme::V2)]
1162        naming_scheme: ManifestNamingScheme,
1163    ) {
1164        let tempdir;
1165        let (object_store, base) = if lexical_list_store {
1166            (Box::new(ObjectStore::memory()), Path::from("base"))
1167        } else {
1168            tempdir = TempObjDir::default();
1169            let path = tempdir.child("base");
1170            let store = Box::new(ObjectStore::local());
1171            assert!(!store.list_is_lexically_ordered);
1172            (store, path)
1173        };
1174
1175        // Write 12 manifest files, latest first
1176        let mut expected_paths = Vec::new();
1177        for i in (0..12).rev() {
1178            let path = naming_scheme.manifest_path(&base, i);
1179            object_store.put(&path, b"".as_slice()).await.unwrap();
1180            expected_paths.push(path);
1181        }
1182
1183        let actual_versions = ConditionalPutCommitHandler
1184            .list_manifest_locations(&base, &object_store, true)
1185            .map_ok(|location| location.path)
1186            .try_collect::<Vec<_>>()
1187            .await
1188            .unwrap();
1189
1190        assert_eq!(actual_versions, expected_paths);
1191    }
1192
1193    #[tokio::test]
1194    #[rstest::rstest]
1195    async fn test_current_manifest_path(
1196        #[values(true, false)] lexical_list_store: bool,
1197        #[values(ManifestNamingScheme::V1, ManifestNamingScheme::V2)]
1198        naming_scheme: ManifestNamingScheme,
1199    ) {
1200        // Use memory store for both cases to avoid local FS special codepath.
1201        // Modify list_is_lexically_ordered to simulate different object stores.
1202        let mut object_store = ObjectStore::memory();
1203        object_store.list_is_lexically_ordered = lexical_list_store;
1204        let object_store = Box::new(object_store);
1205        let base = Path::from("base");
1206
1207        // Write 12 manifest files in non-sequential order
1208        for version in [5, 2, 11, 0, 8, 3, 10, 1, 7, 4, 9, 6] {
1209            let path = naming_scheme.manifest_path(&base, version);
1210            object_store.put(&path, b"".as_slice()).await.unwrap();
1211        }
1212
1213        let location = current_manifest_path(&object_store, &base).await.unwrap();
1214
1215        assert_eq!(location.version, 11);
1216        assert_eq!(location.naming_scheme, naming_scheme);
1217        assert_eq!(location.path, naming_scheme.manifest_path(&base, 11));
1218    }
1219}