#![cfg(feature = "local-catalog")]
use std::collections::HashMap;
use std::fs;
use std::io::ErrorKind;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use async_trait::async_trait;
use iceberg::io::LocalFsStorageFactory;
use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalog, MemoryCatalogBuilder};
use iceberg::table::Table;
use iceberg::{
Catalog, CatalogBuilder, Namespace, NamespaceIdent, Result as IcebergResult, TableCommit,
TableCreation, TableIdent,
};
use krishiv_common::validate::validate_safe_id;
use crate::catalog::LakehouseError;
const VERSION_HINT: &str = "version-hint.text";
const METADATA_DIR: &str = "metadata";
#[derive(Debug)]
pub struct LocalCatalog {
inner: Arc<MemoryCatalog>,
warehouse: PathBuf,
}
impl LocalCatalog {
pub async fn new(warehouse: &Path) -> Result<Self, LakehouseError> {
fs::create_dir_all(warehouse).map_err(|e| LakehouseError::Io(e.to_string()))?;
let warehouse = warehouse
.canonicalize()
.map_err(|e| LakehouseError::Io(e.to_string()))?;
let warehouse_uri = path_to_uri(&warehouse)?;
let inner = MemoryCatalogBuilder::default()
.with_storage_factory(Arc::new(LocalFsStorageFactory))
.load(
"local",
HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_uri)]),
)
.await
.map_err(|e| LakehouseError::Iceberg(e.to_string()))?;
let inner = Arc::new(inner);
let catalog = Self {
inner,
warehouse: warehouse.clone(),
};
catalog.recover_from_disk().await?;
Ok(catalog)
}
pub fn warehouse(&self) -> &Path {
&self.warehouse
}
fn table_location_uri(
&self,
namespace: &NamespaceIdent,
table: &str,
) -> Result<String, LakehouseError> {
let dir = self.table_dir(namespace, table)?;
fs::create_dir_all(&dir).map_err(|e| LakehouseError::Io(e.to_string()))?;
path_to_uri(&dir)
}
fn table_dir(
&self,
namespace: &NamespaceIdent,
table: &str,
) -> Result<PathBuf, LakehouseError> {
validate_namespace(namespace)?;
validate_path_component("table name", table)?;
let mut dir = self.warehouse.clone();
for part in namespace.clone().inner() {
dir.push(part);
}
dir.push(table);
Ok(dir)
}
fn table_metadata_dir(
&self,
namespace: &NamespaceIdent,
table: &str,
) -> Result<PathBuf, LakehouseError> {
let mut dir = self.table_dir(namespace, table)?;
dir.push(METADATA_DIR);
Ok(dir)
}
fn write_version_hint(
&self,
namespace: &NamespaceIdent,
table: &str,
metadata_location: &str,
) -> Result<(), LakehouseError> {
let dir = self.table_metadata_dir(namespace, table)?;
fs::create_dir_all(&dir).map_err(|e| LakehouseError::Io(e.to_string()))?;
fs::write(dir.join(VERSION_HINT), metadata_location)
.map_err(|e| LakehouseError::Io(e.to_string()))
}
async fn recover_from_disk(&self) -> Result<(), LakehouseError> {
let mut discovered: Vec<(NamespaceIdent, String, String)> = Vec::new();
discover_tables(&self.warehouse, &self.warehouse, &mut discovered)?;
for (namespace, table_name, metadata_location) in discovered {
let _ = self
.inner
.create_namespace(&namespace, HashMap::new())
.await;
let ident = TableIdent::new(namespace, table_name);
if !self.inner.table_exists(&ident).await.unwrap_or(false) {
self.inner
.register_table(&ident, metadata_location)
.await
.map_err(|e| LakehouseError::Iceberg(e.to_string()))?;
}
}
Ok(())
}
}
#[async_trait]
impl Catalog for LocalCatalog {
async fn list_namespaces(
&self,
parent: Option<&NamespaceIdent>,
) -> IcebergResult<Vec<NamespaceIdent>> {
self.inner.list_namespaces(parent).await
}
async fn create_namespace(
&self,
namespace: &NamespaceIdent,
properties: HashMap<String, String>,
) -> IcebergResult<Namespace> {
validate_namespace(namespace).map_err(to_iceberg_err)?;
let mut dir = self.warehouse.clone();
for part in namespace.clone().inner() {
dir.push(part);
}
fs::create_dir_all(&dir).map_err(|e| to_iceberg_err(LakehouseError::Io(e.to_string())))?;
self.inner.create_namespace(namespace, properties).await
}
async fn get_namespace(&self, namespace: &NamespaceIdent) -> IcebergResult<Namespace> {
self.inner.get_namespace(namespace).await
}
async fn namespace_exists(&self, namespace: &NamespaceIdent) -> IcebergResult<bool> {
self.inner.namespace_exists(namespace).await
}
async fn update_namespace(
&self,
namespace: &NamespaceIdent,
properties: HashMap<String, String>,
) -> IcebergResult<()> {
self.inner.update_namespace(namespace, properties).await
}
async fn drop_namespace(&self, namespace: &NamespaceIdent) -> IcebergResult<()> {
self.inner.drop_namespace(namespace).await
}
async fn list_tables(&self, namespace: &NamespaceIdent) -> IcebergResult<Vec<TableIdent>> {
self.inner.list_tables(namespace).await
}
async fn create_table(
&self,
namespace: &NamespaceIdent,
creation: TableCreation,
) -> IcebergResult<Table> {
let creation = if creation.location.is_some() {
creation
} else {
let location = self
.table_location_uri(namespace, &creation.name)
.map_err(to_iceberg_err)?;
TableCreation {
location: Some(location),
..creation
}
};
let table_name = creation.name.clone();
let table = self.inner.create_table(namespace, creation).await?;
if let Some(loc) = table.metadata_location() {
self.write_version_hint(namespace, &table_name, loc)
.map_err(to_iceberg_err)?;
}
Ok(table)
}
async fn load_table(&self, table: &TableIdent) -> IcebergResult<Table> {
self.inner.load_table(table).await
}
async fn drop_table(&self, table: &TableIdent) -> IcebergResult<()> {
let dir = self
.table_metadata_dir(table.namespace(), table.name())
.map_err(to_iceberg_err)?;
self.inner.drop_table(table).await?;
remove_file_if_exists(&dir.join(VERSION_HINT)).map_err(to_iceberg_err)?;
Ok(())
}
async fn table_exists(&self, table: &TableIdent) -> IcebergResult<bool> {
self.inner.table_exists(table).await
}
async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> IcebergResult<()> {
let src_hint = self
.table_metadata_dir(src.namespace(), src.name())
.map_err(to_iceberg_err)?
.join(VERSION_HINT);
self.table_metadata_dir(dest.namespace(), dest.name())
.map_err(to_iceberg_err)?;
self.inner.rename_table(src, dest).await?;
let table = self.inner.load_table(dest).await?;
let loc = table.metadata_location().ok_or_else(|| {
iceberg::Error::new(
iceberg::ErrorKind::Unexpected,
format!(
"renamed table {}.{} has no metadata location",
dest.namespace().clone().inner().join("."),
dest.name()
),
)
})?;
self.write_version_hint(dest.namespace(), dest.name(), loc)
.map_err(to_iceberg_err)?;
remove_file_if_exists(&src_hint).map_err(to_iceberg_err)?;
Ok(())
}
async fn register_table(
&self,
table: &TableIdent,
metadata_location: String,
) -> IcebergResult<Table> {
let registered = self
.inner
.register_table(table, metadata_location.clone())
.await?;
self.write_version_hint(table.namespace(), table.name(), &metadata_location)
.map_err(to_iceberg_err)?;
Ok(registered)
}
async fn update_table(&self, commit: TableCommit) -> IcebergResult<Table> {
let ident = commit.identifier().clone();
let updated = self.inner.update_table(commit).await?;
if let Some(loc) = updated.metadata_location() {
self.write_version_hint(ident.namespace(), ident.name(), loc)
.map_err(to_iceberg_err)?;
}
Ok(updated)
}
}
fn to_iceberg_err(e: LakehouseError) -> iceberg::Error {
iceberg::Error::new(iceberg::ErrorKind::Unexpected, e.to_string())
}
fn validate_namespace(namespace: &NamespaceIdent) -> Result<(), LakehouseError> {
for part in namespace.clone().inner() {
validate_path_component("namespace component", &part)?;
}
Ok(())
}
fn validate_path_component(label: &str, value: &str) -> Result<(), LakehouseError> {
validate_safe_id(value, label).map_err(|error| LakehouseError::Io(error.to_string()))
}
fn remove_file_if_exists(path: &Path) -> Result<(), LakehouseError> {
match fs::remove_file(path) {
Ok(()) => Ok(()),
Err(error) if error.kind() == ErrorKind::NotFound => Ok(()),
Err(error) => Err(LakehouseError::Io(error.to_string())),
}
}
fn path_to_uri(path: &Path) -> Result<String, LakehouseError> {
url::Url::from_file_path(path)
.map(|u| u.to_string())
.map_err(|()| LakehouseError::Io(format!("cannot convert path to URI: {}", path.display())))
}
fn discover_tables(
warehouse_root: &Path,
dir: &Path,
out: &mut Vec<(NamespaceIdent, String, String)>,
) -> Result<(), LakehouseError> {
let hint = dir.join(METADATA_DIR).join(VERSION_HINT);
if hint.is_file() {
let rel = dir
.strip_prefix(warehouse_root)
.map_err(|e| LakehouseError::Io(e.to_string()))?;
let parts: Vec<String> = rel
.components()
.map(|c| c.as_os_str().to_string_lossy().into_owned())
.collect();
if let Some((table_name, ns_parts)) = parts.split_last()
&& !ns_parts.is_empty()
{
let namespace = NamespaceIdent::from_vec(ns_parts.to_vec())
.map_err(|e| LakehouseError::Iceberg(e.to_string()))?;
let metadata_location = fs::read_to_string(&hint)
.map_err(|e| LakehouseError::Io(e.to_string()))?
.trim()
.to_string();
if !metadata_location.is_empty() {
out.push((namespace, table_name.clone(), metadata_location));
}
}
return Ok(());
}
let entries = match fs::read_dir(dir) {
Ok(e) => e,
Err(_) => return Ok(()),
};
for entry in entries.flatten() {
let path = entry.path();
if path.is_dir() {
let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
if name == METADATA_DIR || name == "data" {
continue;
}
discover_tables(warehouse_root, &path, out)?;
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
fn sample_schema() -> Schema {
Schema::builder()
.with_schema_id(0)
.with_fields(vec![
Arc::new(NestedField::required(
1,
"id",
Type::Primitive(PrimitiveType::Long),
)),
Arc::new(NestedField::optional(
2,
"name",
Type::Primitive(PrimitiveType::String),
)),
])
.build()
.unwrap()
}
async fn create_table(catalog: &LocalCatalog, ns: &str, table: &str) -> Table {
let namespace = NamespaceIdent::new(ns.to_string());
let _ = catalog.create_namespace(&namespace, HashMap::new()).await;
let creation = TableCreation::builder()
.name(table.to_string())
.schema(sample_schema())
.build();
catalog.create_table(&namespace, creation).await.unwrap()
}
#[tokio::test]
async fn local_catalog_create_and_load_table() {
let dir = tempfile::tempdir().unwrap();
let catalog = LocalCatalog::new(dir.path()).await.unwrap();
let created = create_table(&catalog, "sales", "orders").await;
assert_eq!(created.identifier().name(), "orders");
let ident = TableIdent::new(
NamespaceIdent::new("sales".to_string()),
"orders".to_string(),
);
let loaded = catalog.load_table(&ident).await.unwrap();
assert_eq!(
loaded
.metadata()
.current_schema()
.as_ref()
.field_id_by_name("id"),
Some(1)
);
assert_eq!(
loaded
.metadata()
.current_schema()
.as_ref()
.field_id_by_name("name"),
Some(2)
);
let hint = catalog
.table_metadata_dir(&NamespaceIdent::new("sales".to_string()), "orders")
.unwrap()
.join(VERSION_HINT);
assert!(hint.is_file(), "version-hint.text should be persisted");
}
#[tokio::test]
async fn local_catalog_list_namespaces() {
let dir = tempfile::tempdir().unwrap();
let catalog = LocalCatalog::new(dir.path()).await.unwrap();
catalog
.create_namespace(&NamespaceIdent::new("alpha".to_string()), HashMap::new())
.await
.unwrap();
catalog
.create_namespace(&NamespaceIdent::new("beta".to_string()), HashMap::new())
.await
.unwrap();
let mut names: Vec<String> = catalog
.list_namespaces(None)
.await
.unwrap()
.into_iter()
.map(|n| n.inner().join("."))
.collect();
names.sort();
assert_eq!(names, vec!["alpha", "beta"]);
}
#[tokio::test]
async fn local_catalog_list_tables() {
let dir = tempfile::tempdir().unwrap();
let catalog = LocalCatalog::new(dir.path()).await.unwrap();
create_table(&catalog, "sales", "orders").await;
create_table(&catalog, "sales", "customers").await;
let namespace = NamespaceIdent::new("sales".to_string());
let mut tables: Vec<String> = catalog
.list_tables(&namespace)
.await
.unwrap()
.into_iter()
.map(|t| t.name().to_string())
.collect();
tables.sort();
assert_eq!(tables, vec!["customers", "orders"]);
}
#[tokio::test]
async fn local_catalog_recovers_tables_after_restart() {
let dir = tempfile::tempdir().unwrap();
{
let catalog = LocalCatalog::new(dir.path()).await.unwrap();
create_table(&catalog, "sales", "orders").await;
}
{
let catalog = LocalCatalog::new(dir.path()).await.unwrap();
let namespace = NamespaceIdent::new("sales".to_string());
let tables = catalog.list_tables(&namespace).await.unwrap();
assert_eq!(tables.len(), 1, "table should survive a restart");
assert_eq!(tables[0].name(), "orders");
let loaded = catalog.load_table(&tables[0]).await.unwrap();
assert!(
loaded
.metadata()
.current_schema()
.as_ref()
.field_id_by_name("id")
.is_some()
);
}
}
#[tokio::test]
async fn local_catalog_drop_table_removes_it() {
let dir = tempfile::tempdir().unwrap();
let catalog = LocalCatalog::new(dir.path()).await.unwrap();
create_table(&catalog, "sales", "orders").await;
let ident = TableIdent::new(
NamespaceIdent::new("sales".to_string()),
"orders".to_string(),
);
assert!(catalog.table_exists(&ident).await.unwrap());
catalog.drop_table(&ident).await.unwrap();
assert!(!catalog.table_exists(&ident).await.unwrap());
}
#[tokio::test]
async fn local_catalog_rejects_path_traversal_identifiers() {
let dir = tempfile::tempdir().unwrap();
let catalog = LocalCatalog::new(dir.path()).await.unwrap();
let bad_ns = NamespaceIdent::new("..".to_string());
assert!(catalog.table_location_uri(&bad_ns, "orders").is_err());
let good_ns = NamespaceIdent::new("sales".to_string());
assert!(catalog.table_location_uri(&good_ns, "../orders").is_err());
assert!(catalog.table_metadata_dir(&good_ns, "orders/2026").is_err());
}
}