use crate::config::Credentials;
use crate::metrics::MetricRecord;
use crate::storage::{StorageBackend, adbc::AdbcBackend, duckdb::DuckDbBackend};
use std::sync::Arc;
use std::collections::HashMap;
use tonic::Status;
pub struct CachedStorageBackend {
cache: Arc<dyn StorageBackend>,
store: Arc<dyn StorageBackend>,
max_duration_secs: u64,
}
impl CachedStorageBackend {
pub fn new(
cache: Arc<dyn StorageBackend>,
store: Arc<dyn StorageBackend>,
max_duration_secs: u64,
) -> Self {
Self {
cache,
store,
max_duration_secs,
}
}
}
#[async_trait::async_trait]
impl StorageBackend for CachedStorageBackend {
async fn init(&self) -> Result<(), Status> {
self.cache.init().await?;
self.store.init().await?;
Ok(())
}
async fn insert_metrics(&self, metrics: Vec<MetricRecord>) -> Result<(), Status> {
self.cache.insert_metrics(metrics.clone()).await?;
self.store.insert_metrics(metrics).await?;
Ok(())
}
async fn query_metrics(&self, from_timestamp: i64) -> Result<Vec<MetricRecord>, Status> {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs() as i64;
let cache_cutoff = now - self.max_duration_secs as i64;
if from_timestamp >= cache_cutoff {
match self.cache.query_metrics(from_timestamp).await {
Ok(metrics) if !metrics.is_empty() => return Ok(metrics),
_ => {}
}
}
let metrics = self.store.query_metrics(from_timestamp).await?;
if !metrics.is_empty() && from_timestamp >= cache_cutoff {
self.cache.insert_metrics(metrics.clone()).await?;
}
Ok(metrics)
}
async fn prepare_sql(&self, query: &str) -> Result<Vec<u8>, Status> {
self.store.prepare_sql(query).await
}
async fn query_sql(&self, statement_handle: &[u8]) -> Result<Vec<MetricRecord>, Status> {
self.store.query_sql(statement_handle).await
}
fn new_with_options(
connection_string: &str,
options: &HashMap<String, String>,
credentials: Option<&Credentials>,
) -> Result<Self, Status> {
let max_duration_secs = options
.get("max_duration_secs")
.and_then(|s| s.parse().ok())
.unwrap_or(3600);
let default_engine = "duckdb".to_string();
let default_connection = ":memory:".to_string();
let cache_engine = options.get("cache_engine").unwrap_or(&default_engine);
let cache_connection = options.get("cache_connection").unwrap_or(&default_connection);
let cache_options: HashMap<String, String> = options
.iter()
.filter(|(k, _)| k.starts_with("cache_"))
.map(|(k, v)| (k[6..].to_string(), v.clone()))
.collect();
let cache: Arc<dyn StorageBackend> = match cache_engine.as_str() {
"duckdb" => Arc::new(DuckDbBackend::new_with_options(
cache_connection,
&cache_options,
None,
)?),
"adbc" => Arc::new(AdbcBackend::new_with_options(
cache_connection,
&cache_options,
None,
)?),
_ => return Err(Status::invalid_argument("Invalid cache engine type")),
};
let store = Arc::new(AdbcBackend::new_with_options(
connection_string,
options,
credentials,
)?);
Ok(Self::new(cache, store, max_duration_secs))
}
}