use super::error::{CatalogError, CatalogResult};
use super::operations::{CatalogOperation, CatalogResponse, EntityType, QueryType};
use super::registry::CatalogRegistry;
use super::traits::CatalogSchema;
use crate::storage::StorageManager;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CatalogInfo {
pub name: String,
pub schema: CatalogSchema,
pub supported_operations: Vec<String>,
}
pub struct CatalogManager {
registry: CatalogRegistry,
#[allow(dead_code)]
storage: Arc<StorageManager>,
}
impl CatalogManager {
pub fn new(storage: Arc<StorageManager>) -> Self {
Self {
registry: CatalogRegistry::new(storage.clone()),
storage,
}
}
pub fn execute(
&mut self,
catalog_name: &str,
operation: CatalogOperation,
) -> CatalogResult<CatalogResponse> {
self.registry
.get_mut(catalog_name)
.ok_or_else(|| CatalogError::CatalogNotFound(catalog_name.to_string()))?
.execute(operation)
}
pub fn query_read_only(
&self,
catalog_name: &str,
query_type: crate::catalog::operations::QueryType,
params: serde_json::Value,
) -> CatalogResult<CatalogResponse> {
use crate::catalog::operations::CatalogOperation;
let operation = CatalogOperation::Query { query_type, params };
self.registry
.get(catalog_name)
.ok_or_else(|| CatalogError::CatalogNotFound(catalog_name.to_string()))?
.execute_read_only(operation)
}
pub fn get_catalog_info(&self, catalog_name: &str) -> Option<CatalogInfo> {
self.registry.get(catalog_name).map(|cat| CatalogInfo {
name: catalog_name.to_string(),
schema: cat.schema(),
supported_operations: cat.supported_operations(),
})
}
pub fn list_catalogs(&self) -> Vec<String> {
self.registry.list_catalog_names()
}
pub fn has_catalog(&self, catalog_name: &str) -> bool {
self.registry.has_catalog(catalog_name)
}
pub fn list_catalog_info(&self) -> Vec<CatalogInfo> {
self.list_catalogs()
.into_iter()
.filter_map(|name| self.get_catalog_info(&name))
.collect()
}
pub async fn persist_all(&self) -> CatalogResult<()> {
self.registry.save_all()
}
pub async fn load_all(&mut self) -> CatalogResult<()> {
self.registry.load_all()
}
pub fn persist_catalog(&self, catalog_name: &str) -> CatalogResult<()> {
self.registry.save_catalog(catalog_name)
}
pub fn load_catalog(&mut self, catalog_name: &str) -> CatalogResult<()> {
if !self.registry.has_catalog(catalog_name) {
return Err(CatalogError::CatalogNotFound(catalog_name.to_string()));
}
log::debug!("Loading catalog '{}'", catalog_name);
Ok(())
}
pub fn catalog_count(&self) -> usize {
self.registry.catalog_count()
}
pub fn query(
&mut self,
catalog_name: &str,
query_type: QueryType,
params: serde_json::Value,
) -> CatalogResult<CatalogResponse> {
self.execute(catalog_name, CatalogOperation::Query { query_type, params })
}
pub fn create_entity(
&mut self,
catalog_name: &str,
entity_type: EntityType,
name: &str,
params: serde_json::Value,
) -> CatalogResult<CatalogResponse> {
self.execute(
catalog_name,
CatalogOperation::Create {
entity_type,
name: name.to_string(),
params,
},
)
}
pub fn list_entities(
&mut self,
catalog_name: &str,
entity_type: EntityType,
filters: Option<serde_json::Value>,
) -> CatalogResult<CatalogResponse> {
self.execute(
catalog_name,
CatalogOperation::List {
entity_type,
filters,
},
)
}
}