Skip to main content

lance_table/io/commit/
external_manifest.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Trait for external manifest handler.
5//!
6//! This trait abstracts an external storage with put_if_not_exists semantics.
7
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use lance_core::utils::tracing::{
12    AUDIT_MODE_CREATE, AUDIT_MODE_DELETE, AUDIT_TYPE_MANIFEST, TRACE_FILE_AUDIT,
13};
14use lance_core::{Error, Result};
15use lance_io::object_store::ObjectStore;
16use log::warn;
17use object_store::ObjectMeta;
18use object_store::{Error as ObjectStoreError, ObjectStore as OSObjectStore, path::Path};
19use tracing::info;
20
21use super::{
22    MANIFEST_EXTENSION, ManifestLocation, ManifestNamingScheme, current_manifest_path,
23    default_resolve_version, make_staging_manifest_path,
24};
25use crate::format::{IndexMetadata, Manifest, Transaction};
26use crate::io::commit::{CommitError, CommitHandler};
27
28/// External manifest store
29///
30/// This trait abstracts an external storage for source of truth for manifests.
31/// The storage is expected to remember (uri, version) -> manifest_path
32/// and able to run transactions on the manifest_path.
33///
34/// This trait is called an **External** manifest store because the store is
35/// expected to work in tandem with the object store. We are only leveraging
36/// the external store for concurrent commit. Any manifest committed thru this
37/// trait should ultimately be materialized in the object store.
38/// For a visual explanation of the commit loop see
39/// <https://github.com/lance-format/lance/assets/12615154/b0822312-0826-432a-b554-3965f8d48d04>
40#[async_trait]
41pub trait ExternalManifestStore: std::fmt::Debug + Send + Sync {
42    /// Get the manifest path for a given base_uri and version
43    async fn get(&self, base_uri: &str, version: u64) -> Result<String>;
44
45    async fn get_manifest_location(
46        &self,
47        base_uri: &str,
48        version: u64,
49    ) -> Result<ManifestLocation> {
50        let path = self.get(base_uri, version).await?;
51        let path = Path::from(path);
52        let naming_scheme = detect_naming_scheme_from_path(&path)?;
53        Ok(ManifestLocation {
54            version,
55            path,
56            size: None,
57            naming_scheme,
58            e_tag: None,
59        })
60    }
61
62    /// Get the latest version of a dataset at the base_uri, and the path to the manifest.
63    /// The path is provided as an optimization. The path is deterministic based on
64    /// the version and the store should not customize it.
65    async fn get_latest_version(&self, base_uri: &str) -> Result<Option<(u64, String)>>;
66
67    /// Get the latest manifest location for a given base_uri.
68    ///
69    /// By default, this calls get_latest_version.  Impls should
70    /// override this method if they store both the location and size
71    /// of the latest manifest.
72    async fn get_latest_manifest_location(
73        &self,
74        base_uri: &str,
75    ) -> Result<Option<ManifestLocation>> {
76        self.get_latest_version(base_uri).await.and_then(|res| {
77            res.map(|(version, uri)| {
78                let path = Path::from(uri);
79                let naming_scheme = detect_naming_scheme_from_path(&path)?;
80                Ok(ManifestLocation {
81                    version,
82                    path,
83                    size: None,
84                    naming_scheme,
85                    e_tag: None,
86                })
87            })
88            .transpose()
89        })
90    }
91
92    /// Put the manifest to the external store.
93    ///
94    /// The staging manifest has been written to `staging_path` on the object store.
95    /// This method should atomically claim the version and return the final manifest location.
96    ///
97    /// The default implementation uses put_if_not_exists and put_if_exists to
98    /// implement a staging-based workflow. Implementations that can write directly
99    /// (e.g., namespace-backed stores) should override this method.
100    #[allow(clippy::too_many_arguments)]
101    async fn put(
102        &self,
103        base_path: &Path,
104        version: u64,
105        staging_path: &Path,
106        size: u64,
107        e_tag: Option<String>,
108        object_store: &dyn OSObjectStore,
109        naming_scheme: ManifestNamingScheme,
110    ) -> Result<ManifestLocation> {
111        // Default implementation: staging-based workflow
112
113        // Step 1: Record staging path atomically
114        self.put_if_not_exists(
115            base_path.as_ref(),
116            version,
117            staging_path.as_ref(),
118            size,
119            e_tag.clone(),
120        )
121        .await?;
122
123        // Step 2: Copy staging to final path
124        let final_path = naming_scheme.manifest_path(base_path, version);
125        let copied = match object_store.copy(staging_path, &final_path).await {
126            Ok(_) => true,
127            Err(ObjectStoreError::NotFound { .. }) => false,
128            Err(e) => return Err(e.into()),
129        };
130        if copied {
131            info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_MANIFEST, path = final_path.as_ref());
132        }
133
134        // Get final e_tag (may change after copy for large files)
135        let e_tag = if copied && size < 5 * 1024 * 1024 {
136            e_tag
137        } else {
138            let meta = object_store.head(&final_path).await?;
139            meta.e_tag
140        };
141
142        let location = ManifestLocation {
143            version,
144            path: final_path.clone(),
145            size: Some(size),
146            naming_scheme,
147            e_tag: e_tag.clone(),
148        };
149
150        if !copied {
151            return Ok(location);
152        }
153
154        // Step 3: Update external store to final path
155        self.put_if_exists(
156            base_path.as_ref(),
157            version,
158            final_path.as_ref(),
159            size,
160            e_tag,
161        )
162        .await?;
163
164        // Step 4: Delete staging manifest
165        match object_store.delete(staging_path).await {
166            Ok(_) => {}
167            Err(ObjectStoreError::NotFound { .. }) => {}
168            Err(e) => return Err(e.into()),
169        }
170        info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_MANIFEST, path = staging_path.as_ref());
171
172        Ok(location)
173    }
174
175    /// Put the manifest path for a given base_uri and version, should fail if the version already exists
176    async fn put_if_not_exists(
177        &self,
178        base_uri: &str,
179        version: u64,
180        path: &str,
181        size: u64,
182        e_tag: Option<String>,
183    ) -> Result<()>;
184
185    /// Put the manifest path for a given base_uri and version, should fail if the version **does not** already exist
186    async fn put_if_exists(
187        &self,
188        base_uri: &str,
189        version: u64,
190        path: &str,
191        size: u64,
192        e_tag: Option<String>,
193    ) -> Result<()>;
194
195    /// Delete the manifest information for given base_uri from the store
196    async fn delete(&self, _base_uri: &str) -> Result<()> {
197        Ok(())
198    }
199}
200
201pub(crate) fn detect_naming_scheme_from_path(path: &Path) -> Result<ManifestNamingScheme> {
202    path.filename()
203        .and_then(|name| {
204            ManifestNamingScheme::detect_scheme(name)
205                .or_else(|| Some(ManifestNamingScheme::detect_scheme_staging(name)))
206        })
207        .ok_or_else(|| {
208            Error::corrupt_file(
209                path.clone(),
210                "Path does not follow known manifest naming convention.",
211            )
212        })
213}
214
215/// External manifest commit handler
216/// This handler is used to commit a manifest to an external store
217/// for detailed design, see <https://github.com/lance-format/lance/issues/1183>
218#[derive(Debug)]
219pub struct ExternalManifestCommitHandler {
220    pub external_manifest_store: Arc<dyn ExternalManifestStore>,
221}
222
223impl ExternalManifestCommitHandler {
224    /// The manifest is considered committed once the staging manifest is written
225    /// to object store and that path is committed to the external store.
226    ///
227    /// However, to fully complete this, the staging manifest should be materialized
228    /// into the final path, the final path should be committed to the external store
229    /// and the staging manifest should be deleted. These steps may be completed
230    /// by any number of readers or writers, so care should be taken to ensure
231    /// that the manifest is not lost nor any errors occur due to duplicate
232    /// operations.
233    #[allow(clippy::too_many_arguments)]
234    async fn finalize_manifest(
235        &self,
236        base_path: &Path,
237        staging_manifest_path: &Path,
238        version: u64,
239        size: u64,
240        e_tag: Option<String>,
241        store: &dyn OSObjectStore,
242        naming_scheme: ManifestNamingScheme,
243    ) -> std::result::Result<ManifestLocation, Error> {
244        // step 1: copy the manifest to the final location
245        let final_manifest_path = naming_scheme.manifest_path(base_path, version);
246
247        let copied = match store
248            .copy(staging_manifest_path, &final_manifest_path)
249            .await
250        {
251            Ok(_) => true,
252            Err(ObjectStoreError::NotFound { .. }) => false, // Another writer beat us to it.
253            Err(e) => return Err(e.into()),
254        };
255        if copied {
256            info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_MANIFEST, path = final_manifest_path.as_ref());
257        }
258
259        // On S3, the etag can change if originally was MultipartUpload and later was Copy
260        // https://docs.aws.amazon.com/AmazonS3/latest/API/API_Object.html#AmazonS3-Type-Object-ETag
261        // We only do MultipartUpload for > 5MB files, so we can skip this check
262        // if size < 5MB. However, we need to double check the final_manifest_path
263        // exists before we change the external store, otherwise we may point to a
264        // non-existing manifest.
265        let e_tag = if copied && size < 5 * 1024 * 1024 {
266            e_tag
267        } else {
268            let meta = store.head(&final_manifest_path).await?;
269            meta.e_tag
270        };
271
272        let location = ManifestLocation {
273            version,
274            path: final_manifest_path,
275            size: Some(size),
276            naming_scheme,
277            e_tag,
278        };
279
280        if !copied {
281            return Ok(location);
282        }
283
284        // step 2: flip the external store to point to the final location
285        self.external_manifest_store
286            .put_if_exists(
287                base_path.as_ref(),
288                version,
289                location.path.as_ref(),
290                size,
291                location.e_tag.clone(),
292            )
293            .await?;
294
295        // step 3: delete the staging manifest
296        match store.delete(staging_manifest_path).await {
297            Ok(_) => {}
298            Err(ObjectStoreError::NotFound { .. }) => {}
299            Err(e) => return Err(e.into()),
300        }
301        info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_MANIFEST, path = staging_manifest_path.as_ref());
302
303        Ok(location)
304    }
305}
306
307#[async_trait]
308impl CommitHandler for ExternalManifestCommitHandler {
309    async fn resolve_latest_location(
310        &self,
311        base_path: &Path,
312        object_store: &ObjectStore,
313    ) -> std::result::Result<ManifestLocation, Error> {
314        let location = self
315            .external_manifest_store
316            .get_latest_manifest_location(base_path.as_ref())
317            .await?;
318
319        match location {
320            Some(ManifestLocation {
321                version,
322                path,
323                size,
324                naming_scheme,
325                e_tag,
326            }) => {
327                // The path is finalized, no need to check object store
328                if path.extension() == Some(MANIFEST_EXTENSION) {
329                    return Ok(ManifestLocation {
330                        version,
331                        path,
332                        size,
333                        naming_scheme,
334                        e_tag,
335                    });
336                }
337
338                let (size, e_tag) = if let Some(size) = size {
339                    (size, e_tag)
340                } else {
341                    match object_store.inner.head(&path).await {
342                        Ok(meta) => (meta.size, meta.e_tag),
343                        Err(ObjectStoreError::NotFound { .. }) => {
344                            // there may be other threads that have finished executing finalize_manifest.
345                            let new_location = self
346                                .external_manifest_store
347                                .get_manifest_location(base_path.as_ref(), version)
348                                .await?;
349                            return Ok(new_location);
350                        }
351                        Err(e) => return Err(e.into()),
352                    }
353                };
354
355                let final_location = self
356                    .finalize_manifest(
357                        base_path,
358                        &path,
359                        version,
360                        size,
361                        e_tag.clone(),
362                        &object_store.inner,
363                        naming_scheme,
364                    )
365                    .await?;
366
367                Ok(final_location)
368            }
369            // Dataset not found in the external store, this could be because the dataset did not
370            // use external store for commit before. In this case, we search for the latest manifest
371            None => current_manifest_path(object_store, base_path).await,
372        }
373    }
374
375    async fn resolve_version_location(
376        &self,
377        base_path: &Path,
378        version: u64,
379        object_store: &dyn OSObjectStore,
380    ) -> std::result::Result<ManifestLocation, Error> {
381        let location_res = self
382            .external_manifest_store
383            .get_manifest_location(base_path.as_ref(), version)
384            .await;
385
386        let location = match location_res {
387            Ok(p) => p,
388            // not board external manifest yet, direct to object store
389            Err(Error::NotFound { .. }) => {
390                let path = default_resolve_version(base_path, version, object_store)
391                    .await
392                    .map_err(|_| Error::not_found(format!("{}@{}", base_path, version)))?
393                    .path;
394                match object_store.head(&path).await {
395                    Ok(ObjectMeta { size, e_tag, .. }) => {
396                        let res = self
397                            .external_manifest_store
398                            .put_if_not_exists(
399                                base_path.as_ref(),
400                                version,
401                                path.as_ref(),
402                                size,
403                                e_tag.clone(),
404                            )
405                            .await;
406                        if let Err(e) = res {
407                            warn!(
408                                "could not update external manifest store during load, with error: {}",
409                                e
410                            );
411                        }
412                        let naming_scheme =
413                            ManifestNamingScheme::detect_scheme_staging(path.filename().unwrap());
414                        return Ok(ManifestLocation {
415                            version,
416                            path,
417                            size: Some(size),
418                            naming_scheme,
419                            e_tag,
420                        });
421                    }
422                    Err(ObjectStoreError::NotFound { .. }) => {
423                        return Err(Error::not_found(path.to_string()));
424                    }
425                    Err(e) => return Err(e.into()),
426                }
427            }
428            Err(e) => return Err(e),
429        };
430
431        // finalized path, just return
432        if location.path.extension() == Some(MANIFEST_EXTENSION) {
433            return Ok(location);
434        }
435
436        let naming_scheme =
437            ManifestNamingScheme::detect_scheme_staging(location.path.filename().unwrap());
438
439        let (size, e_tag) = if let Some(size) = location.size {
440            (size, location.e_tag.clone())
441        } else {
442            let meta = object_store.head(&location.path).await?;
443            (meta.size as u64, meta.e_tag)
444        };
445
446        self.finalize_manifest(
447            base_path,
448            &location.path,
449            version,
450            size,
451            e_tag,
452            object_store,
453            naming_scheme,
454        )
455        .await
456    }
457
458    async fn commit(
459        &self,
460        manifest: &mut Manifest,
461        indices: Option<Vec<IndexMetadata>>,
462        base_path: &Path,
463        object_store: &ObjectStore,
464        manifest_writer: super::ManifestWriter,
465        naming_scheme: ManifestNamingScheme,
466        transaction: Option<Transaction>,
467    ) -> std::result::Result<ManifestLocation, CommitError> {
468        // path we get here is the path to the manifest we want to write
469        // use object_store.base_path.as_ref() for getting the root of the dataset
470
471        // step 1: Write the manifest we want to commit to object store with a temporary name
472        let path = naming_scheme.manifest_path(base_path, manifest.version);
473        let staging_path = make_staging_manifest_path(&path)?;
474        let write_res =
475            manifest_writer(object_store, manifest, indices, &staging_path, transaction).await?;
476
477        // step 2 & 3: Put the manifest to external store
478        let result = self
479            .external_manifest_store
480            .put(
481                base_path,
482                manifest.version,
483                &staging_path,
484                write_res.size as u64,
485                write_res.e_tag,
486                &object_store.inner,
487                naming_scheme,
488            )
489            .await;
490
491        match result {
492            Ok(location) => Ok(location),
493            Err(_) => {
494                // delete the staging manifest
495                match object_store.inner.delete(&staging_path).await {
496                    Ok(_) => {}
497                    Err(ObjectStoreError::NotFound { .. }) => {}
498                    Err(e) => return Err(CommitError::OtherError(e.into())),
499                }
500                info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_MANIFEST, path = staging_path.as_ref());
501                Err(CommitError::CommitConflict {})
502            }
503        }
504    }
505
506    async fn delete(&self, base_path: &Path) -> Result<()> {
507        self.external_manifest_store
508            .delete(base_path.as_ref())
509            .await
510    }
511}