use crate::error::{Result, StorageError};
use crate::types::{TableSchema, IndexDef};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
#[derive(Debug, Clone, Serialize, Deserialize)]
struct RegistryMetadata {
tables: HashMap<String, TableSchema>,
index_map: HashMap<String, (String, String)>,
next_table_id: u32,
table_ids: HashMap<String, u32>,
}
pub struct TableRegistry {
metadata: Arc<RwLock<RegistryMetadata>>,
persist_path: PathBuf,
}
impl TableRegistry {
pub fn new<P: AsRef<Path>>(data_dir: P) -> Result<Self> {
let persist_path = data_dir.as_ref().join("catalog.bin");
if let Some(parent) = persist_path.parent() {
fs::create_dir_all(parent)
.map_err(StorageError::Io)?;
}
let metadata = if persist_path.exists() {
let data = fs::read(&persist_path)
.map_err(StorageError::Io)?;
let mut meta: RegistryMetadata = bincode::deserialize(&data)
.map_err(|e| StorageError::Serialization(e.to_string()))?;
for schema in meta.tables.values_mut() {
schema.rebuild_column_map();
}
meta
} else {
RegistryMetadata {
tables: HashMap::new(),
index_map: HashMap::new(),
next_table_id: 1, table_ids: HashMap::new(),
}
};
Ok(Self {
metadata: Arc::new(RwLock::new(metadata)),
persist_path,
})
}
pub fn create_table(&self, mut schema: TableSchema) -> Result<()> {
let mut meta = self.metadata.write()
.map_err(|e| StorageError::InvalidData(e.to_string()))?;
if meta.tables.contains_key(&schema.name) {
return Err(StorageError::InvalidData(format!(
"Table '{}' already exists",
schema.name
)));
}
for index in &schema.indexes {
if meta.index_map.contains_key(&index.name) {
return Err(StorageError::InvalidData(format!(
"Index '{}' already exists",
index.name
)));
}
}
schema.rebuild_column_map();
for index in &schema.indexes {
meta.index_map.insert(
index.name.clone(),
(index.table_name.clone(), index.column_name.clone()),
);
}
let table_id = meta.next_table_id;
meta.next_table_id += 1;
meta.table_ids.insert(schema.name.clone(), table_id);
meta.tables.insert(schema.name.clone(), schema);
drop(meta);
self.persist()?;
Ok(())
}
pub fn drop_table(&self, table_name: &str) -> Result<()> {
let mut meta = self.metadata.write()
.map_err(|e| StorageError::InvalidData(e.to_string()))?;
let schema = meta.tables.remove(table_name)
.ok_or_else(|| StorageError::InvalidData(format!(
"Table '{}' not found",
table_name
)))?;
for index in &schema.indexes {
meta.index_map.remove(&index.name);
}
drop(meta);
self.persist()?;
Ok(())
}
pub fn get_table(&self, table_name: &str) -> Result<TableSchema> {
let meta = self.metadata.read()
.map_err(|e| StorageError::InvalidData(e.to_string()))?;
meta.tables.get(table_name)
.cloned()
.ok_or_else(|| StorageError::InvalidData(format!(
"Table '{}' not found",
table_name
)))
}
pub fn list_tables(&self) -> Result<Vec<String>> {
let meta = self.metadata.read()
.map_err(|e| StorageError::InvalidData(e.to_string()))?;
Ok(meta.tables.keys().cloned().collect())
}
pub fn table_exists(&self, table_name: &str) -> bool {
self.metadata.read()
.map(|meta| meta.tables.contains_key(table_name))
.unwrap_or(false)
}
pub fn add_index(&self, index: IndexDef) -> Result<()> {
let mut meta = self.metadata.write()
.map_err(|e| StorageError::InvalidData(e.to_string()))?;
if meta.index_map.contains_key(&index.name) {
return Err(StorageError::InvalidData(format!(
"Index '{}' already exists",
index.name
)));
}
if !meta.tables.contains_key(&index.table_name) {
return Err(StorageError::InvalidData(format!(
"Table '{}' not found",
index.table_name
)));
}
if let Some(table) = meta.tables.get(&index.table_name) {
if table.get_column(&index.column_name).is_none() {
return Err(StorageError::InvalidData(format!(
"Column '{}' not found in table '{}'",
index.column_name, index.table_name
)));
}
}
meta.index_map.insert(
index.name.clone(),
(index.table_name.clone(), index.column_name.clone()),
);
if let Some(table) = meta.tables.get_mut(&index.table_name) {
table.add_index(index);
}
drop(meta);
self.persist()?;
Ok(())
}
pub fn get_index(&self, index_name: &str) -> Result<IndexDef> {
let meta = self.metadata.read()
.map_err(|e| StorageError::InvalidData(e.to_string()))?;
let (table_name, _column_name) = meta.index_map.get(index_name)
.ok_or_else(|| StorageError::InvalidData(format!(
"Index '{}' not found",
index_name
)))?;
let table = meta.tables.get(table_name)
.ok_or_else(|| StorageError::InvalidData(format!(
"Table '{}' not found",
table_name
)))?;
table.indexes.iter()
.find(|idx| idx.name == index_name)
.cloned()
.ok_or_else(|| StorageError::InvalidData(format!(
"Index '{}' not found",
index_name
)))
}
pub fn find_vector_index(&self, table_name: &str, column_name: &str) -> Result<String> {
let meta = self.metadata.read()
.map_err(|e| StorageError::InvalidData(e.to_string()))?;
for (index_name, (idx_table, idx_col)) in meta.index_map.iter() {
if idx_table == table_name && idx_col == column_name {
if let Some(table) = meta.tables.get(table_name) {
if let Some(index) = table.indexes.iter().find(|idx| &idx.name == index_name) {
if matches!(index.index_type, crate::types::IndexType::Vector { .. }) {
return Ok(index_name.clone());
}
}
}
}
}
Err(StorageError::InvalidData(format!(
"No vector index found for {}.{}",
table_name, column_name
)))
}
pub fn get_table_id(&self, table_name: &str) -> Result<u32> {
let meta = self.metadata.read()
.map_err(|e| StorageError::InvalidData(e.to_string()))?;
meta.table_ids.get(table_name)
.copied()
.ok_or_else(|| StorageError::TableNotFound(table_name.to_string()))
}
pub fn get_table_name_by_id(&self, table_id: u32) -> Result<String> {
let meta = self.metadata.read()
.map_err(|e| StorageError::InvalidData(e.to_string()))?;
for (name, &id) in meta.table_ids.iter() {
if id == table_id {
return Ok(name.clone());
}
}
Err(StorageError::InvalidData(format!(
"No table found for id {}",
table_id
)))
}
pub fn ensure_default_table_id(&self) -> Result<()> {
let mut meta = self.metadata.write()
.map_err(|e| StorageError::InvalidData(e.to_string()))?;
if !meta.table_ids.contains_key("_default") {
meta.table_ids.insert("_default".to_string(), 0);
}
drop(meta);
Ok(())
}
fn persist(&self) -> Result<()> {
let meta = self.metadata.read()
.map_err(|e| StorageError::InvalidData(e.to_string()))?;
let data = bincode::serialize(&*meta)
.map_err(|e| StorageError::Serialization(e.to_string()))?;
fs::write(&self.persist_path, data)
.map_err(StorageError::Io)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{ColumnDef, ColumnType, IndexType};
#[test]
fn test_create_and_get_table() {
let temp_dir = tempfile::tempdir().unwrap();
let registry = TableRegistry::new(temp_dir.path()).unwrap();
let schema = TableSchema::new(
"users".into(),
vec![
ColumnDef::new("id".into(), ColumnType::Integer, 0),
ColumnDef::new("name".into(), ColumnType::Text, 1),
],
);
registry.create_table(schema.clone()).unwrap();
let retrieved = registry.get_table("users").unwrap();
assert_eq!(retrieved.name, "users");
assert_eq!(retrieved.column_count(), 2);
}
#[test]
fn test_drop_table() {
let temp_dir = tempfile::tempdir().unwrap();
let registry = TableRegistry::new(temp_dir.path()).unwrap();
let schema = TableSchema::new("test".into(), vec![]);
registry.create_table(schema).unwrap();
assert!(registry.table_exists("test"));
registry.drop_table("test").unwrap();
assert!(!registry.table_exists("test"));
}
#[test]
fn test_list_tables() {
let temp_dir = tempfile::tempdir().unwrap();
let registry = TableRegistry::new(temp_dir.path()).unwrap();
registry.create_table(TableSchema::new("t1".into(), vec![])).unwrap();
registry.create_table(TableSchema::new("t2".into(), vec![])).unwrap();
let tables = registry.list_tables().unwrap();
assert_eq!(tables.len(), 2);
assert!(tables.contains(&"t1".to_string()));
assert!(tables.contains(&"t2".to_string()));
}
#[test]
fn test_add_index() {
let temp_dir = tempfile::tempdir().unwrap();
let registry = TableRegistry::new(temp_dir.path()).unwrap();
let schema = TableSchema::new(
"articles".into(),
vec![
ColumnDef::new("id".into(), ColumnType::Integer, 0),
ColumnDef::new("title".into(), ColumnType::Text, 1),
],
);
registry.create_table(schema.clone()).unwrap();
let index = IndexDef::new(
"articles_title_idx".into(),
"articles".into(),
"title".into(),
IndexType::FullText,
);
registry.add_index(index).unwrap();
let retrieved_index = registry.get_index("articles_title_idx").unwrap();
assert_eq!(retrieved_index.column_name, "title");
}
#[test]
fn test_persistence() {
let temp_dir = tempfile::tempdir().unwrap();
{
let registry = TableRegistry::new(temp_dir.path()).unwrap();
let schema = TableSchema::new(
"persistent".into(),
vec![ColumnDef::new("id".into(), ColumnType::Integer, 0)],
);
registry.create_table(schema).unwrap();
}
{
let registry = TableRegistry::new(temp_dir.path()).unwrap();
assert!(registry.table_exists("persistent"));
let schema = registry.get_table("persistent").unwrap();
assert_eq!(schema.column_count(), 1);
}
}
}