Skip to main content

lance_table/io/commit/
external_manifest.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Trait for external manifest handler.
5//!
6//! This trait abstracts an external storage with put_if_not_exists semantics.
7
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use lance_core::utils::tracing::{
12    AUDIT_MODE_CREATE, AUDIT_MODE_DELETE, AUDIT_TYPE_MANIFEST, TRACE_FILE_AUDIT,
13};
14use lance_core::{Error, Result};
15use lance_io::object_store::ObjectStore;
16use log::warn;
17use object_store::ObjectMeta;
18use object_store::ObjectStoreExt;
19use object_store::{Error as ObjectStoreError, ObjectStore as OSObjectStore, path::Path};
20use tracing::info;
21
22use super::{
23    MANIFEST_EXTENSION, ManifestLocation, ManifestNamingScheme, current_manifest_path,
24    default_resolve_version, make_staging_manifest_path, write_version_hint,
25};
26use crate::format::{IndexMetadata, Manifest, Transaction};
27use crate::io::commit::{CommitError, CommitHandler};
28
29/// External manifest store
30///
31/// This trait abstracts an external storage for source of truth for manifests.
32/// The storage is expected to remember (uri, version) -> manifest_path
33/// and able to run transactions on the manifest_path.
34///
35/// This trait is called an **External** manifest store because the store is
36/// expected to work in tandem with the object store. We are only leveraging
37/// the external store for concurrent commit. Any manifest committed thru this
38/// trait should ultimately be materialized in the object store.
39/// For a visual explanation of the commit loop see
40/// <https://github.com/lance-format/lance/assets/12615154/b0822312-0826-432a-b554-3965f8d48d04>
41#[async_trait]
42pub trait ExternalManifestStore: std::fmt::Debug + Send + Sync {
43    /// Get the manifest path for a given base_uri and version
44    async fn get(&self, base_uri: &str, version: u64) -> Result<String>;
45
46    async fn get_manifest_location(
47        &self,
48        base_uri: &str,
49        version: u64,
50    ) -> Result<ManifestLocation> {
51        let path = self.get(base_uri, version).await?;
52        let path = Path::parse(&path).map_err(|e| Error::invalid_input(e.to_string()))?;
53        let naming_scheme = detect_naming_scheme_from_path(&path)?;
54        Ok(ManifestLocation {
55            version,
56            path,
57            size: None,
58            naming_scheme,
59            e_tag: None,
60        })
61    }
62
63    /// Get the latest version of a dataset at the base_uri, and the path to the manifest.
64    /// The path is provided as an optimization. The path is deterministic based on
65    /// the version and the store should not customize it.
66    async fn get_latest_version(&self, base_uri: &str) -> Result<Option<(u64, String)>>;
67
68    /// Get the latest manifest location for a given base_uri.
69    ///
70    /// By default, this calls get_latest_version.  Impls should
71    /// override this method if they store both the location and size
72    /// of the latest manifest.
73    async fn get_latest_manifest_location(
74        &self,
75        base_uri: &str,
76    ) -> Result<Option<ManifestLocation>> {
77        self.get_latest_version(base_uri).await.and_then(|res| {
78            res.map(|(version, uri)| {
79                let path = Path::parse(&uri).map_err(|e| Error::invalid_input(e.to_string()))?;
80                let naming_scheme = detect_naming_scheme_from_path(&path)?;
81                Ok(ManifestLocation {
82                    version,
83                    path,
84                    size: None,
85                    naming_scheme,
86                    e_tag: None,
87                })
88            })
89            .transpose()
90        })
91    }
92
93    /// Put the manifest to the external store.
94    ///
95    /// The staging manifest has been written to `staging_path` on the object store.
96    /// This method should atomically claim the version and return the final manifest location.
97    ///
98    /// The default implementation uses put_if_not_exists and put_if_exists to
99    /// implement a staging-based workflow. Implementations that can write directly
100    /// (e.g., namespace-backed stores) should override this method.
101    #[allow(clippy::too_many_arguments)]
102    async fn put(
103        &self,
104        base_path: &Path,
105        version: u64,
106        staging_path: &Path,
107        size: u64,
108        e_tag: Option<String>,
109        object_store: &dyn OSObjectStore,
110        naming_scheme: ManifestNamingScheme,
111    ) -> Result<ManifestLocation> {
112        // Default implementation: staging-based workflow
113
114        // Step 1: Record staging path atomically
115        self.put_if_not_exists(
116            base_path.as_ref(),
117            version,
118            staging_path.as_ref(),
119            size,
120            e_tag.clone(),
121        )
122        .await?;
123
124        // Step 2: Copy staging to final path
125        let final_path = naming_scheme.manifest_path(base_path, version);
126        let copied = match object_store.copy(staging_path, &final_path).await {
127            Ok(_) => true,
128            Err(ObjectStoreError::NotFound { .. }) => false,
129            Err(e) => return Err(e.into()),
130        };
131        if copied {
132            info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_MANIFEST, path = final_path.as_ref());
133        }
134
135        // Get final e_tag (may change after copy for large files)
136        let e_tag = if copied && size < 5 * 1024 * 1024 {
137            e_tag
138        } else {
139            let meta = object_store.head(&final_path).await?;
140            meta.e_tag
141        };
142
143        let location = ManifestLocation {
144            version,
145            path: final_path.clone(),
146            size: Some(size),
147            naming_scheme,
148            e_tag: e_tag.clone(),
149        };
150
151        if !copied {
152            return Ok(location);
153        }
154
155        // Step 3: Update external store to final path
156        self.put_if_exists(
157            base_path.as_ref(),
158            version,
159            final_path.as_ref(),
160            size,
161            e_tag,
162        )
163        .await?;
164
165        // Step 4: Delete staging manifest
166        match object_store.delete(staging_path).await {
167            Ok(_) => {}
168            Err(ObjectStoreError::NotFound { .. }) => {}
169            Err(e) => return Err(e.into()),
170        }
171        info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_MANIFEST, path = staging_path.as_ref());
172
173        Ok(location)
174    }
175
176    /// Put the manifest path for a given base_uri and version, should fail if the version already exists
177    async fn put_if_not_exists(
178        &self,
179        base_uri: &str,
180        version: u64,
181        path: &str,
182        size: u64,
183        e_tag: Option<String>,
184    ) -> Result<()>;
185
186    /// Put the manifest path for a given base_uri and version, should fail if the version **does not** already exist
187    async fn put_if_exists(
188        &self,
189        base_uri: &str,
190        version: u64,
191        path: &str,
192        size: u64,
193        e_tag: Option<String>,
194    ) -> Result<()>;
195
196    /// Delete the manifest information for given base_uri from the store
197    async fn delete(&self, _base_uri: &str) -> Result<()> {
198        Ok(())
199    }
200}
201
202pub(crate) fn detect_naming_scheme_from_path(path: &Path) -> Result<ManifestNamingScheme> {
203    path.filename()
204        .and_then(|name| {
205            ManifestNamingScheme::detect_scheme(name)
206                .or_else(|| Some(ManifestNamingScheme::detect_scheme_staging(name)))
207        })
208        .ok_or_else(|| {
209            Error::corrupt_file(
210                path.clone(),
211                "Path does not follow known manifest naming convention.",
212            )
213        })
214}
215
216/// External manifest commit handler
217/// This handler is used to commit a manifest to an external store
218/// for detailed design, see <https://github.com/lance-format/lance/issues/1183>
219#[derive(Debug)]
220pub struct ExternalManifestCommitHandler {
221    pub external_manifest_store: Arc<dyn ExternalManifestStore>,
222}
223
224impl ExternalManifestCommitHandler {
225    /// The manifest is considered committed once the staging manifest is written
226    /// to object store and that path is committed to the external store.
227    ///
228    /// However, to fully complete this, the staging manifest should be materialized
229    /// into the final path, the final path should be committed to the external store
230    /// and the staging manifest should be deleted. These steps may be completed
231    /// by any number of readers or writers, so care should be taken to ensure
232    /// that the manifest is not lost nor any errors occur due to duplicate
233    /// operations.
234    #[allow(clippy::too_many_arguments)]
235    async fn finalize_manifest(
236        &self,
237        base_path: &Path,
238        staging_manifest_path: &Path,
239        version: u64,
240        size: u64,
241        e_tag: Option<String>,
242        store: &dyn OSObjectStore,
243        naming_scheme: ManifestNamingScheme,
244    ) -> std::result::Result<ManifestLocation, Error> {
245        // step 1: copy the manifest to the final location
246        let final_manifest_path = naming_scheme.manifest_path(base_path, version);
247
248        let copied = match store
249            .copy(staging_manifest_path, &final_manifest_path)
250            .await
251        {
252            Ok(_) => true,
253            Err(ObjectStoreError::NotFound { .. }) => false, // Another writer beat us to it.
254            Err(e) => return Err(e.into()),
255        };
256        if copied {
257            info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_MANIFEST, path = final_manifest_path.as_ref());
258        }
259
260        // On S3, the etag can change if originally was MultipartUpload and later was Copy
261        // https://docs.aws.amazon.com/AmazonS3/latest/API/API_Object.html#AmazonS3-Type-Object-ETag
262        // We only do MultipartUpload for > 5MB files, so we can skip this check
263        // if size < 5MB. However, we need to double check the final_manifest_path
264        // exists before we change the external store, otherwise we may point to a
265        // non-existing manifest.
266        let e_tag = if copied && size < 5 * 1024 * 1024 {
267            e_tag
268        } else {
269            let meta = store.head(&final_manifest_path).await?;
270            meta.e_tag
271        };
272
273        let location = ManifestLocation {
274            version,
275            path: final_manifest_path,
276            size: Some(size),
277            naming_scheme,
278            e_tag,
279        };
280
281        if !copied {
282            return Ok(location);
283        }
284
285        // step 2: flip the external store to point to the final location
286        self.external_manifest_store
287            .put_if_exists(
288                base_path.as_ref(),
289                version,
290                location.path.as_ref(),
291                size,
292                location.e_tag.clone(),
293            )
294            .await?;
295
296        // step 3: delete the staging manifest
297        match store.delete(staging_manifest_path).await {
298            Ok(_) => {}
299            Err(ObjectStoreError::NotFound { .. }) => {}
300            Err(e) => return Err(e.into()),
301        }
302        info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_MANIFEST, path = staging_manifest_path.as_ref());
303
304        Ok(location)
305    }
306}
307
308#[async_trait]
309impl CommitHandler for ExternalManifestCommitHandler {
310    async fn resolve_latest_location(
311        &self,
312        base_path: &Path,
313        object_store: &ObjectStore,
314    ) -> std::result::Result<ManifestLocation, Error> {
315        let location = self
316            .external_manifest_store
317            .get_latest_manifest_location(base_path.as_ref())
318            .await?;
319
320        match location {
321            Some(ManifestLocation {
322                version,
323                path,
324                size,
325                naming_scheme,
326                e_tag,
327            }) => {
328                // The path is finalized, no need to check object store
329                if path.extension() == Some(MANIFEST_EXTENSION) {
330                    return Ok(ManifestLocation {
331                        version,
332                        path,
333                        size,
334                        naming_scheme,
335                        e_tag,
336                    });
337                }
338
339                let (size, e_tag) = if let Some(size) = size {
340                    (size, e_tag)
341                } else {
342                    match object_store.inner.head(&path).await {
343                        Ok(meta) => (meta.size, meta.e_tag),
344                        Err(ObjectStoreError::NotFound { .. }) => {
345                            // there may be other threads that have finished executing finalize_manifest.
346                            let new_location = self
347                                .external_manifest_store
348                                .get_manifest_location(base_path.as_ref(), version)
349                                .await?;
350                            return Ok(new_location);
351                        }
352                        Err(e) => return Err(e.into()),
353                    }
354                };
355
356                let final_location = self
357                    .finalize_manifest(
358                        base_path,
359                        &path,
360                        version,
361                        size,
362                        e_tag.clone(),
363                        &object_store.inner,
364                        naming_scheme,
365                    )
366                    .await?;
367
368                Ok(final_location)
369            }
370            // Dataset not found in the external store, this could be because the dataset did not
371            // use external store for commit before. In this case, we search for the latest manifest
372            None => current_manifest_path(object_store, base_path).await,
373        }
374    }
375
376    async fn resolve_version_location(
377        &self,
378        base_path: &Path,
379        version: u64,
380        object_store: &dyn OSObjectStore,
381    ) -> std::result::Result<ManifestLocation, Error> {
382        let location_res = self
383            .external_manifest_store
384            .get_manifest_location(base_path.as_ref(), version)
385            .await;
386
387        let location = match location_res {
388            Ok(p) => p,
389            // not board external manifest yet, direct to object store
390            Err(Error::NotFound { .. }) => {
391                let path = default_resolve_version(base_path, version, object_store)
392                    .await
393                    .map_err(|_| Error::not_found(format!("{}@{}", base_path, version)))?
394                    .path;
395                match object_store.head(&path).await {
396                    Ok(ObjectMeta { size, e_tag, .. }) => {
397                        let res = self
398                            .external_manifest_store
399                            .put_if_not_exists(
400                                base_path.as_ref(),
401                                version,
402                                path.as_ref(),
403                                size,
404                                e_tag.clone(),
405                            )
406                            .await;
407                        if let Err(e) = res {
408                            warn!(
409                                "could not update external manifest store during load, with error: {}",
410                                e
411                            );
412                        }
413                        let naming_scheme =
414                            ManifestNamingScheme::detect_scheme_staging(path.filename().unwrap());
415                        return Ok(ManifestLocation {
416                            version,
417                            path,
418                            size: Some(size),
419                            naming_scheme,
420                            e_tag,
421                        });
422                    }
423                    Err(ObjectStoreError::NotFound { .. }) => {
424                        return Err(Error::not_found(path.to_string()));
425                    }
426                    Err(e) => return Err(e.into()),
427                }
428            }
429            Err(e) => return Err(e),
430        };
431
432        // finalized path, just return
433        if location.path.extension() == Some(MANIFEST_EXTENSION) {
434            return Ok(location);
435        }
436
437        let naming_scheme =
438            ManifestNamingScheme::detect_scheme_staging(location.path.filename().unwrap());
439
440        let (size, e_tag) = if let Some(size) = location.size {
441            (size, location.e_tag.clone())
442        } else {
443            let meta = object_store.head(&location.path).await?;
444            (meta.size as u64, meta.e_tag)
445        };
446
447        self.finalize_manifest(
448            base_path,
449            &location.path,
450            version,
451            size,
452            e_tag,
453            object_store,
454            naming_scheme,
455        )
456        .await
457    }
458
459    async fn commit(
460        &self,
461        manifest: &mut Manifest,
462        indices: Option<Vec<IndexMetadata>>,
463        base_path: &Path,
464        object_store: &ObjectStore,
465        manifest_writer: super::ManifestWriter,
466        naming_scheme: ManifestNamingScheme,
467        transaction: Option<Transaction>,
468    ) -> std::result::Result<ManifestLocation, CommitError> {
469        // path we get here is the path to the manifest we want to write
470        // use object_store.base_path.as_ref() for getting the root of the dataset
471
472        // step 1: Write the manifest we want to commit to object store with a temporary name
473        let path = naming_scheme.manifest_path(base_path, manifest.version);
474        let staging_path = make_staging_manifest_path(&path)?;
475        let write_res =
476            manifest_writer(object_store, manifest, indices, &staging_path, transaction).await?;
477
478        // step 2 & 3: Put the manifest to external store
479        let result = self
480            .external_manifest_store
481            .put(
482                base_path,
483                manifest.version,
484                &staging_path,
485                write_res.size as u64,
486                write_res.e_tag,
487                &object_store.inner,
488                naming_scheme,
489            )
490            .await;
491
492        match result {
493            Ok(location) => {
494                write_version_hint(object_store, base_path, manifest.version).await;
495                Ok(location)
496            }
497            Err(_) => {
498                // delete the staging manifest
499                match object_store.inner.delete(&staging_path).await {
500                    Ok(_) => {}
501                    Err(ObjectStoreError::NotFound { .. }) => {}
502                    Err(e) => return Err(CommitError::OtherError(e.into())),
503                }
504                info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_MANIFEST, path = staging_path.as_ref());
505                Err(CommitError::CommitConflict {})
506            }
507        }
508    }
509
510    async fn delete(&self, base_path: &Path) -> Result<()> {
511        self.external_manifest_store
512            .delete(base_path.as_ref())
513            .await
514    }
515}