use crate::catalog::manager::CatalogManager;
use crate::storage::data_adapter::DataAdapter;
use crate::storage::indexes::IndexManager;
use crate::storage::multi_graph::MultiGraphManager;
use crate::storage::persistent::{create_storage_driver, StorageDriver, StorageTree};
use crate::storage::StorageType;
use crate::storage::{GraphCache, StorageError};
use log::{debug, error, info};
use serde::{Deserialize, Serialize};
use std::path::Path;
use std::sync::Arc;
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
pub enum StorageMethod {
DiskOnly,
MemoryOnly,
DiskAndMemory,
}
impl Default for StorageMethod {
fn default() -> Self {
StorageMethod::DiskOnly
}
}
#[derive(Clone)]
pub struct StorageManager {
cache: Arc<MultiGraphManager>,
storage_driver: Option<Arc<Box<dyn StorageDriver<Tree = Box<dyn StorageTree>>>>>,
persistent_store: Option<Arc<DataAdapter>>,
memory_store: Option<Arc<DataAdapter>>,
storage_type: StorageType,
index_manager: Option<Arc<IndexManager>>,
}
impl StorageManager {
pub fn new<P: AsRef<Path>>(
path: P,
method: StorageMethod,
storage_type: StorageType,
) -> Result<Self, Box<dyn std::error::Error>> {
info!(
"Creating storage manager with method: {:?}, storage type: {:?}",
method, storage_type
);
match method {
StorageMethod::DiskOnly => Self::init_disk_only(path, storage_type),
StorageMethod::MemoryOnly => Self::init_memory_only(path, storage_type),
StorageMethod::DiskAndMemory => Self::init_disk_and_memory(path, storage_type),
}
}
fn init_disk_only<P: AsRef<Path>>(
path: P,
storage_type: StorageType,
) -> Result<Self, Box<dyn std::error::Error>> {
info!(
"Initializing disk-only storage with {} at path: {:?}",
storage_type,
path.as_ref()
);
let driver = create_storage_driver(storage_type, path.as_ref())?;
let common_trees = vec!["nodes", "edges", "metadata", "catalog", "auth"];
for tree_name in &common_trees {
driver.open_tree(tree_name)?;
debug!("Pre-created tree: {}", tree_name);
}
info!(
"Storage driver initialized with {} pre-created trees",
common_trees.len()
);
let persistent_store = Arc::new(DataAdapter::new());
let driver_arc = Arc::new(driver);
let index_manager = Arc::new(IndexManager::new());
Ok(Self {
cache: Arc::new(MultiGraphManager::new()),
storage_driver: Some(driver_arc),
persistent_store: Some(persistent_store),
memory_store: None,
storage_type,
index_manager: Some(index_manager),
})
}
fn init_memory_only<P: AsRef<Path>>(
_path: P,
storage_type: StorageType,
) -> Result<Self, Box<dyn std::error::Error>> {
info!("Initializing memory-only storage with {:?}", storage_type);
Err("Memory-only storage not yet implemented".into())
}
fn init_disk_and_memory<P: AsRef<Path>>(
path: P,
storage_type: StorageType,
) -> Result<Self, Box<dyn std::error::Error>> {
info!(
"Initializing disk and memory storage with {} at path: {:?}",
storage_type,
path.as_ref()
);
Self::init_disk_only(path, storage_type)
}
pub fn get_graph(&self, name: &str) -> Result<Option<GraphCache>, StorageError> {
debug!("Getting graph '{}' from storage manager", name);
match self.cache.get_graph(name) {
Ok(Some(graph)) => {
debug!("Graph '{}' found in local cache", name);
return Ok(Some(graph));
}
Ok(None) => {
debug!("Graph '{}' not found in local cache", name);
}
Err(e) => {
error!("Error checking cache for graph '{}': {}", name, e);
return Err(e);
}
}
if let Some(_memory_store) = &self.memory_store {
debug!("Memory store not yet implemented for graph '{}'", name);
}
debug!(
"Graph '{}' not in memory, checking persistent storage",
name
);
if let Some(persistent_store) = &self.persistent_store {
if let Some(driver) = &self.storage_driver {
match persistent_store.load_graph_by_path(driver.as_ref().as_ref(), name) {
Ok(graph) => {
debug!("Graph '{}' loaded from persistent storage", name);
self.cache.add_graph(name.to_string(), graph.clone())?;
return Ok(Some(graph));
}
Err(e) => {
debug!(
"Failed to load graph '{}' from persistent storage: {}",
name, e
);
}
}
}
}
Ok(None)
}
pub fn save_graph(&self, name: &str, graph: GraphCache) -> Result<(), StorageError> {
debug!("Saving graph '{}' to storage manager", name);
let cache_name = name.to_string();
match self.cache.add_graph(cache_name.clone(), graph.clone()) {
Ok(_) => {
debug!("Successfully added graph '{}' to cache", cache_name);
}
Err(e) => {
error!("Failed to add graph '{}' to cache: {}", cache_name, e);
return Err(e);
}
}
if let Some(_memory_store) = &self.memory_store {
debug!("Memory store save not yet implemented for graph '{}'", name);
}
if let Some(persistent_store) = &self.persistent_store {
if let Some(driver) = &self.storage_driver {
debug!("Attempting to persist graph '{}' to disk", name);
persistent_store
.save_graph_by_path(driver.as_ref().as_ref(), &graph, name)
.map_err(|e| {
error!("Failed to persist graph '{}': {}", name, e);
StorageError::PersistenceError(format!(
"Failed to persist graph '{}': {}",
name, e
))
})?;
debug!("Successfully persisted graph '{}' to disk", name);
} else {
debug!(
"No storage driver available, skipping disk persistence for '{}'",
name
);
}
} else {
debug!(
"No persistent store available, skipping disk persistence for '{}'",
name
);
}
debug!("Successfully saved graph '{}' to all storage tiers", name);
Ok(())
}
pub fn get_graph_names(&self) -> Result<Vec<String>, StorageError> {
debug!("Getting all graph names from storage manager");
let mut names = self.cache.get_graph_names()?;
if let Some(persistent_store) = &self.persistent_store {
if let Some(driver) = &self.storage_driver {
if let Ok(persistent_names) = persistent_store.list_graphs(driver.as_ref().as_ref())
{
for name in persistent_names {
if !names.contains(&name) {
names.push(name);
}
}
}
}
}
if let Some(memory_store) = &self.memory_store {
if let Some(driver) = &self.storage_driver {
if let Ok(memory_names) = memory_store.list_graphs(driver.as_ref().as_ref()) {
for name in memory_names {
if !names.contains(&name) {
names.push(name);
}
}
}
}
}
names.sort();
debug!("Found {} graphs in storage manager", names.len());
Ok(names)
}
pub fn delete_graph(&self, name: &str) -> Result<(), StorageError> {
debug!("Deleting graph '{}' from storage manager", name);
self.cache.remove_graph(name)?;
if let Some(memory_store) = &self.memory_store {
if let Some(driver) = &self.storage_driver {
memory_store
.delete_graph(driver.as_ref().as_ref(), name)
.map_err(|e| {
error!("Failed to delete graph '{}' from memory store: {}", name, e);
StorageError::PersistenceError(format!(
"Failed to delete graph '{}' from memory: {}",
name, e
))
})?;
}
debug!("Successfully deleted graph '{}' from memory store", name);
}
if let Some(persistent_store) = &self.persistent_store {
if let Some(driver) = &self.storage_driver {
persistent_store
.delete_graph(driver.as_ref().as_ref(), name)
.map_err(|e| {
error!(
"Failed to delete graph '{}' from persistent storage: {}",
name, e
);
StorageError::PersistenceError(format!(
"Failed to delete graph '{}': {}",
name, e
))
})?;
}
debug!(
"Successfully deleted graph '{}' from persistent storage",
name
);
}
debug!(
"Successfully deleted graph '{}' from all storage tiers",
name
);
Ok(())
}
pub fn list_graphs(&self) -> Result<Vec<String>, StorageError> {
self.get_graph_names()
}
pub fn get_graph_mut(&self, name: &str) -> Result<Option<GraphCache>, StorageError> {
if self.cache.get_graph(name)?.is_none() {
if let Some(graph) = self.get_graph(name)? {
self.cache.add_graph(name.to_string(), graph)?;
}
}
self.cache.get_graph_mut(name)
}
pub fn get_storage_driver(
&self,
) -> Option<&Arc<Box<dyn StorageDriver<Tree = Box<dyn StorageTree>>>>> {
self.storage_driver.as_ref()
}
pub fn get_cache(&self) -> &Arc<MultiGraphManager> {
&self.cache
}
pub fn get_index_manager(&self) -> Option<&Arc<IndexManager>> {
self.index_manager.as_ref()
}
pub fn has_text_index(&self, index_name: &str) -> bool {
if let Some(index_manager) = &self.index_manager {
index_manager.index_exists(index_name)
} else {
false
}
}
pub fn create_graph_union(&self, graph_names: Vec<String>) -> Result<GraphCache, StorageError> {
let mut graphs = Vec::new();
for name in graph_names {
if let Some(graph) = self.get_graph(&name)? {
graphs.push(graph);
} else {
return Err(StorageError::GraphNotFound(name));
}
}
self.cache.union_graphs(graphs)
}
pub fn save_catalog(&self, catalog_manager: &CatalogManager) -> Result<(), StorageError> {
if let Some(persistent_store) = &self.persistent_store {
persistent_store.save_catalog(catalog_manager).map_err(|e| {
StorageError::PersistenceError(format!("Failed to save catalog: {}", e))
})
} else if let Some(memory_store) = &self.memory_store {
memory_store.save_catalog(catalog_manager).map_err(|e| {
StorageError::PersistenceError(format!("Failed to save catalog: {}", e))
})
} else {
Err(StorageError::PersistenceError(
"No storage backend available".to_string(),
))
}
}
pub fn load_catalog(&self, catalog_manager: &mut CatalogManager) -> Result<(), StorageError> {
if let Some(persistent_store) = &self.persistent_store {
persistent_store.load_catalog(catalog_manager).map_err(|e| {
StorageError::PersistenceError(format!("Failed to load catalog: {}", e))
})
} else if let Some(memory_store) = &self.memory_store {
memory_store.load_catalog(catalog_manager).map_err(|e| {
StorageError::PersistenceError(format!("Failed to load catalog: {}", e))
})
} else {
debug!("No storage backend available for loading catalog");
Ok(())
}
}
pub fn cache(&self) -> Arc<MultiGraphManager> {
self.cache.clone()
}
pub fn working_set(&self) -> Arc<MultiGraphManager> {
self.cache.clone()
}
pub fn persistent_storage(&self) -> Option<Arc<DataAdapter>> {
self.persistent_store.clone()
}
pub fn storage_driver(
&self,
) -> Option<Arc<Box<dyn StorageDriver<Tree = Box<dyn StorageTree>>>>> {
self.storage_driver.clone()
}
pub fn has_catalog_data(&self) -> Result<bool, StorageError> {
if let Some(persistent_store) = &self.persistent_store {
if let Some(driver) = &self.storage_driver {
persistent_store
.has_catalog_data(driver.as_ref().as_ref())
.map_err(|e| {
StorageError::PersistenceError(format!(
"Failed to check catalog data: {}",
e
))
})
} else {
Ok(false)
}
} else if let Some(memory_store) = &self.memory_store {
if let Some(driver) = &self.storage_driver {
memory_store
.has_catalog_data(driver.as_ref().as_ref())
.map_err(|e| {
StorageError::PersistenceError(format!(
"Failed to check catalog data: {}",
e
))
})
} else {
Ok(false)
}
} else {
Ok(false)
}
}
pub fn clear_cache(&self) -> Result<(), StorageError> {
debug!("Clearing storage cache");
self.cache.clear()?;
debug!("Successfully cleared storage cache");
Ok(())
}
pub fn get_cache_stats(&self) -> (usize, usize) {
let entries = self.cache.graph_count();
let memory_bytes = entries * 1024;
(entries, memory_bytes)
}
pub fn clear_all_data(&self) -> Result<(), StorageError> {
self.clear_cache()?;
if let Some(persistent_store) = &self.persistent_store {
if let Some(driver) = &self.storage_driver {
persistent_store
.clear(driver.as_ref().as_ref())
.map_err(|e| {
StorageError::PersistenceError(format!(
"Failed to clear persistent data: {}",
e
))
})?;
}
}
if let Some(memory_store) = &self.memory_store {
if let Some(driver) = &self.storage_driver {
memory_store.clear(driver.as_ref().as_ref()).map_err(|e| {
StorageError::PersistenceError(format!("Failed to clear memory data: {}", e))
})?;
}
}
Ok(())
}
pub fn save_catalog_provider(&self, name: &str, data: &[u8]) -> Result<(), StorageError> {
if let Some(persistent_store) = &self.persistent_store {
if let Some(driver) = &self.storage_driver {
persistent_store
.save_catalog_provider(driver.as_ref().as_ref(), name, data)
.map_err(|e| {
StorageError::PersistenceError(format!(
"Failed to save catalog provider '{}': {}",
name, e
))
})
} else {
Err(StorageError::PersistenceError(
"No storage driver available".to_string(),
))
}
} else if let Some(memory_store) = &self.memory_store {
if let Some(driver) = &self.storage_driver {
memory_store
.save_catalog_provider(driver.as_ref().as_ref(), name, data)
.map_err(|e| {
StorageError::PersistenceError(format!(
"Failed to save catalog provider '{}': {}",
name, e
))
})
} else {
Err(StorageError::PersistenceError(
"No storage driver available".to_string(),
))
}
} else {
Err(StorageError::PersistenceError(
"No storage backend available".to_string(),
))
}
}
pub fn load_catalog_provider(&self, name: &str) -> Result<Option<Vec<u8>>, StorageError> {
if let Some(persistent_store) = &self.persistent_store {
if let Some(driver) = &self.storage_driver {
match persistent_store.load_catalog_provider(driver.as_ref().as_ref(), name) {
Ok(data) => Ok(Some(data)),
Err(e) => {
debug!(
"Could not load catalog provider '{}' from persistent store: {}",
name, e
);
Ok(None)
}
}
} else {
Ok(None)
}
} else if let Some(memory_store) = &self.memory_store {
if let Some(driver) = &self.storage_driver {
match memory_store.load_catalog_provider(driver.as_ref().as_ref(), name) {
Ok(data) => Ok(Some(data)),
Err(e) => {
debug!(
"Could not load catalog provider '{}' from memory store: {}",
name, e
);
Ok(None)
}
}
} else {
Ok(None)
}
} else {
Ok(None)
}
}
pub fn has_catalog_provider(&self, name: &str) -> Result<bool, StorageError> {
if let Some(persistent_store) = &self.persistent_store {
if let Some(driver) = &self.storage_driver {
persistent_store
.has_catalog_provider(driver.as_ref().as_ref(), name)
.map_err(|e| {
StorageError::PersistenceError(format!(
"Failed to check catalog provider '{}': {}",
name, e
))
})
} else {
Ok(false)
}
} else if let Some(memory_store) = &self.memory_store {
if let Some(driver) = &self.storage_driver {
memory_store
.has_catalog_provider(driver.as_ref().as_ref(), name)
.map_err(|e| {
StorageError::PersistenceError(format!(
"Failed to check catalog provider '{}': {}",
name, e
))
})
} else {
Ok(false)
}
} else {
Ok(false)
}
}
pub fn list_catalog_providers(&self) -> Result<Vec<String>, StorageError> {
if let Some(persistent_store) = &self.persistent_store {
if let Some(driver) = &self.storage_driver {
persistent_store
.list_catalog_providers(driver.as_ref().as_ref())
.map_err(|e| {
StorageError::PersistenceError(format!(
"Failed to list catalog providers: {}",
e
))
})
} else {
Ok(Vec::new())
}
} else if let Some(memory_store) = &self.memory_store {
if let Some(driver) = &self.storage_driver {
memory_store
.list_catalog_providers(driver.as_ref().as_ref())
.map_err(|e| {
StorageError::PersistenceError(format!(
"Failed to list catalog providers: {}",
e
))
})
} else {
Ok(Vec::new())
}
} else {
Ok(Vec::new())
}
}
pub fn shutdown(&self) -> Result<(), StorageError> {
if let Some(_persistent_store) = &self.persistent_store {
debug!("Flushing storage manager during shutdown");
}
Ok(())
}
}
impl std::fmt::Debug for StorageManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StorageManager")
.field("storage_type", &self.storage_type)
.field("has_storage_driver", &self.storage_driver.is_some())
.field("has_persistent_store", &self.persistent_store.is_some())
.field("has_memory_store", &self.memory_store.is_some())
.finish_non_exhaustive()
}
}