Skip to main content

lance_table/io/
commit.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Trait for commit implementations.
5//!
6//! In Lance, a transaction is committed by writing the next manifest file.
7//! However, care should be taken to ensure that the manifest file is written
8//! only once, even if there are concurrent writers. Different stores have
9//! different abilities to handle concurrent writes, so a trait is provided
10//! to allow for different implementations.
11//!
12//! The trait [CommitHandler] can be implemented to provide different commit
13//! strategies. The default implementation for most object stores is
14//! [RenameCommitHandler], which writes the manifest to a temporary path, then
15//! renames the temporary path to the final path if no object already exists
16//! at the final path. This is an atomic operation in most object stores, but
17//! not in AWS S3. So for AWS S3, the default commit handler is
18//! [UnsafeCommitHandler], which writes the manifest to the final path without
19//! any checks.
20//!
21//! When providing your own commit handler, most often you are implementing in
22//! terms of a lock. The trait [CommitLock] can be implemented as a simpler
23//! alternative to [CommitHandler].
24
25use std::io;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::sync::atomic::AtomicBool;
29use std::{fmt::Debug, fs::DirEntry};
30
31use super::manifest::write_manifest;
32use futures::Stream;
33use futures::future::Either;
34use futures::{
35    StreamExt, TryStreamExt,
36    future::{self, BoxFuture},
37    stream::BoxStream,
38};
39use lance_file::format::{MAGIC, MAJOR_VERSION, MINOR_VERSION};
40use lance_io::object_writer::{ObjectWriter, WriteResult, get_etag};
41use log::warn;
42use object_store::ObjectStoreExt as OSObjectStoreExt;
43use object_store::PutOptions;
44use object_store::{Error as ObjectStoreError, ObjectStore as OSObjectStore, path::Path};
45use tracing::info;
46use url::Url;
47
48#[cfg(feature = "dynamodb")]
49pub mod dynamodb;
50pub mod external_manifest;
51
52use lance_core::{Error, Result};
53use lance_io::object_store::{ObjectStore, ObjectStoreExt, ObjectStoreParams};
54use lance_io::traits::{WriteExt, Writer};
55
56use crate::format::{IndexMetadata, Manifest, Transaction, is_detached_version};
57use lance_core::utils::tracing::{AUDIT_MODE_CREATE, AUDIT_TYPE_MANIFEST, TRACE_FILE_AUDIT};
58#[cfg(feature = "dynamodb")]
59use {
60    self::external_manifest::{ExternalManifestCommitHandler, ExternalManifestStore},
61    aws_credential_types::provider::ProvideCredentials,
62    aws_credential_types::provider::error::CredentialsError,
63    lance_io::object_store::{StorageOptions, providers::aws::build_aws_credential},
64    object_store::aws::AmazonS3ConfigKey,
65    object_store::aws::AwsCredentialProvider,
66    std::borrow::Cow,
67    std::time::{Duration, SystemTime},
68};
69
70pub const VERSIONS_DIR: &str = "_versions";
71const MANIFEST_EXTENSION: &str = "manifest";
72const DETACHED_VERSION_PREFIX: &str = "d";
73/// File name for the JSON version hint file, stored under `_versions/`.
74///
75/// The file contains `{"version":N}` where `N` is the latest committed version
76/// at the time of writing. It enables O(1)/O(k) latest-version lookup via HEAD
77/// requests on object stores where listing is not lexicographically ordered
78/// (e.g. S3 Express, local filesystem) instead of an O(n) listing.
79const VERSION_HINT_FILE: &str = "latest_version_hint.json";
80
81/// How manifest files should be named.
82#[derive(Clone, Copy, Debug, PartialEq, Eq)]
83pub enum ManifestNamingScheme {
84    /// `_versions/{version}.manifest`
85    V1,
86    /// `_manifests/{u64::MAX - version}.manifest`
87    ///
88    /// Zero-padded and reversed for O(1) lookup of latest version on object stores.
89    V2,
90}
91
92impl ManifestNamingScheme {
93    pub fn manifest_path(&self, base: &Path, version: u64) -> Path {
94        if is_detached_version(version) {
95            // Detached versions should never show up first in a list operation which
96            // means it needs to come lexicographically after all attached manifest
97            // files and so we add the prefix `d`.  There is no need to invert the
98            // version number since detached versions are not part of the version
99            base.clone().join(VERSIONS_DIR).join(format!(
100                "{DETACHED_VERSION_PREFIX}{version}.{MANIFEST_EXTENSION}"
101            ))
102        } else {
103            let directory = base.clone().join(VERSIONS_DIR);
104            match self {
105                Self::V1 => directory.join(format!("{version}.{MANIFEST_EXTENSION}")),
106                Self::V2 => {
107                    let inverted_version = u64::MAX - version;
108                    directory.join(format!("{inverted_version:020}.{MANIFEST_EXTENSION}"))
109                }
110            }
111        }
112    }
113
114    pub fn parse_version(&self, filename: &str) -> Option<u64> {
115        let file_number = filename
116            .split_once('.')
117            // Detached versions will fail the `parse` step, which is ok.
118            .and_then(|(version_str, _)| version_str.parse::<u64>().ok());
119        match self {
120            Self::V1 => file_number,
121            Self::V2 => file_number.map(|v| u64::MAX - v),
122        }
123    }
124
125    /// Parse a detached version from a filename like `d123456.manifest`.
126    ///
127    /// Returns the full version number with the detached mask bit set.
128    pub fn parse_detached_version(filename: &str) -> Option<u64> {
129        if !filename.starts_with(DETACHED_VERSION_PREFIX) {
130            return None;
131        }
132        let without_prefix = &filename[DETACHED_VERSION_PREFIX.len()..];
133        without_prefix
134            .split_once('.')
135            .and_then(|(version_str, _)| version_str.parse::<u64>().ok())
136    }
137
138    pub fn detect_scheme(filename: &str) -> Option<Self> {
139        if filename.starts_with(DETACHED_VERSION_PREFIX) {
140            // Currently, detached versions must imply V2
141            return Some(Self::V2);
142        }
143        if filename.ends_with(MANIFEST_EXTENSION) {
144            const V2_LEN: usize = 20 + 1 + MANIFEST_EXTENSION.len();
145            if filename.len() == V2_LEN {
146                Some(Self::V2)
147            } else {
148                Some(Self::V1)
149            }
150        } else {
151            None
152        }
153    }
154
155    pub fn detect_scheme_staging(filename: &str) -> Self {
156        // We shouldn't have to worry about detached versions here since there is no
157        // such thing as "detached" and "staged" at the same time.
158        if filename.chars().nth(20) == Some('.') {
159            Self::V2
160        } else {
161            Self::V1
162        }
163    }
164}
165
166/// Migrate all V1 manifests to V2 naming scheme.
167///
168/// This function will rename all V1 manifests to V2 naming scheme.
169///
170/// This function is idempotent, and can be run multiple times without
171/// changing the state of the object store.
172///
173/// However, it should not be run while other concurrent operations are happening.
174/// And it should also run until completion before resuming other operations.
175pub async fn migrate_scheme_to_v2(object_store: &ObjectStore, dataset_base: &Path) -> Result<()> {
176    object_store
177        .inner
178        .list(Some(&dataset_base.clone().join(VERSIONS_DIR)))
179        .try_filter(|res| {
180            let res = if let Some(filename) = res.location.filename() {
181                ManifestNamingScheme::detect_scheme(filename) == Some(ManifestNamingScheme::V1)
182            } else {
183                false
184            };
185            future::ready(res)
186        })
187        .try_for_each_concurrent(object_store.io_parallelism(), |meta| async move {
188            let filename = meta.location.filename().unwrap();
189            let version = ManifestNamingScheme::V1.parse_version(filename).unwrap();
190            let path = ManifestNamingScheme::V2.manifest_path(dataset_base, version);
191            object_store.inner.rename(&meta.location, &path).await?;
192            Ok(())
193        })
194        .await?;
195
196    Ok(())
197}
198
199/// Function that writes the manifest to the object store.
200///
201/// Returns the size of the written manifest.
202pub type ManifestWriter = for<'a> fn(
203    object_store: &'a ObjectStore,
204    manifest: &'a mut Manifest,
205    indices: Option<Vec<IndexMetadata>>,
206    path: &'a Path,
207    transaction: Option<Transaction>,
208) -> BoxFuture<'a, Result<WriteResult>>;
209
210/// Canonical manifest writer; its function item type exactly matches `ManifestWriter`.
211/// Rationale: keep a crate-local writer implementation so call sites can pass this function
212/// directly without non-primitive casts or lifetime coercions.
213pub fn write_manifest_file_to_path<'a>(
214    object_store: &'a ObjectStore,
215    manifest: &'a mut Manifest,
216    indices: Option<Vec<IndexMetadata>>,
217    path: &'a Path,
218    transaction: Option<Transaction>,
219) -> BoxFuture<'a, Result<WriteResult>> {
220    Box::pin(async move {
221        let mut object_writer = ObjectWriter::new(object_store, path).await?;
222        let pos = write_manifest(&mut object_writer, manifest, indices, transaction).await?;
223        object_writer
224            .write_magics(pos, MAJOR_VERSION, MINOR_VERSION, MAGIC)
225            .await?;
226        let res = Writer::shutdown(&mut object_writer).await?;
227        info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_MANIFEST, path = path.to_string());
228        Ok(res)
229    })
230}
231
232#[derive(Debug, Clone)]
233pub struct ManifestLocation {
234    /// The version the manifest corresponds to.
235    pub version: u64,
236    /// Path of the manifest file, relative to the table root.
237    pub path: Path,
238    /// Size, in bytes, of the manifest file. If it is not known, this field should be `None`.
239    pub size: Option<u64>,
240    /// Naming scheme of the manifest file.
241    pub naming_scheme: ManifestNamingScheme,
242    /// Optional e-tag, used for integrity checks. Manifests should be immutable, so
243    /// if we detect a change in the e-tag, it means the manifest was tampered with.
244    /// This might happen if the dataset was deleted and then re-created.
245    pub e_tag: Option<String>,
246}
247
248impl TryFrom<object_store::ObjectMeta> for ManifestLocation {
249    type Error = Error;
250
251    fn try_from(meta: object_store::ObjectMeta) -> Result<Self> {
252        let filename = meta.location.filename().ok_or_else(|| {
253            Error::internal("ObjectMeta location does not have a filename".to_string())
254        })?;
255        let scheme = ManifestNamingScheme::detect_scheme(filename)
256            .ok_or_else(|| Error::internal(format!("Invalid manifest filename: '{}'", filename)))?;
257        let version = scheme
258            .parse_version(filename)
259            .ok_or_else(|| Error::internal(format!("Invalid manifest filename: '{}'", filename)))?;
260        Ok(Self {
261            version,
262            path: meta.location,
263            size: Some(meta.size),
264            naming_scheme: scheme,
265            e_tag: meta.e_tag,
266        })
267    }
268}
269
270/// Get the latest manifest path.
271///
272/// - Local filesystem: a single directory read.
273/// - Stores where listing is not lexicographically ordered (e.g. S3 Express):
274///   the version hint (read the hint file, then probe higher versions with
275///   HEADs), falling back to a listing if the hint is missing or stale. A full
276///   listing on these stores is O(n) in the number of versions.
277/// - Lexicographically ordered stores (e.g. S3 Standard, GCS): the listing
278///   already resolves the latest version in roughly one request.
279async fn current_manifest_path(
280    object_store: &ObjectStore,
281    base: &Path,
282) -> Result<ManifestLocation> {
283    if object_store.is_local() {
284        if let Ok(Some(location)) = current_manifest_local(base) {
285            return Ok(location);
286        }
287    } else if uses_version_hint(object_store)
288        && let Some(location) = read_version_hint_and_probe(object_store, base).await
289    {
290        return Ok(location);
291    }
292
293    resolve_version_from_listing(object_store, base).await
294}
295
296/// JSON body of the version hint file: `{"version":N}`.
297#[derive(serde::Serialize, serde::Deserialize)]
298struct VersionHint {
299    version: u64,
300}
301
302/// Set `LANCE_USE_VERSION_HINT=0` (or `false`) to globally disable the version
303/// hint — writers stop emitting the hint file and readers stop consulting it,
304/// falling back to plain listing. Intended as a benchmark/escape-hatch knob;
305/// the hint is on by default.
306const VERSION_HINT_ENV: &str = "LANCE_USE_VERSION_HINT";
307
308fn version_hint_globally_enabled() -> bool {
309    static ENABLED: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
310    *ENABLED.get_or_init(|| match std::env::var(VERSION_HINT_ENV) {
311        Ok(v) => !matches!(
312            v.trim().to_ascii_lowercase().as_str(),
313            "0" | "false" | "off"
314        ),
315        Err(_) => true,
316    })
317}
318
319/// Whether this object store benefits from a version hint.
320///
321/// On stores where listing is lexicographically ordered (S3 Standard, GCS,
322/// Azure, ...) the latest version is already resolved in roughly one request,
323/// so the hint would only add a write per commit for nothing. We write (and
324/// read) it only on stores where listing is not lexicographically ordered —
325/// S3 Express and the local filesystem. Can be force-disabled with the
326/// `LANCE_USE_VERSION_HINT=0` environment variable.
327pub fn uses_version_hint(object_store: &ObjectStore) -> bool {
328    version_hint_globally_enabled() && !object_store.list_is_lexically_ordered
329}
330
331/// Path to the JSON version hint file for a dataset.
332fn version_hint_path(base: &Path) -> Path {
333    base.clone().join(VERSIONS_DIR).join(VERSION_HINT_FILE)
334}
335
336/// Write the version hint file after a successful commit.
337///
338/// The hint is stored as JSON: `{"version":N}`. This write is best-effort —
339/// failures are logged and ignored, since the hint only accelerates reads and
340/// never affects correctness (readers verify the hinted version and probe
341/// upward from there). It is a no-op for detached versions and for stores that
342/// do not benefit from a hint (see [`uses_version_hint`]).
343pub async fn write_version_hint(object_store: &ObjectStore, base: &Path, version: u64) {
344    if is_detached_version(version) || !uses_version_hint(object_store) {
345        return;
346    }
347    let hint_path = version_hint_path(base);
348    let content = serde_json::to_vec(&VersionHint { version }).expect("serialize version hint");
349    if let Err(e) = object_store.put(&hint_path, content.as_slice()).await {
350        warn!("Failed to write version hint file for version {version}: {e}");
351    }
352}
353
354/// Read the latest version from the hint file, or `None` if it does not exist
355/// or cannot be parsed.
356async fn read_version_from_hint(object_store: &ObjectStore, base: &Path) -> Option<u64> {
357    let bytes = object_store
358        .inner
359        .get(&version_hint_path(base))
360        .await
361        .ok()?
362        .bytes()
363        .await
364        .ok()?;
365    Some(serde_json::from_slice::<VersionHint>(&bytes).ok()?.version)
366}
367
368/// Read the version hint and probe upward to find the true latest manifest.
369///
370/// Returns `None` if the hint file is missing, the hinted version no longer
371/// exists, or any error occurred — callers should fall back to listing.
372async fn read_version_hint_and_probe(
373    object_store: &ObjectStore,
374    base: &Path,
375) -> Option<ManifestLocation> {
376    let hint_version = read_version_from_hint(object_store, base).await?;
377    let (version, scheme, mut probed) = probe_versions_upward(object_store, base, hint_version)
378        .await
379        .ok()
380        .flatten()?;
381    // `probed` is non-empty and its last entry is the highest version found.
382    let (_, meta) = probed.pop()?;
383    Some(ManifestLocation {
384        version,
385        path: scheme.manifest_path(base, version),
386        size: Some(meta.size),
387        naming_scheme: scheme,
388        e_tag: meta.e_tag,
389    })
390}
391
392/// Maximum version gap between the hint and the read version for which we use
393/// the hint-based parallel-HEAD path; beyond this a single (paginated) listing
394/// is cheaper, so callers fall back to it.
395const MAX_HINT_PROBE_GAP: u64 = 1000;
396
397/// Probe `from_version`, then `from_version + 1`, `+ 2`, ... with HEAD requests
398/// until one is not found.
399///
400/// Assumes attached versions are contiguous above `from_version` (true in
401/// practice: every commit increments by one, and cleanup only removes *old*
402/// versions, never ones newer than the latest). A `NotFound` therefore marks
403/// the end of the history.
404///
405/// - `Ok(Some((true_latest_version, naming_scheme, [(version, meta), ...])))`:
406///   the vec covers every version from `from_version` through the true latest
407///   in ascending order.
408/// - `Ok(None)`: `from_version` itself does not exist (a `NotFound` for both
409///   naming schemes) — i.e. the hint pointed past the end.
410/// - `Err(_)`: a transient object-store error was hit, so the probed range may
411///   be incomplete; callers should fall back to a full listing rather than
412///   trust a possibly-stale result.
413async fn probe_versions_upward(
414    object_store: &ObjectStore,
415    base: &Path,
416    from_version: u64,
417) -> Result<
418    Option<(
419        u64,
420        ManifestNamingScheme,
421        Vec<(u64, object_store::ObjectMeta)>,
422    )>,
423> {
424    // Newer datasets use V2; fall back to V1 if the V2 path is not found.
425    let mut scheme = ManifestNamingScheme::V2;
426    let meta = match object_store
427        .inner
428        .head(&scheme.manifest_path(base, from_version))
429        .await
430    {
431        Ok(meta) => meta,
432        Err(ObjectStoreError::NotFound { .. }) => {
433            scheme = ManifestNamingScheme::V1;
434            match object_store
435                .inner
436                .head(&scheme.manifest_path(base, from_version))
437                .await
438            {
439                Ok(meta) => meta,
440                Err(ObjectStoreError::NotFound { .. }) => return Ok(None),
441                Err(e) => return Err(e.into()),
442            }
443        }
444        Err(e) => return Err(e.into()),
445    };
446
447    let mut probed = vec![(from_version, meta)];
448    let mut version = from_version;
449    loop {
450        let next = version + 1;
451        match object_store
452            .inner
453            .head(&scheme.manifest_path(base, next))
454            .await
455        {
456            Ok(meta) => {
457                probed.push((next, meta));
458                version = next;
459            }
460            // NotFound means we found the latest version.
461            Err(ObjectStoreError::NotFound { .. }) => break,
462            // A transient error means a newer version might exist that we
463            // failed to observe — surface it so callers fall back to listing.
464            Err(e) => return Err(e.into()),
465        }
466    }
467    Ok(Some((version, scheme, probed)))
468}
469
470/// List manifest locations with version `> since_version` using the version
471/// hint, in descending order of version.
472///
473/// Returns `None` if the hint is missing or stale enough that this is not
474/// usable — callers should fall back to a full listing. `Some(vec![])` is the
475/// fast path where the hint confirms there are no new versions.
476async fn list_manifests_since_version_with_hint(
477    object_store: &ObjectStore,
478    base: &Path,
479    since_version: u64,
480) -> Option<Vec<ManifestLocation>> {
481    let hint_version = read_version_from_hint(object_store, base).await?;
482
483    // A reader that is very far behind is cheaper to serve with one paginated
484    // listing than with thousands of HEADs.
485    if hint_version.saturating_sub(since_version) > MAX_HINT_PROBE_GAP {
486        return None;
487    }
488
489    // If the hint is not newer than the read version, the only versions that
490    // could exist are right above it; otherwise start at the hint.
491    let probe_from = if hint_version > since_version {
492        hint_version
493    } else {
494        since_version + 1
495    };
496
497    let (scheme, probed) = match probe_versions_upward(object_store, base, probe_from).await {
498        Ok(Some((_true_latest, scheme, probed))) => (scheme, probed),
499        // Nothing at `probe_from`. If we were probing from the hint, the hint
500        // is stale — bail to a full listing. If we were probing from
501        // `since_version + 1`, there are simply no new versions.
502        Ok(None) if hint_version > since_version => return None,
503        Ok(None) => return Some(Vec::new()),
504        // Transient error: don't trust the hint path, fall back to listing.
505        Err(_) => return None,
506    };
507
508    let mut locations: Vec<ManifestLocation> = probed
509        .into_iter()
510        .filter(|(v, _)| *v > since_version)
511        .map(|(version, meta)| ManifestLocation {
512            version,
513            path: scheme.manifest_path(base, version),
514            size: Some(meta.size),
515            naming_scheme: scheme,
516            e_tag: meta.e_tag,
517        })
518        .collect();
519
520    // Fill the gap between `since_version` and the hint with HEADs (the probe
521    // above already covered `hint_version` and up). The range is contiguous, so
522    // any error here (including a `NotFound`) means we can't trust the hint path
523    // — fall back to a full listing.
524    if hint_version > since_version + 1 {
525        let gap_locations: Vec<ManifestLocation> =
526            futures::stream::iter((since_version + 1)..hint_version)
527                .map(|version| async move {
528                    object_store
529                        .inner
530                        .head(&scheme.manifest_path(base, version))
531                        .await
532                        .map(|meta| ManifestLocation {
533                            version,
534                            path: scheme.manifest_path(base, version),
535                            size: Some(meta.size),
536                            naming_scheme: scheme,
537                            e_tag: meta.e_tag,
538                        })
539                })
540                .buffer_unordered(object_store.io_parallelism())
541                .try_collect()
542                .await
543                .ok()?;
544        locations.extend(gap_locations);
545    }
546
547    locations.sort_by_key(|loc| std::cmp::Reverse(loc.version));
548    Some(locations)
549}
550
551/// Resolve the latest manifest by listing the versions directory.
552async fn resolve_version_from_listing(
553    object_store: &ObjectStore,
554    base: &Path,
555) -> Result<ManifestLocation> {
556    let manifest_files = object_store.list(Some(base.clone().join(VERSIONS_DIR)));
557
558    let mut valid_manifests = manifest_files.try_filter_map(|res| {
559        let filename = res.location.filename().unwrap();
560        if let Some(scheme) = ManifestNamingScheme::detect_scheme(filename) {
561            // Only include if we can parse a version (skip detached versions)
562            if scheme.parse_version(filename).is_some() {
563                future::ready(Ok(Some((scheme, res))))
564            } else {
565                future::ready(Ok(None))
566            }
567        } else {
568            future::ready(Ok(None))
569        }
570    });
571
572    let first = valid_manifests.next().await.transpose()?;
573    match (first, object_store.list_is_lexically_ordered) {
574        // If the first valid manifest we see is V2, we can assume that we are using
575        // V2 naming scheme for all manifests.
576        (Some((scheme @ ManifestNamingScheme::V2, meta)), true) => {
577            let version = scheme
578                .parse_version(meta.location.filename().unwrap())
579                .unwrap();
580
581            // Sanity check: verify at least for the first 1k files that they are all V2
582            // and that the version numbers are decreasing. We use the first 1k because
583            // this is the typical size of an object store list endpoint response page.
584            for (scheme, meta) in valid_manifests.take(999).try_collect::<Vec<_>>().await? {
585                if scheme != ManifestNamingScheme::V2 {
586                    warn!(
587                        "Found V1 Manifest in a V2 directory. Use `migrate_manifest_paths_v2` \
588                         to migrate the directory."
589                    );
590                    break;
591                }
592                let next_version = scheme
593                    .parse_version(meta.location.filename().unwrap())
594                    .unwrap();
595                if next_version >= version {
596                    warn!(
597                        "List operation was expected to be lexically ordered, but was not. This \
598                         could mean a corrupt read. Please make a bug report on the lance-format/lance \
599                         GitHub repository."
600                    );
601                    break;
602                }
603            }
604
605            Ok(ManifestLocation {
606                version,
607                path: meta.location,
608                size: Some(meta.size),
609                naming_scheme: scheme,
610                e_tag: meta.e_tag,
611            })
612        }
613        // If the list is not lexically ordered, we need to iterate all manifests
614        // to find the latest version. This works for both V1 and V2 schemes.
615        (Some((first_scheme, meta)), _) => {
616            let mut current_version = first_scheme
617                .parse_version(meta.location.filename().unwrap())
618                .unwrap();
619            let mut current_meta = meta;
620            let scheme = first_scheme;
621
622            while let Some((entry_scheme, meta)) = valid_manifests.next().await.transpose()? {
623                if entry_scheme != scheme {
624                    return Err(Error::internal(format!(
625                        "Found multiple manifest naming schemes in the same directory: {:?} and {:?}. \
626                         Use `migrate_manifest_paths_v2` to migrate the directory.",
627                        scheme, entry_scheme
628                    )));
629                }
630                let version = entry_scheme
631                    .parse_version(meta.location.filename().unwrap())
632                    .unwrap();
633                if version > current_version {
634                    current_version = version;
635                    current_meta = meta;
636                }
637            }
638            Ok(ManifestLocation {
639                version: current_version,
640                path: current_meta.location,
641                size: Some(current_meta.size),
642                naming_scheme: scheme,
643                e_tag: current_meta.e_tag,
644            })
645        }
646        (None, _) => Err(Error::not_found(
647            base.clone().join(VERSIONS_DIR).to_string(),
648        )),
649    }
650}
651
652// This is an optimized function that searches for the latest manifest. In
653// object_store, list operations lookup metadata for each file listed. This
654// method only gets the metadata for the found latest manifest.
655fn current_manifest_local(base: &Path) -> std::io::Result<Option<ManifestLocation>> {
656    let path = lance_io::local::to_local_path(&base.clone().join(VERSIONS_DIR));
657    let entries = std::fs::read_dir(path)?;
658
659    let mut latest_entry: Option<(u64, DirEntry)> = None;
660
661    let mut scheme: Option<ManifestNamingScheme> = None;
662
663    for entry in entries {
664        let entry = entry?;
665        let filename_raw = entry.file_name();
666        let filename = filename_raw.to_string_lossy();
667
668        let Some(entry_scheme) = ManifestNamingScheme::detect_scheme(&filename) else {
669            // Need to ignore temporary files, such as
670            // .tmp_7.manifest_9c100374-3298-4537-afc6-f5ee7913666d
671            continue;
672        };
673
674        if let Some(scheme) = scheme {
675            if scheme != entry_scheme {
676                return Err(io::Error::new(
677                    io::ErrorKind::InvalidData,
678                    format!(
679                        "Found multiple manifest naming schemes in the same directory: {:?} and {:?}",
680                        scheme, entry_scheme
681                    ),
682                ));
683            }
684        } else {
685            scheme = Some(entry_scheme);
686        }
687
688        let Some(version) = entry_scheme.parse_version(&filename) else {
689            continue;
690        };
691
692        if let Some((latest_version, _)) = &latest_entry {
693            if version > *latest_version {
694                latest_entry = Some((version, entry));
695            }
696        } else {
697            latest_entry = Some((version, entry));
698        }
699    }
700
701    if let Some((version, entry)) = latest_entry {
702        let path = Path::from_filesystem_path(entry.path())
703            .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?;
704        let metadata = entry.metadata()?;
705        Ok(Some(ManifestLocation {
706            version,
707            path,
708            size: Some(metadata.len()),
709            naming_scheme: scheme.unwrap(),
710            e_tag: Some(get_etag(&metadata)),
711        }))
712    } else {
713        Ok(None)
714    }
715}
716
717fn list_manifests<'a>(
718    base_path: &Path,
719    object_store: &'a dyn OSObjectStore,
720) -> impl Stream<Item = Result<ManifestLocation>> + 'a {
721    object_store
722        .read_dir_all(&base_path.clone().join(VERSIONS_DIR), None)
723        .filter_map(|obj_meta| {
724            futures::future::ready(
725                obj_meta
726                    .map(|m| ManifestLocation::try_from(m).ok())
727                    .transpose(),
728            )
729        })
730        .boxed()
731}
732
733/// Convert object metadata to ManifestLocation for detached manifests.
734fn detached_manifest_location_from_meta(
735    meta: object_store::ObjectMeta,
736) -> Option<ManifestLocation> {
737    let filename = meta.location.filename()?;
738    let version = ManifestNamingScheme::parse_detached_version(filename)?;
739    Some(ManifestLocation {
740        version,
741        path: meta.location,
742        size: Some(meta.size),
743        naming_scheme: ManifestNamingScheme::V2,
744        e_tag: meta.e_tag,
745    })
746}
747
748/// List all detached manifest files in the versions directory.
749pub fn list_detached_manifests<'a>(
750    base_path: &Path,
751    object_store: &'a dyn OSObjectStore,
752) -> impl Stream<Item = Result<ManifestLocation>> + 'a {
753    object_store
754        .read_dir_all(&base_path.clone().join(VERSIONS_DIR), None)
755        .filter_map(|obj_meta| {
756            futures::future::ready(
757                obj_meta
758                    .map(detached_manifest_location_from_meta)
759                    .transpose(),
760            )
761        })
762        .boxed()
763}
764
765fn make_staging_manifest_path(base: &Path) -> Result<Path> {
766    let id = uuid::Uuid::new_v4().to_string();
767    Path::parse(format!("{base}-{id}")).map_err(|e| Error::io_source(Box::new(e)))
768}
769
770#[cfg(feature = "dynamodb")]
771const DDB_URL_QUERY_KEY: &str = "ddbTableName";
772
773/// Handle commits that prevent conflicting writes.
774///
775/// Commit implementations ensure that if there are multiple concurrent writers
776/// attempting to write the next version of a table, only one will win. In order
777/// to work, all writers must use the same commit handler type.
778/// This trait is also responsible for resolving where the manifests live.
779///
780// TODO: pub(crate)
781#[async_trait::async_trait]
782#[allow(clippy::too_many_arguments)]
783pub trait CommitHandler: Debug + Send + Sync {
784    async fn resolve_latest_location(
785        &self,
786        base_path: &Path,
787        object_store: &ObjectStore,
788    ) -> Result<ManifestLocation> {
789        Ok(current_manifest_path(object_store, base_path).await?)
790    }
791
792    async fn resolve_version_location(
793        &self,
794        base_path: &Path,
795        version: u64,
796        object_store: &dyn OSObjectStore,
797    ) -> Result<ManifestLocation> {
798        default_resolve_version(base_path, version, object_store).await
799    }
800
801    /// List detached manifest locations.
802    ///
803    /// Returns a stream of detached manifest locations in arbitrary order.
804    fn list_detached_manifest_locations<'a>(
805        &self,
806        base_path: &Path,
807        object_store: &'a ObjectStore,
808    ) -> BoxStream<'a, Result<ManifestLocation>> {
809        list_detached_manifests(base_path, &object_store.inner).boxed()
810    }
811
812    /// If `sorted_descending` is `true`, the stream will yield manifests in descending
813    /// order of version. When the object store has a lexicographically
814    /// ordered list and the naming scheme is V2, this will use an optimized
815    /// list operation. Otherwise, it will list all manifests and sort them
816    /// in memory. When `sorted_descending` is `false`, the stream will yield manifests
817    /// in arbitrary order.
818    fn list_manifest_locations<'a>(
819        &self,
820        base_path: &Path,
821        object_store: &'a ObjectStore,
822        sorted_descending: bool,
823    ) -> BoxStream<'a, Result<ManifestLocation>> {
824        let underlying_stream = list_manifests(base_path, &object_store.inner);
825
826        if !sorted_descending {
827            return underlying_stream.boxed();
828        }
829
830        async fn sort_stream(
831            input_stream: impl futures::Stream<Item = Result<ManifestLocation>> + Unpin,
832        ) -> Result<impl Stream<Item = Result<ManifestLocation>> + Unpin> {
833            let mut locations = input_stream.try_collect::<Vec<_>>().await?;
834            locations.sort_by_key(|m| std::cmp::Reverse(m.version));
835            Ok(futures::stream::iter(locations.into_iter().map(Ok)))
836        }
837
838        // If the object store supports lexicographically ordered lists and
839        // the naming scheme is V2, we can use an optimized list operation.
840        if object_store.list_is_lexically_ordered {
841            // We don't know the naming scheme until we see the first manifest.
842            let mut peekable = underlying_stream.peekable();
843
844            futures::stream::once(async move {
845                let naming_scheme = match Pin::new(&mut peekable).peek().await {
846                    Some(Ok(m)) => m.naming_scheme,
847                    // If we get an error or no manifests are found, we default
848                    // to V2 naming scheme, since it doesn't matter.
849                    Some(Err(_)) => ManifestNamingScheme::V2,
850                    None => ManifestNamingScheme::V2,
851                };
852
853                if naming_scheme == ManifestNamingScheme::V2 {
854                    // If the first manifest is V2, we can use the optimized list operation.
855                    Ok(Either::Left(peekable))
856                } else {
857                    sort_stream(peekable).await.map(Either::Right)
858                }
859            })
860            .try_flatten()
861            .boxed()
862        } else {
863            // If the object store does not support lexicographically ordered lists,
864            // we need to sort the manifests in memory. Systems where this isn't
865            // supported (local fs, S3 express) are typically fast enough
866            // that this is not a problem.
867            futures::stream::once(sort_stream(underlying_stream))
868                .try_flatten()
869                .boxed()
870        }
871    }
872
873    /// List manifest locations with version `> since_version`, in descending
874    /// order of version.
875    ///
876    /// On lexically-ordered stores this is the standard listing with early
877    /// termination. On non-lexically-ordered stores (e.g. S3 Express) it uses
878    /// the version hint to avoid an O(n) listing, falling back to a full
879    /// listing if the hint is missing or stale.
880    fn list_manifest_locations_since<'a>(
881        &self,
882        base_path: &Path,
883        object_store: &'a ObjectStore,
884        since_version: u64,
885    ) -> BoxStream<'a, Result<ManifestLocation>> {
886        if !uses_version_hint(object_store) {
887            return self
888                .list_manifest_locations(base_path, object_store, true)
889                .try_take_while(move |loc| future::ready(Ok(loc.version > since_version)))
890                .boxed();
891        }
892
893        let base_path = base_path.clone();
894        futures::stream::once(async move {
895            let locations = match list_manifests_since_version_with_hint(
896                object_store,
897                &base_path,
898                since_version,
899            )
900            .await
901            {
902                Some(locations) => locations,
903                None => {
904                    let mut locations = list_manifests(&base_path, &object_store.inner)
905                        .try_collect::<Vec<_>>()
906                        .await?;
907                    locations.retain(|loc| loc.version > since_version);
908                    locations.sort_by_key(|loc| std::cmp::Reverse(loc.version));
909                    locations
910                }
911            };
912            Ok::<_, Error>(futures::stream::iter(locations.into_iter().map(Ok)))
913        })
914        .try_flatten()
915        .boxed()
916    }
917
918    /// Commit a manifest.
919    ///
920    /// This function should return an [CommitError::CommitConflict] if another
921    /// transaction has already been committed to the path.
922    async fn commit(
923        &self,
924        manifest: &mut Manifest,
925        indices: Option<Vec<IndexMetadata>>,
926        base_path: &Path,
927        object_store: &ObjectStore,
928        manifest_writer: ManifestWriter,
929        naming_scheme: ManifestNamingScheme,
930        transaction: Option<Transaction>,
931    ) -> std::result::Result<ManifestLocation, CommitError>;
932
933    /// Delete the recorded manifest information for a dataset at the base_path
934    async fn delete(&self, _base_path: &Path) -> Result<()> {
935        Ok(())
936    }
937}
938
939async fn default_resolve_version(
940    base_path: &Path,
941    version: u64,
942    object_store: &dyn OSObjectStore,
943) -> Result<ManifestLocation> {
944    if is_detached_version(version) {
945        return Ok(ManifestLocation {
946            version,
947            // Detached versions are not supported with V1 naming scheme.  If we need
948            // to support in the future we could use a different prefix (e.g. 'x' or something)
949            naming_scheme: ManifestNamingScheme::V2,
950            // Both V1 and V2 should give the same path for detached versions
951            path: ManifestNamingScheme::V2.manifest_path(base_path, version),
952            size: None,
953            e_tag: None,
954        });
955    }
956
957    // try V2, fallback to V1.
958    let scheme = ManifestNamingScheme::V2;
959    let path = scheme.manifest_path(base_path, version);
960    match object_store.head(&path).await {
961        Ok(meta) => Ok(ManifestLocation {
962            version,
963            path,
964            size: Some(meta.size),
965            naming_scheme: scheme,
966            e_tag: meta.e_tag,
967        }),
968        Err(ObjectStoreError::NotFound { .. }) => {
969            // fallback to V1
970            let scheme = ManifestNamingScheme::V1;
971            Ok(ManifestLocation {
972                version,
973                path: scheme.manifest_path(base_path, version),
974                size: None,
975                naming_scheme: scheme,
976                e_tag: None,
977            })
978        }
979        Err(e) => Err(e.into()),
980    }
981}
982/// Adapt an object_store credentials into AWS SDK creds
983#[cfg(feature = "dynamodb")]
984#[derive(Debug)]
985struct OSObjectStoreToAwsCredAdaptor(AwsCredentialProvider);
986
987#[cfg(feature = "dynamodb")]
988impl ProvideCredentials for OSObjectStoreToAwsCredAdaptor {
989    fn provide_credentials<'a>(
990        &'a self,
991    ) -> aws_credential_types::provider::future::ProvideCredentials<'a>
992    where
993        Self: 'a,
994    {
995        aws_credential_types::provider::future::ProvideCredentials::new(async {
996            let creds = self
997                .0
998                .get_credential()
999                .await
1000                .map_err(|e| CredentialsError::provider_error(Box::new(e)))?;
1001            Ok(aws_credential_types::Credentials::new(
1002                &creds.key_id,
1003                &creds.secret_key,
1004                creds.token.clone(),
1005                Some(
1006                    SystemTime::now()
1007                        .checked_add(Duration::from_secs(
1008                            60 * 10, //  10 min
1009                        ))
1010                        .expect("overflow"),
1011                ),
1012                "",
1013            ))
1014        })
1015    }
1016}
1017
1018#[cfg(feature = "dynamodb")]
1019async fn build_dynamodb_external_store(
1020    table_name: &str,
1021    creds: AwsCredentialProvider,
1022    region: &str,
1023    endpoint: Option<String>,
1024    app_name: &str,
1025) -> Result<Arc<dyn ExternalManifestStore>> {
1026    use super::commit::dynamodb::DynamoDBExternalManifestStore;
1027    use aws_sdk_dynamodb::{
1028        Client,
1029        config::{IdentityCache, Region, retry::RetryConfig},
1030    };
1031
1032    let mut dynamodb_config = aws_sdk_dynamodb::config::Builder::new()
1033        .behavior_version_latest()
1034        .region(Some(Region::new(region.to_string())))
1035        .credentials_provider(OSObjectStoreToAwsCredAdaptor(creds))
1036        // caching should be handled by passed AwsCredentialProvider
1037        .identity_cache(IdentityCache::no_cache())
1038        // Be more resilient to transient network issues.
1039        // 5 attempts = 1 initial + 4 retries with exponential backoff.
1040        .retry_config(RetryConfig::standard().with_max_attempts(5));
1041
1042    if let Some(endpoint) = endpoint {
1043        dynamodb_config = dynamodb_config.endpoint_url(endpoint);
1044    }
1045    let client = Client::from_conf(dynamodb_config.build());
1046
1047    DynamoDBExternalManifestStore::new_external_store(client.into(), table_name, app_name).await
1048}
1049
1050pub async fn commit_handler_from_url(
1051    url_or_path: &str,
1052    // This looks unused if dynamodb feature disabled
1053    #[allow(unused_variables)] options: &Option<ObjectStoreParams>,
1054) -> Result<Arc<dyn CommitHandler>> {
1055    let local_handler: Arc<dyn CommitHandler> = if cfg!(windows) {
1056        Arc::new(RenameCommitHandler)
1057    } else {
1058        Arc::new(ConditionalPutCommitHandler)
1059    };
1060
1061    let url = match Url::parse(url_or_path) {
1062        Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
1063            // On Windows, the drive is parsed as a scheme
1064            return Ok(local_handler);
1065        }
1066        Ok(url) => url,
1067        Err(_) => {
1068            return Ok(local_handler);
1069        }
1070    };
1071
1072    match url.scheme() {
1073        "file" | "file-object-store" => Ok(local_handler),
1074        "s3" | "gs" | "az" | "abfss" | "memory" | "oss" | "cos" | "shared-memory" => {
1075            Ok(Arc::new(ConditionalPutCommitHandler))
1076        }
1077        #[cfg(not(feature = "dynamodb"))]
1078        "s3+ddb" => Err(Error::invalid_input_source(
1079            "`s3+ddb://` scheme requires `dynamodb` feature to be enabled".into(),
1080        )),
1081        #[cfg(feature = "dynamodb")]
1082        "s3+ddb" => {
1083            if url.query_pairs().count() != 1 {
1084                return Err(Error::invalid_input_source(
1085                    "`s3+ddb://` scheme and expects exactly one query `ddbTableName`".into(),
1086                ));
1087            }
1088            let table_name = match url.query_pairs().next() {
1089                Some((Cow::Borrowed(key), Cow::Borrowed(table_name)))
1090                    if key == DDB_URL_QUERY_KEY =>
1091                {
1092                    if table_name.is_empty() {
1093                        return Err(Error::invalid_input_source(
1094                            "`s3+ddb://` scheme requires non empty dynamodb table name".into(),
1095                        ));
1096                    }
1097                    table_name
1098                }
1099                _ => {
1100                    return Err(Error::invalid_input_source(
1101                        "`s3+ddb://` scheme and expects exactly one query `ddbTableName`".into(),
1102                    ));
1103                }
1104            };
1105            let options = options.clone().unwrap_or_default();
1106            let storage_options_raw =
1107                StorageOptions(options.storage_options().cloned().unwrap_or_default());
1108            let dynamo_endpoint = get_dynamodb_endpoint(&storage_options_raw);
1109            let storage_options = storage_options_raw.as_s3_options();
1110
1111            let region = storage_options.get(&AmazonS3ConfigKey::Region).cloned();
1112
1113            // Get accessor from the options
1114            let accessor = options.get_accessor();
1115
1116            let (aws_creds, region) = build_aws_credential(
1117                options.s3_credentials_refresh_offset,
1118                options.aws_credentials.clone(),
1119                Some(&storage_options),
1120                region,
1121                accessor,
1122            )
1123            .await?;
1124
1125            Ok(Arc::new(ExternalManifestCommitHandler {
1126                external_manifest_store: build_dynamodb_external_store(
1127                    table_name,
1128                    aws_creds.clone(),
1129                    &region,
1130                    dynamo_endpoint,
1131                    "lancedb",
1132                )
1133                .await?,
1134            }))
1135        }
1136        _ => Ok(Arc::new(UnsafeCommitHandler)),
1137    }
1138}
1139
1140#[cfg(feature = "dynamodb")]
1141fn get_dynamodb_endpoint(storage_options: &StorageOptions) -> Option<String> {
1142    if let Some(endpoint) = storage_options.0.get("dynamodb_endpoint") {
1143        Some(endpoint.clone())
1144    } else {
1145        std::env::var("DYNAMODB_ENDPOINT").ok()
1146    }
1147}
1148
1149/// Errors that can occur when committing a manifest.
1150#[derive(Debug)]
1151pub enum CommitError {
1152    /// Another transaction has already been written to the path
1153    CommitConflict,
1154    /// Something else went wrong
1155    OtherError(Error),
1156}
1157
1158impl From<Error> for CommitError {
1159    fn from(e: Error) -> Self {
1160        Self::OtherError(e)
1161    }
1162}
1163
1164impl From<CommitError> for Error {
1165    fn from(e: CommitError) -> Self {
1166        match e {
1167            CommitError::CommitConflict => Self::internal("Commit conflict".to_string()),
1168            CommitError::OtherError(e) => e,
1169        }
1170    }
1171}
1172
1173/// Whether we have issued a warning about using the unsafe commit handler.
1174static WARNED_ON_UNSAFE_COMMIT: AtomicBool = AtomicBool::new(false);
1175
1176/// A naive commit implementation that does not prevent conflicting writes.
1177///
1178/// This will log a warning the first time it is used.
1179pub struct UnsafeCommitHandler;
1180
1181#[async_trait::async_trait]
1182#[allow(clippy::too_many_arguments)]
1183impl CommitHandler for UnsafeCommitHandler {
1184    async fn commit(
1185        &self,
1186        manifest: &mut Manifest,
1187        indices: Option<Vec<IndexMetadata>>,
1188        base_path: &Path,
1189        object_store: &ObjectStore,
1190        manifest_writer: ManifestWriter,
1191        naming_scheme: ManifestNamingScheme,
1192        transaction: Option<Transaction>,
1193    ) -> std::result::Result<ManifestLocation, CommitError> {
1194        // Log a one-time warning
1195        if !WARNED_ON_UNSAFE_COMMIT.load(std::sync::atomic::Ordering::Relaxed) {
1196            WARNED_ON_UNSAFE_COMMIT.store(true, std::sync::atomic::Ordering::Relaxed);
1197            log::warn!(
1198                "Using unsafe commit handler. Concurrent writes may result in data loss. \
1199                 Consider providing a commit handler that prevents conflicting writes."
1200            );
1201        }
1202
1203        let version_path = naming_scheme.manifest_path(base_path, manifest.version);
1204        let res =
1205            manifest_writer(object_store, manifest, indices, &version_path, transaction).await?;
1206
1207        write_version_hint(object_store, base_path, manifest.version).await;
1208
1209        Ok(ManifestLocation {
1210            version: manifest.version,
1211            size: Some(res.size as u64),
1212            naming_scheme,
1213            path: version_path,
1214            e_tag: res.e_tag,
1215        })
1216    }
1217}
1218
1219impl Debug for UnsafeCommitHandler {
1220    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1221        f.debug_struct("UnsafeCommitHandler").finish()
1222    }
1223}
1224
1225/// A commit implementation that uses a lock to prevent conflicting writes.
1226#[async_trait::async_trait]
1227pub trait CommitLock: Debug {
1228    type Lease: CommitLease;
1229
1230    /// Attempt to lock the table for the given version.
1231    ///
1232    /// If it is already locked by another transaction, wait until it is unlocked.
1233    /// Once it is unlocked, return [CommitError::CommitConflict] if the version
1234    /// has already been committed. Otherwise, return the lock.
1235    ///
1236    /// To prevent poisoned locks, it's recommended to set a timeout on the lock
1237    /// of at least 30 seconds.
1238    ///
1239    /// It is not required that the lock tracks the version. It is provided in
1240    /// case the locking is handled by a catalog service that needs to know the
1241    /// current version of the table.
1242    async fn lock(&self, version: u64) -> std::result::Result<Self::Lease, CommitError>;
1243}
1244
1245#[async_trait::async_trait]
1246pub trait CommitLease: Send + Sync {
1247    /// Return the lease, indicating whether the commit was successful.
1248    async fn release(&self, success: bool) -> std::result::Result<(), CommitError>;
1249}
1250
1251#[async_trait::async_trait]
1252impl<T: CommitLock + Send + Sync> CommitHandler for T {
1253    async fn commit(
1254        &self,
1255        manifest: &mut Manifest,
1256        indices: Option<Vec<IndexMetadata>>,
1257        base_path: &Path,
1258        object_store: &ObjectStore,
1259        manifest_writer: ManifestWriter,
1260        naming_scheme: ManifestNamingScheme,
1261        transaction: Option<Transaction>,
1262    ) -> std::result::Result<ManifestLocation, CommitError> {
1263        let path = naming_scheme.manifest_path(base_path, manifest.version);
1264        // NOTE: once we have the lease we cannot use ? to return errors, since
1265        // we must release the lease before returning.
1266        let lease = self.lock(manifest.version).await?;
1267
1268        // Head the location and make sure it's not already committed
1269        match object_store.inner.head(&path).await {
1270            Ok(_) => {
1271                // The path already exists, so it's already committed
1272                // Release the lock
1273                lease.release(false).await?;
1274
1275                return Err(CommitError::CommitConflict);
1276            }
1277            Err(ObjectStoreError::NotFound { .. }) => {}
1278            Err(e) => {
1279                // Something else went wrong
1280                // Release the lock
1281                lease.release(false).await?;
1282
1283                return Err(CommitError::OtherError(e.into()));
1284            }
1285        }
1286        let res = manifest_writer(object_store, manifest, indices, &path, transaction).await;
1287
1288        // Release the lock
1289        lease.release(res.is_ok()).await?;
1290
1291        let res = res?;
1292
1293        write_version_hint(object_store, base_path, manifest.version).await;
1294
1295        Ok(ManifestLocation {
1296            version: manifest.version,
1297            size: Some(res.size as u64),
1298            naming_scheme,
1299            path,
1300            e_tag: res.e_tag,
1301        })
1302    }
1303}
1304
1305#[async_trait::async_trait]
1306impl<T: CommitLock + Send + Sync> CommitHandler for Arc<T> {
1307    async fn commit(
1308        &self,
1309        manifest: &mut Manifest,
1310        indices: Option<Vec<IndexMetadata>>,
1311        base_path: &Path,
1312        object_store: &ObjectStore,
1313        manifest_writer: ManifestWriter,
1314        naming_scheme: ManifestNamingScheme,
1315        transaction: Option<Transaction>,
1316    ) -> std::result::Result<ManifestLocation, CommitError> {
1317        self.as_ref()
1318            .commit(
1319                manifest,
1320                indices,
1321                base_path,
1322                object_store,
1323                manifest_writer,
1324                naming_scheme,
1325                transaction,
1326            )
1327            .await
1328    }
1329}
1330
1331/// A commit implementation that uses a temporary path and renames the object.
1332///
1333/// This only works for object stores that support atomic rename if not exist.
1334pub struct RenameCommitHandler;
1335
1336#[async_trait::async_trait]
1337impl CommitHandler for RenameCommitHandler {
1338    async fn commit(
1339        &self,
1340        manifest: &mut Manifest,
1341        indices: Option<Vec<IndexMetadata>>,
1342        base_path: &Path,
1343        object_store: &ObjectStore,
1344        manifest_writer: ManifestWriter,
1345        naming_scheme: ManifestNamingScheme,
1346        transaction: Option<Transaction>,
1347    ) -> std::result::Result<ManifestLocation, CommitError> {
1348        // Create a temporary object, then use `rename_if_not_exists` to commit.
1349        // If failed, clean up the temporary object.
1350
1351        let path = naming_scheme.manifest_path(base_path, manifest.version);
1352        let tmp_path = make_staging_manifest_path(&path)?;
1353
1354        let res = manifest_writer(object_store, manifest, indices, &tmp_path, transaction).await?;
1355
1356        match object_store
1357            .inner
1358            .rename_if_not_exists(&tmp_path, &path)
1359            .await
1360        {
1361            Ok(_) => {
1362                // Successfully committed
1363                write_version_hint(object_store, base_path, manifest.version).await;
1364                Ok(ManifestLocation {
1365                    version: manifest.version,
1366                    path,
1367                    size: Some(res.size as u64),
1368                    naming_scheme,
1369                    e_tag: None, // Re-name can change e-tag.
1370                })
1371            }
1372            Err(ObjectStoreError::AlreadyExists { .. }) => {
1373                // Another transaction has already been committed
1374                // Attempt to clean up temporary object, but ignore errors if we can't
1375                let _ = object_store.delete(&tmp_path).await;
1376
1377                return Err(CommitError::CommitConflict);
1378            }
1379            Err(e) => {
1380                // Something else went wrong
1381                return Err(CommitError::OtherError(e.into()));
1382            }
1383        }
1384    }
1385}
1386
1387impl Debug for RenameCommitHandler {
1388    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1389        f.debug_struct("RenameCommitHandler").finish()
1390    }
1391}
1392
1393pub struct ConditionalPutCommitHandler;
1394
1395#[async_trait::async_trait]
1396impl CommitHandler for ConditionalPutCommitHandler {
1397    async fn commit(
1398        &self,
1399        manifest: &mut Manifest,
1400        indices: Option<Vec<IndexMetadata>>,
1401        base_path: &Path,
1402        object_store: &ObjectStore,
1403        manifest_writer: ManifestWriter,
1404        naming_scheme: ManifestNamingScheme,
1405        transaction: Option<Transaction>,
1406    ) -> std::result::Result<ManifestLocation, CommitError> {
1407        let path = naming_scheme.manifest_path(base_path, manifest.version);
1408
1409        let memory_store = ObjectStore::memory();
1410        let dummy_path = "dummy";
1411        manifest_writer(
1412            &memory_store,
1413            manifest,
1414            indices,
1415            &dummy_path.into(),
1416            transaction,
1417        )
1418        .await?;
1419        let dummy_data = memory_store.read_one_all(&dummy_path.into()).await?;
1420        let size = dummy_data.len() as u64;
1421        let res = object_store
1422            .inner
1423            .put_opts(
1424                &path,
1425                dummy_data.into(),
1426                PutOptions {
1427                    mode: object_store::PutMode::Create,
1428                    ..Default::default()
1429                },
1430            )
1431            .await
1432            .map_err(|err| match err {
1433                ObjectStoreError::AlreadyExists { .. } | ObjectStoreError::Precondition { .. } => {
1434                    CommitError::CommitConflict
1435                }
1436                _ => CommitError::OtherError(err.into()),
1437            })?;
1438
1439        write_version_hint(object_store, base_path, manifest.version).await;
1440
1441        Ok(ManifestLocation {
1442            version: manifest.version,
1443            path,
1444            size: Some(size),
1445            naming_scheme,
1446            e_tag: res.e_tag,
1447        })
1448    }
1449}
1450
1451impl Debug for ConditionalPutCommitHandler {
1452    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1453        f.debug_struct("ConditionalPutCommitHandler").finish()
1454    }
1455}
1456
1457#[derive(Debug, Clone)]
1458pub struct CommitConfig {
1459    pub num_retries: u32,
1460    pub skip_auto_cleanup: bool,
1461    // TODO: add isolation_level
1462}
1463
1464impl Default for CommitConfig {
1465    fn default() -> Self {
1466        Self {
1467            num_retries: 20,
1468            skip_auto_cleanup: false,
1469        }
1470    }
1471}
1472
1473#[cfg(test)]
1474mod tests {
1475    use lance_core::utils::tempfile::TempObjDir;
1476
1477    use super::*;
1478
1479    #[test]
1480    fn test_manifest_naming_scheme() {
1481        let v1 = ManifestNamingScheme::V1;
1482        let v2 = ManifestNamingScheme::V2;
1483
1484        assert_eq!(
1485            v1.manifest_path(&Path::from("base"), 0),
1486            Path::from("base/_versions/0.manifest")
1487        );
1488        assert_eq!(
1489            v1.manifest_path(&Path::from("base"), 42),
1490            Path::from("base/_versions/42.manifest")
1491        );
1492
1493        assert_eq!(
1494            v2.manifest_path(&Path::from("base"), 0),
1495            Path::from("base/_versions/18446744073709551615.manifest")
1496        );
1497        assert_eq!(
1498            v2.manifest_path(&Path::from("base"), 42),
1499            Path::from("base/_versions/18446744073709551573.manifest")
1500        );
1501
1502        assert_eq!(v1.parse_version("0.manifest"), Some(0));
1503        assert_eq!(v1.parse_version("42.manifest"), Some(42));
1504        assert_eq!(
1505            v1.parse_version("42.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
1506            Some(42)
1507        );
1508
1509        assert_eq!(v2.parse_version("18446744073709551615.manifest"), Some(0));
1510        assert_eq!(v2.parse_version("18446744073709551573.manifest"), Some(42));
1511        assert_eq!(
1512            v2.parse_version("18446744073709551573.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
1513            Some(42)
1514        );
1515
1516        assert_eq!(ManifestNamingScheme::detect_scheme("0.manifest"), Some(v1));
1517        assert_eq!(
1518            ManifestNamingScheme::detect_scheme("18446744073709551615.manifest"),
1519            Some(v2)
1520        );
1521        assert_eq!(ManifestNamingScheme::detect_scheme("something else"), None);
1522    }
1523
1524    #[tokio::test]
1525    async fn test_manifest_naming_migration() {
1526        let object_store = ObjectStore::memory();
1527        let base = Path::from("base");
1528        let versions_dir = base.clone().join(VERSIONS_DIR);
1529
1530        // Write two v1 files and one v1
1531        let original_files = vec![
1532            versions_dir.clone().join("irrelevant"),
1533            ManifestNamingScheme::V1.manifest_path(&base, 0),
1534            ManifestNamingScheme::V2.manifest_path(&base, 1),
1535        ];
1536        for path in original_files {
1537            object_store.put(&path, b"".as_slice()).await.unwrap();
1538        }
1539
1540        migrate_scheme_to_v2(&object_store, &base).await.unwrap();
1541
1542        let expected_files = vec![
1543            ManifestNamingScheme::V2.manifest_path(&base, 1),
1544            ManifestNamingScheme::V2.manifest_path(&base, 0),
1545            versions_dir.clone().join("irrelevant"),
1546        ];
1547        let actual_files = object_store
1548            .inner
1549            .list(Some(&versions_dir))
1550            .map_ok(|res| res.location)
1551            .try_collect::<Vec<_>>()
1552            .await
1553            .unwrap();
1554        assert_eq!(actual_files, expected_files);
1555    }
1556
1557    #[tokio::test]
1558    #[rstest::rstest]
1559    async fn test_list_manifests_sorted(
1560        #[values(true, false)] lexical_list_store: bool,
1561        #[values(ManifestNamingScheme::V1, ManifestNamingScheme::V2)]
1562        naming_scheme: ManifestNamingScheme,
1563    ) {
1564        let tempdir;
1565        let (object_store, base) = if lexical_list_store {
1566            (Box::new(ObjectStore::memory()), Path::from("base"))
1567        } else {
1568            tempdir = TempObjDir::default();
1569            let path = tempdir.clone().join("base");
1570            let store = Box::new(ObjectStore::local());
1571            assert!(!store.list_is_lexically_ordered);
1572            (store, path)
1573        };
1574
1575        // Write 12 manifest files, latest first
1576        let mut expected_paths = Vec::new();
1577        for i in (0..12).rev() {
1578            let path = naming_scheme.manifest_path(&base, i);
1579            object_store.put(&path, b"".as_slice()).await.unwrap();
1580            expected_paths.push(path);
1581        }
1582
1583        let actual_versions = ConditionalPutCommitHandler
1584            .list_manifest_locations(&base, &object_store, true)
1585            .map_ok(|location| location.path)
1586            .try_collect::<Vec<_>>()
1587            .await
1588            .unwrap();
1589
1590        assert_eq!(actual_versions, expected_paths);
1591    }
1592
1593    #[tokio::test]
1594    #[rstest::rstest]
1595    async fn test_current_manifest_path(
1596        #[values(true, false)] lexical_list_store: bool,
1597        #[values(ManifestNamingScheme::V1, ManifestNamingScheme::V2)]
1598        naming_scheme: ManifestNamingScheme,
1599    ) {
1600        // Use memory store for both cases to avoid local FS special codepath.
1601        // Modify list_is_lexically_ordered to simulate different object stores.
1602        let mut object_store = ObjectStore::memory();
1603        object_store.list_is_lexically_ordered = lexical_list_store;
1604        let object_store = Box::new(object_store);
1605        let base = Path::from("base");
1606
1607        // Write 12 manifest files in non-sequential order
1608        for version in [5, 2, 11, 0, 8, 3, 10, 1, 7, 4, 9, 6] {
1609            let path = naming_scheme.manifest_path(&base, version);
1610            object_store.put(&path, b"".as_slice()).await.unwrap();
1611        }
1612
1613        let location = current_manifest_path(&object_store, &base).await.unwrap();
1614
1615        assert_eq!(location.version, 11);
1616        assert_eq!(location.naming_scheme, naming_scheme);
1617        assert_eq!(location.path, naming_scheme.manifest_path(&base, 11));
1618    }
1619
1620    /// A memory store that reports `list_is_lexically_ordered == false`, like
1621    /// S3 Express, so the version-hint paths are exercised.
1622    fn non_lexical_memory_store() -> Box<ObjectStore> {
1623        let mut object_store = ObjectStore::memory();
1624        object_store.list_is_lexically_ordered = false;
1625        Box::new(object_store)
1626    }
1627
1628    #[tokio::test]
1629    async fn test_write_version_hint() {
1630        let base = Path::from("base");
1631
1632        // No hint is written on lexically-ordered stores (it would not be read).
1633        let lexical = ObjectStore::memory();
1634        write_version_hint(&lexical, &base, 42).await;
1635        assert_eq!(read_version_from_hint(&lexical, &base).await, None);
1636
1637        let object_store = non_lexical_memory_store();
1638        write_version_hint(&object_store, &base, 42).await;
1639        assert_eq!(read_version_from_hint(&object_store, &base).await, Some(42));
1640
1641        // A later commit overwrites the hint.
1642        write_version_hint(&object_store, &base, 100).await;
1643        assert_eq!(
1644            read_version_from_hint(&object_store, &base).await,
1645            Some(100)
1646        );
1647
1648        // Detached versions are never written to the hint.
1649        write_version_hint(
1650            &object_store,
1651            &base,
1652            crate::format::DETACHED_VERSION_MASK | 7,
1653        )
1654        .await;
1655        assert_eq!(
1656            read_version_from_hint(&object_store, &base).await,
1657            Some(100)
1658        );
1659
1660        // A corrupt / non-JSON hint file is treated as missing.
1661        let hint_path = version_hint_path(&base);
1662        object_store
1663            .put(&hint_path, b"not json".as_slice())
1664            .await
1665            .unwrap();
1666        assert_eq!(read_version_from_hint(&object_store, &base).await, None);
1667    }
1668
1669    #[tokio::test]
1670    #[rstest::rstest]
1671    async fn test_read_version_hint_and_probe(
1672        #[values(ManifestNamingScheme::V1, ManifestNamingScheme::V2)]
1673        naming_scheme: ManifestNamingScheme,
1674    ) {
1675        let object_store = non_lexical_memory_store();
1676        let base = Path::from("base");
1677
1678        // No hint file yet.
1679        assert!(
1680            read_version_hint_and_probe(&object_store, &base)
1681                .await
1682                .is_none()
1683        );
1684
1685        for version in 1..=5 {
1686            object_store
1687                .put(&naming_scheme.manifest_path(&base, version), b"".as_slice())
1688                .await
1689                .unwrap();
1690        }
1691
1692        // Stale hint: should probe forward and find version 5.
1693        write_version_hint(&object_store, &base, 3).await;
1694        let location = read_version_hint_and_probe(&object_store, &base)
1695            .await
1696            .unwrap();
1697        assert_eq!(location.version, 5);
1698        assert_eq!(location.naming_scheme, naming_scheme);
1699
1700        // Up-to-date hint: returns version 5 directly.
1701        write_version_hint(&object_store, &base, 5).await;
1702        let location = read_version_hint_and_probe(&object_store, &base)
1703            .await
1704            .unwrap();
1705        assert_eq!(location.version, 5);
1706
1707        // Hint points past the latest version: not usable.
1708        write_version_hint(&object_store, &base, 10).await;
1709        assert!(
1710            read_version_hint_and_probe(&object_store, &base)
1711                .await
1712                .is_none()
1713        );
1714    }
1715
1716    #[tokio::test]
1717    async fn test_list_manifests_since_version_with_hint() {
1718        let object_store = non_lexical_memory_store();
1719        let base = Path::from("base");
1720        let scheme = ManifestNamingScheme::V2;
1721
1722        for version in 1..=10 {
1723            object_store
1724                .put(&scheme.manifest_path(&base, version), b"".as_slice())
1725                .await
1726                .unwrap();
1727        }
1728
1729        // No hint yet -> not usable, caller must fall back.
1730        assert!(
1731            list_manifests_since_version_with_hint(&object_store, &base, 7)
1732                .await
1733                .is_none()
1734        );
1735
1736        // Hint exactly at the read version -> fast path, nothing new.
1737        write_version_hint(&object_store, &base, 10).await;
1738        assert!(matches!(
1739            list_manifests_since_version_with_hint(&object_store, &base, 10).await,
1740            Some(v) if v.is_empty()
1741        ));
1742
1743        // Hint ahead of the read version, with a gap to fill (8, 9) plus probing
1744        // from the hint (10). Results are descending by version.
1745        let locations = list_manifests_since_version_with_hint(&object_store, &base, 7)
1746            .await
1747            .unwrap();
1748        assert_eq!(
1749            locations.iter().map(|l| l.version).collect::<Vec<_>>(),
1750            vec![10, 9, 8]
1751        );
1752
1753        // Slightly stale hint (points at 8) still probes up to the true latest.
1754        write_version_hint(&object_store, &base, 8).await;
1755        let locations = list_manifests_since_version_with_hint(&object_store, &base, 7)
1756            .await
1757            .unwrap();
1758        assert_eq!(
1759            locations.iter().map(|l| l.version).collect::<Vec<_>>(),
1760            vec![10, 9, 8]
1761        );
1762
1763        // Hint points past the latest -> not usable, caller falls back.
1764        write_version_hint(&object_store, &base, 20).await;
1765        assert!(
1766            list_manifests_since_version_with_hint(&object_store, &base, 7)
1767                .await
1768                .is_none()
1769        );
1770    }
1771
1772    #[tokio::test]
1773    async fn test_current_manifest_path_with_hint_non_lexical() {
1774        // Simulate S3 Express (non-lexically ordered list) with many versions.
1775        let object_store = non_lexical_memory_store();
1776        let base = Path::from("base");
1777        let naming_scheme = ManifestNamingScheme::V2;
1778
1779        for version in 1..=100 {
1780            object_store
1781                .put(&naming_scheme.manifest_path(&base, version), b"".as_slice())
1782                .await
1783                .unwrap();
1784        }
1785
1786        // Slightly stale hint: probing from 98 still resolves the true latest.
1787        write_version_hint(&object_store, &base, 98).await;
1788        let location = current_manifest_path(&object_store, &base).await.unwrap();
1789        assert_eq!(location.version, 100);
1790    }
1791
1792    #[tokio::test]
1793    async fn test_current_manifest_path_with_stale_hint_falls_back_to_listing() {
1794        let object_store = non_lexical_memory_store();
1795        let base = Path::from("base");
1796        let naming_scheme = ManifestNamingScheme::V2;
1797
1798        // Only version 5 exists, but the hint claims version 10.
1799        object_store
1800            .put(&naming_scheme.manifest_path(&base, 5), b"".as_slice())
1801            .await
1802            .unwrap();
1803        write_version_hint(&object_store, &base, 10).await;
1804
1805        // The stale hint is ignored; listing finds version 5.
1806        let location = current_manifest_path(&object_store, &base).await.unwrap();
1807        assert_eq!(location.version, 5);
1808    }
1809
1810    #[test]
1811    fn test_parse_detached_version() {
1812        // Valid detached version filenames
1813        assert_eq!(
1814            ManifestNamingScheme::parse_detached_version("d12345.manifest"),
1815            Some(12345)
1816        );
1817        assert_eq!(
1818            ManifestNamingScheme::parse_detached_version("d9223372036854775808.manifest"),
1819            Some(9223372036854775808)
1820        );
1821
1822        // Invalid: not starting with 'd' prefix
1823        assert_eq!(
1824            ManifestNamingScheme::parse_detached_version("12345.manifest"),
1825            None
1826        );
1827
1828        // Invalid: regular V2 manifest
1829        assert_eq!(
1830            ManifestNamingScheme::parse_detached_version("18446744073709551615.manifest"),
1831            None
1832        );
1833
1834        // Invalid: no extension
1835        assert_eq!(ManifestNamingScheme::parse_detached_version("d12345"), None);
1836    }
1837
1838    #[tokio::test]
1839    async fn test_list_detached_manifests() {
1840        use crate::format::DETACHED_VERSION_MASK;
1841        use futures::TryStreamExt;
1842
1843        let object_store = ObjectStore::memory();
1844        let base = Path::from("base");
1845        let versions_dir = base.clone().join(VERSIONS_DIR);
1846
1847        // Create some regular manifests
1848        for version in [1, 2, 3] {
1849            let path = ManifestNamingScheme::V2.manifest_path(&base, version);
1850            object_store.put(&path, b"".as_slice()).await.unwrap();
1851        }
1852
1853        // Create some detached manifests
1854        let detached_versions: Vec<u64> = vec![
1855            100 | DETACHED_VERSION_MASK,
1856            200 | DETACHED_VERSION_MASK,
1857            300 | DETACHED_VERSION_MASK,
1858        ];
1859        for version in &detached_versions {
1860            let path = versions_dir.clone().join(format!("d{}.manifest", version));
1861            object_store.put(&path, b"".as_slice()).await.unwrap();
1862        }
1863
1864        // List detached manifests
1865        let detached_locations: Vec<ManifestLocation> =
1866            list_detached_manifests(&base, &object_store.inner)
1867                .try_collect()
1868                .await
1869                .unwrap();
1870
1871        assert_eq!(detached_locations.len(), 3);
1872        for loc in &detached_locations {
1873            assert_eq!(loc.naming_scheme, ManifestNamingScheme::V2);
1874        }
1875
1876        let mut found_versions: Vec<u64> = detached_locations.iter().map(|l| l.version).collect();
1877        found_versions.sort();
1878        let mut expected_versions = detached_versions.clone();
1879        expected_versions.sort();
1880        assert_eq!(found_versions, expected_versions);
1881    }
1882
1883    #[tokio::test]
1884    async fn test_commit_handler_from_url_memory_schemes() {
1885        // Both `memory://` and `shared-memory://` must route to
1886        // ConditionalPutCommitHandler — otherwise concurrent writers fall
1887        // through to UnsafeCommitHandler and silently clobber each other's
1888        // manifests.
1889        for url in ["memory://bucket-a/ds", "shared-memory://bucket-a/ds"] {
1890            let handler = commit_handler_from_url(url, &None).await.unwrap();
1891            assert_eq!(
1892                format!("{:?}", handler),
1893                "ConditionalPutCommitHandler",
1894                "{url} should route to ConditionalPutCommitHandler",
1895            );
1896        }
1897    }
1898}