lance_table/io/
commit.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Trait for commit implementations.
5//!
6//! In Lance, a transaction is committed by writing the next manifest file.
7//! However, care should be taken to ensure that the manifest file is written
8//! only once, even if there are concurrent writers. Different stores have
9//! different abilities to handle concurrent writes, so a trait is provided
10//! to allow for different implementations.
11//!
12//! The trait [CommitHandler] can be implemented to provide different commit
13//! strategies. The default implementation for most object stores is
14//! [RenameCommitHandler], which writes the manifest to a temporary path, then
15//! renames the temporary path to the final path if no object already exists
16//! at the final path. This is an atomic operation in most object stores, but
17//! not in AWS S3. So for AWS S3, the default commit handler is
18//! [UnsafeCommitHandler], which writes the manifest to the final path without
19//! any checks.
20//!
21//! When providing your own commit handler, most often you are implementing in
22//! terms of a lock. The trait [CommitLock] can be implemented as a simpler
23//! alternative to [CommitHandler].
24
25use std::io;
26use std::pin::Pin;
27use std::sync::atomic::AtomicBool;
28use std::sync::Arc;
29use std::{fmt::Debug, fs::DirEntry};
30
31use futures::future::Either;
32use futures::Stream;
33use futures::{
34    future::{self, BoxFuture},
35    stream::BoxStream,
36    StreamExt, TryStreamExt,
37};
38use lance_io::object_writer::WriteResult;
39use log::warn;
40use object_store::PutOptions;
41use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore};
42use snafu::location;
43use url::Url;
44
45#[cfg(feature = "dynamodb")]
46pub mod dynamodb;
47pub mod external_manifest;
48
49use lance_core::{Error, Result};
50use lance_io::object_store::{ObjectStore, ObjectStoreExt, ObjectStoreParams};
51
52#[cfg(feature = "dynamodb")]
53use {
54    self::external_manifest::{ExternalManifestCommitHandler, ExternalManifestStore},
55    aws_credential_types::provider::error::CredentialsError,
56    aws_credential_types::provider::ProvideCredentials,
57    lance_io::object_store::{providers::aws::build_aws_credential, StorageOptions},
58    object_store::aws::AmazonS3ConfigKey,
59    object_store::aws::AwsCredentialProvider,
60    std::borrow::Cow,
61    std::time::{Duration, SystemTime},
62};
63
64use crate::format::{is_detached_version, Index, Manifest};
65
66const VERSIONS_DIR: &str = "_versions";
67const MANIFEST_EXTENSION: &str = "manifest";
68const DETACHED_VERSION_PREFIX: &str = "d";
69
70/// How manifest files should be named.
71#[derive(Clone, Copy, Debug, PartialEq, Eq)]
72pub enum ManifestNamingScheme {
73    /// `_versions/{version}.manifest`
74    V1,
75    /// `_manifests/{u64::MAX - version}.manifest`
76    ///
77    /// Zero-padded and reversed for O(1) lookup of latest version on object stores.
78    V2,
79}
80
81impl ManifestNamingScheme {
82    pub fn manifest_path(&self, base: &Path, version: u64) -> Path {
83        let directory = base.child(VERSIONS_DIR);
84        if is_detached_version(version) {
85            // Detached versions should never show up first in a list operation which
86            // means it needs to come lexicographically after all attached manifest
87            // files and so we add the prefix `d`.  There is no need to invert the
88            // version number since detached versions are not part of the version
89            let directory = base.child(VERSIONS_DIR);
90            directory.child(format!(
91                "{DETACHED_VERSION_PREFIX}{version}.{MANIFEST_EXTENSION}"
92            ))
93        } else {
94            match self {
95                Self::V1 => directory.child(format!("{version}.{MANIFEST_EXTENSION}")),
96                Self::V2 => {
97                    let inverted_version = u64::MAX - version;
98                    directory.child(format!("{inverted_version:020}.{MANIFEST_EXTENSION}"))
99                }
100            }
101        }
102    }
103
104    pub fn parse_version(&self, filename: &str) -> Option<u64> {
105        let file_number = filename
106            .split_once('.')
107            // Detached versions will fail the `parse` step, which is ok.
108            .and_then(|(version_str, _)| version_str.parse::<u64>().ok());
109        match self {
110            Self::V1 => file_number,
111            Self::V2 => file_number.map(|v| u64::MAX - v),
112        }
113    }
114
115    pub fn detect_scheme(filename: &str) -> Option<Self> {
116        if filename.starts_with(DETACHED_VERSION_PREFIX) {
117            // Currently, detached versions must imply V2
118            return Some(Self::V2);
119        }
120        if filename.ends_with(MANIFEST_EXTENSION) {
121            const V2_LEN: usize = 20 + 1 + MANIFEST_EXTENSION.len();
122            if filename.len() == V2_LEN {
123                Some(Self::V2)
124            } else {
125                Some(Self::V1)
126            }
127        } else {
128            None
129        }
130    }
131
132    pub fn detect_scheme_staging(filename: &str) -> Self {
133        // We shouldn't have to worry about detached versions here since there is no
134        // such thing as "detached" and "staged" at the same time.
135        if filename.chars().nth(20) == Some('.') {
136            Self::V2
137        } else {
138            Self::V1
139        }
140    }
141}
142
143/// Migrate all V1 manifests to V2 naming scheme.
144///
145/// This function will rename all V1 manifests to V2 naming scheme.
146///
147/// This function is idempotent, and can be run multiple times without
148/// changing the state of the object store.
149///
150/// However, it should not be run while other concurrent operations are happening.
151/// And it should also run until completion before resuming other operations.
152pub async fn migrate_scheme_to_v2(object_store: &ObjectStore, dataset_base: &Path) -> Result<()> {
153    object_store
154        .inner
155        .list(Some(&dataset_base.child(VERSIONS_DIR)))
156        .try_filter(|res| {
157            let res = if let Some(filename) = res.location.filename() {
158                ManifestNamingScheme::detect_scheme(filename) == Some(ManifestNamingScheme::V1)
159            } else {
160                false
161            };
162            future::ready(res)
163        })
164        .try_for_each_concurrent(object_store.io_parallelism(), |meta| async move {
165            let filename = meta.location.filename().unwrap();
166            let version = ManifestNamingScheme::V1.parse_version(filename).unwrap();
167            let path = ManifestNamingScheme::V2.manifest_path(dataset_base, version);
168            object_store.inner.rename(&meta.location, &path).await?;
169            Ok(())
170        })
171        .await?;
172
173    Ok(())
174}
175
176/// Function that writes the manifest to the object store.
177///
178/// Returns the size of the written manifest.
179pub type ManifestWriter = for<'a> fn(
180    object_store: &'a ObjectStore,
181    manifest: &'a mut Manifest,
182    indices: Option<Vec<Index>>,
183    path: &'a Path,
184) -> BoxFuture<'a, Result<WriteResult>>;
185
186#[derive(Debug, Clone)]
187pub struct ManifestLocation {
188    /// The version the manifest corresponds to.
189    pub version: u64,
190    /// Path of the manifest file, relative to the table root.
191    pub path: Path,
192    /// Size, in bytes, of the manifest file. If it is not known, this field should be `None`.
193    pub size: Option<u64>,
194    /// Naming scheme of the manifest file.
195    pub naming_scheme: ManifestNamingScheme,
196    /// Optional e-tag, used for integrity checks. Manifests should be immutable, so
197    /// if we detect a change in the e-tag, it means the manifest was tampered with.
198    /// This might happen if the dataset was deleted and then re-created.
199    pub e_tag: Option<String>,
200}
201
202impl TryFrom<object_store::ObjectMeta> for ManifestLocation {
203    type Error = Error;
204
205    fn try_from(meta: object_store::ObjectMeta) -> Result<Self> {
206        let filename = meta.location.filename().ok_or_else(|| Error::Internal {
207            message: "ObjectMeta location does not have a filename".to_string(),
208            location: location!(),
209        })?;
210        let scheme =
211            ManifestNamingScheme::detect_scheme(filename).ok_or_else(|| Error::Internal {
212                message: format!("Invalid manifest filename: '{}'", filename),
213                location: location!(),
214            })?;
215        let version = scheme
216            .parse_version(filename)
217            .ok_or_else(|| Error::Internal {
218                message: format!("Invalid manifest filename: '{}'", filename),
219                location: location!(),
220            })?;
221        Ok(Self {
222            version,
223            path: meta.location,
224            size: Some(meta.size),
225            naming_scheme: scheme,
226            e_tag: meta.e_tag,
227        })
228    }
229}
230
231/// Get the latest manifest path
232async fn current_manifest_path(
233    object_store: &ObjectStore,
234    base: &Path,
235) -> Result<ManifestLocation> {
236    if object_store.is_local() {
237        if let Ok(Some(location)) = current_manifest_local(base) {
238            return Ok(location);
239        }
240    }
241
242    let manifest_files = object_store.list(Some(base.child(VERSIONS_DIR)));
243
244    let mut valid_manifests = manifest_files.try_filter_map(|res| {
245        if let Some(scheme) = ManifestNamingScheme::detect_scheme(res.location.filename().unwrap())
246        {
247            future::ready(Ok(Some((scheme, res))))
248        } else {
249            future::ready(Ok(None))
250        }
251    });
252
253    let first = valid_manifests.next().await.transpose()?;
254    match (first, object_store.list_is_lexically_ordered) {
255        // If the first valid manifest we see is V2, we can assume that we are using
256        // V2 naming scheme for all manifests.
257        (Some((scheme @ ManifestNamingScheme::V2, meta)), true) => {
258            let version = scheme
259                .parse_version(meta.location.filename().unwrap())
260                .unwrap();
261
262            // Sanity check: verify at least for the first 1k files that they are all V2
263            // and that the version numbers are decreasing. We use the first 1k because
264            // this is the typical size of an object store list endpoint response page.
265            for (scheme, meta) in valid_manifests.take(999).try_collect::<Vec<_>>().await? {
266                if scheme != ManifestNamingScheme::V2 {
267                    warn!(
268                        "Found V1 Manifest in a V2 directory. Use `migrate_manifest_paths_v2` \
269                         to migrate the directory."
270                    );
271                    break;
272                }
273                let next_version = scheme
274                    .parse_version(meta.location.filename().unwrap())
275                    .unwrap();
276                if next_version >= version {
277                    warn!(
278                        "List operation was expected to be lexically ordered, but was not. This \
279                         could mean a corrupt read. Please make a bug report on the lancedb/lance \
280                         GitHub repository."
281                    );
282                    break;
283                }
284            }
285
286            Ok(ManifestLocation {
287                version,
288                path: meta.location,
289                size: Some(meta.size),
290                naming_scheme: scheme,
291                e_tag: meta.e_tag,
292            })
293        }
294        // If the first valid manifest we see if V1, assume for now that we are
295        // using V1 naming scheme for all manifests. Since we are listing the
296        // directory anyways, we will assert there aren't any V2 manifests.
297        (Some((scheme, meta)), _) => {
298            let mut current_version = scheme
299                .parse_version(meta.location.filename().unwrap())
300                .unwrap();
301            let mut current_meta = meta;
302
303            while let Some((scheme, meta)) = valid_manifests.next().await.transpose()? {
304                if matches!(scheme, ManifestNamingScheme::V2) {
305                    return Err(Error::Internal {
306                        message: "Found V2 manifest in a V1 manifest directory".to_string(),
307                        location: location!(),
308                    });
309                }
310                let version = scheme
311                    .parse_version(meta.location.filename().unwrap())
312                    .unwrap();
313                if version > current_version {
314                    current_version = version;
315                    current_meta = meta;
316                }
317            }
318            Ok(ManifestLocation {
319                version: current_version,
320                path: current_meta.location,
321                size: Some(current_meta.size),
322                naming_scheme: scheme,
323                e_tag: current_meta.e_tag,
324            })
325        }
326        (None, _) => Err(Error::NotFound {
327            uri: base.child(VERSIONS_DIR).to_string(),
328            location: location!(),
329        }),
330    }
331}
332
333// This is an optimized function that searches for the latest manifest. In
334// object_store, list operations lookup metadata for each file listed. This
335// method only gets the metadata for the found latest manifest.
336fn current_manifest_local(base: &Path) -> std::io::Result<Option<ManifestLocation>> {
337    let path = lance_io::local::to_local_path(&base.child(VERSIONS_DIR));
338    let entries = std::fs::read_dir(path)?;
339
340    let mut latest_entry: Option<(u64, DirEntry)> = None;
341
342    let mut scheme: Option<ManifestNamingScheme> = None;
343
344    for entry in entries {
345        let entry = entry?;
346        let filename_raw = entry.file_name();
347        let filename = filename_raw.to_string_lossy();
348
349        let Some(entry_scheme) = ManifestNamingScheme::detect_scheme(&filename) else {
350            // Need to ignore temporary files, such as
351            // .tmp_7.manifest_9c100374-3298-4537-afc6-f5ee7913666d
352            continue;
353        };
354
355        if let Some(scheme) = scheme {
356            if scheme != entry_scheme {
357                return Err(io::Error::new(
358                    io::ErrorKind::InvalidData,
359                    format!(
360                        "Found multiple manifest naming schemes in the same directory: {:?} and {:?}",
361                        scheme, entry_scheme
362                    ),
363                ));
364            }
365        } else {
366            scheme = Some(entry_scheme);
367        }
368
369        let Some(version) = entry_scheme.parse_version(&filename) else {
370            continue;
371        };
372
373        if let Some((latest_version, _)) = &latest_entry {
374            if version > *latest_version {
375                latest_entry = Some((version, entry));
376            }
377        } else {
378            latest_entry = Some((version, entry));
379        }
380    }
381
382    if let Some((version, entry)) = latest_entry {
383        let path = Path::from_filesystem_path(entry.path())
384            .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?;
385        let metadata = entry.metadata()?;
386        Ok(Some(ManifestLocation {
387            version,
388            path,
389            size: Some(metadata.len()),
390            naming_scheme: scheme.unwrap(),
391            e_tag: Some(get_etag(&metadata)),
392        }))
393    } else {
394        Ok(None)
395    }
396}
397
398// Based on object store's implementation.
399fn get_etag(metadata: &std::fs::Metadata) -> String {
400    let inode = get_inode(metadata);
401    let size = metadata.len();
402    let mtime = metadata
403        .modified()
404        .ok()
405        .and_then(|mtime| mtime.duration_since(std::time::SystemTime::UNIX_EPOCH).ok())
406        .unwrap_or_default()
407        .as_micros();
408
409    // Use an ETag scheme based on that used by many popular HTTP servers
410    // <https://httpd.apache.org/docs/2.2/mod/core.html#fileetag>
411    // <https://stackoverflow.com/questions/47512043/how-etags-are-generated-and-configured>
412    format!("{inode:x}-{mtime:x}-{size:x}")
413}
414
415#[cfg(unix)]
416/// We include the inode when available to yield an ETag more resistant to collisions
417/// and as used by popular web servers such as [Apache](https://httpd.apache.org/docs/2.2/mod/core.html#fileetag)
418fn get_inode(metadata: &std::fs::Metadata) -> u64 {
419    std::os::unix::fs::MetadataExt::ino(metadata)
420}
421
422#[cfg(not(unix))]
423/// On platforms where an inode isn't available, fallback to just relying on size and mtime
424fn get_inode(_metadata: &std::fs::Metadata) -> u64 {
425    0
426}
427
428fn list_manifests<'a>(
429    base_path: &Path,
430    object_store: &'a dyn OSObjectStore,
431) -> impl Stream<Item = Result<ManifestLocation>> + 'a {
432    object_store
433        .read_dir_all(&base_path.child(VERSIONS_DIR), None)
434        .filter_map(|obj_meta| {
435            futures::future::ready(
436                obj_meta
437                    .map(|m| ManifestLocation::try_from(m).ok())
438                    .transpose(),
439            )
440        })
441        .boxed()
442}
443
444fn make_staging_manifest_path(base: &Path) -> Result<Path> {
445    let id = uuid::Uuid::new_v4().to_string();
446    Path::parse(format!("{base}-{id}")).map_err(|e| Error::IO {
447        source: Box::new(e),
448        location: location!(),
449    })
450}
451
452#[cfg(feature = "dynamodb")]
453const DDB_URL_QUERY_KEY: &str = "ddbTableName";
454
455/// Handle commits that prevent conflicting writes.
456///
457/// Commit implementations ensure that if there are multiple concurrent writers
458/// attempting to write the next version of a table, only one will win. In order
459/// to work, all writers must use the same commit handler type.
460/// This trait is also responsible for resolving where the manifests live.
461///
462// TODO: pub(crate)
463#[async_trait::async_trait]
464pub trait CommitHandler: Debug + Send + Sync {
465    async fn resolve_latest_location(
466        &self,
467        base_path: &Path,
468        object_store: &ObjectStore,
469    ) -> Result<ManifestLocation> {
470        Ok(current_manifest_path(object_store, base_path).await?)
471    }
472
473    async fn resolve_version_location(
474        &self,
475        base_path: &Path,
476        version: u64,
477        object_store: &dyn OSObjectStore,
478    ) -> Result<ManifestLocation> {
479        default_resolve_version(base_path, version, object_store).await
480    }
481
482    /// If `sorted_descending` is `true`, the stream will yield manifests in descending
483    /// order of version. When the object store has a lexicographically
484    /// ordered list and the naming scheme is V2, this will use an optimized
485    /// list operation. Otherwise, it will list all manifests and sort them
486    /// in memory. When `sorted_descending` is `false`, the stream will yield manifests
487    /// in arbitrary order.
488    fn list_manifest_locations<'a>(
489        &self,
490        base_path: &Path,
491        object_store: &'a ObjectStore,
492        sorted_descending: bool,
493    ) -> BoxStream<'a, Result<ManifestLocation>> {
494        let underlying_stream = list_manifests(base_path, &object_store.inner);
495
496        if !sorted_descending {
497            return underlying_stream.boxed();
498        }
499
500        async fn sort_stream(
501            input_stream: impl futures::Stream<Item = Result<ManifestLocation>> + Unpin,
502        ) -> Result<impl Stream<Item = Result<ManifestLocation>> + Unpin> {
503            let mut locations = input_stream.try_collect::<Vec<_>>().await?;
504            locations.sort_by_key(|m| std::cmp::Reverse(m.version));
505            Ok(futures::stream::iter(locations.into_iter().map(Ok)))
506        }
507
508        // If the object store supports lexicographically ordered lists and
509        // the naming scheme is V2, we can use an optimized list operation.
510        if object_store.list_is_lexically_ordered {
511            // We don't know the naming scheme until we see the first manifest.
512            let mut peekable = underlying_stream.peekable();
513
514            futures::stream::once(async move {
515                let naming_scheme = match Pin::new(&mut peekable).peek().await {
516                    Some(Ok(m)) => m.naming_scheme,
517                    // If we get an error or no manifests are found, we default
518                    // to V2 naming scheme, since it doesn't matter.
519                    Some(Err(_)) => ManifestNamingScheme::V2,
520                    None => ManifestNamingScheme::V2,
521                };
522
523                if naming_scheme == ManifestNamingScheme::V2 {
524                    // If the first manifest is V2, we can use the optimized list operation.
525                    Ok(Either::Left(peekable))
526                } else {
527                    sort_stream(peekable).await.map(Either::Right)
528                }
529            })
530            .try_flatten()
531            .boxed()
532        } else {
533            // If the object store does not support lexicographically ordered lists,
534            // we need to sort the manifests in memory. Systems where this isn't
535            // supported (local fs, S3 express) are typically fast enough
536            // that this is not a problem.
537            futures::stream::once(sort_stream(underlying_stream))
538                .try_flatten()
539                .boxed()
540        }
541    }
542
543    /// Commit a manifest.
544    ///
545    /// This function should return an [CommitError::CommitConflict] if another
546    /// transaction has already been committed to the path.
547    async fn commit(
548        &self,
549        manifest: &mut Manifest,
550        indices: Option<Vec<Index>>,
551        base_path: &Path,
552        object_store: &ObjectStore,
553        manifest_writer: ManifestWriter,
554        naming_scheme: ManifestNamingScheme,
555    ) -> std::result::Result<ManifestLocation, CommitError>;
556
557    /// Delete the recorded manifest information for a dataset at the base_path
558    async fn delete(&self, _base_path: &Path) -> Result<()> {
559        Ok(())
560    }
561}
562
563async fn default_resolve_version(
564    base_path: &Path,
565    version: u64,
566    object_store: &dyn OSObjectStore,
567) -> Result<ManifestLocation> {
568    if is_detached_version(version) {
569        return Ok(ManifestLocation {
570            version,
571            // Detached versions are not supported with V1 naming scheme.  If we need
572            // to support in the future we could use a different prefix (e.g. 'x' or something)
573            naming_scheme: ManifestNamingScheme::V2,
574            // Both V1 and V2 should give the same path for detached versions
575            path: ManifestNamingScheme::V2.manifest_path(base_path, version),
576            size: None,
577            e_tag: None,
578        });
579    }
580
581    // try V2, fallback to V1.
582    let scheme = ManifestNamingScheme::V2;
583    let path = scheme.manifest_path(base_path, version);
584    match object_store.head(&path).await {
585        Ok(meta) => Ok(ManifestLocation {
586            version,
587            path,
588            size: Some(meta.size),
589            naming_scheme: scheme,
590            e_tag: meta.e_tag,
591        }),
592        Err(ObjectStoreError::NotFound { .. }) => {
593            // fallback to V1
594            let scheme = ManifestNamingScheme::V1;
595            Ok(ManifestLocation {
596                version,
597                path: scheme.manifest_path(base_path, version),
598                size: None,
599                naming_scheme: scheme,
600                e_tag: None,
601            })
602        }
603        Err(e) => Err(e.into()),
604    }
605}
606/// Adapt an object_store credentials into AWS SDK creds
607#[cfg(feature = "dynamodb")]
608#[derive(Debug)]
609struct OSObjectStoreToAwsCredAdaptor(AwsCredentialProvider);
610
611#[cfg(feature = "dynamodb")]
612impl ProvideCredentials for OSObjectStoreToAwsCredAdaptor {
613    fn provide_credentials<'a>(
614        &'a self,
615    ) -> aws_credential_types::provider::future::ProvideCredentials<'a>
616    where
617        Self: 'a,
618    {
619        aws_credential_types::provider::future::ProvideCredentials::new(async {
620            let creds = self
621                .0
622                .get_credential()
623                .await
624                .map_err(|e| CredentialsError::provider_error(Box::new(e)))?;
625            Ok(aws_credential_types::Credentials::new(
626                &creds.key_id,
627                &creds.secret_key,
628                creds.token.clone(),
629                Some(
630                    SystemTime::now()
631                        .checked_add(Duration::from_secs(
632                            60 * 10, //  10 min
633                        ))
634                        .expect("overflow"),
635                ),
636                "",
637            ))
638        })
639    }
640}
641
642#[cfg(feature = "dynamodb")]
643async fn build_dynamodb_external_store(
644    table_name: &str,
645    creds: AwsCredentialProvider,
646    region: &str,
647    endpoint: Option<String>,
648    app_name: &str,
649) -> Result<Arc<dyn ExternalManifestStore>> {
650    use super::commit::dynamodb::DynamoDBExternalManifestStore;
651    use aws_sdk_dynamodb::{
652        config::{IdentityCache, Region},
653        Client,
654    };
655
656    let mut dynamodb_config = aws_sdk_dynamodb::config::Builder::new()
657        .behavior_version_latest()
658        .region(Some(Region::new(region.to_string())))
659        .credentials_provider(OSObjectStoreToAwsCredAdaptor(creds))
660        // caching should be handled by passed AwsCredentialProvider
661        .identity_cache(IdentityCache::no_cache());
662
663    if let Some(endpoint) = endpoint {
664        dynamodb_config = dynamodb_config.endpoint_url(endpoint);
665    }
666    let client = Client::from_conf(dynamodb_config.build());
667
668    DynamoDBExternalManifestStore::new_external_store(client.into(), table_name, app_name).await
669}
670
671pub async fn commit_handler_from_url(
672    url_or_path: &str,
673    // This looks unused if dynamodb feature disabled
674    #[allow(unused_variables)] options: &Option<ObjectStoreParams>,
675) -> Result<Arc<dyn CommitHandler>> {
676    let local_handler: Arc<dyn CommitHandler> = if cfg!(windows) {
677        Arc::new(RenameCommitHandler)
678    } else {
679        Arc::new(ConditionalPutCommitHandler)
680    };
681
682    let url = match Url::parse(url_or_path) {
683        Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
684            // On Windows, the drive is parsed as a scheme
685            return Ok(local_handler);
686        }
687        Ok(url) => url,
688        Err(_) => {
689            return Ok(local_handler);
690        }
691    };
692
693    match url.scheme() {
694        "file" | "file-object-store" => Ok(local_handler),
695        "s3" | "gs" | "az" | "memory" => Ok(Arc::new(ConditionalPutCommitHandler)),
696        #[cfg(not(feature = "dynamodb"))]
697        "s3+ddb" => Err(Error::InvalidInput {
698            source: "`s3+ddb://` scheme requires `dynamodb` feature to be enabled".into(),
699            location: location!(),
700        }),
701        #[cfg(feature = "dynamodb")]
702        "s3+ddb" => {
703            if url.query_pairs().count() != 1 {
704                return Err(Error::InvalidInput {
705                    source: "`s3+ddb://` scheme and expects exactly one query `ddbTableName`"
706                        .into(),
707                    location: location!(),
708                });
709            }
710            let table_name = match url.query_pairs().next() {
711                Some((Cow::Borrowed(key), Cow::Borrowed(table_name)))
712                    if key == DDB_URL_QUERY_KEY =>
713                {
714                    if table_name.is_empty() {
715                        return Err(Error::InvalidInput {
716                            source: "`s3+ddb://` scheme requires non empty dynamodb table name"
717                                .into(),
718                            location: location!(),
719                        });
720                    }
721                    table_name
722                }
723                _ => {
724                    return Err(Error::InvalidInput {
725                        source: "`s3+ddb://` scheme and expects exactly one query `ddbTableName`"
726                            .into(),
727                        location: location!(),
728                    });
729                }
730            };
731            let options = options.clone().unwrap_or_default();
732            let storage_options = StorageOptions(options.storage_options.unwrap_or_default());
733            let dynamo_endpoint = get_dynamodb_endpoint(&storage_options);
734            let storage_options = storage_options.as_s3_options();
735
736            let region = storage_options.get(&AmazonS3ConfigKey::Region).cloned();
737
738            let (aws_creds, region) = build_aws_credential(
739                options.s3_credentials_refresh_offset,
740                options.aws_credentials.clone(),
741                Some(&storage_options),
742                region,
743            )
744            .await?;
745
746            Ok(Arc::new(ExternalManifestCommitHandler {
747                external_manifest_store: build_dynamodb_external_store(
748                    table_name,
749                    aws_creds.clone(),
750                    &region,
751                    dynamo_endpoint,
752                    "lancedb",
753                )
754                .await?,
755            }))
756        }
757        _ => Ok(Arc::new(UnsafeCommitHandler)),
758    }
759}
760
761#[cfg(feature = "dynamodb")]
762fn get_dynamodb_endpoint(storage_options: &StorageOptions) -> Option<String> {
763    if let Some(endpoint) = storage_options.0.get("dynamodb_endpoint") {
764        Some(endpoint.clone())
765    } else {
766        std::env::var("DYNAMODB_ENDPOINT").ok()
767    }
768}
769
770/// Errors that can occur when committing a manifest.
771#[derive(Debug)]
772pub enum CommitError {
773    /// Another transaction has already been written to the path
774    CommitConflict,
775    /// Something else went wrong
776    OtherError(Error),
777}
778
779impl From<Error> for CommitError {
780    fn from(e: Error) -> Self {
781        Self::OtherError(e)
782    }
783}
784
785impl From<CommitError> for Error {
786    fn from(e: CommitError) -> Self {
787        match e {
788            CommitError::CommitConflict => Self::Internal {
789                message: "Commit conflict".to_string(),
790                location: location!(),
791            },
792            CommitError::OtherError(e) => e,
793        }
794    }
795}
796
797/// Whether we have issued a warning about using the unsafe commit handler.
798static WARNED_ON_UNSAFE_COMMIT: AtomicBool = AtomicBool::new(false);
799
800/// A naive commit implementation that does not prevent conflicting writes.
801///
802/// This will log a warning the first time it is used.
803pub struct UnsafeCommitHandler;
804
805#[async_trait::async_trait]
806impl CommitHandler for UnsafeCommitHandler {
807    async fn commit(
808        &self,
809        manifest: &mut Manifest,
810        indices: Option<Vec<Index>>,
811        base_path: &Path,
812        object_store: &ObjectStore,
813        manifest_writer: ManifestWriter,
814        naming_scheme: ManifestNamingScheme,
815    ) -> std::result::Result<ManifestLocation, CommitError> {
816        // Log a one-time warning
817        if !WARNED_ON_UNSAFE_COMMIT.load(std::sync::atomic::Ordering::Relaxed) {
818            WARNED_ON_UNSAFE_COMMIT.store(true, std::sync::atomic::Ordering::Relaxed);
819            log::warn!(
820                "Using unsafe commit handler. Concurrent writes may result in data loss. \
821                 Consider providing a commit handler that prevents conflicting writes."
822            );
823        }
824
825        let version_path = naming_scheme.manifest_path(base_path, manifest.version);
826        // Write the manifest naively
827        let res = manifest_writer(object_store, manifest, indices, &version_path).await?;
828
829        Ok(ManifestLocation {
830            version: manifest.version,
831            size: Some(res.size as u64),
832            naming_scheme,
833            path: version_path,
834            e_tag: res.e_tag,
835        })
836    }
837}
838
839impl Debug for UnsafeCommitHandler {
840    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
841        f.debug_struct("UnsafeCommitHandler").finish()
842    }
843}
844
845/// A commit implementation that uses a lock to prevent conflicting writes.
846#[async_trait::async_trait]
847pub trait CommitLock: Debug {
848    type Lease: CommitLease;
849
850    /// Attempt to lock the table for the given version.
851    ///
852    /// If it is already locked by another transaction, wait until it is unlocked.
853    /// Once it is unlocked, return [CommitError::CommitConflict] if the version
854    /// has already been committed. Otherwise, return the lock.
855    ///
856    /// To prevent poisoned locks, it's recommended to set a timeout on the lock
857    /// of at least 30 seconds.
858    ///
859    /// It is not required that the lock tracks the version. It is provided in
860    /// case the locking is handled by a catalog service that needs to know the
861    /// current version of the table.
862    async fn lock(&self, version: u64) -> std::result::Result<Self::Lease, CommitError>;
863}
864
865#[async_trait::async_trait]
866pub trait CommitLease: Send + Sync {
867    /// Return the lease, indicating whether the commit was successful.
868    async fn release(&self, success: bool) -> std::result::Result<(), CommitError>;
869}
870
871#[async_trait::async_trait]
872impl<T: CommitLock + Send + Sync> CommitHandler for T {
873    async fn commit(
874        &self,
875        manifest: &mut Manifest,
876        indices: Option<Vec<Index>>,
877        base_path: &Path,
878        object_store: &ObjectStore,
879        manifest_writer: ManifestWriter,
880        naming_scheme: ManifestNamingScheme,
881    ) -> std::result::Result<ManifestLocation, CommitError> {
882        let path = naming_scheme.manifest_path(base_path, manifest.version);
883        // NOTE: once we have the lease we cannot use ? to return errors, since
884        // we must release the lease before returning.
885        let lease = self.lock(manifest.version).await?;
886
887        // Head the location and make sure it's not already committed
888        match object_store.inner.head(&path).await {
889            Ok(_) => {
890                // The path already exists, so it's already committed
891                // Release the lock
892                lease.release(false).await?;
893
894                return Err(CommitError::CommitConflict);
895            }
896            Err(ObjectStoreError::NotFound { .. }) => {}
897            Err(e) => {
898                // Something else went wrong
899                // Release the lock
900                lease.release(false).await?;
901
902                return Err(CommitError::OtherError(e.into()));
903            }
904        }
905        let res = manifest_writer(object_store, manifest, indices, &path).await;
906
907        // Release the lock
908        lease.release(res.is_ok()).await?;
909
910        let res = res?;
911        Ok(ManifestLocation {
912            version: manifest.version,
913            size: Some(res.size as u64),
914            naming_scheme,
915            path,
916            e_tag: res.e_tag,
917        })
918    }
919}
920
921#[async_trait::async_trait]
922impl<T: CommitLock + Send + Sync> CommitHandler for Arc<T> {
923    async fn commit(
924        &self,
925        manifest: &mut Manifest,
926        indices: Option<Vec<Index>>,
927        base_path: &Path,
928        object_store: &ObjectStore,
929        manifest_writer: ManifestWriter,
930        naming_scheme: ManifestNamingScheme,
931    ) -> std::result::Result<ManifestLocation, CommitError> {
932        self.as_ref()
933            .commit(
934                manifest,
935                indices,
936                base_path,
937                object_store,
938                manifest_writer,
939                naming_scheme,
940            )
941            .await
942    }
943}
944
945/// A commit implementation that uses a temporary path and renames the object.
946///
947/// This only works for object stores that support atomic rename if not exist.
948pub struct RenameCommitHandler;
949
950#[async_trait::async_trait]
951impl CommitHandler for RenameCommitHandler {
952    async fn commit(
953        &self,
954        manifest: &mut Manifest,
955        indices: Option<Vec<Index>>,
956        base_path: &Path,
957        object_store: &ObjectStore,
958        manifest_writer: ManifestWriter,
959        naming_scheme: ManifestNamingScheme,
960    ) -> std::result::Result<ManifestLocation, CommitError> {
961        // Create a temporary object, then use `rename_if_not_exists` to commit.
962        // If failed, clean up the temporary object.
963
964        let path = naming_scheme.manifest_path(base_path, manifest.version);
965        let tmp_path = make_staging_manifest_path(&path)?;
966
967        // Write the manifest to the temporary path
968        let res = manifest_writer(object_store, manifest, indices, &tmp_path).await?;
969
970        match object_store
971            .inner
972            .rename_if_not_exists(&tmp_path, &path)
973            .await
974        {
975            Ok(_) => {
976                // Successfully committed
977                Ok(ManifestLocation {
978                    version: manifest.version,
979                    path,
980                    size: Some(res.size as u64),
981                    naming_scheme,
982                    e_tag: None, // Re-name can change e-tag.
983                })
984            }
985            Err(ObjectStoreError::AlreadyExists { .. }) => {
986                // Another transaction has already been committed
987                // Attempt to clean up temporary object, but ignore errors if we can't
988                let _ = object_store.delete(&tmp_path).await;
989
990                return Err(CommitError::CommitConflict);
991            }
992            Err(e) => {
993                // Something else went wrong
994                return Err(CommitError::OtherError(e.into()));
995            }
996        }
997    }
998}
999
1000impl Debug for RenameCommitHandler {
1001    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1002        f.debug_struct("RenameCommitHandler").finish()
1003    }
1004}
1005
1006pub struct ConditionalPutCommitHandler;
1007
1008#[async_trait::async_trait]
1009impl CommitHandler for ConditionalPutCommitHandler {
1010    async fn commit(
1011        &self,
1012        manifest: &mut Manifest,
1013        indices: Option<Vec<Index>>,
1014        base_path: &Path,
1015        object_store: &ObjectStore,
1016        manifest_writer: ManifestWriter,
1017        naming_scheme: ManifestNamingScheme,
1018    ) -> std::result::Result<ManifestLocation, CommitError> {
1019        let path = naming_scheme.manifest_path(base_path, manifest.version);
1020
1021        let memory_store = ObjectStore::memory();
1022        let dummy_path = "dummy";
1023        manifest_writer(&memory_store, manifest, indices, &dummy_path.into()).await?;
1024        let dummy_data = memory_store.read_one_all(&dummy_path.into()).await?;
1025        let size = dummy_data.len() as u64;
1026        let res = object_store
1027            .inner
1028            .put_opts(
1029                &path,
1030                dummy_data.into(),
1031                PutOptions {
1032                    mode: object_store::PutMode::Create,
1033                    ..Default::default()
1034                },
1035            )
1036            .await
1037            .map_err(|err| match err {
1038                ObjectStoreError::AlreadyExists { .. } | ObjectStoreError::Precondition { .. } => {
1039                    CommitError::CommitConflict
1040                }
1041                _ => CommitError::OtherError(err.into()),
1042            })?;
1043
1044        Ok(ManifestLocation {
1045            version: manifest.version,
1046            path,
1047            size: Some(size),
1048            naming_scheme,
1049            e_tag: res.e_tag,
1050        })
1051    }
1052}
1053
1054impl Debug for ConditionalPutCommitHandler {
1055    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1056        f.debug_struct("ConditionalPutCommitHandler").finish()
1057    }
1058}
1059
1060#[derive(Debug, Clone)]
1061pub struct CommitConfig {
1062    pub num_retries: u32,
1063    pub skip_auto_cleanup: bool,
1064    // TODO: add isolation_level
1065}
1066
1067impl Default for CommitConfig {
1068    fn default() -> Self {
1069        Self {
1070            num_retries: 20,
1071            skip_auto_cleanup: false,
1072        }
1073    }
1074}
1075
1076#[cfg(test)]
1077mod tests {
1078    use super::*;
1079
1080    #[test]
1081    fn test_manifest_naming_scheme() {
1082        let v1 = ManifestNamingScheme::V1;
1083        let v2 = ManifestNamingScheme::V2;
1084
1085        assert_eq!(
1086            v1.manifest_path(&Path::from("base"), 0),
1087            Path::from("base/_versions/0.manifest")
1088        );
1089        assert_eq!(
1090            v1.manifest_path(&Path::from("base"), 42),
1091            Path::from("base/_versions/42.manifest")
1092        );
1093
1094        assert_eq!(
1095            v2.manifest_path(&Path::from("base"), 0),
1096            Path::from("base/_versions/18446744073709551615.manifest")
1097        );
1098        assert_eq!(
1099            v2.manifest_path(&Path::from("base"), 42),
1100            Path::from("base/_versions/18446744073709551573.manifest")
1101        );
1102
1103        assert_eq!(v1.parse_version("0.manifest"), Some(0));
1104        assert_eq!(v1.parse_version("42.manifest"), Some(42));
1105        assert_eq!(
1106            v1.parse_version("42.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
1107            Some(42)
1108        );
1109
1110        assert_eq!(v2.parse_version("18446744073709551615.manifest"), Some(0));
1111        assert_eq!(v2.parse_version("18446744073709551573.manifest"), Some(42));
1112        assert_eq!(
1113            v2.parse_version("18446744073709551573.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
1114            Some(42)
1115        );
1116
1117        assert_eq!(ManifestNamingScheme::detect_scheme("0.manifest"), Some(v1));
1118        assert_eq!(
1119            ManifestNamingScheme::detect_scheme("18446744073709551615.manifest"),
1120            Some(v2)
1121        );
1122        assert_eq!(ManifestNamingScheme::detect_scheme("something else"), None);
1123    }
1124
1125    #[tokio::test]
1126    async fn test_manifest_naming_migration() {
1127        let object_store = ObjectStore::memory();
1128        let base = Path::from("base");
1129        let versions_dir = base.child(VERSIONS_DIR);
1130
1131        // Write two v1 files and one v1
1132        let original_files = vec![
1133            versions_dir.child("irrelevant"),
1134            ManifestNamingScheme::V1.manifest_path(&base, 0),
1135            ManifestNamingScheme::V2.manifest_path(&base, 1),
1136        ];
1137        for path in original_files {
1138            object_store.put(&path, b"".as_slice()).await.unwrap();
1139        }
1140
1141        migrate_scheme_to_v2(&object_store, &base).await.unwrap();
1142
1143        let expected_files = vec![
1144            ManifestNamingScheme::V2.manifest_path(&base, 1),
1145            ManifestNamingScheme::V2.manifest_path(&base, 0),
1146            versions_dir.child("irrelevant"),
1147        ];
1148        let actual_files = object_store
1149            .inner
1150            .list(Some(&versions_dir))
1151            .map_ok(|res| res.location)
1152            .try_collect::<Vec<_>>()
1153            .await
1154            .unwrap();
1155        assert_eq!(actual_files, expected_files);
1156    }
1157
1158    #[tokio::test]
1159    #[rstest::rstest]
1160    async fn test_list_manifests_sorted(
1161        #[values(true, false)] lexical_list_store: bool,
1162        #[values(ManifestNamingScheme::V1, ManifestNamingScheme::V2)]
1163        naming_scheme: ManifestNamingScheme,
1164    ) {
1165        let tempdir;
1166        let (object_store, base) = if lexical_list_store {
1167            (Box::new(ObjectStore::memory()), Path::from("base"))
1168        } else {
1169            tempdir = Some(tempfile::tempdir().unwrap());
1170            let base = Path::from_absolute_path(tempdir.as_ref().unwrap().path().to_str().unwrap())
1171                .unwrap();
1172            let store = Box::new(ObjectStore::local());
1173            assert!(!store.list_is_lexically_ordered);
1174            (store, base)
1175        };
1176
1177        // Write 12 manifest files, latest first
1178        let mut expected_paths = Vec::new();
1179        for i in (0..12).rev() {
1180            let path = naming_scheme.manifest_path(&base, i);
1181            object_store.put(&path, b"".as_slice()).await.unwrap();
1182            expected_paths.push(path);
1183        }
1184
1185        let actual_versions = ConditionalPutCommitHandler
1186            .list_manifest_locations(&base, &object_store, true)
1187            .map_ok(|location| location.path)
1188            .try_collect::<Vec<_>>()
1189            .await
1190            .unwrap();
1191
1192        assert_eq!(actual_versions, expected_paths);
1193    }
1194}