use crate::error::{Result, StorageError};
use crate::types::{TableSchema, IndexDef};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
#[derive(Debug, Clone, Serialize, Deserialize)]
struct RegistryMetadata {
tables: HashMap<String, TableSchema>,
index_map: HashMap<String, (String, String)>,
next_table_id: u32,
table_ids: HashMap<String, u32>,
#[serde(default)]
id_to_name: HashMap<u32, String>,
#[serde(default)]
auto_increment_counters: HashMap<String, i64>,
}
pub struct TableRegistry {
metadata: Arc<RwLock<RegistryMetadata>>,
schema_cache: parking_lot::RwLock<HashMap<String, Arc<TableSchema>>>,
table_id_cache: parking_lot::RwLock<HashMap<String, u32>>,
persist_path: PathBuf,
}
impl TableRegistry {
pub fn new<P: AsRef<Path>>(data_dir: P) -> Result<Self> {
let persist_path = data_dir.as_ref().join("catalog.bin");
if let Some(parent) = persist_path.parent() {
fs::create_dir_all(parent)
.map_err(StorageError::Io)?;
}
let metadata = if persist_path.exists() {
let data = fs::read(&persist_path)
.map_err(StorageError::Io)?;
let mut meta: RegistryMetadata = bincode::deserialize(&data)
.map_err(|e| StorageError::Serialization(e.to_string()))?;
for schema in meta.tables.values_mut() {
schema.rebuild_column_map();
}
if meta.id_to_name.is_empty() && !meta.table_ids.is_empty() {
for (name, &id) in &meta.table_ids {
meta.id_to_name.insert(id, name.clone());
}
}
meta
} else {
RegistryMetadata {
tables: HashMap::new(),
index_map: HashMap::new(),
next_table_id: 1, table_ids: HashMap::new(),
id_to_name: HashMap::new(),
auto_increment_counters: HashMap::new(),
}
};
Ok(Self {
metadata: Arc::new(RwLock::new(metadata)),
schema_cache: parking_lot::RwLock::new(HashMap::new()),
table_id_cache: parking_lot::RwLock::new(HashMap::new()),
persist_path,
})
}
pub fn create_table(&self, mut schema: TableSchema) -> Result<()> {
let mut meta = self.metadata.write()
.map_err(|e| StorageError::InvalidData(e.to_string()))?;
if meta.tables.contains_key(&schema.name) {
return Err(StorageError::InvalidData(format!(
"Table '{}' already exists",
schema.name
)));
}
for index in &schema.indexes {
if meta.index_map.contains_key(&index.name) {
return Err(StorageError::InvalidData(format!(
"Index '{}' already exists",
index.name
)));
}
}
schema.rebuild_column_map();
for index in &schema.indexes {
meta.index_map.insert(
index.name.clone(),
(index.table_name.clone(), index.column_name.clone()),
);
}
let table_id = meta.next_table_id;
meta.next_table_id += 1;
meta.table_ids.insert(schema.name.clone(), table_id);
meta.id_to_name.insert(table_id, schema.name.clone());
meta.tables.insert(schema.name.clone(), schema);
drop(meta);
self.schema_cache.write().clear();
self.table_id_cache.write().clear();
self.persist()?;
Ok(())
}
pub fn drop_table(&self, table_name: &str) -> Result<()> {
let mut meta = self.metadata.write()
.map_err(|e| StorageError::InvalidData(e.to_string()))?;
let schema = meta.tables.remove(table_name)
.ok_or_else(|| StorageError::InvalidData(format!(
"Table '{}' not found",
table_name
)))?;
for index in &schema.indexes {
meta.index_map.remove(&index.name);
}
if let Some(id) = meta.table_ids.remove(table_name) {
meta.id_to_name.remove(&id);
}
drop(meta);
self.schema_cache.write().remove(table_name);
self.table_id_cache.write().remove(table_name);
self.persist()?;
Ok(())
}
pub fn get_table(&self, table_name: &str) -> Result<Arc<TableSchema>> {
{
let cache = self.schema_cache.read();
if let Some(cached) = cache.get(table_name) {
return Ok(Arc::clone(cached));
}
}
let meta = self.metadata.read()
.map_err(|e| StorageError::InvalidData(e.to_string()))?;
let schema = meta.tables.get(table_name)
.ok_or_else(|| StorageError::InvalidData(format!(
"Table '{}' not found",
table_name
)))?;
let arc_schema = Arc::new(schema.clone());
{
let mut cache = self.schema_cache.write();
cache.entry(table_name.to_string()).or_insert(Arc::clone(&arc_schema));
}
Ok(arc_schema)
}
pub fn list_tables(&self) -> Result<Vec<String>> {
let meta = self.metadata.read()
.map_err(|e| StorageError::InvalidData(e.to_string()))?;
Ok(meta.tables.keys().cloned().collect())
}
pub fn table_exists(&self, table_name: &str) -> bool {
self.metadata.read()
.map(|meta| meta.tables.contains_key(table_name))
.unwrap_or(false)
}
pub fn add_index(&self, index: IndexDef) -> Result<()> {
let table_name = index.table_name.clone();
let mut meta = self.metadata.write()
.map_err(|e| StorageError::InvalidData(e.to_string()))?;
if meta.index_map.contains_key(&index.name) {
return Err(StorageError::InvalidData(format!(
"Index '{}' already exists",
index.name
)));
}
if !meta.tables.contains_key(&index.table_name) {
return Err(StorageError::InvalidData(format!(
"Table '{}' not found",
index.table_name
)));
}
if let Some(table) = meta.tables.get(&index.table_name) {
if table.get_column(&index.column_name).is_none() {
return Err(StorageError::InvalidData(format!(
"Column '{}' not found in table '{}'",
index.column_name, index.table_name
)));
}
}
meta.index_map.insert(
index.name.clone(),
(index.table_name.clone(), index.column_name.clone()),
);
if let Some(table) = meta.tables.get_mut(&index.table_name) {
table.add_index(index);
}
drop(meta);
self.schema_cache.write().remove(&table_name);
self.persist()?;
Ok(())
}
pub fn get_index(&self, index_name: &str) -> Result<IndexDef> {
let meta = self.metadata.read()
.map_err(|e| StorageError::InvalidData(e.to_string()))?;
let (table_name, _column_name) = meta.index_map.get(index_name)
.ok_or_else(|| StorageError::InvalidData(format!(
"Index '{}' not found",
index_name
)))?;
let table = meta.tables.get(table_name)
.ok_or_else(|| StorageError::InvalidData(format!(
"Table '{}' not found",
table_name
)))?;
table.indexes.iter()
.find(|idx| idx.name == index_name)
.cloned()
.ok_or_else(|| StorageError::InvalidData(format!(
"Index '{}' not found",
index_name
)))
}
pub fn find_vector_index(&self, table_name: &str, column_name: &str) -> Result<String> {
let meta = self.metadata.read()
.map_err(|e| StorageError::InvalidData(e.to_string()))?;
for (index_name, (idx_table, idx_col)) in meta.index_map.iter() {
if idx_table == table_name && idx_col == column_name {
if let Some(table) = meta.tables.get(table_name) {
if let Some(index) = table.indexes.iter().find(|idx| &idx.name == index_name) {
if matches!(index.index_type, crate::types::IndexType::Vector { .. }) {
return Ok(index_name.clone());
}
}
}
}
}
Err(StorageError::InvalidData(format!(
"No vector index found for {}.{}",
table_name, column_name
)))
}
pub fn get_table_id(&self, table_name: &str) -> Result<u32> {
{
let cache = self.table_id_cache.read();
if let Some(&id) = cache.get(table_name) {
return Ok(id);
}
}
let meta = self.metadata.read()
.map_err(|e| StorageError::InvalidData(e.to_string()))?;
let id = meta.table_ids.get(table_name)
.copied()
.ok_or_else(|| StorageError::TableNotFound(table_name.to_string()))?;
{
let mut cache = self.table_id_cache.write();
cache.entry(table_name.to_string()).or_insert(id);
}
Ok(id)
}
pub fn get_table_name_by_id(&self, table_id: u32) -> Result<String> {
let meta = self.metadata.read()
.map_err(|e| StorageError::InvalidData(e.to_string()))?;
meta.id_to_name.get(&table_id)
.cloned()
.ok_or_else(|| StorageError::InvalidData(format!(
"No table found for id {}",
table_id
)))
}
pub fn ensure_default_table_id(&self) -> Result<()> {
let mut meta = self.metadata.write()
.map_err(|e| StorageError::InvalidData(e.to_string()))?;
if !meta.table_ids.contains_key("_default") {
meta.table_ids.insert("_default".to_string(), 0);
}
drop(meta);
Ok(())
}
pub fn get_auto_increment_counter(&self, table_name: &str) -> Option<i64> {
let meta = self.metadata.read().ok()?;
meta.auto_increment_counters.get(table_name).copied()
}
pub fn update_auto_increment_counter(&self, table_name: &str, value: i64) -> Result<()> {
let mut meta = self.metadata.write()
.map_err(|e| StorageError::InvalidData(e.to_string()))?;
meta.auto_increment_counters.insert(table_name.to_string(), value);
drop(meta);
Ok(())
}
pub fn persist_auto_increment_counters(&self) -> Result<()> {
self.persist()
}
fn persist(&self) -> Result<()> {
let meta = self.metadata.read()
.map_err(|e| StorageError::InvalidData(e.to_string()))?;
let data = bincode::serialize(&*meta)
.map_err(|e| StorageError::Serialization(e.to_string()))?;
fs::write(&self.persist_path, data)
.map_err(StorageError::Io)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{ColumnDef, ColumnType, IndexType};
#[test]
fn test_create_and_get_table() {
let temp_dir = tempfile::tempdir().unwrap();
let registry = TableRegistry::new(temp_dir.path()).unwrap();
let schema = TableSchema::new(
"users".into(),
vec![
ColumnDef::new("id".into(), ColumnType::Integer, 0),
ColumnDef::new("name".into(), ColumnType::Text, 1),
],
);
registry.create_table(schema.clone()).unwrap();
let retrieved = registry.get_table("users").unwrap();
assert_eq!(retrieved.name, "users");
assert_eq!(retrieved.column_count(), 2);
}
#[test]
fn test_drop_table() {
let temp_dir = tempfile::tempdir().unwrap();
let registry = TableRegistry::new(temp_dir.path()).unwrap();
let schema = TableSchema::new("test".into(), vec![]);
registry.create_table(schema).unwrap();
assert!(registry.table_exists("test"));
registry.drop_table("test").unwrap();
assert!(!registry.table_exists("test"));
}
#[test]
fn test_list_tables() {
let temp_dir = tempfile::tempdir().unwrap();
let registry = TableRegistry::new(temp_dir.path()).unwrap();
registry.create_table(TableSchema::new("t1".into(), vec![])).unwrap();
registry.create_table(TableSchema::new("t2".into(), vec![])).unwrap();
let tables = registry.list_tables().unwrap();
assert_eq!(tables.len(), 2);
assert!(tables.contains(&"t1".to_string()));
assert!(tables.contains(&"t2".to_string()));
}
#[test]
fn test_add_index() {
let temp_dir = tempfile::tempdir().unwrap();
let registry = TableRegistry::new(temp_dir.path()).unwrap();
let schema = TableSchema::new(
"articles".into(),
vec![
ColumnDef::new("id".into(), ColumnType::Integer, 0),
ColumnDef::new("title".into(), ColumnType::Text, 1),
],
);
registry.create_table(schema.clone()).unwrap();
let index = IndexDef::new(
"articles_title_idx".into(),
"articles".into(),
"title".into(),
IndexType::FullText,
);
registry.add_index(index).unwrap();
let retrieved_index = registry.get_index("articles_title_idx").unwrap();
assert_eq!(retrieved_index.column_name, "title");
}
#[test]
fn test_persistence() {
let temp_dir = tempfile::tempdir().unwrap();
{
let registry = TableRegistry::new(temp_dir.path()).unwrap();
let schema = TableSchema::new(
"persistent".into(),
vec![ColumnDef::new("id".into(), ColumnType::Integer, 0)],
);
registry.create_table(schema).unwrap();
}
{
let registry = TableRegistry::new(temp_dir.path()).unwrap();
assert!(registry.table_exists("persistent"));
let schema = registry.get_table("persistent").unwrap();
assert_eq!(schema.column_count(), 1);
}
}
}