Skip to main content

lance_table/io/
commit.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Trait for commit implementations.
5//!
6//! In Lance, a transaction is committed by writing the next manifest file.
7//! However, care should be taken to ensure that the manifest file is written
8//! only once, even if there are concurrent writers. Different stores have
9//! different abilities to handle concurrent writes, so a trait is provided
10//! to allow for different implementations.
11//!
12//! The trait [CommitHandler] can be implemented to provide different commit
13//! strategies. The default implementation for most object stores is
14//! [RenameCommitHandler], which writes the manifest to a temporary path, then
15//! renames the temporary path to the final path if no object already exists
16//! at the final path. This is an atomic operation in most object stores, but
17//! not in AWS S3. So for AWS S3, the default commit handler is
18//! [UnsafeCommitHandler], which writes the manifest to the final path without
19//! any checks.
20//!
21//! When providing your own commit handler, most often you are implementing in
22//! terms of a lock. The trait [CommitLock] can be implemented as a simpler
23//! alternative to [CommitHandler].
24
25use std::io;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::sync::atomic::AtomicBool;
29use std::{fmt::Debug, fs::DirEntry};
30
31use super::manifest::write_manifest;
32use futures::Stream;
33use futures::future::Either;
34use futures::{
35    StreamExt, TryStreamExt,
36    future::{self, BoxFuture},
37    stream::BoxStream,
38};
39use lance_file::format::{MAGIC, MAJOR_VERSION, MINOR_VERSION};
40use lance_io::object_writer::{ObjectWriter, WriteResult, get_etag};
41use log::warn;
42use object_store::ObjectStoreExt as OSObjectStoreExt;
43use object_store::PutOptions;
44use object_store::{Error as ObjectStoreError, ObjectStore as OSObjectStore, path::Path};
45use tracing::info;
46use url::Url;
47
48#[cfg(feature = "dynamodb")]
49pub mod dynamodb;
50pub mod external_manifest;
51
52use lance_core::{Error, Result};
53use lance_io::object_store::{ObjectStore, ObjectStoreExt, ObjectStoreParams};
54use lance_io::traits::{WriteExt, Writer};
55
56use crate::format::{IndexMetadata, Manifest, Transaction, is_detached_version};
57use lance_core::utils::tracing::{AUDIT_MODE_CREATE, AUDIT_TYPE_MANIFEST, TRACE_FILE_AUDIT};
58#[cfg(feature = "dynamodb")]
59use {
60    self::external_manifest::{ExternalManifestCommitHandler, ExternalManifestStore},
61    aws_credential_types::provider::ProvideCredentials,
62    aws_credential_types::provider::error::CredentialsError,
63    lance_io::object_store::{StorageOptions, providers::aws::build_aws_credential},
64    object_store::aws::AmazonS3ConfigKey,
65    object_store::aws::AwsCredentialProvider,
66    std::borrow::Cow,
67    std::time::{Duration, SystemTime},
68};
69
70pub const VERSIONS_DIR: &str = "_versions";
71const MANIFEST_EXTENSION: &str = "manifest";
72const DETACHED_VERSION_PREFIX: &str = "d";
73/// File name for the JSON version hint file, stored under `_versions/`.
74///
75/// The file contains `{"version":N}` where `N` is the latest committed version
76/// at the time of writing. It enables O(1)/O(k) latest-version lookup via HEAD
77/// requests on object stores where listing is not lexicographically ordered
78/// (e.g. S3 Express, local filesystem) instead of an O(n) listing.
79const VERSION_HINT_FILE: &str = "latest_version_hint.json";
80
81/// How manifest files should be named.
82#[derive(Clone, Copy, Debug, PartialEq, Eq)]
83pub enum ManifestNamingScheme {
84    /// `_versions/{version}.manifest`
85    V1,
86    /// `_manifests/{u64::MAX - version}.manifest`
87    ///
88    /// Zero-padded and reversed for O(1) lookup of latest version on object stores.
89    V2,
90}
91
92impl ManifestNamingScheme {
93    pub fn manifest_path(&self, base: &Path, version: u64) -> Path {
94        if is_detached_version(version) {
95            // Detached versions should never show up first in a list operation which
96            // means it needs to come lexicographically after all attached manifest
97            // files and so we add the prefix `d`.  There is no need to invert the
98            // version number since detached versions are not part of the version
99            base.clone().join(VERSIONS_DIR).join(format!(
100                "{DETACHED_VERSION_PREFIX}{version}.{MANIFEST_EXTENSION}"
101            ))
102        } else {
103            let directory = base.clone().join(VERSIONS_DIR);
104            match self {
105                Self::V1 => directory.join(format!("{version}.{MANIFEST_EXTENSION}")),
106                Self::V2 => {
107                    let inverted_version = u64::MAX - version;
108                    directory.join(format!("{inverted_version:020}.{MANIFEST_EXTENSION}"))
109                }
110            }
111        }
112    }
113
114    pub fn parse_version(&self, filename: &str) -> Option<u64> {
115        let file_number = filename
116            .split_once('.')
117            // Detached versions will fail the `parse` step, which is ok.
118            .and_then(|(version_str, _)| version_str.parse::<u64>().ok());
119        match self {
120            Self::V1 => file_number,
121            Self::V2 => file_number.map(|v| u64::MAX - v),
122        }
123    }
124
125    /// Parse a detached version from a filename like `d123456.manifest`.
126    ///
127    /// Returns the full version number with the detached mask bit set.
128    pub fn parse_detached_version(filename: &str) -> Option<u64> {
129        if !filename.starts_with(DETACHED_VERSION_PREFIX) {
130            return None;
131        }
132        let without_prefix = &filename[DETACHED_VERSION_PREFIX.len()..];
133        without_prefix
134            .split_once('.')
135            .and_then(|(version_str, _)| version_str.parse::<u64>().ok())
136    }
137
138    pub fn detect_scheme(filename: &str) -> Option<Self> {
139        if filename.starts_with(DETACHED_VERSION_PREFIX) {
140            // Currently, detached versions must imply V2
141            return Some(Self::V2);
142        }
143        if filename.ends_with(MANIFEST_EXTENSION) {
144            const V2_LEN: usize = 20 + 1 + MANIFEST_EXTENSION.len();
145            if filename.len() == V2_LEN {
146                Some(Self::V2)
147            } else {
148                Some(Self::V1)
149            }
150        } else {
151            None
152        }
153    }
154
155    pub fn detect_scheme_staging(filename: &str) -> Self {
156        // We shouldn't have to worry about detached versions here since there is no
157        // such thing as "detached" and "staged" at the same time.
158        if filename.chars().nth(20) == Some('.') {
159            Self::V2
160        } else {
161            Self::V1
162        }
163    }
164}
165
166/// Migrate all V1 manifests to V2 naming scheme.
167///
168/// This function will rename all V1 manifests to V2 naming scheme.
169///
170/// This function is idempotent, and can be run multiple times without
171/// changing the state of the object store.
172///
173/// However, it should not be run while other concurrent operations are happening.
174/// And it should also run until completion before resuming other operations.
175pub async fn migrate_scheme_to_v2(object_store: &ObjectStore, dataset_base: &Path) -> Result<()> {
176    object_store
177        .inner
178        .list(Some(&dataset_base.clone().join(VERSIONS_DIR)))
179        .try_filter(|res| {
180            let res = if let Some(filename) = res.location.filename() {
181                ManifestNamingScheme::detect_scheme(filename) == Some(ManifestNamingScheme::V1)
182            } else {
183                false
184            };
185            future::ready(res)
186        })
187        .try_for_each_concurrent(object_store.io_parallelism(), |meta| async move {
188            let filename = meta.location.filename().unwrap();
189            let version = ManifestNamingScheme::V1.parse_version(filename).unwrap();
190            let path = ManifestNamingScheme::V2.manifest_path(dataset_base, version);
191            object_store.inner.rename(&meta.location, &path).await?;
192            Ok(())
193        })
194        .await?;
195
196    Ok(())
197}
198
199/// Function that writes the manifest to the object store.
200///
201/// Returns the size of the written manifest.
202pub type ManifestWriter = for<'a> fn(
203    object_store: &'a ObjectStore,
204    manifest: &'a mut Manifest,
205    indices: Option<Vec<IndexMetadata>>,
206    path: &'a Path,
207    transaction: Option<Transaction>,
208) -> BoxFuture<'a, Result<WriteResult>>;
209
210/// Canonical manifest writer; its function item type exactly matches `ManifestWriter`.
211/// Rationale: keep a crate-local writer implementation so call sites can pass this function
212/// directly without non-primitive casts or lifetime coercions.
213pub fn write_manifest_file_to_path<'a>(
214    object_store: &'a ObjectStore,
215    manifest: &'a mut Manifest,
216    indices: Option<Vec<IndexMetadata>>,
217    path: &'a Path,
218    transaction: Option<Transaction>,
219) -> BoxFuture<'a, Result<WriteResult>> {
220    Box::pin(async move {
221        let mut object_writer = ObjectWriter::new(object_store, path).await?;
222        let pos = write_manifest(&mut object_writer, manifest, indices, transaction).await?;
223        object_writer
224            .write_magics(pos, MAJOR_VERSION, MINOR_VERSION, MAGIC)
225            .await?;
226        let res = Writer::shutdown(&mut object_writer).await?;
227        info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_MANIFEST, path = path.to_string());
228        Ok(res)
229    })
230}
231
232#[derive(Debug, Clone)]
233pub struct ManifestLocation {
234    /// The version the manifest corresponds to.
235    pub version: u64,
236    /// Path of the manifest file, relative to the table root.
237    pub path: Path,
238    /// Size, in bytes, of the manifest file. If it is not known, this field should be `None`.
239    pub size: Option<u64>,
240    /// Naming scheme of the manifest file.
241    pub naming_scheme: ManifestNamingScheme,
242    /// Optional e-tag, used for integrity checks. Manifests should be immutable, so
243    /// if we detect a change in the e-tag, it means the manifest was tampered with.
244    /// This might happen if the dataset was deleted and then re-created.
245    pub e_tag: Option<String>,
246}
247
248impl TryFrom<object_store::ObjectMeta> for ManifestLocation {
249    type Error = Error;
250
251    fn try_from(meta: object_store::ObjectMeta) -> Result<Self> {
252        let filename = meta.location.filename().ok_or_else(|| {
253            Error::internal("ObjectMeta location does not have a filename".to_string())
254        })?;
255        let scheme = ManifestNamingScheme::detect_scheme(filename)
256            .ok_or_else(|| Error::internal(format!("Invalid manifest filename: '{}'", filename)))?;
257        let version = scheme
258            .parse_version(filename)
259            .ok_or_else(|| Error::internal(format!("Invalid manifest filename: '{}'", filename)))?;
260        Ok(Self {
261            version,
262            path: meta.location,
263            size: Some(meta.size),
264            naming_scheme: scheme,
265            e_tag: meta.e_tag,
266        })
267    }
268}
269
270/// Get the latest manifest path.
271///
272/// - Local filesystem: a single directory read.
273/// - Stores where listing is not lexicographically ordered (e.g. S3 Express):
274///   the version hint (read the hint file, then probe higher versions with
275///   HEADs), falling back to a listing if the hint is missing or stale. A full
276///   listing on these stores is O(n) in the number of versions.
277/// - Lexicographically ordered stores (e.g. S3 Standard, GCS): the listing
278///   already resolves the latest version in roughly one request.
279async fn current_manifest_path(
280    object_store: &ObjectStore,
281    base: &Path,
282) -> Result<ManifestLocation> {
283    if object_store.is_local() {
284        if let Ok(Some(location)) = current_manifest_local(base) {
285            return Ok(location);
286        }
287    } else if uses_version_hint(object_store)
288        && let Some(location) = read_version_hint_and_probe(object_store, base).await
289    {
290        return Ok(location);
291    }
292
293    resolve_version_from_listing(object_store, base).await
294}
295
296/// JSON body of the version hint file: `{"version":N}`.
297#[derive(serde::Serialize, serde::Deserialize)]
298struct VersionHint {
299    version: u64,
300}
301
302/// Set `LANCE_USE_VERSION_HINT=0` (or `false`) to globally disable the version
303/// hint — writers stop emitting the hint file and readers stop consulting it,
304/// falling back to plain listing. Intended as a benchmark/escape-hatch knob;
305/// the hint is on by default.
306const VERSION_HINT_ENV: &str = "LANCE_USE_VERSION_HINT";
307
308fn version_hint_globally_enabled() -> bool {
309    static ENABLED: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
310    *ENABLED.get_or_init(|| match std::env::var(VERSION_HINT_ENV) {
311        Ok(v) => !matches!(
312            v.trim().to_ascii_lowercase().as_str(),
313            "0" | "false" | "off"
314        ),
315        Err(_) => true,
316    })
317}
318
319/// Whether this object store benefits from a version hint.
320///
321/// On stores where listing is lexicographically ordered (S3 Standard, GCS,
322/// Azure, ...) the latest version is already resolved in roughly one request,
323/// so the hint would only add a write per commit for nothing. We write (and
324/// read) it only on stores where listing is not lexicographically ordered —
325/// S3 Express and the local filesystem. Can be force-disabled with the
326/// `LANCE_USE_VERSION_HINT=0` environment variable.
327pub fn uses_version_hint(object_store: &ObjectStore) -> bool {
328    version_hint_globally_enabled() && !object_store.list_is_lexically_ordered
329}
330
331/// Path to the JSON version hint file for a dataset.
332fn version_hint_path(base: &Path) -> Path {
333    base.clone().join(VERSIONS_DIR).join(VERSION_HINT_FILE)
334}
335
336/// Write the version hint file after a successful commit.
337///
338/// The hint is stored as JSON: `{"version":N}`. This write is best-effort —
339/// failures are logged and ignored, since the hint only accelerates reads and
340/// never affects correctness (readers verify the hinted version and probe
341/// upward from there). It is a no-op for detached versions and for stores that
342/// do not benefit from a hint (see [`uses_version_hint`]).
343pub async fn write_version_hint(object_store: &ObjectStore, base: &Path, version: u64) {
344    if is_detached_version(version) || !uses_version_hint(object_store) {
345        return;
346    }
347    let hint_path = version_hint_path(base);
348    let content = serde_json::to_vec(&VersionHint { version }).expect("serialize version hint");
349    if let Err(e) = object_store.put(&hint_path, content.as_slice()).await {
350        warn!("Failed to write version hint file for version {version}: {e}");
351    }
352}
353
354/// Read the latest version from the hint file, or `None` if it does not exist
355/// or cannot be parsed.
356async fn read_version_from_hint(object_store: &ObjectStore, base: &Path) -> Option<u64> {
357    let bytes = object_store
358        .inner
359        .get(&version_hint_path(base))
360        .await
361        .ok()?
362        .bytes()
363        .await
364        .ok()?;
365    Some(serde_json::from_slice::<VersionHint>(&bytes).ok()?.version)
366}
367
368/// Read the version hint and probe upward to find the true latest manifest.
369///
370/// Returns `None` if the hint file is missing, the hinted version no longer
371/// exists, or any error occurred — callers should fall back to listing.
372async fn read_version_hint_and_probe(
373    object_store: &ObjectStore,
374    base: &Path,
375) -> Option<ManifestLocation> {
376    let hint_version = read_version_from_hint(object_store, base).await?;
377    let (version, scheme, mut probed) = probe_versions_upward(object_store, base, hint_version)
378        .await
379        .ok()
380        .flatten()?;
381    // `probed` is non-empty and its last entry is the highest version found.
382    let (_, meta) = probed.pop()?;
383    Some(ManifestLocation {
384        version,
385        path: scheme.manifest_path(base, version),
386        size: Some(meta.size),
387        naming_scheme: scheme,
388        e_tag: meta.e_tag,
389    })
390}
391
392/// Maximum version gap between the hint and the read version for which we use
393/// the hint-based parallel-HEAD path; beyond this a single (paginated) listing
394/// is cheaper, so callers fall back to it.
395const MAX_HINT_PROBE_GAP: u64 = 1000;
396
397/// Probe `from_version`, then `from_version + 1`, `+ 2`, ... with HEAD requests
398/// until one is not found.
399///
400/// Assumes attached versions are contiguous above `from_version` (true in
401/// practice: every commit increments by one, and cleanup only removes *old*
402/// versions, never ones newer than the latest). A `NotFound` therefore marks
403/// the end of the history.
404///
405/// - `Ok(Some((true_latest_version, naming_scheme, [(version, meta), ...])))`:
406///   the vec covers every version from `from_version` through the true latest
407///   in ascending order.
408/// - `Ok(None)`: `from_version` itself does not exist (a `NotFound` for both
409///   naming schemes) — i.e. the hint pointed past the end.
410/// - `Err(_)`: a transient object-store error was hit, so the probed range may
411///   be incomplete; callers should fall back to a full listing rather than
412///   trust a possibly-stale result.
413async fn probe_versions_upward(
414    object_store: &ObjectStore,
415    base: &Path,
416    from_version: u64,
417) -> Result<
418    Option<(
419        u64,
420        ManifestNamingScheme,
421        Vec<(u64, object_store::ObjectMeta)>,
422    )>,
423> {
424    // Newer datasets use V2; fall back to V1 if the V2 path is not found.
425    let mut scheme = ManifestNamingScheme::V2;
426    let meta = match object_store
427        .inner
428        .head(&scheme.manifest_path(base, from_version))
429        .await
430    {
431        Ok(meta) => meta,
432        Err(ObjectStoreError::NotFound { .. }) => {
433            scheme = ManifestNamingScheme::V1;
434            match object_store
435                .inner
436                .head(&scheme.manifest_path(base, from_version))
437                .await
438            {
439                Ok(meta) => meta,
440                Err(ObjectStoreError::NotFound { .. }) => return Ok(None),
441                Err(e) => return Err(e.into()),
442            }
443        }
444        Err(e) => return Err(e.into()),
445    };
446
447    let mut probed = vec![(from_version, meta)];
448    let mut version = from_version;
449    loop {
450        let next = version + 1;
451        match object_store
452            .inner
453            .head(&scheme.manifest_path(base, next))
454            .await
455        {
456            Ok(meta) => {
457                probed.push((next, meta));
458                version = next;
459            }
460            // NotFound means we found the latest version.
461            Err(ObjectStoreError::NotFound { .. }) => break,
462            // A transient error means a newer version might exist that we
463            // failed to observe — surface it so callers fall back to listing.
464            Err(e) => return Err(e.into()),
465        }
466    }
467    Ok(Some((version, scheme, probed)))
468}
469
470/// List manifest locations with version `> since_version` using the version
471/// hint, in descending order of version.
472///
473/// Returns `None` if the hint is missing or stale enough that this is not
474/// usable — callers should fall back to a full listing. `Some(vec![])` is the
475/// fast path where the hint confirms there are no new versions.
476async fn list_manifests_since_version_with_hint(
477    object_store: &ObjectStore,
478    base: &Path,
479    since_version: u64,
480) -> Option<Vec<ManifestLocation>> {
481    let hint_version = read_version_from_hint(object_store, base).await?;
482
483    // A reader that is very far behind is cheaper to serve with one paginated
484    // listing than with thousands of HEADs.
485    if hint_version.saturating_sub(since_version) > MAX_HINT_PROBE_GAP {
486        return None;
487    }
488
489    // If the hint is not newer than the read version, the only versions that
490    // could exist are right above it; otherwise start at the hint.
491    let probe_from = if hint_version > since_version {
492        hint_version
493    } else {
494        since_version + 1
495    };
496
497    let (scheme, probed) = match probe_versions_upward(object_store, base, probe_from).await {
498        Ok(Some((_true_latest, scheme, probed))) => (scheme, probed),
499        // Nothing at `probe_from`. If we were probing from the hint, the hint
500        // is stale — bail to a full listing. If we were probing from
501        // `since_version + 1`, there are simply no new versions.
502        Ok(None) if hint_version > since_version => return None,
503        Ok(None) => return Some(Vec::new()),
504        // Transient error: don't trust the hint path, fall back to listing.
505        Err(_) => return None,
506    };
507
508    let mut locations: Vec<ManifestLocation> = probed
509        .into_iter()
510        .filter(|(v, _)| *v > since_version)
511        .map(|(version, meta)| ManifestLocation {
512            version,
513            path: scheme.manifest_path(base, version),
514            size: Some(meta.size),
515            naming_scheme: scheme,
516            e_tag: meta.e_tag,
517        })
518        .collect();
519
520    // Fill the gap between `since_version` and the hint with HEADs (the probe
521    // above already covered `hint_version` and up). The range is contiguous, so
522    // any error here (including a `NotFound`) means we can't trust the hint path
523    // — fall back to a full listing.
524    if hint_version > since_version + 1 {
525        let gap_locations: Vec<ManifestLocation> =
526            futures::stream::iter((since_version + 1)..hint_version)
527                .map(|version| async move {
528                    object_store
529                        .inner
530                        .head(&scheme.manifest_path(base, version))
531                        .await
532                        .map(|meta| ManifestLocation {
533                            version,
534                            path: scheme.manifest_path(base, version),
535                            size: Some(meta.size),
536                            naming_scheme: scheme,
537                            e_tag: meta.e_tag,
538                        })
539                })
540                .buffer_unordered(object_store.io_parallelism())
541                .try_collect()
542                .await
543                .ok()?;
544        locations.extend(gap_locations);
545    }
546
547    locations.sort_by_key(|loc| std::cmp::Reverse(loc.version));
548    Some(locations)
549}
550
551/// Resolve the latest manifest by listing the versions directory.
552async fn resolve_version_from_listing(
553    object_store: &ObjectStore,
554    base: &Path,
555) -> Result<ManifestLocation> {
556    let manifest_files = object_store.list(Some(base.clone().join(VERSIONS_DIR)));
557
558    let mut valid_manifests = manifest_files.try_filter_map(|res| {
559        let filename = res.location.filename().unwrap();
560        if let Some(scheme) = ManifestNamingScheme::detect_scheme(filename) {
561            // Only include if we can parse a version (skip detached versions)
562            if scheme.parse_version(filename).is_some() {
563                future::ready(Ok(Some((scheme, res))))
564            } else {
565                future::ready(Ok(None))
566            }
567        } else {
568            future::ready(Ok(None))
569        }
570    });
571
572    let first = valid_manifests.next().await.transpose()?;
573    match (first, object_store.list_is_lexically_ordered) {
574        // If the first valid manifest we see is V2, we can assume that we are using
575        // V2 naming scheme for all manifests.
576        (Some((scheme @ ManifestNamingScheme::V2, meta)), true) => {
577            let version = scheme
578                .parse_version(meta.location.filename().unwrap())
579                .unwrap();
580
581            // Sanity check: verify at least for the first 1k files that they are all V2
582            // and that the version numbers are decreasing. We use the first 1k because
583            // this is the typical size of an object store list endpoint response page.
584            for (scheme, meta) in valid_manifests.take(999).try_collect::<Vec<_>>().await? {
585                if scheme != ManifestNamingScheme::V2 {
586                    warn!(
587                        "Found V1 Manifest in a V2 directory. Use `migrate_manifest_paths_v2` \
588                         to migrate the directory."
589                    );
590                    break;
591                }
592                let next_version = scheme
593                    .parse_version(meta.location.filename().unwrap())
594                    .unwrap();
595                if next_version >= version {
596                    warn!(
597                        "List operation was expected to be lexically ordered, but was not. This \
598                         could mean a corrupt read. Please make a bug report on the lance-format/lance \
599                         GitHub repository."
600                    );
601                    break;
602                }
603            }
604
605            Ok(ManifestLocation {
606                version,
607                path: meta.location,
608                size: Some(meta.size),
609                naming_scheme: scheme,
610                e_tag: meta.e_tag,
611            })
612        }
613        // If the list is not lexically ordered, we need to iterate all manifests
614        // to find the latest version. This works for both V1 and V2 schemes.
615        (Some((first_scheme, meta)), _) => {
616            let mut current_version = first_scheme
617                .parse_version(meta.location.filename().unwrap())
618                .unwrap();
619            let mut current_meta = meta;
620            let scheme = first_scheme;
621
622            while let Some((entry_scheme, meta)) = valid_manifests.next().await.transpose()? {
623                if entry_scheme != scheme {
624                    return Err(Error::internal(format!(
625                        "Found multiple manifest naming schemes in the same directory: {:?} and {:?}. \
626                         Use `migrate_manifest_paths_v2` to migrate the directory.",
627                        scheme, entry_scheme
628                    )));
629                }
630                let version = entry_scheme
631                    .parse_version(meta.location.filename().unwrap())
632                    .unwrap();
633                if version > current_version {
634                    current_version = version;
635                    current_meta = meta;
636                }
637            }
638            Ok(ManifestLocation {
639                version: current_version,
640                path: current_meta.location,
641                size: Some(current_meta.size),
642                naming_scheme: scheme,
643                e_tag: current_meta.e_tag,
644            })
645        }
646        (None, _) => Err(Error::not_found(
647            base.clone().join(VERSIONS_DIR).to_string(),
648        )),
649    }
650}
651
652// This is an optimized function that searches for the latest manifest. In
653// object_store, list operations lookup metadata for each file listed. This
654// method only gets the metadata for the found latest manifest.
655fn current_manifest_local(base: &Path) -> std::io::Result<Option<ManifestLocation>> {
656    let path = lance_io::local::to_local_path(&base.clone().join(VERSIONS_DIR));
657    let entries = std::fs::read_dir(path)?;
658
659    let mut latest_entry: Option<(u64, DirEntry)> = None;
660
661    let mut scheme: Option<ManifestNamingScheme> = None;
662
663    for entry in entries {
664        let entry = entry?;
665        let filename_raw = entry.file_name();
666        let filename = filename_raw.to_string_lossy();
667
668        let Some(entry_scheme) = ManifestNamingScheme::detect_scheme(&filename) else {
669            // Need to ignore temporary files, such as
670            // .tmp_7.manifest_9c100374-3298-4537-afc6-f5ee7913666d
671            continue;
672        };
673
674        if let Some(scheme) = scheme {
675            if scheme != entry_scheme {
676                return Err(io::Error::new(
677                    io::ErrorKind::InvalidData,
678                    format!(
679                        "Found multiple manifest naming schemes in the same directory: {:?} and {:?}",
680                        scheme, entry_scheme
681                    ),
682                ));
683            }
684        } else {
685            scheme = Some(entry_scheme);
686        }
687
688        let Some(version) = entry_scheme.parse_version(&filename) else {
689            continue;
690        };
691
692        if let Some((latest_version, _)) = &latest_entry {
693            if version > *latest_version {
694                latest_entry = Some((version, entry));
695            }
696        } else {
697            latest_entry = Some((version, entry));
698        }
699    }
700
701    if let Some((version, entry)) = latest_entry {
702        let path = Path::from_filesystem_path(entry.path())
703            .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?;
704        let metadata = entry.metadata()?;
705        Ok(Some(ManifestLocation {
706            version,
707            path,
708            size: Some(metadata.len()),
709            naming_scheme: scheme.unwrap(),
710            e_tag: Some(get_etag(&metadata)),
711        }))
712    } else {
713        Ok(None)
714    }
715}
716
717fn list_manifests<'a>(
718    base_path: &Path,
719    object_store: &'a dyn OSObjectStore,
720) -> impl Stream<Item = Result<ManifestLocation>> + 'a {
721    object_store
722        .read_dir_all(&base_path.clone().join(VERSIONS_DIR), None)
723        .filter_map(|obj_meta| {
724            futures::future::ready(
725                obj_meta
726                    .map(|m| ManifestLocation::try_from(m).ok())
727                    .transpose(),
728            )
729        })
730        .boxed()
731}
732
733/// Convert object metadata to ManifestLocation for detached manifests.
734fn detached_manifest_location_from_meta(
735    meta: object_store::ObjectMeta,
736) -> Option<ManifestLocation> {
737    let filename = meta.location.filename()?;
738    let version = ManifestNamingScheme::parse_detached_version(filename)?;
739    Some(ManifestLocation {
740        version,
741        path: meta.location,
742        size: Some(meta.size),
743        naming_scheme: ManifestNamingScheme::V2,
744        e_tag: meta.e_tag,
745    })
746}
747
748/// List all detached manifest files in the versions directory.
749pub fn list_detached_manifests<'a>(
750    base_path: &Path,
751    object_store: &'a dyn OSObjectStore,
752) -> impl Stream<Item = Result<ManifestLocation>> + 'a {
753    object_store
754        .read_dir_all(&base_path.clone().join(VERSIONS_DIR), None)
755        .filter_map(|obj_meta| {
756            futures::future::ready(
757                obj_meta
758                    .map(detached_manifest_location_from_meta)
759                    .transpose(),
760            )
761        })
762        .boxed()
763}
764
765fn make_staging_manifest_path(base: &Path) -> Result<Path> {
766    let id = uuid::Uuid::new_v4().to_string();
767    Path::parse(format!("{base}-{id}")).map_err(|e| Error::io_source(Box::new(e)))
768}
769
770#[cfg(feature = "dynamodb")]
771const DDB_URL_QUERY_KEY: &str = "ddbTableName";
772
773/// Handle commits that prevent conflicting writes.
774///
775/// Commit implementations ensure that if there are multiple concurrent writers
776/// attempting to write the next version of a table, only one will win. In order
777/// to work, all writers must use the same commit handler type.
778/// This trait is also responsible for resolving where the manifests live.
779///
780// TODO: pub(crate)
781#[async_trait::async_trait]
782#[allow(clippy::too_many_arguments)]
783pub trait CommitHandler: Debug + Send + Sync {
784    async fn resolve_latest_location(
785        &self,
786        base_path: &Path,
787        object_store: &ObjectStore,
788    ) -> Result<ManifestLocation> {
789        Ok(current_manifest_path(object_store, base_path).await?)
790    }
791
792    async fn resolve_version_location(
793        &self,
794        base_path: &Path,
795        version: u64,
796        object_store: &dyn OSObjectStore,
797    ) -> Result<ManifestLocation> {
798        default_resolve_version(base_path, version, object_store).await
799    }
800
801    /// Check whether an attached manifest version exists without loading it.
802    ///
803    /// The default implementation probes the deterministic manifest path for
804    /// the given naming scheme. Commit handlers with an external source of
805    /// truth should override this method.
806    async fn version_exists(
807        &self,
808        base_path: &Path,
809        version: u64,
810        object_store: &dyn OSObjectStore,
811        naming_scheme: ManifestNamingScheme,
812    ) -> Result<bool> {
813        let path = naming_scheme.manifest_path(base_path, version);
814        match object_store.head(&path).await {
815            Ok(_) => Ok(true),
816            Err(ObjectStoreError::NotFound { .. }) => Ok(false),
817            Err(e) => Err(e.into()),
818        }
819    }
820
821    /// List detached manifest locations.
822    ///
823    /// Returns a stream of detached manifest locations in arbitrary order.
824    fn list_detached_manifest_locations<'a>(
825        &self,
826        base_path: &Path,
827        object_store: &'a ObjectStore,
828    ) -> BoxStream<'a, Result<ManifestLocation>> {
829        list_detached_manifests(base_path, &object_store.inner).boxed()
830    }
831
832    /// If `sorted_descending` is `true`, the stream will yield manifests in descending
833    /// order of version. When the object store has a lexicographically
834    /// ordered list and the naming scheme is V2, this will use an optimized
835    /// list operation. Otherwise, it will list all manifests and sort them
836    /// in memory. When `sorted_descending` is `false`, the stream will yield manifests
837    /// in arbitrary order.
838    fn list_manifest_locations<'a>(
839        &self,
840        base_path: &Path,
841        object_store: &'a ObjectStore,
842        sorted_descending: bool,
843    ) -> BoxStream<'a, Result<ManifestLocation>> {
844        let underlying_stream = list_manifests(base_path, &object_store.inner);
845
846        if !sorted_descending {
847            return underlying_stream.boxed();
848        }
849
850        async fn sort_stream(
851            input_stream: impl futures::Stream<Item = Result<ManifestLocation>> + Unpin,
852        ) -> Result<impl Stream<Item = Result<ManifestLocation>> + Unpin> {
853            let mut locations = input_stream.try_collect::<Vec<_>>().await?;
854            locations.sort_by_key(|m| std::cmp::Reverse(m.version));
855            Ok(futures::stream::iter(locations.into_iter().map(Ok)))
856        }
857
858        // If the object store supports lexicographically ordered lists and
859        // the naming scheme is V2, we can use an optimized list operation.
860        if object_store.list_is_lexically_ordered {
861            // We don't know the naming scheme until we see the first manifest.
862            let mut peekable = underlying_stream.peekable();
863
864            futures::stream::once(async move {
865                let naming_scheme = match Pin::new(&mut peekable).peek().await {
866                    Some(Ok(m)) => m.naming_scheme,
867                    // If we get an error or no manifests are found, we default
868                    // to V2 naming scheme, since it doesn't matter.
869                    Some(Err(_)) => ManifestNamingScheme::V2,
870                    None => ManifestNamingScheme::V2,
871                };
872
873                if naming_scheme == ManifestNamingScheme::V2 {
874                    // If the first manifest is V2, we can use the optimized list operation.
875                    Ok(Either::Left(peekable))
876                } else {
877                    sort_stream(peekable).await.map(Either::Right)
878                }
879            })
880            .try_flatten()
881            .boxed()
882        } else {
883            // If the object store does not support lexicographically ordered lists,
884            // we need to sort the manifests in memory. Systems where this isn't
885            // supported (local fs, S3 express) are typically fast enough
886            // that this is not a problem.
887            futures::stream::once(sort_stream(underlying_stream))
888                .try_flatten()
889                .boxed()
890        }
891    }
892
893    /// List manifest locations with version `> since_version`, in descending
894    /// order of version.
895    ///
896    /// On lexically-ordered stores this is the standard listing with early
897    /// termination. On non-lexically-ordered stores (e.g. S3 Express) it uses
898    /// the version hint to avoid an O(n) listing, falling back to a full
899    /// listing if the hint is missing or stale.
900    fn list_manifest_locations_since<'a>(
901        &self,
902        base_path: &Path,
903        object_store: &'a ObjectStore,
904        since_version: u64,
905    ) -> BoxStream<'a, Result<ManifestLocation>> {
906        if !uses_version_hint(object_store) {
907            return self
908                .list_manifest_locations(base_path, object_store, true)
909                .try_take_while(move |loc| future::ready(Ok(loc.version > since_version)))
910                .boxed();
911        }
912
913        let base_path = base_path.clone();
914        futures::stream::once(async move {
915            let locations = match list_manifests_since_version_with_hint(
916                object_store,
917                &base_path,
918                since_version,
919            )
920            .await
921            {
922                Some(locations) => locations,
923                None => {
924                    let mut locations = list_manifests(&base_path, &object_store.inner)
925                        .try_collect::<Vec<_>>()
926                        .await?;
927                    locations.retain(|loc| loc.version > since_version);
928                    locations.sort_by_key(|loc| std::cmp::Reverse(loc.version));
929                    locations
930                }
931            };
932            Ok::<_, Error>(futures::stream::iter(locations.into_iter().map(Ok)))
933        })
934        .try_flatten()
935        .boxed()
936    }
937
938    /// Commit a manifest.
939    ///
940    /// This function should return an [CommitError::CommitConflict] if another
941    /// transaction has already been committed to the path.
942    async fn commit(
943        &self,
944        manifest: &mut Manifest,
945        indices: Option<Vec<IndexMetadata>>,
946        base_path: &Path,
947        object_store: &ObjectStore,
948        manifest_writer: ManifestWriter,
949        naming_scheme: ManifestNamingScheme,
950        transaction: Option<Transaction>,
951    ) -> std::result::Result<ManifestLocation, CommitError>;
952
953    /// Delete the recorded manifest information for a dataset at the base_path
954    async fn delete(&self, _base_path: &Path) -> Result<()> {
955        Ok(())
956    }
957}
958
959async fn default_resolve_version(
960    base_path: &Path,
961    version: u64,
962    object_store: &dyn OSObjectStore,
963) -> Result<ManifestLocation> {
964    if is_detached_version(version) {
965        return Ok(ManifestLocation {
966            version,
967            // Detached versions are not supported with V1 naming scheme.  If we need
968            // to support in the future we could use a different prefix (e.g. 'x' or something)
969            naming_scheme: ManifestNamingScheme::V2,
970            // Both V1 and V2 should give the same path for detached versions
971            path: ManifestNamingScheme::V2.manifest_path(base_path, version),
972            size: None,
973            e_tag: None,
974        });
975    }
976
977    // try V2, fallback to V1.
978    let scheme = ManifestNamingScheme::V2;
979    let path = scheme.manifest_path(base_path, version);
980    match object_store.head(&path).await {
981        Ok(meta) => Ok(ManifestLocation {
982            version,
983            path,
984            size: Some(meta.size),
985            naming_scheme: scheme,
986            e_tag: meta.e_tag,
987        }),
988        Err(ObjectStoreError::NotFound { .. }) => {
989            // fallback to V1
990            let scheme = ManifestNamingScheme::V1;
991            Ok(ManifestLocation {
992                version,
993                path: scheme.manifest_path(base_path, version),
994                size: None,
995                naming_scheme: scheme,
996                e_tag: None,
997            })
998        }
999        Err(e) => Err(e.into()),
1000    }
1001}
1002/// Adapt an object_store credentials into AWS SDK creds
1003#[cfg(feature = "dynamodb")]
1004#[derive(Debug)]
1005struct OSObjectStoreToAwsCredAdaptor(AwsCredentialProvider);
1006
1007#[cfg(feature = "dynamodb")]
1008impl ProvideCredentials for OSObjectStoreToAwsCredAdaptor {
1009    fn provide_credentials<'a>(
1010        &'a self,
1011    ) -> aws_credential_types::provider::future::ProvideCredentials<'a>
1012    where
1013        Self: 'a,
1014    {
1015        aws_credential_types::provider::future::ProvideCredentials::new(async {
1016            let creds = self
1017                .0
1018                .get_credential()
1019                .await
1020                .map_err(|e| CredentialsError::provider_error(Box::new(e)))?;
1021            Ok(aws_credential_types::Credentials::new(
1022                &creds.key_id,
1023                &creds.secret_key,
1024                creds.token.clone(),
1025                Some(
1026                    SystemTime::now()
1027                        .checked_add(Duration::from_secs(
1028                            60 * 10, //  10 min
1029                        ))
1030                        .expect("overflow"),
1031                ),
1032                "",
1033            ))
1034        })
1035    }
1036}
1037
1038#[cfg(feature = "dynamodb")]
1039async fn build_dynamodb_external_store(
1040    table_name: &str,
1041    creds: AwsCredentialProvider,
1042    region: &str,
1043    endpoint: Option<String>,
1044    app_name: &str,
1045) -> Result<Arc<dyn ExternalManifestStore>> {
1046    use super::commit::dynamodb::DynamoDBExternalManifestStore;
1047    use aws_sdk_dynamodb::{
1048        Client,
1049        config::{IdentityCache, Region, retry::RetryConfig},
1050    };
1051
1052    let mut dynamodb_config = aws_sdk_dynamodb::config::Builder::new()
1053        .behavior_version_latest()
1054        .region(Some(Region::new(region.to_string())))
1055        .credentials_provider(OSObjectStoreToAwsCredAdaptor(creds))
1056        // caching should be handled by passed AwsCredentialProvider
1057        .identity_cache(IdentityCache::no_cache())
1058        // Be more resilient to transient network issues.
1059        // 5 attempts = 1 initial + 4 retries with exponential backoff.
1060        .retry_config(RetryConfig::standard().with_max_attempts(5));
1061
1062    if let Some(endpoint) = endpoint {
1063        dynamodb_config = dynamodb_config.endpoint_url(endpoint);
1064    }
1065    let client = Client::from_conf(dynamodb_config.build());
1066
1067    DynamoDBExternalManifestStore::new_external_store(client.into(), table_name, app_name).await
1068}
1069
1070pub async fn commit_handler_from_url(
1071    url_or_path: &str,
1072    // This looks unused if dynamodb feature disabled
1073    #[allow(unused_variables)] options: &Option<ObjectStoreParams>,
1074) -> Result<Arc<dyn CommitHandler>> {
1075    let local_handler: Arc<dyn CommitHandler> = if cfg!(windows) {
1076        Arc::new(RenameCommitHandler)
1077    } else {
1078        Arc::new(ConditionalPutCommitHandler)
1079    };
1080
1081    let url = match Url::parse(url_or_path) {
1082        Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
1083            // On Windows, the drive is parsed as a scheme
1084            return Ok(local_handler);
1085        }
1086        Ok(url) => url,
1087        Err(_) => {
1088            return Ok(local_handler);
1089        }
1090    };
1091
1092    match url.scheme() {
1093        "file" | "file-object-store" => Ok(local_handler),
1094        "s3" | "gs" | "az" | "abfss" | "memory" | "oss" | "cos" | "shared-memory" => {
1095            Ok(Arc::new(ConditionalPutCommitHandler))
1096        }
1097        #[cfg(not(feature = "dynamodb"))]
1098        "s3+ddb" => Err(Error::invalid_input_source(
1099            "`s3+ddb://` scheme requires `dynamodb` feature to be enabled".into(),
1100        )),
1101        #[cfg(feature = "dynamodb")]
1102        "s3+ddb" => {
1103            if url.query_pairs().count() != 1 {
1104                return Err(Error::invalid_input_source(
1105                    "`s3+ddb://` scheme and expects exactly one query `ddbTableName`".into(),
1106                ));
1107            }
1108            let table_name = match url.query_pairs().next() {
1109                Some((Cow::Borrowed(key), Cow::Borrowed(table_name)))
1110                    if key == DDB_URL_QUERY_KEY =>
1111                {
1112                    if table_name.is_empty() {
1113                        return Err(Error::invalid_input_source(
1114                            "`s3+ddb://` scheme requires non empty dynamodb table name".into(),
1115                        ));
1116                    }
1117                    table_name
1118                }
1119                _ => {
1120                    return Err(Error::invalid_input_source(
1121                        "`s3+ddb://` scheme and expects exactly one query `ddbTableName`".into(),
1122                    ));
1123                }
1124            };
1125            let options = options.clone().unwrap_or_default();
1126            let storage_options_raw =
1127                StorageOptions(options.storage_options().cloned().unwrap_or_default());
1128            let dynamo_endpoint = get_dynamodb_endpoint(&storage_options_raw);
1129            let storage_options = storage_options_raw.as_s3_options();
1130
1131            let region = storage_options.get(&AmazonS3ConfigKey::Region).cloned();
1132
1133            // Get accessor from the options
1134            let accessor = options.get_accessor();
1135
1136            let (aws_creds, region) = build_aws_credential(
1137                options.s3_credentials_refresh_offset,
1138                options.aws_credentials.clone(),
1139                Some(&storage_options),
1140                region,
1141                accessor,
1142            )
1143            .await?;
1144
1145            Ok(Arc::new(ExternalManifestCommitHandler {
1146                external_manifest_store: build_dynamodb_external_store(
1147                    table_name,
1148                    aws_creds.clone(),
1149                    &region,
1150                    dynamo_endpoint,
1151                    "lancedb",
1152                )
1153                .await?,
1154            }))
1155        }
1156        _ => Ok(Arc::new(UnsafeCommitHandler)),
1157    }
1158}
1159
1160#[cfg(feature = "dynamodb")]
1161fn get_dynamodb_endpoint(storage_options: &StorageOptions) -> Option<String> {
1162    if let Some(endpoint) = storage_options.0.get("dynamodb_endpoint") {
1163        Some(endpoint.clone())
1164    } else {
1165        std::env::var("DYNAMODB_ENDPOINT").ok()
1166    }
1167}
1168
1169/// Errors that can occur when committing a manifest.
1170#[derive(Debug)]
1171pub enum CommitError {
1172    /// Another transaction has already been written to the path
1173    CommitConflict,
1174    /// Something else went wrong
1175    OtherError(Error),
1176}
1177
1178impl From<Error> for CommitError {
1179    fn from(e: Error) -> Self {
1180        Self::OtherError(e)
1181    }
1182}
1183
1184impl From<CommitError> for Error {
1185    fn from(e: CommitError) -> Self {
1186        match e {
1187            CommitError::CommitConflict => Self::internal("Commit conflict".to_string()),
1188            CommitError::OtherError(e) => e,
1189        }
1190    }
1191}
1192
1193/// Whether we have issued a warning about using the unsafe commit handler.
1194static WARNED_ON_UNSAFE_COMMIT: AtomicBool = AtomicBool::new(false);
1195
1196/// A naive commit implementation that does not prevent conflicting writes.
1197///
1198/// This will log a warning the first time it is used.
1199pub struct UnsafeCommitHandler;
1200
1201#[async_trait::async_trait]
1202#[allow(clippy::too_many_arguments)]
1203impl CommitHandler for UnsafeCommitHandler {
1204    async fn commit(
1205        &self,
1206        manifest: &mut Manifest,
1207        indices: Option<Vec<IndexMetadata>>,
1208        base_path: &Path,
1209        object_store: &ObjectStore,
1210        manifest_writer: ManifestWriter,
1211        naming_scheme: ManifestNamingScheme,
1212        transaction: Option<Transaction>,
1213    ) -> std::result::Result<ManifestLocation, CommitError> {
1214        // Log a one-time warning
1215        if !WARNED_ON_UNSAFE_COMMIT.load(std::sync::atomic::Ordering::Relaxed) {
1216            WARNED_ON_UNSAFE_COMMIT.store(true, std::sync::atomic::Ordering::Relaxed);
1217            log::warn!(
1218                "Using unsafe commit handler. Concurrent writes may result in data loss. \
1219                 Consider providing a commit handler that prevents conflicting writes."
1220            );
1221        }
1222
1223        let version_path = naming_scheme.manifest_path(base_path, manifest.version);
1224        let res =
1225            manifest_writer(object_store, manifest, indices, &version_path, transaction).await?;
1226
1227        write_version_hint(object_store, base_path, manifest.version).await;
1228
1229        Ok(ManifestLocation {
1230            version: manifest.version,
1231            size: Some(res.size as u64),
1232            naming_scheme,
1233            path: version_path,
1234            e_tag: res.e_tag,
1235        })
1236    }
1237}
1238
1239impl Debug for UnsafeCommitHandler {
1240    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1241        f.debug_struct("UnsafeCommitHandler").finish()
1242    }
1243}
1244
1245/// A commit implementation that uses a lock to prevent conflicting writes.
1246#[async_trait::async_trait]
1247pub trait CommitLock: Debug {
1248    type Lease: CommitLease;
1249
1250    /// Attempt to lock the table for the given version.
1251    ///
1252    /// If it is already locked by another transaction, wait until it is unlocked.
1253    /// Once it is unlocked, return [CommitError::CommitConflict] if the version
1254    /// has already been committed. Otherwise, return the lock.
1255    ///
1256    /// To prevent poisoned locks, it's recommended to set a timeout on the lock
1257    /// of at least 30 seconds.
1258    ///
1259    /// It is not required that the lock tracks the version. It is provided in
1260    /// case the locking is handled by a catalog service that needs to know the
1261    /// current version of the table.
1262    async fn lock(&self, version: u64) -> std::result::Result<Self::Lease, CommitError>;
1263}
1264
1265#[async_trait::async_trait]
1266pub trait CommitLease: Send + Sync {
1267    /// Return the lease, indicating whether the commit was successful.
1268    ///
1269    /// Implementations should tolerate being called more than once: if a commit
1270    /// is cancelled (e.g. by a timeout) while `release` is in flight, a
1271    /// best-effort `release(false)` may be issued afterwards from the drop path.
1272    async fn release(&self, success: bool) -> std::result::Result<(), CommitError>;
1273}
1274
1275/// Guards a [CommitLease] so the lock is released even if the commit future is
1276/// dropped (e.g. cancelled by a commit timeout) before reaching an explicit
1277/// release.
1278///
1279/// [CommitLease::release] is async and cannot be awaited from `Drop`, so on the
1280/// drop path we spawn a best-effort background task that releases the lock with
1281/// `success = false`. Without this, a cancelled commit would leak the lock until
1282/// the lease's own TTL expired, blocking other writers in the meantime.
1283struct LeaseGuard<L: CommitLease + 'static> {
1284    lease: Option<L>,
1285}
1286
1287impl<L: CommitLease + 'static> LeaseGuard<L> {
1288    fn new(lease: L) -> Self {
1289        Self { lease: Some(lease) }
1290    }
1291
1292    /// Explicitly release the lease, consuming the guard so `Drop` is a no-op.
1293    async fn release(mut self, success: bool) -> std::result::Result<(), CommitError> {
1294        // Keep the lease inside the guard across the await so that, if this
1295        // future is cancelled mid-release (e.g. the release call itself hangs
1296        // and the commit timeout fires), `Drop` still issues a best-effort
1297        // release. Only clear it once the release has fully completed.
1298        let result = {
1299            let lease = self
1300                .lease
1301                .as_ref()
1302                .expect("LeaseGuard released more than once");
1303            lease.release(success).await
1304        };
1305        self.lease = None;
1306        result
1307    }
1308}
1309
1310impl<L: CommitLease + 'static> Drop for LeaseGuard<L> {
1311    fn drop(&mut self) {
1312        if let Some(lease) = self.lease.take() {
1313            // The guard was dropped without an explicit release, meaning the
1314            // commit future was cancelled while holding the lock. We can't await
1315            // in `Drop`, so spawn a best-effort release. If there is no runtime,
1316            // leave the lease for its TTL to reclaim.
1317            if let Ok(handle) = tokio::runtime::Handle::try_current() {
1318                handle.spawn(async move {
1319                    let _ = lease.release(false).await;
1320                });
1321            }
1322        }
1323    }
1324}
1325
1326#[async_trait::async_trait]
1327impl<T: CommitLock + Send + Sync> CommitHandler for T
1328where
1329    T::Lease: 'static,
1330{
1331    async fn commit(
1332        &self,
1333        manifest: &mut Manifest,
1334        indices: Option<Vec<IndexMetadata>>,
1335        base_path: &Path,
1336        object_store: &ObjectStore,
1337        manifest_writer: ManifestWriter,
1338        naming_scheme: ManifestNamingScheme,
1339        transaction: Option<Transaction>,
1340    ) -> std::result::Result<ManifestLocation, CommitError> {
1341        let path = naming_scheme.manifest_path(base_path, manifest.version);
1342        // Hold the lease in a guard so the lock is released even if this future
1343        // is cancelled before we reach an explicit release below. The explicit
1344        // releases are still preferred since they report the correct success
1345        // flag and surface release errors; the guard only covers cancellation.
1346        let lease = LeaseGuard::new(self.lock(manifest.version).await?);
1347
1348        // Head the location and make sure it's not already committed
1349        match object_store.inner.head(&path).await {
1350            Ok(_) => {
1351                // The path already exists, so it's already committed
1352                // Release the lock
1353                lease.release(false).await?;
1354
1355                return Err(CommitError::CommitConflict);
1356            }
1357            Err(ObjectStoreError::NotFound { .. }) => {}
1358            Err(e) => {
1359                // Something else went wrong
1360                // Release the lock
1361                lease.release(false).await?;
1362
1363                return Err(CommitError::OtherError(e.into()));
1364            }
1365        }
1366        let res = manifest_writer(object_store, manifest, indices, &path, transaction).await;
1367
1368        // Release the lock
1369        lease.release(res.is_ok()).await?;
1370
1371        let res = res?;
1372
1373        write_version_hint(object_store, base_path, manifest.version).await;
1374
1375        Ok(ManifestLocation {
1376            version: manifest.version,
1377            size: Some(res.size as u64),
1378            naming_scheme,
1379            path,
1380            e_tag: res.e_tag,
1381        })
1382    }
1383}
1384
1385#[async_trait::async_trait]
1386impl<T: CommitLock + Send + Sync> CommitHandler for Arc<T>
1387where
1388    T::Lease: 'static,
1389{
1390    async fn commit(
1391        &self,
1392        manifest: &mut Manifest,
1393        indices: Option<Vec<IndexMetadata>>,
1394        base_path: &Path,
1395        object_store: &ObjectStore,
1396        manifest_writer: ManifestWriter,
1397        naming_scheme: ManifestNamingScheme,
1398        transaction: Option<Transaction>,
1399    ) -> std::result::Result<ManifestLocation, CommitError> {
1400        self.as_ref()
1401            .commit(
1402                manifest,
1403                indices,
1404                base_path,
1405                object_store,
1406                manifest_writer,
1407                naming_scheme,
1408                transaction,
1409            )
1410            .await
1411    }
1412}
1413
1414/// A commit implementation that uses a temporary path and renames the object.
1415///
1416/// This only works for object stores that support atomic rename if not exist.
1417pub struct RenameCommitHandler;
1418
1419#[async_trait::async_trait]
1420impl CommitHandler for RenameCommitHandler {
1421    async fn commit(
1422        &self,
1423        manifest: &mut Manifest,
1424        indices: Option<Vec<IndexMetadata>>,
1425        base_path: &Path,
1426        object_store: &ObjectStore,
1427        manifest_writer: ManifestWriter,
1428        naming_scheme: ManifestNamingScheme,
1429        transaction: Option<Transaction>,
1430    ) -> std::result::Result<ManifestLocation, CommitError> {
1431        // Create a temporary object, then use `rename_if_not_exists` to commit.
1432        // If failed, clean up the temporary object.
1433
1434        let path = naming_scheme.manifest_path(base_path, manifest.version);
1435        let tmp_path = make_staging_manifest_path(&path)?;
1436
1437        let res = manifest_writer(object_store, manifest, indices, &tmp_path, transaction).await?;
1438
1439        match object_store
1440            .inner
1441            .rename_if_not_exists(&tmp_path, &path)
1442            .await
1443        {
1444            Ok(_) => {
1445                // Successfully committed
1446                write_version_hint(object_store, base_path, manifest.version).await;
1447                Ok(ManifestLocation {
1448                    version: manifest.version,
1449                    path,
1450                    size: Some(res.size as u64),
1451                    naming_scheme,
1452                    e_tag: None, // Re-name can change e-tag.
1453                })
1454            }
1455            Err(ObjectStoreError::AlreadyExists { .. }) => {
1456                // Another transaction has already been committed
1457                // Attempt to clean up temporary object, but ignore errors if we can't
1458                let _ = object_store.delete(&tmp_path).await;
1459
1460                return Err(CommitError::CommitConflict);
1461            }
1462            Err(e) => {
1463                // Something else went wrong
1464                return Err(CommitError::OtherError(e.into()));
1465            }
1466        }
1467    }
1468}
1469
1470impl Debug for RenameCommitHandler {
1471    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1472        f.debug_struct("RenameCommitHandler").finish()
1473    }
1474}
1475
1476pub struct ConditionalPutCommitHandler;
1477
1478#[async_trait::async_trait]
1479impl CommitHandler for ConditionalPutCommitHandler {
1480    async fn commit(
1481        &self,
1482        manifest: &mut Manifest,
1483        indices: Option<Vec<IndexMetadata>>,
1484        base_path: &Path,
1485        object_store: &ObjectStore,
1486        manifest_writer: ManifestWriter,
1487        naming_scheme: ManifestNamingScheme,
1488        transaction: Option<Transaction>,
1489    ) -> std::result::Result<ManifestLocation, CommitError> {
1490        let path = naming_scheme.manifest_path(base_path, manifest.version);
1491
1492        let memory_store = ObjectStore::memory();
1493        let dummy_path = "dummy";
1494        manifest_writer(
1495            &memory_store,
1496            manifest,
1497            indices,
1498            &dummy_path.into(),
1499            transaction,
1500        )
1501        .await?;
1502        let dummy_data = memory_store.read_one_all(&dummy_path.into()).await?;
1503        let size = dummy_data.len() as u64;
1504        let res = object_store
1505            .inner
1506            .put_opts(
1507                &path,
1508                dummy_data.into(),
1509                PutOptions {
1510                    mode: object_store::PutMode::Create,
1511                    ..Default::default()
1512                },
1513            )
1514            .await
1515            .map_err(|err| match err {
1516                ObjectStoreError::AlreadyExists { .. } | ObjectStoreError::Precondition { .. } => {
1517                    CommitError::CommitConflict
1518                }
1519                _ => CommitError::OtherError(err.into()),
1520            })?;
1521
1522        write_version_hint(object_store, base_path, manifest.version).await;
1523
1524        Ok(ManifestLocation {
1525            version: manifest.version,
1526            path,
1527            size: Some(size),
1528            naming_scheme,
1529            e_tag: res.e_tag,
1530        })
1531    }
1532}
1533
1534impl Debug for ConditionalPutCommitHandler {
1535    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1536        f.debug_struct("ConditionalPutCommitHandler").finish()
1537    }
1538}
1539
1540#[derive(Debug, Clone)]
1541pub struct CommitConfig {
1542    pub num_retries: u32,
1543    pub skip_auto_cleanup: bool,
1544    // TODO: add isolation_level
1545}
1546
1547impl Default for CommitConfig {
1548    fn default() -> Self {
1549        Self {
1550            num_retries: 20,
1551            skip_auto_cleanup: false,
1552        }
1553    }
1554}
1555
1556#[cfg(test)]
1557mod tests {
1558    use std::sync::atomic::AtomicUsize;
1559
1560    use lance_core::utils::tempfile::TempObjDir;
1561
1562    use super::*;
1563
1564    #[test]
1565    fn test_manifest_naming_scheme() {
1566        let v1 = ManifestNamingScheme::V1;
1567        let v2 = ManifestNamingScheme::V2;
1568
1569        assert_eq!(
1570            v1.manifest_path(&Path::from("base"), 0),
1571            Path::from("base/_versions/0.manifest")
1572        );
1573        assert_eq!(
1574            v1.manifest_path(&Path::from("base"), 42),
1575            Path::from("base/_versions/42.manifest")
1576        );
1577
1578        assert_eq!(
1579            v2.manifest_path(&Path::from("base"), 0),
1580            Path::from("base/_versions/18446744073709551615.manifest")
1581        );
1582        assert_eq!(
1583            v2.manifest_path(&Path::from("base"), 42),
1584            Path::from("base/_versions/18446744073709551573.manifest")
1585        );
1586
1587        assert_eq!(v1.parse_version("0.manifest"), Some(0));
1588        assert_eq!(v1.parse_version("42.manifest"), Some(42));
1589        assert_eq!(
1590            v1.parse_version("42.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
1591            Some(42)
1592        );
1593
1594        assert_eq!(v2.parse_version("18446744073709551615.manifest"), Some(0));
1595        assert_eq!(v2.parse_version("18446744073709551573.manifest"), Some(42));
1596        assert_eq!(
1597            v2.parse_version("18446744073709551573.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
1598            Some(42)
1599        );
1600
1601        assert_eq!(ManifestNamingScheme::detect_scheme("0.manifest"), Some(v1));
1602        assert_eq!(
1603            ManifestNamingScheme::detect_scheme("18446744073709551615.manifest"),
1604            Some(v2)
1605        );
1606        assert_eq!(ManifestNamingScheme::detect_scheme("something else"), None);
1607    }
1608
1609    #[tokio::test]
1610    async fn test_manifest_naming_migration() {
1611        let object_store = ObjectStore::memory();
1612        let base = Path::from("base");
1613        let versions_dir = base.clone().join(VERSIONS_DIR);
1614
1615        // Write two v1 files and one v1
1616        let original_files = vec![
1617            versions_dir.clone().join("irrelevant"),
1618            ManifestNamingScheme::V1.manifest_path(&base, 0),
1619            ManifestNamingScheme::V2.manifest_path(&base, 1),
1620        ];
1621        for path in original_files {
1622            object_store.put(&path, b"".as_slice()).await.unwrap();
1623        }
1624
1625        migrate_scheme_to_v2(&object_store, &base).await.unwrap();
1626
1627        let expected_files = vec![
1628            ManifestNamingScheme::V2.manifest_path(&base, 1),
1629            ManifestNamingScheme::V2.manifest_path(&base, 0),
1630            versions_dir.clone().join("irrelevant"),
1631        ];
1632        let actual_files = object_store
1633            .inner
1634            .list(Some(&versions_dir))
1635            .map_ok(|res| res.location)
1636            .try_collect::<Vec<_>>()
1637            .await
1638            .unwrap();
1639        assert_eq!(actual_files, expected_files);
1640    }
1641
1642    #[tokio::test]
1643    #[rstest::rstest]
1644    async fn test_list_manifests_sorted(
1645        #[values(true, false)] lexical_list_store: bool,
1646        #[values(ManifestNamingScheme::V1, ManifestNamingScheme::V2)]
1647        naming_scheme: ManifestNamingScheme,
1648    ) {
1649        let tempdir;
1650        let (object_store, base) = if lexical_list_store {
1651            (Box::new(ObjectStore::memory()), Path::from("base"))
1652        } else {
1653            tempdir = TempObjDir::default();
1654            let path = tempdir.clone().join("base");
1655            let store = Box::new(ObjectStore::local());
1656            assert!(!store.list_is_lexically_ordered);
1657            (store, path)
1658        };
1659
1660        // Write 12 manifest files, latest first
1661        let mut expected_paths = Vec::new();
1662        for i in (0..12).rev() {
1663            let path = naming_scheme.manifest_path(&base, i);
1664            object_store.put(&path, b"".as_slice()).await.unwrap();
1665            expected_paths.push(path);
1666        }
1667
1668        let actual_versions = ConditionalPutCommitHandler
1669            .list_manifest_locations(&base, &object_store, true)
1670            .map_ok(|location| location.path)
1671            .try_collect::<Vec<_>>()
1672            .await
1673            .unwrap();
1674
1675        assert_eq!(actual_versions, expected_paths);
1676    }
1677
1678    #[tokio::test]
1679    #[rstest::rstest]
1680    async fn test_current_manifest_path(
1681        #[values(true, false)] lexical_list_store: bool,
1682        #[values(ManifestNamingScheme::V1, ManifestNamingScheme::V2)]
1683        naming_scheme: ManifestNamingScheme,
1684    ) {
1685        // Use memory store for both cases to avoid local FS special codepath.
1686        // Modify list_is_lexically_ordered to simulate different object stores.
1687        let mut object_store = ObjectStore::memory();
1688        object_store.list_is_lexically_ordered = lexical_list_store;
1689        let object_store = Box::new(object_store);
1690        let base = Path::from("base");
1691
1692        // Write 12 manifest files in non-sequential order
1693        for version in [5, 2, 11, 0, 8, 3, 10, 1, 7, 4, 9, 6] {
1694            let path = naming_scheme.manifest_path(&base, version);
1695            object_store.put(&path, b"".as_slice()).await.unwrap();
1696        }
1697
1698        let location = current_manifest_path(&object_store, &base).await.unwrap();
1699
1700        assert_eq!(location.version, 11);
1701        assert_eq!(location.naming_scheme, naming_scheme);
1702        assert_eq!(location.path, naming_scheme.manifest_path(&base, 11));
1703    }
1704
1705    /// A memory store that reports `list_is_lexically_ordered == false`, like
1706    /// S3 Express, so the version-hint paths are exercised.
1707    fn non_lexical_memory_store() -> Box<ObjectStore> {
1708        let mut object_store = ObjectStore::memory();
1709        object_store.list_is_lexically_ordered = false;
1710        Box::new(object_store)
1711    }
1712
1713    #[tokio::test]
1714    async fn test_write_version_hint() {
1715        let base = Path::from("base");
1716
1717        // No hint is written on lexically-ordered stores (it would not be read).
1718        let lexical = ObjectStore::memory();
1719        write_version_hint(&lexical, &base, 42).await;
1720        assert_eq!(read_version_from_hint(&lexical, &base).await, None);
1721
1722        let object_store = non_lexical_memory_store();
1723        write_version_hint(&object_store, &base, 42).await;
1724        assert_eq!(read_version_from_hint(&object_store, &base).await, Some(42));
1725
1726        // A later commit overwrites the hint.
1727        write_version_hint(&object_store, &base, 100).await;
1728        assert_eq!(
1729            read_version_from_hint(&object_store, &base).await,
1730            Some(100)
1731        );
1732
1733        // Detached versions are never written to the hint.
1734        write_version_hint(
1735            &object_store,
1736            &base,
1737            crate::format::DETACHED_VERSION_MASK | 7,
1738        )
1739        .await;
1740        assert_eq!(
1741            read_version_from_hint(&object_store, &base).await,
1742            Some(100)
1743        );
1744
1745        // A corrupt / non-JSON hint file is treated as missing.
1746        let hint_path = version_hint_path(&base);
1747        object_store
1748            .put(&hint_path, b"not json".as_slice())
1749            .await
1750            .unwrap();
1751        assert_eq!(read_version_from_hint(&object_store, &base).await, None);
1752    }
1753
1754    #[tokio::test]
1755    #[rstest::rstest]
1756    async fn test_read_version_hint_and_probe(
1757        #[values(ManifestNamingScheme::V1, ManifestNamingScheme::V2)]
1758        naming_scheme: ManifestNamingScheme,
1759    ) {
1760        let object_store = non_lexical_memory_store();
1761        let base = Path::from("base");
1762
1763        // No hint file yet.
1764        assert!(
1765            read_version_hint_and_probe(&object_store, &base)
1766                .await
1767                .is_none()
1768        );
1769
1770        for version in 1..=5 {
1771            object_store
1772                .put(&naming_scheme.manifest_path(&base, version), b"".as_slice())
1773                .await
1774                .unwrap();
1775        }
1776
1777        // Stale hint: should probe forward and find version 5.
1778        write_version_hint(&object_store, &base, 3).await;
1779        let location = read_version_hint_and_probe(&object_store, &base)
1780            .await
1781            .unwrap();
1782        assert_eq!(location.version, 5);
1783        assert_eq!(location.naming_scheme, naming_scheme);
1784
1785        // Up-to-date hint: returns version 5 directly.
1786        write_version_hint(&object_store, &base, 5).await;
1787        let location = read_version_hint_and_probe(&object_store, &base)
1788            .await
1789            .unwrap();
1790        assert_eq!(location.version, 5);
1791
1792        // Hint points past the latest version: not usable.
1793        write_version_hint(&object_store, &base, 10).await;
1794        assert!(
1795            read_version_hint_and_probe(&object_store, &base)
1796                .await
1797                .is_none()
1798        );
1799    }
1800
1801    #[tokio::test]
1802    async fn test_list_manifests_since_version_with_hint() {
1803        let object_store = non_lexical_memory_store();
1804        let base = Path::from("base");
1805        let scheme = ManifestNamingScheme::V2;
1806
1807        for version in 1..=10 {
1808            object_store
1809                .put(&scheme.manifest_path(&base, version), b"".as_slice())
1810                .await
1811                .unwrap();
1812        }
1813
1814        // No hint yet -> not usable, caller must fall back.
1815        assert!(
1816            list_manifests_since_version_with_hint(&object_store, &base, 7)
1817                .await
1818                .is_none()
1819        );
1820
1821        // Hint exactly at the read version -> fast path, nothing new.
1822        write_version_hint(&object_store, &base, 10).await;
1823        assert!(matches!(
1824            list_manifests_since_version_with_hint(&object_store, &base, 10).await,
1825            Some(v) if v.is_empty()
1826        ));
1827
1828        // Hint ahead of the read version, with a gap to fill (8, 9) plus probing
1829        // from the hint (10). Results are descending by version.
1830        let locations = list_manifests_since_version_with_hint(&object_store, &base, 7)
1831            .await
1832            .unwrap();
1833        assert_eq!(
1834            locations.iter().map(|l| l.version).collect::<Vec<_>>(),
1835            vec![10, 9, 8]
1836        );
1837
1838        // Slightly stale hint (points at 8) still probes up to the true latest.
1839        write_version_hint(&object_store, &base, 8).await;
1840        let locations = list_manifests_since_version_with_hint(&object_store, &base, 7)
1841            .await
1842            .unwrap();
1843        assert_eq!(
1844            locations.iter().map(|l| l.version).collect::<Vec<_>>(),
1845            vec![10, 9, 8]
1846        );
1847
1848        // Hint points past the latest -> not usable, caller falls back.
1849        write_version_hint(&object_store, &base, 20).await;
1850        assert!(
1851            list_manifests_since_version_with_hint(&object_store, &base, 7)
1852                .await
1853                .is_none()
1854        );
1855    }
1856
1857    #[tokio::test]
1858    async fn test_current_manifest_path_with_hint_non_lexical() {
1859        // Simulate S3 Express (non-lexically ordered list) with many versions.
1860        let object_store = non_lexical_memory_store();
1861        let base = Path::from("base");
1862        let naming_scheme = ManifestNamingScheme::V2;
1863
1864        for version in 1..=100 {
1865            object_store
1866                .put(&naming_scheme.manifest_path(&base, version), b"".as_slice())
1867                .await
1868                .unwrap();
1869        }
1870
1871        // Slightly stale hint: probing from 98 still resolves the true latest.
1872        write_version_hint(&object_store, &base, 98).await;
1873        let location = current_manifest_path(&object_store, &base).await.unwrap();
1874        assert_eq!(location.version, 100);
1875    }
1876
1877    #[tokio::test]
1878    async fn test_current_manifest_path_with_stale_hint_falls_back_to_listing() {
1879        let object_store = non_lexical_memory_store();
1880        let base = Path::from("base");
1881        let naming_scheme = ManifestNamingScheme::V2;
1882
1883        // Only version 5 exists, but the hint claims version 10.
1884        object_store
1885            .put(&naming_scheme.manifest_path(&base, 5), b"".as_slice())
1886            .await
1887            .unwrap();
1888        write_version_hint(&object_store, &base, 10).await;
1889
1890        // The stale hint is ignored; listing finds version 5.
1891        let location = current_manifest_path(&object_store, &base).await.unwrap();
1892        assert_eq!(location.version, 5);
1893    }
1894
1895    #[test]
1896    fn test_parse_detached_version() {
1897        // Valid detached version filenames
1898        assert_eq!(
1899            ManifestNamingScheme::parse_detached_version("d12345.manifest"),
1900            Some(12345)
1901        );
1902        assert_eq!(
1903            ManifestNamingScheme::parse_detached_version("d9223372036854775808.manifest"),
1904            Some(9223372036854775808)
1905        );
1906
1907        // Invalid: not starting with 'd' prefix
1908        assert_eq!(
1909            ManifestNamingScheme::parse_detached_version("12345.manifest"),
1910            None
1911        );
1912
1913        // Invalid: regular V2 manifest
1914        assert_eq!(
1915            ManifestNamingScheme::parse_detached_version("18446744073709551615.manifest"),
1916            None
1917        );
1918
1919        // Invalid: no extension
1920        assert_eq!(ManifestNamingScheme::parse_detached_version("d12345"), None);
1921    }
1922
1923    #[tokio::test]
1924    async fn test_list_detached_manifests() {
1925        use crate::format::DETACHED_VERSION_MASK;
1926        use futures::TryStreamExt;
1927
1928        let object_store = ObjectStore::memory();
1929        let base = Path::from("base");
1930        let versions_dir = base.clone().join(VERSIONS_DIR);
1931
1932        // Create some regular manifests
1933        for version in [1, 2, 3] {
1934            let path = ManifestNamingScheme::V2.manifest_path(&base, version);
1935            object_store.put(&path, b"".as_slice()).await.unwrap();
1936        }
1937
1938        // Create some detached manifests
1939        let detached_versions: Vec<u64> = vec![
1940            100 | DETACHED_VERSION_MASK,
1941            200 | DETACHED_VERSION_MASK,
1942            300 | DETACHED_VERSION_MASK,
1943        ];
1944        for version in &detached_versions {
1945            let path = versions_dir.clone().join(format!("d{}.manifest", version));
1946            object_store.put(&path, b"".as_slice()).await.unwrap();
1947        }
1948
1949        // List detached manifests
1950        let detached_locations: Vec<ManifestLocation> =
1951            list_detached_manifests(&base, &object_store.inner)
1952                .try_collect()
1953                .await
1954                .unwrap();
1955
1956        assert_eq!(detached_locations.len(), 3);
1957        for loc in &detached_locations {
1958            assert_eq!(loc.naming_scheme, ManifestNamingScheme::V2);
1959        }
1960
1961        let mut found_versions: Vec<u64> = detached_locations.iter().map(|l| l.version).collect();
1962        found_versions.sort();
1963        let mut expected_versions = detached_versions.clone();
1964        expected_versions.sort();
1965        assert_eq!(found_versions, expected_versions);
1966    }
1967
1968    #[tokio::test]
1969    async fn test_commit_handler_from_url_memory_schemes() {
1970        // Both `memory://` and `shared-memory://` must route to
1971        // ConditionalPutCommitHandler — otherwise concurrent writers fall
1972        // through to UnsafeCommitHandler and silently clobber each other's
1973        // manifests.
1974        for url in ["memory://bucket-a/ds", "shared-memory://bucket-a/ds"] {
1975            let handler = commit_handler_from_url(url, &None).await.unwrap();
1976            assert_eq!(
1977                format!("{:?}", handler),
1978                "ConditionalPutCommitHandler",
1979                "{url} should route to ConditionalPutCommitHandler",
1980            );
1981        }
1982    }
1983
1984    /// A [CommitLock] whose lease records whether it was released, so we can
1985    /// assert the lock does not leak when the commit future is cancelled.
1986    #[derive(Debug)]
1987    struct TrackingLock {
1988        released: Arc<AtomicBool>,
1989    }
1990
1991    struct TrackingLease {
1992        released: Arc<AtomicBool>,
1993    }
1994
1995    #[async_trait::async_trait]
1996    impl CommitLock for TrackingLock {
1997        type Lease = TrackingLease;
1998        async fn lock(&self, _version: u64) -> std::result::Result<Self::Lease, CommitError> {
1999            Ok(TrackingLease {
2000                released: self.released.clone(),
2001            })
2002        }
2003    }
2004
2005    #[async_trait::async_trait]
2006    impl CommitLease for TrackingLease {
2007        async fn release(&self, _success: bool) -> std::result::Result<(), CommitError> {
2008            self.released
2009                .store(true, std::sync::atomic::Ordering::SeqCst);
2010            Ok(())
2011        }
2012    }
2013
2014    /// A [CommitLock] whose lease hangs on its first `release` call but completes
2015    /// on subsequent ones, so we can assert the drop-path best-effort release
2016    /// fires when a commit is cancelled *during* the explicit release.
2017    #[derive(Debug)]
2018    struct HangingReleaseLock {
2019        release_calls: Arc<AtomicUsize>,
2020        released: Arc<AtomicBool>,
2021    }
2022
2023    struct HangingReleaseLease {
2024        release_calls: Arc<AtomicUsize>,
2025        released: Arc<AtomicBool>,
2026    }
2027
2028    #[async_trait::async_trait]
2029    impl CommitLock for HangingReleaseLock {
2030        type Lease = HangingReleaseLease;
2031        async fn lock(&self, _version: u64) -> std::result::Result<Self::Lease, CommitError> {
2032            Ok(HangingReleaseLease {
2033                release_calls: self.release_calls.clone(),
2034                released: self.released.clone(),
2035            })
2036        }
2037    }
2038
2039    #[async_trait::async_trait]
2040    impl CommitLease for HangingReleaseLease {
2041        async fn release(&self, _success: bool) -> std::result::Result<(), CommitError> {
2042            // The first release (the explicit one) hangs, simulating a release
2043            // call that stalls long enough for the commit timeout to fire. The
2044            // best-effort release issued from `Drop` is the second call and
2045            // succeeds.
2046            if self
2047                .release_calls
2048                .fetch_add(1, std::sync::atomic::Ordering::SeqCst)
2049                == 0
2050            {
2051                future::pending::<()>().await;
2052                unreachable!()
2053            }
2054            self.released
2055                .store(true, std::sync::atomic::Ordering::SeqCst);
2056            Ok(())
2057        }
2058    }
2059
2060    /// A manifest writer that succeeds immediately, so the commit reaches the
2061    /// explicit lease release.
2062    fn succeeding_manifest_writer<'a>(
2063        _object_store: &'a ObjectStore,
2064        _manifest: &'a mut Manifest,
2065        _indices: Option<Vec<IndexMetadata>>,
2066        _path: &'a Path,
2067        _transaction: Option<Transaction>,
2068    ) -> BoxFuture<'a, Result<WriteResult>> {
2069        Box::pin(async move { Ok(WriteResult::default()) })
2070    }
2071
2072    /// A manifest writer that never completes, simulating a hung object store.
2073    fn hanging_manifest_writer<'a>(
2074        _object_store: &'a ObjectStore,
2075        _manifest: &'a mut Manifest,
2076        _indices: Option<Vec<IndexMetadata>>,
2077        _path: &'a Path,
2078        _transaction: Option<Transaction>,
2079    ) -> BoxFuture<'a, Result<WriteResult>> {
2080        Box::pin(async move {
2081            future::pending::<()>().await;
2082            unreachable!()
2083        })
2084    }
2085
2086    /// Cancelling a commit (as a commit timeout does) while the lock is held must
2087    /// still release the lock; otherwise it leaks until the lease's TTL expires.
2088    #[tokio::test]
2089    async fn test_commit_lock_released_on_cancellation() {
2090        use std::collections::HashMap;
2091        use std::sync::atomic::Ordering;
2092        use std::time::Duration;
2093
2094        use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema};
2095        use lance_core::datatypes::Schema;
2096        use lance_file::version::LanceFileVersion;
2097
2098        use crate::format::DataStorageFormat;
2099
2100        let released = Arc::new(AtomicBool::new(false));
2101        let lock = TrackingLock {
2102            released: released.clone(),
2103        };
2104
2105        let object_store = ObjectStore::memory();
2106        let base_path = Path::from("test");
2107        let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int32, false)]);
2108        let mut manifest = Manifest::new(
2109            Schema::try_from(&arrow_schema).unwrap(),
2110            Arc::new(vec![]),
2111            DataStorageFormat::new(LanceFileVersion::Stable),
2112            HashMap::new(),
2113        );
2114
2115        // The commit will hang on the manifest writer while holding the lock.
2116        // Cancel it the same way a commit timeout would: drop the future.
2117        let commit_fut = lock.commit(
2118            &mut manifest,
2119            None,
2120            &base_path,
2121            &object_store,
2122            hanging_manifest_writer,
2123            ManifestNamingScheme::V2,
2124            None,
2125        );
2126        let timed_out = tokio::time::timeout(Duration::from_millis(50), commit_fut).await;
2127        assert!(timed_out.is_err(), "commit should not have completed");
2128
2129        // The drop guard releases the lock on a background task; wait for it.
2130        for _ in 0..100 {
2131            if released.load(Ordering::SeqCst) {
2132                break;
2133            }
2134            tokio::time::sleep(Duration::from_millis(10)).await;
2135        }
2136        assert!(
2137            released.load(Ordering::SeqCst),
2138            "lock must be released after the commit future is cancelled"
2139        );
2140    }
2141
2142    /// Cancelling a commit *during* the explicit lease release (e.g. the release
2143    /// call itself hangs and the commit timeout fires) must still release the
2144    /// lock via the drop-path best-effort release.
2145    #[tokio::test]
2146    async fn test_commit_lock_released_on_cancellation_during_release() {
2147        use std::collections::HashMap;
2148        use std::sync::atomic::Ordering;
2149        use std::time::Duration;
2150
2151        use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema};
2152        use lance_core::datatypes::Schema;
2153        use lance_file::version::LanceFileVersion;
2154
2155        use crate::format::DataStorageFormat;
2156
2157        let release_calls = Arc::new(AtomicUsize::new(0));
2158        let released = Arc::new(AtomicBool::new(false));
2159        let lock = HangingReleaseLock {
2160            release_calls: release_calls.clone(),
2161            released: released.clone(),
2162        };
2163
2164        let object_store = ObjectStore::memory();
2165        let base_path = Path::from("test");
2166        let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int32, false)]);
2167        let mut manifest = Manifest::new(
2168            Schema::try_from(&arrow_schema).unwrap(),
2169            Arc::new(vec![]),
2170            DataStorageFormat::new(LanceFileVersion::Stable),
2171            HashMap::new(),
2172        );
2173
2174        // The manifest writer succeeds, so the commit reaches the explicit
2175        // release, which hangs. Cancel it the same way a commit timeout would.
2176        let commit_fut = lock.commit(
2177            &mut manifest,
2178            None,
2179            &base_path,
2180            &object_store,
2181            succeeding_manifest_writer,
2182            ManifestNamingScheme::V2,
2183            None,
2184        );
2185        let timed_out = tokio::time::timeout(Duration::from_millis(50), commit_fut).await;
2186        assert!(timed_out.is_err(), "commit should not have completed");
2187
2188        // The drop guard issues a best-effort release on a background task; wait
2189        // for it. This is the second release call (the first one hung).
2190        for _ in 0..100 {
2191            if released.load(Ordering::SeqCst) {
2192                break;
2193            }
2194            tokio::time::sleep(Duration::from_millis(10)).await;
2195        }
2196        assert!(
2197            released.load(Ordering::SeqCst),
2198            "lock must be released even when cancelled during the explicit release"
2199        );
2200        assert_eq!(
2201            release_calls.load(Ordering::SeqCst),
2202            2,
2203            "expected the hung explicit release plus one best-effort drop release"
2204        );
2205    }
2206}