use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::RwLock;
use crate::metadata::TableMetadata;
#[derive(Debug, Error)]
pub enum CatalogError {
#[error("table not found: {0}")]
TableNotFound(String),
#[error("namespace not found: {0}")]
NamespaceNotFound(String),
#[error("table already exists: {0}")]
TableAlreadyExists(String),
#[error("namespace already exists: {0}")]
NamespaceAlreadyExists(String),
#[error("commit conflict: expected version {expected}, found {actual}")]
CommitConflict { expected: i64, actual: i64 },
#[error("io error: {0}")]
Io(#[from] std::io::Error),
#[error("serialization error: {0}")]
Serialization(String),
}
pub type CatalogResult<T> = Result<T, CatalogError>;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct TableIdentifier {
pub namespace: Vec<String>,
pub name: String,
}
impl TableIdentifier {
pub fn new(
namespace: impl IntoIterator<Item = impl Into<String>>,
name: impl Into<String>,
) -> Self {
Self {
namespace: namespace.into_iter().map(|s| s.into()).collect(),
name: name.into(),
}
}
pub fn of(namespace: impl Into<String>, name: impl Into<String>) -> Self {
Self {
namespace: vec![namespace.into()],
name: name.into(),
}
}
pub fn full_name(&self) -> String {
if self.namespace.is_empty() {
self.name.clone()
} else {
format!("{}.{}", self.namespace.join("."), self.name)
}
}
pub fn parse(full_name: &str) -> Self {
let parts: Vec<&str> = full_name.split('.').collect();
if parts.len() == 1 {
Self {
namespace: Vec::new(),
name: parts[0].to_string(),
}
} else {
let (namespace, name) = parts.split_at(parts.len() - 1);
Self {
namespace: namespace.iter().map(|s| s.to_string()).collect(),
name: name[0].to_string(),
}
}
}
}
impl std::fmt::Display for TableIdentifier {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.full_name())
}
}
pub type NamespaceProperties = HashMap<String, String>;
#[async_trait]
pub trait Catalog: Send + Sync {
fn name(&self) -> &str;
async fn list_namespaces(&self, parent: Option<&[String]>) -> CatalogResult<Vec<Vec<String>>>;
async fn create_namespace(
&self,
namespace: &[String],
properties: NamespaceProperties,
) -> CatalogResult<()>;
async fn drop_namespace(&self, namespace: &[String]) -> CatalogResult<()>;
async fn namespace_properties(
&self,
namespace: &[String],
) -> CatalogResult<NamespaceProperties>;
async fn list_tables(&self, namespace: &[String]) -> CatalogResult<Vec<TableIdentifier>>;
async fn create_table(
&self,
identifier: &TableIdentifier,
metadata: TableMetadata,
) -> CatalogResult<TableMetadata>;
async fn load_table(&self, identifier: &TableIdentifier) -> CatalogResult<TableMetadata>;
async fn drop_table(&self, identifier: &TableIdentifier, purge: bool) -> CatalogResult<()>;
async fn rename_table(&self, from: &TableIdentifier, to: &TableIdentifier)
-> CatalogResult<()>;
async fn table_exists(&self, identifier: &TableIdentifier) -> CatalogResult<bool>;
async fn commit_table(
&self,
identifier: &TableIdentifier,
base_version: i64,
metadata: TableMetadata,
) -> CatalogResult<TableMetadata>;
}
#[derive(Debug)]
pub struct InMemoryCatalog {
name: String,
namespaces: RwLock<HashMap<Vec<String>, NamespaceProperties>>,
tables: RwLock<HashMap<TableIdentifier, TableMetadata>>,
}
impl InMemoryCatalog {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
namespaces: RwLock::new(HashMap::new()),
tables: RwLock::new(HashMap::new()),
}
}
pub fn shared(name: impl Into<String>) -> Arc<Self> {
Arc::new(Self::new(name))
}
}
#[async_trait]
impl Catalog for InMemoryCatalog {
fn name(&self) -> &str {
&self.name
}
async fn list_namespaces(&self, parent: Option<&[String]>) -> CatalogResult<Vec<Vec<String>>> {
let namespaces: tokio::sync::RwLockReadGuard<HashMap<Vec<String>, crate::catalog::NamespaceProperties>> = self.namespaces.read().await;
let result: Vec<Vec<String>> = namespaces
.keys()
.filter(|ns| match parent {
Some(p) => ns.starts_with(p) && ns.len() == p.len() + 1,
None => ns.len() == 1,
})
.cloned()
.collect();
Ok(result)
}
async fn create_namespace(
&self,
namespace: &[String],
properties: NamespaceProperties,
) -> CatalogResult<()> {
let mut namespaces: tokio::sync::RwLockWriteGuard<HashMap<Vec<String>, crate::catalog::NamespaceProperties>> = self.namespaces.write().await;
let ns_vec = namespace.to_vec();
if namespaces.contains_key(&ns_vec) {
return Err(CatalogError::NamespaceAlreadyExists(namespace.join(".")));
}
namespaces.insert(ns_vec, properties);
Ok(())
}
async fn drop_namespace(&self, namespace: &[String]) -> CatalogResult<()> {
let mut namespaces = self.namespaces.write().await;
let ns_vec = namespace.to_vec();
if namespaces.remove(&ns_vec).is_none() {
return Err(CatalogError::NamespaceNotFound(namespace.join(".")));
}
Ok(())
}
async fn namespace_properties(
&self,
namespace: &[String],
) -> CatalogResult<NamespaceProperties> {
let namespaces = self.namespaces.read().await;
namespaces
.get(namespace)
.cloned()
.ok_or_else(|| CatalogError::NamespaceNotFound(namespace.join(".")))
}
async fn list_tables(&self, namespace: &[String]) -> CatalogResult<Vec<TableIdentifier>> {
let tables = self.tables.read().await;
let result: Vec<TableIdentifier> = tables
.keys()
.filter(|id| id.namespace == namespace)
.cloned()
.collect();
Ok(result)
}
async fn create_table(
&self,
identifier: &TableIdentifier,
metadata: TableMetadata,
) -> CatalogResult<TableMetadata> {
let mut tables = self.tables.write().await;
if tables.contains_key(identifier) {
return Err(CatalogError::TableAlreadyExists(identifier.full_name()));
}
tables.insert(identifier.clone(), metadata.clone());
Ok(metadata)
}
async fn load_table(&self, identifier: &TableIdentifier) -> CatalogResult<TableMetadata> {
let tables = self.tables.read().await;
tables
.get(identifier)
.cloned()
.ok_or_else(|| CatalogError::TableNotFound(identifier.full_name()))
}
async fn drop_table(&self, identifier: &TableIdentifier, _purge: bool) -> CatalogResult<()> {
let mut tables = self.tables.write().await;
if tables.remove(identifier).is_none() {
return Err(CatalogError::TableNotFound(identifier.full_name()));
}
Ok(())
}
async fn rename_table(
&self,
from: &TableIdentifier,
to: &TableIdentifier,
) -> CatalogResult<()> {
let mut tables = self.tables.write().await;
let metadata = tables
.remove(from)
.ok_or_else(|| CatalogError::TableNotFound(from.full_name()))?;
if tables.contains_key(to) {
tables.insert(from.clone(), metadata);
return Err(CatalogError::TableAlreadyExists(to.full_name()));
}
tables.insert(to.clone(), metadata);
Ok(())
}
async fn table_exists(&self, identifier: &TableIdentifier) -> CatalogResult<bool> {
let tables: tokio::sync::RwLockReadGuard<HashMap<TableIdentifier, TableMetadata>> = self.tables.read().await;
Ok(tables.contains_key(identifier))
}
async fn commit_table(
&self,
identifier: &TableIdentifier,
base_version: i64,
metadata: TableMetadata,
) -> CatalogResult<TableMetadata> {
let mut tables: tokio::sync::RwLockWriteGuard<HashMap<TableIdentifier, TableMetadata>> = self.tables.write().await;
let current = tables
.get(identifier)
.ok_or_else(|| CatalogError::TableNotFound(identifier.full_name()))?;
if current.last_sequence_number != base_version {
return Err(CatalogError::CommitConflict {
expected: base_version,
actual: current.last_sequence_number,
});
}
tables.insert(identifier.clone(), metadata.clone());
Ok(metadata)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::schema::{Schema, Type};
fn sample_metadata(location: &str) -> TableMetadata {
let schema = Schema::builder(0)
.with_field(1, "id", Type::Long, true)
.build();
TableMetadata::builder(location, schema).build()
}
#[tokio::test]
async fn test_create_namespace() {
let catalog = InMemoryCatalog::new("test");
catalog
.create_namespace(&["db".into()], HashMap::new())
.await
.unwrap();
let namespaces = catalog.list_namespaces(None).await.unwrap();
assert_eq!(namespaces.len(), 1);
assert_eq!(namespaces[0], vec!["db".to_string()]);
}
#[tokio::test]
async fn test_create_table() {
let catalog = InMemoryCatalog::new("test");
let identifier = TableIdentifier::of("db", "users");
let metadata = sample_metadata("s3://bucket/users");
catalog.create_table(&identifier, metadata).await.unwrap();
let exists = catalog.table_exists(&identifier).await.unwrap();
assert!(exists);
}
#[tokio::test]
async fn test_commit_conflict() {
let catalog = InMemoryCatalog::new("test");
let identifier = TableIdentifier::of("db", "users");
let metadata = sample_metadata("s3://bucket/users");
catalog
.create_table(&identifier, metadata.clone())
.await
.unwrap();
let result = catalog
.commit_table(&identifier, 999, metadata.clone())
.await;
assert!(matches!(result, Err(CatalogError::CommitConflict { .. })));
}
#[tokio::test]
async fn test_table_identifier_parsing() {
let id = TableIdentifier::parse("db.schema.users");
assert_eq!(id.namespace, vec!["db", "schema"]);
assert_eq!(id.name, "users");
assert_eq!(id.full_name(), "db.schema.users");
let id2 = TableIdentifier::parse("simple_table");
assert!(id2.namespace.is_empty());
assert_eq!(id2.name, "simple_table");
}
}