lance_table/io/commit/
external_manifest.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Trait for external manifest handler.
5//!
6//! This trait abstracts an external storage with put_if_not_exists semantics.
7
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use lance_core::utils::tracing::{
12    AUDIT_MODE_CREATE, AUDIT_MODE_DELETE, AUDIT_TYPE_MANIFEST, TRACE_FILE_AUDIT,
13};
14use lance_core::{Error, Result};
15use lance_io::object_store::ObjectStore;
16use log::warn;
17use object_store::ObjectMeta;
18use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore};
19use snafu::location;
20use tracing::info;
21
22use super::{
23    current_manifest_path, default_resolve_version, make_staging_manifest_path, ManifestLocation,
24    ManifestNamingScheme, MANIFEST_EXTENSION,
25};
26use crate::format::{IndexMetadata, Manifest, Transaction};
27use crate::io::commit::{CommitError, CommitHandler};
28
29/// External manifest store
30///
31/// This trait abstracts an external storage for source of truth for manifests.
32/// The storage is expected to remember (uri, version) -> manifest_path
33/// and able to run transactions on the manifest_path.
34///
35/// This trait is called an **External** manifest store because the store is
36/// expected to work in tandem with the object store. We are only leveraging
37/// the external store for concurrent commit. Any manifest committed thru this
38/// trait should ultimately be materialized in the object store.
39/// For a visual explanation of the commit loop see
40/// https://github.com/lance-format/lance/assets/12615154/b0822312-0826-432a-b554-3965f8d48d04
41#[async_trait]
42pub trait ExternalManifestStore: std::fmt::Debug + Send + Sync {
43    /// Get the manifest path for a given base_uri and version
44    async fn get(&self, base_uri: &str, version: u64) -> Result<String>;
45
46    async fn get_manifest_location(
47        &self,
48        base_uri: &str,
49        version: u64,
50    ) -> Result<ManifestLocation> {
51        let path = self.get(base_uri, version).await?;
52        let path = Path::from(path);
53        let naming_scheme = detect_naming_scheme_from_path(&path)?;
54        Ok(ManifestLocation {
55            version,
56            path,
57            size: None,
58            naming_scheme,
59            e_tag: None,
60        })
61    }
62
63    /// Get the latest version of a dataset at the base_uri, and the path to the manifest.
64    /// The path is provided as an optimization. The path is deterministic based on
65    /// the version and the store should not customize it.
66    async fn get_latest_version(&self, base_uri: &str) -> Result<Option<(u64, String)>>;
67
68    /// Get the latest manifest location for a given base_uri.
69    ///
70    /// By default, this calls get_latest_version.  Impls should
71    /// override this method if they store both the location and size
72    /// of the latest manifest.
73    async fn get_latest_manifest_location(
74        &self,
75        base_uri: &str,
76    ) -> Result<Option<ManifestLocation>> {
77        self.get_latest_version(base_uri).await.and_then(|res| {
78            res.map(|(version, uri)| {
79                let path = Path::from(uri);
80                let naming_scheme = detect_naming_scheme_from_path(&path)?;
81                Ok(ManifestLocation {
82                    version,
83                    path,
84                    size: None,
85                    naming_scheme,
86                    e_tag: None,
87                })
88            })
89            .transpose()
90        })
91    }
92
93    /// Put the manifest path for a given base_uri and version, should fail if the version already exists
94    async fn put_if_not_exists(
95        &self,
96        base_uri: &str,
97        version: u64,
98        path: &str,
99        size: u64,
100        e_tag: Option<String>,
101    ) -> Result<()>;
102
103    /// Put the manifest path for a given base_uri and version, should fail if the version **does not** already exist
104    async fn put_if_exists(
105        &self,
106        base_uri: &str,
107        version: u64,
108        path: &str,
109        size: u64,
110        e_tag: Option<String>,
111    ) -> Result<()>;
112
113    /// Delete the manifest information for given base_uri from the store
114    async fn delete(&self, _base_uri: &str) -> Result<()> {
115        Ok(())
116    }
117}
118
119pub(crate) fn detect_naming_scheme_from_path(path: &Path) -> Result<ManifestNamingScheme> {
120    path.filename()
121        .and_then(|name| {
122            ManifestNamingScheme::detect_scheme(name)
123                .or_else(|| Some(ManifestNamingScheme::detect_scheme_staging(name)))
124        })
125        .ok_or_else(|| {
126            Error::corrupt_file(
127                path.clone(),
128                "Path does not follow known manifest naming convention.",
129                location!(),
130            )
131        })
132}
133
134/// External manifest commit handler
135/// This handler is used to commit a manifest to an external store
136/// for detailed design, see https://github.com/lance-format/lance/issues/1183
137#[derive(Debug)]
138pub struct ExternalManifestCommitHandler {
139    pub external_manifest_store: Arc<dyn ExternalManifestStore>,
140}
141
142impl ExternalManifestCommitHandler {
143    /// The manifest is considered committed once the staging manifest is written
144    /// to object store and that path is committed to the external store.
145    ///
146    /// However, to fully complete this, the staging manifest should be materialized
147    /// into the final path, the final path should be committed to the external store
148    /// and the staging manifest should be deleted. These steps may be completed
149    /// by any number of readers or writers, so care should be taken to ensure
150    /// that the manifest is not lost nor any errors occur due to duplicate
151    /// operations.
152    #[allow(clippy::too_many_arguments)]
153    async fn finalize_manifest(
154        &self,
155        base_path: &Path,
156        staging_manifest_path: &Path,
157        version: u64,
158        size: u64,
159        e_tag: Option<String>,
160        store: &dyn OSObjectStore,
161        naming_scheme: ManifestNamingScheme,
162    ) -> std::result::Result<ManifestLocation, Error> {
163        // step 1: copy the manifest to the final location
164        let final_manifest_path = naming_scheme.manifest_path(base_path, version);
165
166        let copied = match store
167            .copy(staging_manifest_path, &final_manifest_path)
168            .await
169        {
170            Ok(_) => true,
171            Err(ObjectStoreError::NotFound { .. }) => false, // Another writer beat us to it.
172            Err(e) => return Err(e.into()),
173        };
174        if copied {
175            info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_MANIFEST, path = final_manifest_path.as_ref());
176        }
177
178        // On S3, the etag can change if originally was MultipartUpload and later was Copy
179        // https://docs.aws.amazon.com/AmazonS3/latest/API/API_Object.html#AmazonS3-Type-Object-ETag
180        // We only do MultipartUpload for > 5MB files, so we can skip this check
181        // if size < 5MB. However, we need to double check the final_manifest_path
182        // exists before we change the external store, otherwise we may point to a
183        // non-existing manifest.
184        let e_tag = if copied && size < 5 * 1024 * 1024 {
185            e_tag
186        } else {
187            let meta = store.head(&final_manifest_path).await?;
188            meta.e_tag
189        };
190
191        let location = ManifestLocation {
192            version,
193            path: final_manifest_path,
194            size: Some(size),
195            naming_scheme,
196            e_tag,
197        };
198
199        if !copied {
200            return Ok(location);
201        }
202
203        // step 2: flip the external store to point to the final location
204        self.external_manifest_store
205            .put_if_exists(
206                base_path.as_ref(),
207                version,
208                location.path.as_ref(),
209                size,
210                location.e_tag.clone(),
211            )
212            .await?;
213
214        // step 3: delete the staging manifest
215        match store.delete(staging_manifest_path).await {
216            Ok(_) => {}
217            Err(ObjectStoreError::NotFound { .. }) => {}
218            Err(e) => return Err(e.into()),
219        }
220        info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_MANIFEST, path = staging_manifest_path.as_ref());
221
222        Ok(location)
223    }
224}
225
226#[async_trait]
227impl CommitHandler for ExternalManifestCommitHandler {
228    async fn resolve_latest_location(
229        &self,
230        base_path: &Path,
231        object_store: &ObjectStore,
232    ) -> std::result::Result<ManifestLocation, Error> {
233        let location = self
234            .external_manifest_store
235            .get_latest_manifest_location(base_path.as_ref())
236            .await?;
237
238        match location {
239            Some(ManifestLocation {
240                version,
241                path,
242                size,
243                naming_scheme,
244                e_tag,
245            }) => {
246                // The path is finalized, no need to check object store
247                if path.extension() == Some(MANIFEST_EXTENSION) {
248                    return Ok(ManifestLocation {
249                        version,
250                        path,
251                        size,
252                        naming_scheme,
253                        e_tag,
254                    });
255                }
256
257                let (size, e_tag) = if let Some(size) = size {
258                    (size, e_tag)
259                } else {
260                    match object_store.inner.head(&path).await {
261                        Ok(meta) => (meta.size, meta.e_tag),
262                        Err(ObjectStoreError::NotFound { .. }) => {
263                            // there may be other threads that have finished executing finalize_manifest.
264                            let new_location = self
265                                .external_manifest_store
266                                .get_manifest_location(base_path.as_ref(), version)
267                                .await?;
268                            return Ok(new_location);
269                        }
270                        Err(e) => return Err(e.into()),
271                    }
272                };
273
274                let final_location = self
275                    .finalize_manifest(
276                        base_path,
277                        &path,
278                        version,
279                        size,
280                        e_tag.clone(),
281                        &object_store.inner,
282                        naming_scheme,
283                    )
284                    .await?;
285
286                Ok(final_location)
287            }
288            // Dataset not found in the external store, this could be because the dataset did not
289            // use external store for commit before. In this case, we search for the latest manifest
290            None => current_manifest_path(object_store, base_path).await,
291        }
292    }
293
294    async fn resolve_version_location(
295        &self,
296        base_path: &Path,
297        version: u64,
298        object_store: &dyn OSObjectStore,
299    ) -> std::result::Result<ManifestLocation, Error> {
300        let location_res = self
301            .external_manifest_store
302            .get_manifest_location(base_path.as_ref(), version)
303            .await;
304
305        let location = match location_res {
306            Ok(p) => p,
307            // not board external manifest yet, direct to object store
308            Err(Error::NotFound { .. }) => {
309                let path = default_resolve_version(base_path, version, object_store)
310                    .await
311                    .map_err(|_| Error::NotFound {
312                        uri: format!("{}@{}", base_path, version),
313                        location: location!(),
314                    })?
315                    .path;
316                match object_store.head(&path).await {
317                    Ok(ObjectMeta { size, e_tag, .. }) => {
318                        let res = self
319                            .external_manifest_store
320                            .put_if_not_exists(
321                                base_path.as_ref(),
322                                version,
323                                path.as_ref(),
324                                size,
325                                e_tag.clone(),
326                            )
327                            .await;
328                        if let Err(e) = res {
329                            warn!(
330                                "could not update external manifest store during load, with error: {}",
331                                e
332                            );
333                        }
334                        let naming_scheme =
335                            ManifestNamingScheme::detect_scheme_staging(path.filename().unwrap());
336                        return Ok(ManifestLocation {
337                            version,
338                            path,
339                            size: Some(size),
340                            naming_scheme,
341                            e_tag,
342                        });
343                    }
344                    Err(ObjectStoreError::NotFound { .. }) => {
345                        return Err(Error::NotFound {
346                            uri: path.to_string(),
347                            location: location!(),
348                        });
349                    }
350                    Err(e) => return Err(e.into()),
351                }
352            }
353            Err(e) => return Err(e),
354        };
355
356        // finalized path, just return
357        if location.path.extension() == Some(MANIFEST_EXTENSION) {
358            return Ok(location);
359        }
360
361        let naming_scheme =
362            ManifestNamingScheme::detect_scheme_staging(location.path.filename().unwrap());
363
364        let (size, e_tag) = if let Some(size) = location.size {
365            (size, location.e_tag.clone())
366        } else {
367            let meta = object_store.head(&location.path).await?;
368            (meta.size as u64, meta.e_tag)
369        };
370
371        self.finalize_manifest(
372            base_path,
373            &location.path,
374            version,
375            size,
376            e_tag,
377            object_store,
378            naming_scheme,
379        )
380        .await
381    }
382
383    async fn commit(
384        &self,
385        manifest: &mut Manifest,
386        indices: Option<Vec<IndexMetadata>>,
387        base_path: &Path,
388        object_store: &ObjectStore,
389        manifest_writer: super::ManifestWriter,
390        naming_scheme: ManifestNamingScheme,
391        transaction: Option<Transaction>,
392    ) -> std::result::Result<ManifestLocation, CommitError> {
393        // path we get here is the path to the manifest we want to write
394        // use object_store.base_path.as_ref() for getting the root of the dataset
395
396        // step 1: Write the manifest we want to commit to object store with a temporary name
397        let path = naming_scheme.manifest_path(base_path, manifest.version);
398        let staging_path = make_staging_manifest_path(&path)?;
399        let write_res =
400            manifest_writer(object_store, manifest, indices, &staging_path, transaction).await?;
401
402        // step 2 & 3: Try to commit this version to external store, return err on failure
403        let res = self
404            .external_manifest_store
405            .put_if_not_exists(
406                base_path.as_ref(),
407                manifest.version,
408                staging_path.as_ref(),
409                write_res.size as u64,
410                write_res.e_tag.clone(),
411            )
412            .await
413            .map_err(|_| CommitError::CommitConflict {});
414
415        if let Err(err) = res {
416            // delete the staging manifest
417            match object_store.inner.delete(&staging_path).await {
418                Ok(_) => {}
419                Err(ObjectStoreError::NotFound { .. }) => {}
420                Err(e) => return Err(CommitError::OtherError(e.into())),
421            }
422            info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_MANIFEST, path = staging_path.as_ref());
423            return Err(err);
424        }
425
426        Ok(self
427            .finalize_manifest(
428                base_path,
429                &staging_path,
430                manifest.version,
431                write_res.size as u64,
432                write_res.e_tag,
433                &object_store.inner,
434                naming_scheme,
435            )
436            .await?)
437    }
438
439    async fn delete(&self, base_path: &Path) -> Result<()> {
440        self.external_manifest_store
441            .delete(base_path.as_ref())
442            .await
443    }
444}