lance_table/io/commit/
external_manifest.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Trait for external manifest handler.
5//!
6//! This trait abstracts an external storage with put_if_not_exists semantics.
7
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use lance_core::{Error, Result};
12use lance_io::object_store::{ObjectStore, ObjectStoreExt};
13use log::warn;
14use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore};
15use snafu::location;
16
17use super::{
18    current_manifest_path, default_resolve_version, make_staging_manifest_path, ManifestLocation,
19    ManifestNamingScheme, MANIFEST_EXTENSION,
20};
21use crate::format::{Index, Manifest};
22use crate::io::commit::{CommitError, CommitHandler, ManifestWriter};
23
24/// External manifest store
25///
26/// This trait abstracts an external storage for source of truth for manifests.
27/// The storage is expected to remember (uri, version) -> manifest_path
28/// and able to run transactions on the manifest_path.
29///
30/// This trait is called an **External** manifest store because the store is
31/// expected to work in tandem with the object store. We are only leveraging
32/// the external store for concurrent commit. Any manifest committed thru this
33/// trait should ultimately be materialized in the object store.
34/// For a visual explanation of the commit loop see
35/// https://github.com/lancedb/lance/assets/12615154/b0822312-0826-432a-b554-3965f8d48d04
36#[async_trait]
37pub trait ExternalManifestStore: std::fmt::Debug + Send + Sync {
38    /// Get the manifest path for a given base_uri and version
39    async fn get(&self, base_uri: &str, version: u64) -> Result<String>;
40
41    /// Get the latest version of a dataset at the base_uri, and the path to the manifest.
42    /// The path is provided as an optimization. The path is deterministic based on
43    /// the version and the store should not customize it.
44    async fn get_latest_version(&self, base_uri: &str) -> Result<Option<(u64, String)>>;
45
46    /// Get the latest manifest location for a given base_uri.
47    ///
48    /// By default, this calls get_latest_version.  Impls should
49    /// override this method if they store both the location and size
50    /// of the latest manifest.
51    async fn get_latest_manifest_location(
52        &self,
53        base_uri: &str,
54    ) -> Result<Option<ManifestLocation>> {
55        self.get_latest_version(base_uri).await.and_then(|res| {
56            res.map(|(version, uri)| {
57                let path = Path::from(uri);
58                let naming_scheme = detect_naming_scheme_from_path(&path)?;
59                Ok(ManifestLocation {
60                    version,
61                    path,
62                    size: None,
63                    naming_scheme,
64                })
65            })
66            .transpose()
67        })
68    }
69
70    /// Put the manifest path for a given base_uri and version, should fail if the version already exists
71    async fn put_if_not_exists(&self, base_uri: &str, version: u64, path: &str) -> Result<()>;
72
73    /// Put the manifest path for a given base_uri and version, should fail if the version **does not** already exist
74    async fn put_if_exists(&self, base_uri: &str, version: u64, path: &str) -> Result<()>;
75
76    /// Delete the manifest information for given base_uri from the store
77    async fn delete(&self, _base_uri: &str) -> Result<()> {
78        Ok(())
79    }
80}
81
82fn detect_naming_scheme_from_path(path: &Path) -> Result<ManifestNamingScheme> {
83    path.filename()
84        .and_then(ManifestNamingScheme::detect_scheme)
85        .ok_or_else(|| {
86            Error::corrupt_file(
87                path.clone(),
88                "Path does not follow known manifest naming convention.",
89                location!(),
90            )
91        })
92}
93
94/// External manifest commit handler
95/// This handler is used to commit a manifest to an external store
96/// for detailed design, see https://github.com/lancedb/lance/issues/1183
97#[derive(Debug)]
98pub struct ExternalManifestCommitHandler {
99    pub external_manifest_store: Arc<dyn ExternalManifestStore>,
100}
101
102impl ExternalManifestCommitHandler {
103    /// The manifest is considered committed once the staging manifest is written
104    /// to object store and that path is committed to the external store.
105    ///
106    /// However, to fully complete this, the staging manifest should be materialized
107    /// into the final path, the final path should be committed to the external store
108    /// and the staging manifest should be deleted. These steps may be completed
109    /// by any number of readers or writers, so care should be taken to ensure
110    /// that the manifest is not lost nor any errors occur due to duplicate
111    /// operations.
112    async fn finalize_manifest(
113        &self,
114        base_path: &Path,
115        staging_manifest_path: &Path,
116        version: u64,
117        store: &dyn OSObjectStore,
118        naming_scheme: ManifestNamingScheme,
119    ) -> std::result::Result<Path, Error> {
120        // step 1: copy the manifest to the final location
121        let final_manifest_path = naming_scheme.manifest_path(base_path, version);
122        match store
123            .copy(staging_manifest_path, &final_manifest_path)
124            .await
125        {
126            Ok(_) => {}
127            Err(ObjectStoreError::NotFound { .. }) => return Ok(final_manifest_path), // Another writer beat us to it.
128            Err(e) => return Err(e.into()),
129        };
130
131        // step 2: flip the external store to point to the final location
132        self.external_manifest_store
133            .put_if_exists(base_path.as_ref(), version, final_manifest_path.as_ref())
134            .await?;
135
136        // step 3: delete the staging manifest
137        match store.delete(staging_manifest_path).await {
138            Ok(_) => {}
139            Err(ObjectStoreError::NotFound { .. }) => {}
140            Err(e) => return Err(e.into()),
141        }
142
143        Ok(final_manifest_path)
144    }
145}
146
147#[async_trait]
148impl CommitHandler for ExternalManifestCommitHandler {
149    async fn resolve_latest_location(
150        &self,
151        base_path: &Path,
152        object_store: &ObjectStore,
153    ) -> std::result::Result<ManifestLocation, Error> {
154        let path = self.resolve_latest_version(base_path, object_store).await?;
155        let naming_scheme = detect_naming_scheme_from_path(&path)?;
156        Ok(ManifestLocation {
157            version: self
158                .resolve_latest_version_id(base_path, object_store)
159                .await?,
160            path,
161            size: None,
162            naming_scheme,
163        })
164    }
165
166    /// Get the latest version of a dataset at the path
167    async fn resolve_latest_version(
168        &self,
169        base_path: &Path,
170        object_store: &ObjectStore,
171    ) -> std::result::Result<Path, Error> {
172        let version = self
173            .external_manifest_store
174            .get_latest_version(base_path.as_ref())
175            .await?;
176
177        match version {
178            Some((version, path)) => {
179                // The path is finalized, no need to check object store
180                if path.ends_with(&format!(".{MANIFEST_EXTENSION}")) {
181                    return Ok(Path::parse(path)?);
182                }
183
184                // Detect naming scheme based on presence of zero padding.
185                let staged_path = Path::parse(&path)?;
186                let naming_scheme =
187                    ManifestNamingScheme::detect_scheme_staging(staged_path.filename().unwrap());
188
189                self.finalize_manifest(
190                    base_path,
191                    &staged_path,
192                    version,
193                    &object_store.inner,
194                    naming_scheme,
195                )
196                .await
197            }
198            // Dataset not found in the external store, this could be because the dataset did not
199            // use external store for commit before. In this case, we search for the latest manifest
200            None => Ok(current_manifest_path(object_store, base_path).await?.path),
201        }
202    }
203
204    async fn resolve_latest_version_id(
205        &self,
206        base_path: &Path,
207        object_store: &ObjectStore,
208    ) -> std::result::Result<u64, Error> {
209        let version = self
210            .external_manifest_store
211            .get_latest_version(base_path.as_ref())
212            .await?;
213
214        match version {
215            Some((version, _)) => Ok(version),
216            None => Ok(current_manifest_path(object_store, base_path)
217                .await?
218                .version),
219        }
220    }
221
222    async fn resolve_version(
223        &self,
224        base_path: &Path,
225        version: u64,
226        object_store: &dyn OSObjectStore,
227    ) -> std::result::Result<Path, Error> {
228        let path_res = self
229            .external_manifest_store
230            .get(base_path.as_ref(), version)
231            .await;
232
233        let path = match path_res {
234            Ok(p) => p,
235            // not board external manifest yet, direct to object store
236            Err(Error::NotFound { .. }) => {
237                let path = default_resolve_version(base_path, version, object_store)
238                    .await
239                    .map_err(|_| Error::NotFound {
240                        uri: format!("{}@{}", base_path, version),
241                        location: location!(),
242                    })?
243                    .path;
244                if object_store.exists(&path).await? {
245                    // best effort put, if it fails, it's okay
246                    match self
247                        .external_manifest_store
248                        .put_if_not_exists(base_path.as_ref(), version, path.as_ref())
249                        .await
250                    {
251                        Ok(_) => {}
252                        Err(e) => {
253                            warn!(
254                            "could not update external manifest store during load, with error: {}",
255                            e
256                        );
257                        }
258                    }
259                    return Ok(path);
260                } else {
261                    return Err(Error::NotFound {
262                        uri: path.to_string(),
263                        location: location!(),
264                    });
265                }
266            }
267            Err(e) => return Err(e),
268        };
269
270        // finalized path, just return
271        let current_path = Path::parse(path)?;
272        if current_path.extension() == Some(MANIFEST_EXTENSION) {
273            return Ok(current_path);
274        }
275
276        let naming_scheme =
277            ManifestNamingScheme::detect_scheme_staging(current_path.filename().unwrap());
278
279        self.finalize_manifest(
280            base_path,
281            &Path::parse(&current_path)?,
282            version,
283            object_store,
284            naming_scheme,
285        )
286        .await
287    }
288
289    async fn resolve_version_location(
290        &self,
291        base_path: &Path,
292        version: u64,
293        object_store: &dyn OSObjectStore,
294    ) -> std::result::Result<ManifestLocation, Error> {
295        let path = self
296            .resolve_version(base_path, version, object_store)
297            .await?;
298        let naming_scheme = detect_naming_scheme_from_path(&path)?;
299        Ok(ManifestLocation {
300            version,
301            path,
302            size: None,
303            naming_scheme,
304        })
305    }
306
307    async fn commit(
308        &self,
309        manifest: &mut Manifest,
310        indices: Option<Vec<Index>>,
311        base_path: &Path,
312        object_store: &ObjectStore,
313        manifest_writer: ManifestWriter,
314        naming_scheme: ManifestNamingScheme,
315    ) -> std::result::Result<Path, CommitError> {
316        // path we get here is the path to the manifest we want to write
317        // use object_store.base_path.as_ref() for getting the root of the dataset
318
319        // step 1: Write the manifest we want to commit to object store with a temporary name
320        let path = naming_scheme.manifest_path(base_path, manifest.version);
321        let staging_path = make_staging_manifest_path(&path)?;
322        manifest_writer(object_store, manifest, indices, &staging_path).await?;
323
324        // step 2 & 3: Try to commit this version to external store, return err on failure
325        let res = self
326            .external_manifest_store
327            .put_if_not_exists(base_path.as_ref(), manifest.version, staging_path.as_ref())
328            .await
329            .map_err(|_| CommitError::CommitConflict {});
330
331        if let Err(err) = res {
332            // delete the staging manifest
333            match object_store.inner.delete(&staging_path).await {
334                Ok(_) => {}
335                Err(ObjectStoreError::NotFound { .. }) => {}
336                Err(e) => return Err(CommitError::OtherError(e.into())),
337            }
338            return Err(err);
339        }
340
341        let scheme = detect_naming_scheme_from_path(&path)?;
342
343        Ok(self
344            .finalize_manifest(
345                base_path,
346                &staging_path,
347                manifest.version,
348                &object_store.inner,
349                scheme,
350            )
351            .await?)
352    }
353
354    async fn delete(&self, base_path: &Path) -> Result<()> {
355        self.external_manifest_store
356            .delete(base_path.as_ref())
357            .await
358    }
359}