lance_table/io/commit/
external_manifest.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Trait for external manifest handler.
5//!
6//! This trait abstracts an external storage with put_if_not_exists semantics.
7
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use lance_core::{Error, Result};
12use lance_io::object_store::ObjectStore;
13use log::warn;
14use object_store::ObjectMeta;
15use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore};
16use snafu::location;
17
18use super::{
19    current_manifest_path, default_resolve_version, make_staging_manifest_path, ManifestLocation,
20    ManifestNamingScheme, MANIFEST_EXTENSION,
21};
22use crate::format::{Index, Manifest};
23use crate::io::commit::{CommitError, CommitHandler, ManifestWriter};
24
25/// External manifest store
26///
27/// This trait abstracts an external storage for source of truth for manifests.
28/// The storage is expected to remember (uri, version) -> manifest_path
29/// and able to run transactions on the manifest_path.
30///
31/// This trait is called an **External** manifest store because the store is
32/// expected to work in tandem with the object store. We are only leveraging
33/// the external store for concurrent commit. Any manifest committed thru this
34/// trait should ultimately be materialized in the object store.
35/// For a visual explanation of the commit loop see
36/// https://github.com/lancedb/lance/assets/12615154/b0822312-0826-432a-b554-3965f8d48d04
37#[async_trait]
38pub trait ExternalManifestStore: std::fmt::Debug + Send + Sync {
39    /// Get the manifest path for a given base_uri and version
40    async fn get(&self, base_uri: &str, version: u64) -> Result<String>;
41
42    async fn get_manifest_location(
43        &self,
44        base_uri: &str,
45        version: u64,
46    ) -> Result<ManifestLocation> {
47        let path = self.get(base_uri, version).await?;
48        let path = Path::from(path);
49        let naming_scheme = detect_naming_scheme_from_path(&path)?;
50        Ok(ManifestLocation {
51            version,
52            path,
53            size: None,
54            naming_scheme,
55            e_tag: None,
56        })
57    }
58
59    /// Get the latest version of a dataset at the base_uri, and the path to the manifest.
60    /// The path is provided as an optimization. The path is deterministic based on
61    /// the version and the store should not customize it.
62    async fn get_latest_version(&self, base_uri: &str) -> Result<Option<(u64, String)>>;
63
64    /// Get the latest manifest location for a given base_uri.
65    ///
66    /// By default, this calls get_latest_version.  Impls should
67    /// override this method if they store both the location and size
68    /// of the latest manifest.
69    async fn get_latest_manifest_location(
70        &self,
71        base_uri: &str,
72    ) -> Result<Option<ManifestLocation>> {
73        self.get_latest_version(base_uri).await.and_then(|res| {
74            res.map(|(version, uri)| {
75                let path = Path::from(uri);
76                let naming_scheme = detect_naming_scheme_from_path(&path)?;
77                Ok(ManifestLocation {
78                    version,
79                    path,
80                    size: None,
81                    naming_scheme,
82                    e_tag: None,
83                })
84            })
85            .transpose()
86        })
87    }
88
89    /// Put the manifest path for a given base_uri and version, should fail if the version already exists
90    async fn put_if_not_exists(
91        &self,
92        base_uri: &str,
93        version: u64,
94        path: &str,
95        size: u64,
96        e_tag: Option<String>,
97    ) -> Result<()>;
98
99    /// Put the manifest path for a given base_uri and version, should fail if the version **does not** already exist
100    async fn put_if_exists(
101        &self,
102        base_uri: &str,
103        version: u64,
104        path: &str,
105        size: u64,
106        e_tag: Option<String>,
107    ) -> Result<()>;
108
109    /// Delete the manifest information for given base_uri from the store
110    async fn delete(&self, _base_uri: &str) -> Result<()> {
111        Ok(())
112    }
113}
114
115pub(crate) fn detect_naming_scheme_from_path(path: &Path) -> Result<ManifestNamingScheme> {
116    path.filename()
117        .and_then(|name| {
118            ManifestNamingScheme::detect_scheme(name)
119                .or_else(|| Some(ManifestNamingScheme::detect_scheme_staging(name)))
120        })
121        .ok_or_else(|| {
122            Error::corrupt_file(
123                path.clone(),
124                "Path does not follow known manifest naming convention.",
125                location!(),
126            )
127        })
128}
129
130/// External manifest commit handler
131/// This handler is used to commit a manifest to an external store
132/// for detailed design, see https://github.com/lancedb/lance/issues/1183
133#[derive(Debug)]
134pub struct ExternalManifestCommitHandler {
135    pub external_manifest_store: Arc<dyn ExternalManifestStore>,
136}
137
138impl ExternalManifestCommitHandler {
139    /// The manifest is considered committed once the staging manifest is written
140    /// to object store and that path is committed to the external store.
141    ///
142    /// However, to fully complete this, the staging manifest should be materialized
143    /// into the final path, the final path should be committed to the external store
144    /// and the staging manifest should be deleted. These steps may be completed
145    /// by any number of readers or writers, so care should be taken to ensure
146    /// that the manifest is not lost nor any errors occur due to duplicate
147    /// operations.
148    #[allow(clippy::too_many_arguments)]
149    async fn finalize_manifest(
150        &self,
151        base_path: &Path,
152        staging_manifest_path: &Path,
153        version: u64,
154        size: u64,
155        e_tag: Option<String>,
156        store: &dyn OSObjectStore,
157        naming_scheme: ManifestNamingScheme,
158    ) -> std::result::Result<ManifestLocation, Error> {
159        // step 1: copy the manifest to the final location
160        let final_manifest_path = naming_scheme.manifest_path(base_path, version);
161
162        let copied = match store
163            .copy(staging_manifest_path, &final_manifest_path)
164            .await
165        {
166            Ok(_) => true,
167            Err(ObjectStoreError::NotFound { .. }) => false, // Another writer beat us to it.
168            Err(e) => return Err(e.into()),
169        };
170
171        // On S3, the etag can change if originally was MultipartUpload and later was Copy
172        // https://docs.aws.amazon.com/AmazonS3/latest/API/API_Object.html#AmazonS3-Type-Object-ETag
173        // We only do MultipartUpload for > 5MB files, so we can skip this check
174        // if size < 5MB
175        let e_tag = if size < 5 * 1024 * 1024 {
176            e_tag
177        } else {
178            let meta = store.head(&final_manifest_path).await?;
179            meta.e_tag
180        };
181
182        let location = ManifestLocation {
183            version,
184            path: final_manifest_path,
185            size: Some(size),
186            naming_scheme,
187            e_tag,
188        };
189
190        if !copied {
191            return Ok(location);
192        }
193
194        // step 2: flip the external store to point to the final location
195        self.external_manifest_store
196            .put_if_exists(
197                base_path.as_ref(),
198                version,
199                location.path.as_ref(),
200                size,
201                location.e_tag.clone(),
202            )
203            .await?;
204
205        // step 3: delete the staging manifest
206        match store.delete(staging_manifest_path).await {
207            Ok(_) => {}
208            Err(ObjectStoreError::NotFound { .. }) => {}
209            Err(e) => return Err(e.into()),
210        }
211
212        Ok(location)
213    }
214}
215
216#[async_trait]
217impl CommitHandler for ExternalManifestCommitHandler {
218    async fn resolve_latest_location(
219        &self,
220        base_path: &Path,
221        object_store: &ObjectStore,
222    ) -> std::result::Result<ManifestLocation, Error> {
223        let location = self
224            .external_manifest_store
225            .get_latest_manifest_location(base_path.as_ref())
226            .await?;
227
228        match location {
229            Some(ManifestLocation {
230                version,
231                path,
232                size,
233                naming_scheme,
234                e_tag,
235            }) => {
236                // The path is finalized, no need to check object store
237                if path.extension() == Some(MANIFEST_EXTENSION) {
238                    return Ok(ManifestLocation {
239                        version,
240                        path,
241                        size,
242                        naming_scheme,
243                        e_tag,
244                    });
245                }
246
247                let (size, e_tag) = if let Some(size) = size {
248                    (size, e_tag)
249                } else {
250                    let meta = object_store.inner.head(&path).await?;
251                    (meta.size, meta.e_tag)
252                };
253
254                let final_location = self
255                    .finalize_manifest(
256                        base_path,
257                        &path,
258                        version,
259                        size,
260                        e_tag.clone(),
261                        &object_store.inner,
262                        naming_scheme,
263                    )
264                    .await?;
265
266                Ok(final_location)
267            }
268            // Dataset not found in the external store, this could be because the dataset did not
269            // use external store for commit before. In this case, we search for the latest manifest
270            None => current_manifest_path(object_store, base_path).await,
271        }
272    }
273
274    async fn resolve_version_location(
275        &self,
276        base_path: &Path,
277        version: u64,
278        object_store: &dyn OSObjectStore,
279    ) -> std::result::Result<ManifestLocation, Error> {
280        let location_res = self
281            .external_manifest_store
282            .get_manifest_location(base_path.as_ref(), version)
283            .await;
284
285        let location = match location_res {
286            Ok(p) => p,
287            // not board external manifest yet, direct to object store
288            Err(Error::NotFound { .. }) => {
289                let path = default_resolve_version(base_path, version, object_store)
290                    .await
291                    .map_err(|_| Error::NotFound {
292                        uri: format!("{}@{}", base_path, version),
293                        location: location!(),
294                    })?
295                    .path;
296                match object_store.head(&path).await {
297                    Ok(ObjectMeta { size, e_tag, .. }) => {
298                        let res = self
299                            .external_manifest_store
300                            .put_if_not_exists(
301                                base_path.as_ref(),
302                                version,
303                                path.as_ref(),
304                                size,
305                                e_tag.clone(),
306                            )
307                            .await;
308                        if let Err(e) = res {
309                            warn!(
310                                "could not update external manifest store during load, with error: {}",
311                                e
312                            );
313                        }
314                        let naming_scheme =
315                            ManifestNamingScheme::detect_scheme_staging(path.filename().unwrap());
316                        return Ok(ManifestLocation {
317                            version,
318                            path,
319                            size: Some(size),
320                            naming_scheme,
321                            e_tag,
322                        });
323                    }
324                    Err(ObjectStoreError::NotFound { .. }) => {
325                        return Err(Error::NotFound {
326                            uri: path.to_string(),
327                            location: location!(),
328                        });
329                    }
330                    Err(e) => return Err(e.into()),
331                }
332            }
333            Err(e) => return Err(e),
334        };
335
336        // finalized path, just return
337        if location.path.extension() == Some(MANIFEST_EXTENSION) {
338            return Ok(location);
339        }
340
341        let naming_scheme =
342            ManifestNamingScheme::detect_scheme_staging(location.path.filename().unwrap());
343
344        let (size, e_tag) = if let Some(size) = location.size {
345            (size, location.e_tag.clone())
346        } else {
347            let meta = object_store.head(&location.path).await?;
348            (meta.size as u64, meta.e_tag)
349        };
350
351        self.finalize_manifest(
352            base_path,
353            &location.path,
354            version,
355            size,
356            e_tag,
357            object_store,
358            naming_scheme,
359        )
360        .await
361    }
362
363    async fn commit(
364        &self,
365        manifest: &mut Manifest,
366        indices: Option<Vec<Index>>,
367        base_path: &Path,
368        object_store: &ObjectStore,
369        manifest_writer: ManifestWriter,
370        naming_scheme: ManifestNamingScheme,
371    ) -> std::result::Result<ManifestLocation, CommitError> {
372        // path we get here is the path to the manifest we want to write
373        // use object_store.base_path.as_ref() for getting the root of the dataset
374
375        // step 1: Write the manifest we want to commit to object store with a temporary name
376        let path = naming_scheme.manifest_path(base_path, manifest.version);
377        let staging_path = make_staging_manifest_path(&path)?;
378        let write_res = manifest_writer(object_store, manifest, indices, &staging_path).await?;
379
380        // step 2 & 3: Try to commit this version to external store, return err on failure
381        let res = self
382            .external_manifest_store
383            .put_if_not_exists(
384                base_path.as_ref(),
385                manifest.version,
386                staging_path.as_ref(),
387                write_res.size as u64,
388                write_res.e_tag.clone(),
389            )
390            .await
391            .map_err(|_| CommitError::CommitConflict {});
392
393        if let Err(err) = res {
394            // delete the staging manifest
395            match object_store.inner.delete(&staging_path).await {
396                Ok(_) => {}
397                Err(ObjectStoreError::NotFound { .. }) => {}
398                Err(e) => return Err(CommitError::OtherError(e.into())),
399            }
400            return Err(err);
401        }
402
403        Ok(self
404            .finalize_manifest(
405                base_path,
406                &staging_path,
407                manifest.version,
408                write_res.size as u64,
409                write_res.e_tag,
410                &object_store.inner,
411                naming_scheme,
412            )
413            .await?)
414    }
415
416    async fn delete(&self, base_path: &Path) -> Result<()> {
417        self.external_manifest_store
418            .delete(base_path.as_ref())
419            .await
420    }
421}