lance_table/io/commit/
external_manifest.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Trait for external manifest handler.
5//!
6//! This trait abstracts an external storage with put_if_not_exists semantics.
7
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use lance_core::{Error, Result};
12use lance_io::object_store::ObjectStore;
13use log::warn;
14use object_store::ObjectMeta;
15use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore};
16use snafu::location;
17
18use super::{
19    current_manifest_path, default_resolve_version, make_staging_manifest_path, ManifestLocation,
20    ManifestNamingScheme, MANIFEST_EXTENSION,
21};
22use crate::format::{Index, Manifest};
23use crate::io::commit::{CommitError, CommitHandler, ManifestWriter};
24
25/// External manifest store
26///
27/// This trait abstracts an external storage for source of truth for manifests.
28/// The storage is expected to remember (uri, version) -> manifest_path
29/// and able to run transactions on the manifest_path.
30///
31/// This trait is called an **External** manifest store because the store is
32/// expected to work in tandem with the object store. We are only leveraging
33/// the external store for concurrent commit. Any manifest committed thru this
34/// trait should ultimately be materialized in the object store.
35/// For a visual explanation of the commit loop see
36/// https://github.com/lancedb/lance/assets/12615154/b0822312-0826-432a-b554-3965f8d48d04
37#[async_trait]
38pub trait ExternalManifestStore: std::fmt::Debug + Send + Sync {
39    /// Get the manifest path for a given base_uri and version
40    async fn get(&self, base_uri: &str, version: u64) -> Result<String>;
41
42    async fn get_manifest_location(
43        &self,
44        base_uri: &str,
45        version: u64,
46    ) -> Result<ManifestLocation> {
47        let path = self.get(base_uri, version).await?;
48        let path = Path::from(path);
49        let naming_scheme = detect_naming_scheme_from_path(&path)?;
50        Ok(ManifestLocation {
51            version,
52            path,
53            size: None,
54            naming_scheme,
55        })
56    }
57
58    /// Get the latest version of a dataset at the base_uri, and the path to the manifest.
59    /// The path is provided as an optimization. The path is deterministic based on
60    /// the version and the store should not customize it.
61    async fn get_latest_version(&self, base_uri: &str) -> Result<Option<(u64, String)>>;
62
63    /// Get the latest manifest location for a given base_uri.
64    ///
65    /// By default, this calls get_latest_version.  Impls should
66    /// override this method if they store both the location and size
67    /// of the latest manifest.
68    async fn get_latest_manifest_location(
69        &self,
70        base_uri: &str,
71    ) -> Result<Option<ManifestLocation>> {
72        self.get_latest_version(base_uri).await.and_then(|res| {
73            res.map(|(version, uri)| {
74                let path = Path::from(uri);
75                let naming_scheme = detect_naming_scheme_from_path(&path)?;
76                Ok(ManifestLocation {
77                    version,
78                    path,
79                    size: None,
80                    naming_scheme,
81                })
82            })
83            .transpose()
84        })
85    }
86
87    /// Put the manifest path for a given base_uri and version, should fail if the version already exists
88    async fn put_if_not_exists(
89        &self,
90        base_uri: &str,
91        version: u64,
92        path: &str,
93        size: u64,
94    ) -> Result<()>;
95
96    /// Put the manifest path for a given base_uri and version, should fail if the version **does not** already exist
97    async fn put_if_exists(
98        &self,
99        base_uri: &str,
100        version: u64,
101        path: &str,
102        size: u64,
103    ) -> Result<()>;
104
105    /// Delete the manifest information for given base_uri from the store
106    async fn delete(&self, _base_uri: &str) -> Result<()> {
107        Ok(())
108    }
109}
110
111pub(crate) fn detect_naming_scheme_from_path(path: &Path) -> Result<ManifestNamingScheme> {
112    path.filename()
113        .and_then(|name| {
114            ManifestNamingScheme::detect_scheme(name)
115                .or_else(|| Some(ManifestNamingScheme::detect_scheme_staging(name)))
116        })
117        .ok_or_else(|| {
118            Error::corrupt_file(
119                path.clone(),
120                "Path does not follow known manifest naming convention.",
121                location!(),
122            )
123        })
124}
125
126/// External manifest commit handler
127/// This handler is used to commit a manifest to an external store
128/// for detailed design, see https://github.com/lancedb/lance/issues/1183
129#[derive(Debug)]
130pub struct ExternalManifestCommitHandler {
131    pub external_manifest_store: Arc<dyn ExternalManifestStore>,
132}
133
134impl ExternalManifestCommitHandler {
135    /// The manifest is considered committed once the staging manifest is written
136    /// to object store and that path is committed to the external store.
137    ///
138    /// However, to fully complete this, the staging manifest should be materialized
139    /// into the final path, the final path should be committed to the external store
140    /// and the staging manifest should be deleted. These steps may be completed
141    /// by any number of readers or writers, so care should be taken to ensure
142    /// that the manifest is not lost nor any errors occur due to duplicate
143    /// operations.
144    async fn finalize_manifest(
145        &self,
146        base_path: &Path,
147        staging_manifest_path: &Path,
148        version: u64,
149        size: u64,
150        store: &dyn OSObjectStore,
151        naming_scheme: ManifestNamingScheme,
152    ) -> std::result::Result<Path, Error> {
153        // step 1: copy the manifest to the final location
154        let final_manifest_path = naming_scheme.manifest_path(base_path, version);
155        match store
156            .copy(staging_manifest_path, &final_manifest_path)
157            .await
158        {
159            Ok(_) => {}
160            Err(ObjectStoreError::NotFound { .. }) => return Ok(final_manifest_path), // Another writer beat us to it.
161            Err(e) => return Err(e.into()),
162        };
163
164        // step 2: flip the external store to point to the final location
165        self.external_manifest_store
166            .put_if_exists(
167                base_path.as_ref(),
168                version,
169                final_manifest_path.as_ref(),
170                size,
171            )
172            .await?;
173
174        // step 3: delete the staging manifest
175        match store.delete(staging_manifest_path).await {
176            Ok(_) => {}
177            Err(ObjectStoreError::NotFound { .. }) => {}
178            Err(e) => return Err(e.into()),
179        }
180
181        Ok(final_manifest_path)
182    }
183}
184
185#[async_trait]
186impl CommitHandler for ExternalManifestCommitHandler {
187    async fn resolve_latest_location(
188        &self,
189        base_path: &Path,
190        object_store: &ObjectStore,
191    ) -> std::result::Result<ManifestLocation, Error> {
192        let location = self
193            .external_manifest_store
194            .get_latest_manifest_location(base_path.as_ref())
195            .await?;
196
197        match location {
198            Some(ManifestLocation {
199                version,
200                path,
201                size,
202                naming_scheme,
203            }) => {
204                // The path is finalized, no need to check object store
205                if path.extension() == Some(MANIFEST_EXTENSION) {
206                    return Ok(ManifestLocation {
207                        version,
208                        path,
209                        size,
210                        naming_scheme,
211                    });
212                }
213
214                let size = if let Some(size) = size {
215                    size
216                } else {
217                    object_store.size(&path).await? as u64
218                };
219
220                let final_path = self
221                    .finalize_manifest(
222                        base_path,
223                        &path,
224                        version,
225                        size,
226                        &object_store.inner,
227                        naming_scheme,
228                    )
229                    .await?;
230
231                Ok(ManifestLocation {
232                    version,
233                    path: final_path,
234                    size: Some(size),
235                    naming_scheme,
236                })
237            }
238            // Dataset not found in the external store, this could be because the dataset did not
239            // use external store for commit before. In this case, we search for the latest manifest
240            None => current_manifest_path(object_store, base_path).await,
241        }
242    }
243
244    /// Get the latest version of a dataset at the path
245    async fn resolve_latest_version(
246        &self,
247        base_path: &Path,
248        object_store: &ObjectStore,
249    ) -> std::result::Result<Path, Error> {
250        self.resolve_latest_location(base_path, object_store)
251            .await
252            .map(|l| l.path)
253    }
254
255    async fn resolve_latest_version_id(
256        &self,
257        base_path: &Path,
258        object_store: &ObjectStore,
259    ) -> std::result::Result<u64, Error> {
260        let version = self
261            .external_manifest_store
262            .get_latest_version(base_path.as_ref())
263            .await?;
264
265        match version {
266            Some((version, _)) => Ok(version),
267            None => Ok(current_manifest_path(object_store, base_path)
268                .await?
269                .version),
270        }
271    }
272
273    async fn resolve_version(
274        &self,
275        base_path: &Path,
276        version: u64,
277        object_store: &dyn OSObjectStore,
278    ) -> std::result::Result<Path, Error> {
279        Ok(self
280            .resolve_version_location(base_path, version, object_store)
281            .await?
282            .path)
283    }
284
285    async fn resolve_version_location(
286        &self,
287        base_path: &Path,
288        version: u64,
289        object_store: &dyn OSObjectStore,
290    ) -> std::result::Result<ManifestLocation, Error> {
291        let location_res = self
292            .external_manifest_store
293            .get_manifest_location(base_path.as_ref(), version)
294            .await;
295
296        let location = match location_res {
297            Ok(p) => p,
298            // not board external manifest yet, direct to object store
299            Err(Error::NotFound { .. }) => {
300                let path = default_resolve_version(base_path, version, object_store)
301                    .await
302                    .map_err(|_| Error::NotFound {
303                        uri: format!("{}@{}", base_path, version),
304                        location: location!(),
305                    })?
306                    .path;
307                match object_store.head(&path).await {
308                    Ok(ObjectMeta { size, .. }) => {
309                        let res = self
310                            .external_manifest_store
311                            .put_if_not_exists(
312                                base_path.as_ref(),
313                                version,
314                                path.as_ref(),
315                                size as u64,
316                            )
317                            .await;
318                        if let Err(e) = res {
319                            warn!(
320                                "could not update external manifest store during load, with error: {}",
321                                e
322                            );
323                        }
324                        let naming_scheme =
325                            ManifestNamingScheme::detect_scheme_staging(path.filename().unwrap());
326                        return Ok(ManifestLocation {
327                            version,
328                            path,
329                            size: Some(size as u64),
330                            naming_scheme,
331                        });
332                    }
333                    Err(ObjectStoreError::NotFound { .. }) => {
334                        return Err(Error::NotFound {
335                            uri: path.to_string(),
336                            location: location!(),
337                        });
338                    }
339                    Err(e) => return Err(e.into()),
340                }
341            }
342            Err(e) => return Err(e),
343        };
344
345        // finalized path, just return
346        if location.path.extension() == Some(MANIFEST_EXTENSION) {
347            return Ok(location);
348        }
349
350        let naming_scheme =
351            ManifestNamingScheme::detect_scheme_staging(location.path.filename().unwrap());
352
353        let size = if let Some(size) = location.size {
354            size
355        } else {
356            object_store.head(&location.path).await?.size as u64
357        };
358
359        let new_path = self
360            .finalize_manifest(
361                base_path,
362                &location.path,
363                version,
364                size,
365                object_store,
366                naming_scheme,
367            )
368            .await?;
369
370        Ok(ManifestLocation {
371            path: new_path,
372            ..location
373        })
374    }
375
376    async fn commit(
377        &self,
378        manifest: &mut Manifest,
379        indices: Option<Vec<Index>>,
380        base_path: &Path,
381        object_store: &ObjectStore,
382        manifest_writer: ManifestWriter,
383        naming_scheme: ManifestNamingScheme,
384    ) -> std::result::Result<Path, CommitError> {
385        // path we get here is the path to the manifest we want to write
386        // use object_store.base_path.as_ref() for getting the root of the dataset
387
388        // step 1: Write the manifest we want to commit to object store with a temporary name
389        let path = naming_scheme.manifest_path(base_path, manifest.version);
390        let staging_path = make_staging_manifest_path(&path)?;
391        let size = manifest_writer(object_store, manifest, indices, &staging_path).await?;
392
393        // step 2 & 3: Try to commit this version to external store, return err on failure
394        let res = self
395            .external_manifest_store
396            .put_if_not_exists(
397                base_path.as_ref(),
398                manifest.version,
399                staging_path.as_ref(),
400                size,
401            )
402            .await
403            .map_err(|_| CommitError::CommitConflict {});
404
405        if let Err(err) = res {
406            // delete the staging manifest
407            match object_store.inner.delete(&staging_path).await {
408                Ok(_) => {}
409                Err(ObjectStoreError::NotFound { .. }) => {}
410                Err(e) => return Err(CommitError::OtherError(e.into())),
411            }
412            return Err(err);
413        }
414
415        Ok(self
416            .finalize_manifest(
417                base_path,
418                &staging_path,
419                manifest.version,
420                size,
421                &object_store.inner,
422                naming_scheme,
423            )
424            .await?)
425    }
426
427    async fn delete(&self, base_path: &Path) -> Result<()> {
428        self.external_manifest_store
429            .delete(base_path.as_ref())
430            .await
431    }
432}