use serde::{Deserialize, Serialize};
use std::path::Path;
use tracing::{debug, info, instrument};
use crate::core::error::{AnamError, Result};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TableEntry {
pub name: String,
pub lance_path: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RuleEntry {
pub name: String,
pub datalog: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ModelEntry {
pub name: String,
pub version: String,
pub artifact_path: String,
pub function_id: String,
pub num_features: usize,
pub avg_latency_ms: f64,
pub accuracy: f64,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct Catalog {
pub tables: Vec<TableEntry>,
pub rules: Vec<RuleEntry>,
pub models: Vec<ModelEntry>,
}
#[derive(Debug)]
pub struct CatalogStore {
path: String,
catalog: Catalog,
}
impl CatalogStore {
#[instrument]
pub fn open(path: &str) -> Result<Self> {
let catalog = if Path::new(path).exists() {
info!(path, "loading existing catalog");
let data = std::fs::read_to_string(path).map_err(AnamError::Io)?;
serde_json::from_str(&data)
.map_err(|e| AnamError::Serde(format!("failed to parse catalog: {e}")))?
} else {
info!(path, "creating new catalog");
let catalog = Catalog::default();
if let Some(parent) = Path::new(path).parent() {
std::fs::create_dir_all(parent).map_err(AnamError::Io)?;
}
catalog
};
let store = Self {
path: path.to_string(),
catalog,
};
store.flush()?;
Ok(store)
}
fn flush(&self) -> Result<()> {
let data = serde_json::to_string_pretty(&self.catalog)
.map_err(|e| AnamError::Serde(format!("failed to serialize catalog: {e}")))?;
std::fs::write(&self.path, data).map_err(AnamError::Io)?;
debug!(path = %self.path, "catalog flushed to disk");
Ok(())
}
pub fn register_table(&mut self, name: &str, lance_path: &str) -> Result<()> {
self.catalog.tables.retain(|t| t.name != name);
self.catalog.tables.push(TableEntry {
name: name.to_string(),
lance_path: lance_path.to_string(),
});
self.flush()
}
pub fn list_tables(&self) -> &[TableEntry] {
&self.catalog.tables
}
pub fn remove_table(&mut self, name: &str) -> Result<()> {
self.catalog.tables.retain(|t| t.name != name);
self.flush()
}
pub fn register_rule(&mut self, name: &str, datalog: &str) -> Result<()> {
self.catalog.rules.retain(|r| r.name != name);
self.catalog.rules.push(RuleEntry {
name: name.to_string(),
datalog: datalog.to_string(),
});
self.flush()
}
pub fn list_rules(&self) -> &[RuleEntry] {
&self.catalog.rules
}
pub fn remove_rule(&mut self, name: &str) -> Result<()> {
self.catalog.rules.retain(|r| r.name != name);
self.flush()
}
pub fn register_model(&mut self, entry: ModelEntry) -> Result<()> {
self.catalog.models.retain(|m| m.name != entry.name);
self.catalog.models.push(entry);
self.flush()
}
pub fn list_models(&self) -> &[ModelEntry] {
&self.catalog.models
}
pub fn remove_model(&mut self, name: &str) -> Result<()> {
self.catalog.models.retain(|m| m.name != name);
self.flush()
}
pub fn snapshot(&self) -> &Catalog {
&self.catalog
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn catalog_roundtrip() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("catalog.json");
let path_str = path.to_str().unwrap();
let mut store = CatalogStore::open(path_str).unwrap();
store.register_table("txns", "/data/txns.lance").unwrap();
store
.register_rule("high_risk", "high_risk(X) :- txns(X), fraud_prob > 0.80")
.unwrap();
store
.register_model(ModelEntry {
name: "fraud_detector".to_string(),
version: "1.0.0".to_string(),
artifact_path: "models/fraud.onnx".to_string(),
function_id: "fraud_detector".to_string(),
num_features: 3,
avg_latency_ms: 5.0,
accuracy: 0.95,
})
.unwrap();
let store2 = CatalogStore::open(path_str).unwrap();
assert_eq!(store2.list_tables().len(), 1);
assert_eq!(store2.list_tables()[0].name, "txns");
assert_eq!(store2.list_rules().len(), 1);
assert_eq!(store2.list_rules()[0].name, "high_risk");
assert_eq!(store2.list_models().len(), 1);
assert_eq!(store2.list_models()[0].name, "fraud_detector");
}
#[test]
fn catalog_upsert() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("catalog.json");
let path_str = path.to_str().unwrap();
let mut store = CatalogStore::open(path_str).unwrap();
store.register_table("txns", "/old/path.lance").unwrap();
store.register_table("txns", "/new/path.lance").unwrap();
assert_eq!(store.list_tables().len(), 1);
assert_eq!(store.list_tables()[0].lance_path, "/new/path.lance");
}
#[test]
fn catalog_remove() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("catalog.json");
let path_str = path.to_str().unwrap();
let mut store = CatalogStore::open(path_str).unwrap();
store.register_table("a", "/a.lance").unwrap();
store.register_table("b", "/b.lance").unwrap();
assert_eq!(store.list_tables().len(), 2);
store.remove_table("a").unwrap();
assert_eq!(store.list_tables().len(), 1);
assert_eq!(store.list_tables()[0].name, "b");
}
}