use std::sync::Arc;
use async_trait::async_trait;
use lance_core::utils::tracing::{
AUDIT_MODE_CREATE, AUDIT_MODE_DELETE, AUDIT_TYPE_MANIFEST, TRACE_FILE_AUDIT,
};
use lance_core::{Error, Result};
use lance_io::object_store::ObjectStore;
use log::warn;
use object_store::ObjectMeta;
use object_store::{Error as ObjectStoreError, ObjectStore as OSObjectStore, path::Path};
use tracing::info;
use super::{
MANIFEST_EXTENSION, ManifestLocation, ManifestNamingScheme, current_manifest_path,
default_resolve_version, make_staging_manifest_path,
};
use crate::format::{IndexMetadata, Manifest, Transaction};
use crate::io::commit::{CommitError, CommitHandler};
#[async_trait]
pub trait ExternalManifestStore: std::fmt::Debug + Send + Sync {
async fn get(&self, base_uri: &str, version: u64) -> Result<String>;
async fn get_manifest_location(
&self,
base_uri: &str,
version: u64,
) -> Result<ManifestLocation> {
let path = self.get(base_uri, version).await?;
let path = Path::from(path);
let naming_scheme = detect_naming_scheme_from_path(&path)?;
Ok(ManifestLocation {
version,
path,
size: None,
naming_scheme,
e_tag: None,
})
}
async fn get_latest_version(&self, base_uri: &str) -> Result<Option<(u64, String)>>;
async fn get_latest_manifest_location(
&self,
base_uri: &str,
) -> Result<Option<ManifestLocation>> {
self.get_latest_version(base_uri).await.and_then(|res| {
res.map(|(version, uri)| {
let path = Path::from(uri);
let naming_scheme = detect_naming_scheme_from_path(&path)?;
Ok(ManifestLocation {
version,
path,
size: None,
naming_scheme,
e_tag: None,
})
})
.transpose()
})
}
#[allow(clippy::too_many_arguments)]
async fn put(
&self,
base_path: &Path,
version: u64,
staging_path: &Path,
size: u64,
e_tag: Option<String>,
object_store: &dyn OSObjectStore,
naming_scheme: ManifestNamingScheme,
) -> Result<ManifestLocation> {
self.put_if_not_exists(
base_path.as_ref(),
version,
staging_path.as_ref(),
size,
e_tag.clone(),
)
.await?;
let final_path = naming_scheme.manifest_path(base_path, version);
let copied = match object_store.copy(staging_path, &final_path).await {
Ok(_) => true,
Err(ObjectStoreError::NotFound { .. }) => false,
Err(e) => return Err(e.into()),
};
if copied {
info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_MANIFEST, path = final_path.as_ref());
}
let e_tag = if copied && size < 5 * 1024 * 1024 {
e_tag
} else {
let meta = object_store.head(&final_path).await?;
meta.e_tag
};
let location = ManifestLocation {
version,
path: final_path.clone(),
size: Some(size),
naming_scheme,
e_tag: e_tag.clone(),
};
if !copied {
return Ok(location);
}
self.put_if_exists(
base_path.as_ref(),
version,
final_path.as_ref(),
size,
e_tag,
)
.await?;
match object_store.delete(staging_path).await {
Ok(_) => {}
Err(ObjectStoreError::NotFound { .. }) => {}
Err(e) => return Err(e.into()),
}
info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_MANIFEST, path = staging_path.as_ref());
Ok(location)
}
async fn put_if_not_exists(
&self,
base_uri: &str,
version: u64,
path: &str,
size: u64,
e_tag: Option<String>,
) -> Result<()>;
async fn put_if_exists(
&self,
base_uri: &str,
version: u64,
path: &str,
size: u64,
e_tag: Option<String>,
) -> Result<()>;
async fn delete(&self, _base_uri: &str) -> Result<()> {
Ok(())
}
}
pub(crate) fn detect_naming_scheme_from_path(path: &Path) -> Result<ManifestNamingScheme> {
path.filename()
.and_then(|name| {
ManifestNamingScheme::detect_scheme(name)
.or_else(|| Some(ManifestNamingScheme::detect_scheme_staging(name)))
})
.ok_or_else(|| {
Error::corrupt_file(
path.clone(),
"Path does not follow known manifest naming convention.",
)
})
}
#[derive(Debug)]
pub struct ExternalManifestCommitHandler {
pub external_manifest_store: Arc<dyn ExternalManifestStore>,
}
impl ExternalManifestCommitHandler {
#[allow(clippy::too_many_arguments)]
async fn finalize_manifest(
&self,
base_path: &Path,
staging_manifest_path: &Path,
version: u64,
size: u64,
e_tag: Option<String>,
store: &dyn OSObjectStore,
naming_scheme: ManifestNamingScheme,
) -> std::result::Result<ManifestLocation, Error> {
let final_manifest_path = naming_scheme.manifest_path(base_path, version);
let copied = match store
.copy(staging_manifest_path, &final_manifest_path)
.await
{
Ok(_) => true,
Err(ObjectStoreError::NotFound { .. }) => false, Err(e) => return Err(e.into()),
};
if copied {
info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_MANIFEST, path = final_manifest_path.as_ref());
}
let e_tag = if copied && size < 5 * 1024 * 1024 {
e_tag
} else {
let meta = store.head(&final_manifest_path).await?;
meta.e_tag
};
let location = ManifestLocation {
version,
path: final_manifest_path,
size: Some(size),
naming_scheme,
e_tag,
};
if !copied {
return Ok(location);
}
self.external_manifest_store
.put_if_exists(
base_path.as_ref(),
version,
location.path.as_ref(),
size,
location.e_tag.clone(),
)
.await?;
match store.delete(staging_manifest_path).await {
Ok(_) => {}
Err(ObjectStoreError::NotFound { .. }) => {}
Err(e) => return Err(e.into()),
}
info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_MANIFEST, path = staging_manifest_path.as_ref());
Ok(location)
}
}
#[async_trait]
impl CommitHandler for ExternalManifestCommitHandler {
async fn resolve_latest_location(
&self,
base_path: &Path,
object_store: &ObjectStore,
) -> std::result::Result<ManifestLocation, Error> {
let location = self
.external_manifest_store
.get_latest_manifest_location(base_path.as_ref())
.await?;
match location {
Some(ManifestLocation {
version,
path,
size,
naming_scheme,
e_tag,
}) => {
if path.extension() == Some(MANIFEST_EXTENSION) {
return Ok(ManifestLocation {
version,
path,
size,
naming_scheme,
e_tag,
});
}
let (size, e_tag) = if let Some(size) = size {
(size, e_tag)
} else {
match object_store.inner.head(&path).await {
Ok(meta) => (meta.size, meta.e_tag),
Err(ObjectStoreError::NotFound { .. }) => {
let new_location = self
.external_manifest_store
.get_manifest_location(base_path.as_ref(), version)
.await?;
return Ok(new_location);
}
Err(e) => return Err(e.into()),
}
};
let final_location = self
.finalize_manifest(
base_path,
&path,
version,
size,
e_tag.clone(),
&object_store.inner,
naming_scheme,
)
.await?;
Ok(final_location)
}
None => current_manifest_path(object_store, base_path).await,
}
}
async fn resolve_version_location(
&self,
base_path: &Path,
version: u64,
object_store: &dyn OSObjectStore,
) -> std::result::Result<ManifestLocation, Error> {
let location_res = self
.external_manifest_store
.get_manifest_location(base_path.as_ref(), version)
.await;
let location = match location_res {
Ok(p) => p,
Err(Error::NotFound { .. }) => {
let path = default_resolve_version(base_path, version, object_store)
.await
.map_err(|_| Error::not_found(format!("{}@{}", base_path, version)))?
.path;
match object_store.head(&path).await {
Ok(ObjectMeta { size, e_tag, .. }) => {
let res = self
.external_manifest_store
.put_if_not_exists(
base_path.as_ref(),
version,
path.as_ref(),
size,
e_tag.clone(),
)
.await;
if let Err(e) = res {
warn!(
"could not update external manifest store during load, with error: {}",
e
);
}
let naming_scheme =
ManifestNamingScheme::detect_scheme_staging(path.filename().unwrap());
return Ok(ManifestLocation {
version,
path,
size: Some(size),
naming_scheme,
e_tag,
});
}
Err(ObjectStoreError::NotFound { .. }) => {
return Err(Error::not_found(path.to_string()));
}
Err(e) => return Err(e.into()),
}
}
Err(e) => return Err(e),
};
if location.path.extension() == Some(MANIFEST_EXTENSION) {
return Ok(location);
}
let naming_scheme =
ManifestNamingScheme::detect_scheme_staging(location.path.filename().unwrap());
let (size, e_tag) = if let Some(size) = location.size {
(size, location.e_tag.clone())
} else {
let meta = object_store.head(&location.path).await?;
(meta.size as u64, meta.e_tag)
};
self.finalize_manifest(
base_path,
&location.path,
version,
size,
e_tag,
object_store,
naming_scheme,
)
.await
}
async fn commit(
&self,
manifest: &mut Manifest,
indices: Option<Vec<IndexMetadata>>,
base_path: &Path,
object_store: &ObjectStore,
manifest_writer: super::ManifestWriter,
naming_scheme: ManifestNamingScheme,
transaction: Option<Transaction>,
) -> std::result::Result<ManifestLocation, CommitError> {
let path = naming_scheme.manifest_path(base_path, manifest.version);
let staging_path = make_staging_manifest_path(&path)?;
let write_res =
manifest_writer(object_store, manifest, indices, &staging_path, transaction).await?;
let result = self
.external_manifest_store
.put(
base_path,
manifest.version,
&staging_path,
write_res.size as u64,
write_res.e_tag,
&object_store.inner,
naming_scheme,
)
.await;
match result {
Ok(location) => Ok(location),
Err(_) => {
match object_store.inner.delete(&staging_path).await {
Ok(_) => {}
Err(ObjectStoreError::NotFound { .. }) => {}
Err(e) => return Err(CommitError::OtherError(e.into())),
}
info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_MANIFEST, path = staging_path.as_ref());
Err(CommitError::CommitConflict {})
}
}
}
async fn delete(&self, base_path: &Path) -> Result<()> {
self.external_manifest_store
.delete(base_path.as_ref())
.await
}
}