pub mod database;
pub mod files;
pub mod redis;
pub mod vector;
use crate::config::models::storage::StorageConfig;
use crate::utils::error::gateway_error::{GatewayError, Result};
use std::sync::Arc;
use tracing::{debug, info, warn};
#[derive(Debug, Clone)]
pub struct StorageLayer {
pub database: Arc<database::Database>,
pub redis: Arc<redis::RedisPool>,
pub files: Arc<files::FileStorage>,
pub vector: Option<Arc<vector::VectorStoreBackend>>,
}
impl StorageLayer {
pub async fn new(config: &StorageConfig) -> Result<Self> {
info!("Initializing storage layer");
debug!("Connecting to database");
let database = Arc::new(database::Database::new(&config.database).await?);
debug!("Creating Redis connection pool");
let redis = match redis::RedisPool::new(&config.redis).await {
Ok(pool) => {
if pool.is_noop() {
info!("Redis caching is disabled (no-op mode)");
} else {
info!("Redis connection established");
}
Arc::new(pool)
}
Err(e) => {
warn!(
"Redis connection failed: {}. Gateway will operate without caching.",
e
);
Arc::new(redis::RedisPool::create_noop())
}
};
debug!("Initializing file storage");
let default_file_config = crate::config::models::file_storage::FileStorageConfig::default();
let files = Arc::new(files::FileStorage::new(&default_file_config).await?);
let vector = if let Some(ref vector_config) = config.vector_db {
debug!("Initializing vector database");
match vector::VectorStoreBackend::new(vector_config).await {
Ok(v) => Some(Arc::new(v)),
Err(e) => {
warn!(
"Vector database initialization failed: {}, continuing without vector DB",
e
);
None
}
}
} else {
debug!("Vector database not configured, skipping");
None
};
info!("Storage layer initialized successfully");
Ok(Self {
database,
redis,
files,
vector,
})
}
pub async fn migrate(&self) -> Result<()> {
info!("Running database migrations");
self.database.migrate().await?;
info!("Database migrations completed");
Ok(())
}
pub async fn health_check(&self) -> Result<StorageHealthStatus> {
let mut status = StorageHealthStatus {
database: false,
redis: false,
files: false,
vector: false,
overall: false,
};
match self.database.health_check().await {
Ok(_) => status.database = true,
Err(e) => {
warn!("Database health check failed: {}", e);
}
}
match self.redis.health_check().await {
Ok(_) => status.redis = true,
Err(e) => {
warn!("Redis health check failed: {}", e);
}
}
match self.files.health_check().await {
Ok(_) => status.files = true,
Err(e) => {
warn!("File storage health check failed: {}", e);
}
}
if let Some(vector) = &self.vector {
match vector.health_check().await {
Ok(_) => status.vector = true,
Err(e) => {
warn!("Vector database health check failed: {}", e);
}
}
} else {
status.vector = true; }
status.overall = status.database && status.redis && status.files && status.vector;
Ok(status)
}
pub async fn close(&self) -> Result<()> {
info!("Closing storage connections");
self.redis.close().await?;
self.files.close().await?;
if let Some(vector) = &self.vector {
vector.close().await?;
}
info!("Storage connections closed");
Ok(())
}
pub fn db(&self) -> &database::Database {
&self.database
}
pub fn redis(&self) -> &redis::RedisPool {
&self.redis
}
pub fn files(&self) -> &files::FileStorage {
&self.files
}
pub fn vector(&self) -> Option<&vector::VectorStoreBackend> {
self.vector.as_deref()
}
pub async fn redis_conn(&self) -> Result<redis::RedisConnection> {
self.redis.get_connection().await
}
pub async fn store_file(&self, filename: &str, content: &[u8]) -> Result<String> {
self.files.store(filename, content).await
}
pub async fn get_file(&self, file_id: &str) -> Result<Vec<u8>> {
self.files.get(file_id).await
}
pub async fn delete_file(&self, file_id: &str) -> Result<()> {
self.files.delete(file_id).await
}
pub async fn store_embeddings(
&self,
id: &str,
embeddings: &[f32],
metadata: Option<serde_json::Value>,
) -> Result<()> {
if let Some(vector) = &self.vector {
vector.store(id, embeddings, metadata).await
} else {
Err(GatewayError::Config(
"Vector database not configured".to_string(),
))
}
}
pub async fn search_similar(
&self,
query_vector: &[f32],
limit: usize,
threshold: Option<f32>,
) -> Result<Vec<vector::SearchResult>> {
if let Some(vector) = &self.vector {
vector.search(query_vector, limit, threshold).await
} else {
Err(GatewayError::Config(
"Vector database not configured".to_string(),
))
}
}
pub async fn cache_get(&self, key: &str) -> Result<Option<String>> {
self.redis.get(key).await
}
pub async fn cache_set(&self, key: &str, value: &str, ttl: Option<u64>) -> Result<()> {
self.redis.set(key, value, ttl).await
}
pub async fn cache_delete(&self, key: &str) -> Result<()> {
self.redis.delete(key).await
}
pub async fn cache_exists(&self, key: &str) -> Result<bool> {
self.redis.exists(key).await
}
pub async fn cache_mget(&self, keys: &[String]) -> Result<Vec<Option<String>>> {
self.redis.mget(keys).await
}
pub async fn cache_mset(&self, pairs: &[(String, String)], ttl: Option<u64>) -> Result<()> {
self.redis.mset(pairs, ttl).await
}
pub async fn list_push(&self, key: &str, value: &str) -> Result<()> {
self.redis.list_push(key, value).await
}
pub async fn list_pop(&self, key: &str) -> Result<Option<String>> {
self.redis.list_pop(key).await
}
pub async fn list_length(&self, key: &str) -> Result<usize> {
self.redis.list_length(key).await
}
pub async fn set_add(&self, key: &str, member: &str) -> Result<()> {
self.redis.set_add(key, member).await
}
pub async fn set_remove(&self, key: &str, member: &str) -> Result<()> {
self.redis.set_remove(key, member).await
}
pub async fn set_members(&self, key: &str) -> Result<Vec<String>> {
self.redis.set_members(key).await
}
pub async fn hash_set(&self, key: &str, field: &str, value: &str) -> Result<()> {
self.redis.hash_set(key, field, value).await
}
pub async fn hash_get(&self, key: &str, field: &str) -> Result<Option<String>> {
self.redis.hash_get(key, field).await
}
pub async fn hash_delete(&self, key: &str, field: &str) -> Result<()> {
self.redis.hash_delete(key, field).await
}
pub async fn hash_get_all(
&self,
key: &str,
) -> Result<std::collections::HashMap<String, String>> {
self.redis.hash_get_all(key).await
}
pub async fn publish(&self, channel: &str, message: &str) -> Result<()> {
self.redis.publish(channel, message).await
}
pub async fn subscribe(&self, channels: &[String]) -> Result<redis::Subscription> {
self.redis.subscribe(channels).await
}
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct StorageHealthStatus {
pub database: bool,
pub redis: bool,
pub files: bool,
pub vector: bool,
pub overall: bool,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::models::storage::{DatabaseConfig, RedisConfig};
#[tokio::test]
async fn test_storage_layer_creation() {
let config = StorageConfig {
database: DatabaseConfig {
url: "postgresql://localhost:5432/test".to_string(),
max_connections: 5,
connection_timeout: 5,
ssl: false,
enabled: true,
},
redis: RedisConfig {
url: "redis://localhost:6379".to_string(),
enabled: true,
max_connections: 10,
connection_timeout: 5,
cluster: false,
},
vector_db: None,
};
assert_eq!(config.database.url, "postgresql://localhost:5432/test");
assert_eq!(config.redis.url, "redis://localhost:6379");
}
}