#![allow(clippy::excessive_nesting)]
use do_memory_core::{Error, Result};
use redb::{Database, TableDefinition};
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use tracing::info;
use crate::cache::Cache as CacheTrait;
mod backend_impl;
mod cache;
mod embeddings;
mod embeddings_backend;
mod embeddings_impl;
mod episodes;
mod episodes_queries;
mod episodes_summaries;
mod heuristics;
mod patterns;
mod persistence;
mod recommendations;
mod relationships;
mod statistics;
mod storage;
mod storage_ops;
mod tables;
pub use crate::cache::{
AdaptiveCache, AdaptiveCacheAdapter, AdaptiveCacheConfig, AdaptiveCacheMetrics, Cache,
CacheConfig, CacheMetrics, LRUCache,
};
pub use crate::statistics::StorageStatistics;
pub use persistence::{
CachePersistence, CacheSnapshot, IncrementalUpdate, PersistedCacheEntry, PersistenceConfig,
PersistenceManager, PersistenceMode, PersistenceStats, PersistenceStrategy,
};
pub use storage::RedbQuery;
pub const MAX_EPISODE_SIZE: u64 = 10_000_000;
pub const MAX_PATTERN_SIZE: u64 = 1_000_000;
pub const MAX_HEURISTIC_SIZE: u64 = 100_000;
pub const MAX_EMBEDDING_SIZE: u64 = 1_000_000;
pub(crate) const EPISODES_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("episodes");
pub(crate) const PATTERNS_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("patterns");
pub(crate) const HEURISTICS_TABLE: TableDefinition<&str, &[u8]> =
TableDefinition::new("heuristics");
pub(crate) const EMBEDDINGS_TABLE: TableDefinition<&str, &[u8]> =
TableDefinition::new("embeddings");
pub(crate) const METADATA_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("metadata");
pub(crate) const SUMMARIES_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("summaries");
pub(crate) const RELATIONSHIPS_TABLE: TableDefinition<&str, &[u8]> =
TableDefinition::new("relationships");
pub(crate) const RECOMMENDATION_SESSIONS_TABLE: TableDefinition<&str, &[u8]> =
TableDefinition::new("recommendation_sessions");
pub(crate) const RECOMMENDATION_FEEDBACK_TABLE: TableDefinition<&str, &[u8]> =
TableDefinition::new("recommendation_feedback");
pub(crate) const RECOMMENDATION_EPISODE_INDEX_TABLE: TableDefinition<&str, &str> =
TableDefinition::new("recommendation_episode_index");
pub(crate) const SCHEMA_VERSION: u64 = 2;
pub(crate) const SCHEMA_VERSION_TABLE: TableDefinition<&str, u64> =
TableDefinition::new("schema_version");
const DB_OPERATION_TIMEOUT: Duration = Duration::from_secs(10);
pub(crate) async fn with_db_timeout<T, F>(operation: F) -> crate::Result<T>
where
F: FnOnce() -> crate::Result<T> + Send + 'static,
T: Send + 'static,
{
match tokio::time::timeout(DB_OPERATION_TIMEOUT, tokio::task::spawn_blocking(operation)).await {
Ok(Ok(result)) => result, Ok(Err(join_err)) => Err(Error::Storage(format!("Task join error: {}", join_err))),
Err(_) => Err(Error::Storage(format!(
"Database operation timed out after {:?}",
DB_OPERATION_TIMEOUT
))),
}
}
pub struct RedbStorage {
pub(crate) db: Arc<Database>,
pub(crate) cache: Box<dyn CacheTrait>,
}
impl RedbStorage {
pub async fn new(path: &Path) -> Result<Self> {
Self::new_with_adaptive_config(path, AdaptiveCacheConfig::default()).await
}
pub async fn new_with_cache_config(path: &Path, cache_config: CacheConfig) -> Result<Self> {
info!("Opening redb database at {}", path.display());
let path_buf = path.to_path_buf();
let db = with_db_timeout(move || {
Database::create(&path_buf)
.map_err(|e| Error::Storage(format!("Failed to create redb database: {}", e)))
})
.await?;
let cache: Box<dyn CacheTrait> = Box::new(LRUCache::new(cache_config));
let storage = Self {
db: Arc::new(db),
cache,
};
storage.initialize_tables().await?;
info!("Successfully opened redb database with LRU cache");
Ok(storage)
}
pub async fn new_with_adaptive_config(
path: &Path,
config: AdaptiveCacheConfig,
) -> Result<Self> {
info!("Opening redb database at {}", path.display());
let path_buf = path.to_path_buf();
let db = with_db_timeout(move || {
Database::create(&path_buf)
.map_err(|e| Error::Storage(format!("Failed to create redb database: {}", e)))
})
.await?;
let cache: Box<dyn CacheTrait> = Box::new(AdaptiveCacheAdapter::new(config));
let storage = Self {
db: Arc::new(db),
cache,
};
storage.initialize_tables().await?;
info!("Successfully opened redb database with adaptive cache");
Ok(storage)
}
}