use std::sync::Arc;
use async_trait::async_trait;
use lance_core::{Error, Result};
use lance_io::object_store::{ObjectStore, ObjectStoreExt};
use log::warn;
use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore};
use snafu::{location, Location};
use super::{
current_manifest_path, default_resolve_version, make_staging_manifest_path, ManifestLocation,
ManifestNamingScheme, MANIFEST_EXTENSION,
};
use crate::format::{Index, Manifest};
use crate::io::commit::{CommitError, CommitHandler, ManifestWriter};
#[async_trait]
pub trait ExternalManifestStore: std::fmt::Debug + Send + Sync {
async fn get(&self, base_uri: &str, version: u64) -> Result<String>;
async fn get_latest_version(&self, base_uri: &str) -> Result<Option<(u64, String)>>;
async fn get_latest_manifest_location(
&self,
base_uri: &str,
) -> Result<Option<ManifestLocation>> {
self.get_latest_version(base_uri).await.and_then(|res| {
res.map(|(version, uri)| {
let path = Path::from(uri);
let naming_scheme = detect_naming_scheme_from_path(&path)?;
Ok(ManifestLocation {
version,
path,
size: None,
naming_scheme,
})
})
.transpose()
})
}
async fn put_if_not_exists(&self, base_uri: &str, version: u64, path: &str) -> Result<()>;
async fn put_if_exists(&self, base_uri: &str, version: u64, path: &str) -> Result<()>;
}
fn detect_naming_scheme_from_path(path: &Path) -> Result<ManifestNamingScheme> {
path.filename()
.and_then(ManifestNamingScheme::detect_scheme)
.ok_or_else(|| {
Error::corrupt_file(
path.clone(),
"Path does not follow known manifest naming convention.",
location!(),
)
})
}
#[derive(Debug)]
pub struct ExternalManifestCommitHandler {
pub external_manifest_store: Arc<dyn ExternalManifestStore>,
}
impl ExternalManifestCommitHandler {
async fn finalize_manifest(
&self,
base_path: &Path,
staging_manifest_path: &Path,
version: u64,
store: &dyn OSObjectStore,
naming_scheme: ManifestNamingScheme,
) -> std::result::Result<Path, Error> {
let final_manifest_path = naming_scheme.manifest_path(base_path, version);
match store
.copy(staging_manifest_path, &final_manifest_path)
.await
{
Ok(_) => {}
Err(ObjectStoreError::NotFound { .. }) => return Ok(final_manifest_path), Err(e) => return Err(e.into()),
};
self.external_manifest_store
.put_if_exists(base_path.as_ref(), version, final_manifest_path.as_ref())
.await?;
match store.delete(staging_manifest_path).await {
Ok(_) => {}
Err(ObjectStoreError::NotFound { .. }) => {}
Err(e) => return Err(e.into()),
}
Ok(final_manifest_path)
}
}
#[async_trait]
impl CommitHandler for ExternalManifestCommitHandler {
async fn resolve_latest_location(
&self,
base_path: &Path,
object_store: &ObjectStore,
) -> std::result::Result<ManifestLocation, Error> {
let path = self.resolve_latest_version(base_path, object_store).await?;
let naming_scheme = detect_naming_scheme_from_path(&path)?;
Ok(ManifestLocation {
version: self
.resolve_latest_version_id(base_path, object_store)
.await?,
path,
size: None,
naming_scheme,
})
}
async fn resolve_latest_version(
&self,
base_path: &Path,
object_store: &ObjectStore,
) -> std::result::Result<Path, Error> {
let version = self
.external_manifest_store
.get_latest_version(base_path.as_ref())
.await?;
match version {
Some((version, path)) => {
if path.ends_with(&format!(".{MANIFEST_EXTENSION}")) {
return Ok(Path::parse(path)?);
}
let staged_path = Path::parse(&path)?;
let naming_scheme =
ManifestNamingScheme::detect_scheme_staging(staged_path.filename().unwrap());
self.finalize_manifest(
base_path,
&staged_path,
version,
&object_store.inner,
naming_scheme,
)
.await
}
None => Ok(current_manifest_path(object_store, base_path).await?.path),
}
}
async fn resolve_latest_version_id(
&self,
base_path: &Path,
object_store: &ObjectStore,
) -> std::result::Result<u64, Error> {
let version = self
.external_manifest_store
.get_latest_version(base_path.as_ref())
.await?;
match version {
Some((version, _)) => Ok(version),
None => Ok(current_manifest_path(object_store, base_path)
.await?
.version),
}
}
async fn resolve_version(
&self,
base_path: &Path,
version: u64,
object_store: &dyn OSObjectStore,
) -> std::result::Result<Path, Error> {
let path_res = self
.external_manifest_store
.get(base_path.as_ref(), version)
.await;
let path = match path_res {
Ok(p) => p,
Err(Error::NotFound { .. }) => {
let path = default_resolve_version(base_path, version, object_store)
.await
.map_err(|_| Error::NotFound {
uri: format!("{}@{}", base_path, version),
location: location!(),
})?
.path;
if object_store.exists(&path).await? {
match self
.external_manifest_store
.put_if_not_exists(base_path.as_ref(), version, path.as_ref())
.await
{
Ok(_) => {}
Err(e) => {
warn!(
"could not update external manifest store during load, with error: {}",
e
);
}
}
return Ok(path);
} else {
return Err(Error::NotFound {
uri: path.to_string(),
location: location!(),
});
}
}
Err(e) => return Err(e),
};
let current_path = Path::parse(path)?;
if current_path.extension() == Some(MANIFEST_EXTENSION) {
return Ok(current_path);
}
let naming_scheme =
ManifestNamingScheme::detect_scheme_staging(current_path.filename().unwrap());
self.finalize_manifest(
base_path,
&Path::parse(¤t_path)?,
version,
object_store,
naming_scheme,
)
.await
}
async fn resolve_version_location(
&self,
base_path: &Path,
version: u64,
object_store: &dyn OSObjectStore,
) -> std::result::Result<ManifestLocation, Error> {
let path = self
.resolve_version(base_path, version, object_store)
.await?;
let naming_scheme = detect_naming_scheme_from_path(&path)?;
Ok(ManifestLocation {
version,
path,
size: None,
naming_scheme,
})
}
async fn commit(
&self,
manifest: &mut Manifest,
indices: Option<Vec<Index>>,
base_path: &Path,
object_store: &ObjectStore,
manifest_writer: ManifestWriter,
naming_scheme: ManifestNamingScheme,
) -> std::result::Result<(), CommitError> {
let path = naming_scheme.manifest_path(base_path, manifest.version);
let staging_path = make_staging_manifest_path(&path)?;
manifest_writer(object_store, manifest, indices, &staging_path).await?;
let res = self
.external_manifest_store
.put_if_not_exists(base_path.as_ref(), manifest.version, staging_path.as_ref())
.await
.map_err(|_| CommitError::CommitConflict {});
if res.is_err() {
match object_store.inner.delete(&staging_path).await {
Ok(_) => {}
Err(ObjectStoreError::NotFound { .. }) => {}
Err(e) => return Err(CommitError::OtherError(e.into())),
}
return res;
}
let scheme = detect_naming_scheme_from_path(&path)?;
self.finalize_manifest(
base_path,
&staging_path,
manifest.version,
&object_store.inner,
scheme,
)
.await?;
Ok(())
}
}