use std::{sync::Arc, time::Duration};
use async_trait::async_trait;
use tokio::sync::Mutex;
use crate::cache::{CacheError, CacheKey, CacheResult, CacheStore};
#[derive(Debug, Default)]
struct StatsInner {
l1_hits: u64,
l1_misses: u64,
l2_hits: u64,
l2_misses: u64,
backfills: u64,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct TwoLevelCacheSnapshot {
pub l1_hits: u64,
pub l1_misses: u64,
pub l2_hits: u64,
pub l2_misses: u64,
pub backfills: u64,
}
#[derive(Debug, Clone, Default)]
pub struct TwoLevelCacheStats {
inner: Arc<Mutex<StatsInner>>,
}
impl TwoLevelCacheStats {
async fn record_l1_hit(&self) {
self.inner.lock().await.l1_hits += 1;
}
async fn record_l1_miss(&self) {
self.inner.lock().await.l1_misses += 1;
}
async fn record_l2_hit(&self) {
self.inner.lock().await.l2_hits += 1;
}
async fn record_l2_miss(&self) {
self.inner.lock().await.l2_misses += 1;
}
async fn record_backfill(&self) {
self.inner.lock().await.backfills += 1;
}
pub async fn snapshot(&self) -> TwoLevelCacheSnapshot {
let stats = self.inner.lock().await;
TwoLevelCacheSnapshot {
l1_hits: stats.l1_hits,
l1_misses: stats.l1_misses,
l2_hits: stats.l2_hits,
l2_misses: stats.l2_misses,
backfills: stats.backfills,
}
}
}
#[derive(Debug, Clone)]
pub struct TwoLevelCacheStore<L1, L2> {
l1: L1,
l2: L2,
l1_backfill_ttl: Option<Duration>,
stats: TwoLevelCacheStats,
#[cfg(feature = "observability")]
metrics: Option<crate::observability::MetricsRegistry>,
}
impl<L1, L2> TwoLevelCacheStore<L1, L2> {
pub fn new(l1: L1, l2: L2) -> Self {
Self {
l1,
l2,
l1_backfill_ttl: None,
stats: TwoLevelCacheStats::default(),
#[cfg(feature = "observability")]
metrics: None,
}
}
pub fn with_l1_backfill_ttl(mut self, ttl: Option<Duration>) -> Self {
self.l1_backfill_ttl = ttl;
self
}
#[cfg(feature = "observability")]
pub fn with_metrics(mut self, metrics: crate::observability::MetricsRegistry) -> Self {
self.metrics = Some(metrics);
self
}
pub fn stats(&self) -> TwoLevelCacheStats {
self.stats.clone()
}
pub fn l1(&self) -> &L1 {
&self.l1
}
pub fn l2(&self) -> &L2 {
&self.l2
}
fn record_event(&self, operation: &str, result: &str) {
#[cfg(feature = "observability")]
crate::observability::cache::record_cache_event(
self.metrics.as_ref(),
"two_level",
operation,
result,
);
#[cfg(not(feature = "observability"))]
{
let _ = (operation, result);
}
}
}
#[async_trait]
impl<L1, L2> CacheStore for TwoLevelCacheStore<L1, L2>
where
L1: CacheStore,
L2: CacheStore,
{
async fn get_raw(&self, key: &CacheKey) -> CacheResult<Option<Vec<u8>>> {
if let Some(value) = self.l1.get_raw(key).await? {
self.stats.record_l1_hit().await;
self.record_event("get", "l1_hit");
return Ok(Some(value));
}
self.stats.record_l1_miss().await;
self.record_event("get", "l1_miss");
let Some(value) = self.l2.get_raw(key).await? else {
self.stats.record_l2_miss().await;
self.record_event("get", "l2_miss");
return Ok(None);
};
self.stats.record_l2_hit().await;
self.record_event("get", "l2_hit");
self.l1
.set_raw(key, value.clone(), self.l1_backfill_ttl)
.await?;
self.stats.record_backfill().await;
self.record_event("set", "backfill");
Ok(Some(value))
}
async fn set_raw(
&self,
key: &CacheKey,
value: Vec<u8>,
ttl: Option<Duration>,
) -> CacheResult<()> {
self.l2.set_raw(key, value.clone(), ttl).await?;
self.l1.set_raw(key, value, ttl).await?;
self.record_event("set", "success");
Ok(())
}
async fn delete(&self, key: &CacheKey) -> CacheResult<()> {
let l1 = self.l1.delete(key).await;
let l2 = self.l2.delete(key).await;
match (l1, l2) {
(Ok(()), Ok(())) => {
self.record_event("delete", "success");
Ok(())
}
(Err(error), Ok(())) | (Ok(()), Err(error)) => {
self.record_event("delete", "error");
Err(error)
}
(Err(left), Err(right)) => {
self.record_event("delete", "error");
Err(CacheError::Backend(format!(
"two-level cache delete failed: l1={left}; l2={right}"
)))
}
}
}
}
#[cfg(test)]
mod tests {
use crate::cache::{CacheKey, CacheStore, MemoryCacheStore, TwoLevelCacheStore};
#[tokio::test]
async fn two_level_cache_backfills_l1_from_l2() {
let l1 = MemoryCacheStore::new();
let l2 = MemoryCacheStore::new();
let key = CacheKey::new("app", ["user", "1"]);
l2.set_raw(&key, b"ada".to_vec(), None)
.await
.expect("set l2");
let store = TwoLevelCacheStore::new(l1.clone(), l2);
assert_eq!(
store.get_raw(&key).await.expect("get"),
Some(b"ada".to_vec())
);
assert_eq!(l1.get_raw(&key).await.expect("l1"), Some(b"ada".to_vec()));
let snapshot = store.stats().snapshot().await;
assert_eq!(snapshot.l1_misses, 1);
assert_eq!(snapshot.l2_hits, 1);
assert_eq!(snapshot.backfills, 1);
}
#[tokio::test]
async fn two_level_cache_sets_and_deletes_both_levels() {
let l1 = MemoryCacheStore::new();
let l2 = MemoryCacheStore::new();
let store = TwoLevelCacheStore::new(l1.clone(), l2.clone());
let key = CacheKey::new("app", ["user", "2"]);
store
.set_raw(&key, b"grace".to_vec(), None)
.await
.expect("set");
assert_eq!(l1.get_raw(&key).await.expect("l1"), Some(b"grace".to_vec()));
assert_eq!(l2.get_raw(&key).await.expect("l2"), Some(b"grace".to_vec()));
store.delete(&key).await.expect("delete");
assert!(l1.get_raw(&key).await.expect("l1").is_none());
assert!(l2.get_raw(&key).await.expect("l2").is_none());
}
}