use std::sync::Arc;
use async_trait::async_trait;
use lance_core::Result;
use lance_namespace::LanceNamespace;
use lance_namespace::models::{
CreateTableVersionRequest, DescribeTableVersionRequest, ListTableVersionsRequest,
};
use lance_table::io::commit::external_manifest::ExternalManifestStore;
use lance_table::io::commit::{ManifestLocation, ManifestNamingScheme};
use object_store::ObjectStore as OSObjectStore;
use object_store::path::Path;
use lance_namespace::error::NamespaceError;
use crate::dataset::branch_location::BranchLocation;
fn is_chain_not_found(e: &lance_core::Error) -> bool {
if let lance_core::Error::Namespace { source, .. } = e
&& let Some(ns_err) = source.downcast_ref::<NamespaceError>()
{
return matches!(
ns_err,
NamespaceError::TableNotFound { .. } | NamespaceError::TableBranchNotFound { .. }
);
}
false
}
#[derive(Debug)]
pub struct LanceNamespaceExternalManifestStore {
namespace_client: Arc<dyn LanceNamespace>,
table_id: Vec<String>,
table_root: Path,
}
impl LanceNamespaceExternalManifestStore {
pub fn new(
namespace_client: Arc<dyn LanceNamespace>,
table_id: Vec<String>,
table_root: Path,
) -> Self {
Self {
namespace_client,
table_id,
table_root,
}
}
pub fn for_table_uri(
namespace_client: Arc<dyn LanceNamespace>,
table_id: Vec<String>,
table_uri: &str,
) -> Result<Self> {
let table_root = lance_io::object_store::ObjectStore::extract_path_from_uri(
Arc::new(lance_io::object_store::ObjectStoreRegistry::default()),
table_uri,
)?;
Ok(Self::new(namespace_client, table_id, table_root))
}
fn branch_for_base(&self, base: &str) -> Result<Option<String>> {
BranchLocation::branch_of(self.table_root.as_ref(), base)
}
}
#[async_trait]
impl ExternalManifestStore for LanceNamespaceExternalManifestStore {
async fn get(&self, base_uri: &str, version: u64) -> Result<String> {
let request = DescribeTableVersionRequest {
id: Some(self.table_id.clone()),
version: Some(version as i64),
branch: self.branch_for_base(base_uri)?,
..Default::default()
};
let response = self
.namespace_client
.describe_table_version(request)
.await?;
Ok(response.version.manifest_path)
}
async fn get_latest_version(&self, base_uri: &str) -> Result<Option<(u64, String)>> {
let request = ListTableVersionsRequest {
id: Some(self.table_id.clone()),
descending: Some(true),
limit: Some(1),
branch: self.branch_for_base(base_uri)?,
..Default::default()
};
let response = match self.namespace_client.list_table_versions(request).await {
Ok(response) => response,
Err(e) if is_chain_not_found(&e) => return Ok(None),
Err(e) => return Err(e),
};
if response.versions.is_empty() {
return Ok(None);
}
let version = &response.versions[0];
Ok(Some((
version.version as u64,
version.manifest_path.clone(),
)))
}
async fn put(
&self,
base_path: &Path,
version: u64,
staging_path: &Path,
size: u64,
e_tag: Option<String>,
_object_store: &dyn OSObjectStore,
naming_scheme: ManifestNamingScheme,
) -> Result<ManifestLocation> {
let naming_scheme_str = match naming_scheme {
ManifestNamingScheme::V1 => "V1",
ManifestNamingScheme::V2 => "V2",
};
let request = CreateTableVersionRequest {
id: Some(self.table_id.clone()),
version: version as i64,
manifest_path: staging_path.to_string(),
manifest_size: Some(size as i64),
e_tag: e_tag.clone(),
naming_scheme: Some(naming_scheme_str.to_string()),
branch: self.branch_for_base(base_path.as_ref())?,
..Default::default()
};
let response = self.namespace_client.create_table_version(request).await?;
let version_info = response.version.ok_or_else(|| {
lance_core::Error::internal(
"create_table_version response missing version info".to_string(),
)
})?;
Ok(ManifestLocation {
version: version_info.version as u64,
path: Path::parse(&version_info.manifest_path).map_err(|e| {
lance_core::Error::invalid_input(format!(
"Invalid manifest path '{}': {}",
version_info.manifest_path, e
))
})?,
size: version_info.manifest_size.map(|s| s as u64),
naming_scheme,
e_tag: version_info.e_tag,
})
}
async fn put_if_not_exists(
&self,
_base_uri: &str,
_version: u64,
_path: &str,
_size: u64,
_e_tag: Option<String>,
) -> Result<()> {
Err(lance_core::Error::not_supported_source(
"put_if_not_exists is not supported for namespace-backed stores".into(),
))
}
async fn put_if_exists(
&self,
_base_uri: &str,
_version: u64,
_path: &str,
_size: u64,
_e_tag: Option<String>,
) -> Result<()> {
Err(lance_core::Error::not_supported_source(
"put_if_exists is not supported for namespace-backed stores".into(),
))
}
}
#[cfg(test)]
mod tests {
use super::*;
use lance_namespace::models::ListTableVersionsResponse;
#[derive(Debug)]
struct FailingNamespace {
error: fn() -> lance_core::Error,
}
#[async_trait]
impl LanceNamespace for FailingNamespace {
fn namespace_id(&self) -> String {
"failing".to_string()
}
async fn list_table_versions(
&self,
_request: ListTableVersionsRequest,
) -> Result<ListTableVersionsResponse> {
Err((self.error)())
}
}
fn store_with(error: fn() -> lance_core::Error) -> LanceNamespaceExternalManifestStore {
LanceNamespaceExternalManifestStore::new(
Arc::new(FailingNamespace { error }),
vec!["t".to_string()],
Path::parse("data/t.lance").unwrap(),
)
}
#[tokio::test]
async fn test_get_latest_version_error_classification() {
use lance_namespace::error::NamespaceError;
let absent = [
store_with(|| {
NamespaceError::TableNotFound {
message: "missing table".to_string(),
}
.into()
}),
store_with(|| {
NamespaceError::TableBranchNotFound {
message: "missing branch".to_string(),
}
.into()
}),
];
for store in absent {
let latest = store.get_latest_version("data/t.lance/tree/dev").await;
assert!(
matches!(latest, Ok(None)),
"a missing chain must read as no latest version, got: {:?}",
latest
);
}
let failures = [
store_with(|| {
NamespaceError::Internal {
message: "server error".to_string(),
}
.into()
}),
store_with(|| {
NamespaceError::Throttling {
message: "slow down".to_string(),
}
.into()
}),
store_with(|| lance_core::Error::io("connection reset".to_string())),
];
for store in failures {
let latest = store.get_latest_version("data/t.lance/tree/dev").await;
assert!(
latest.is_err(),
"a real failure must propagate, got: {:?}",
latest
);
}
}
}