rs-zero 0.2.6

Rust-first microservice framework inspired by go-zero engineering practices
Documentation
use std::{sync::Arc, time::Duration};

use async_trait::async_trait;
use tokio::sync::Mutex;

use crate::cache::{CacheError, CacheKey, CacheResult, CacheStore};

#[derive(Debug, Default)]
struct StatsInner {
    l1_hits: u64,
    l1_misses: u64,
    l2_hits: u64,
    l2_misses: u64,
    backfills: u64,
}

/// Snapshot of two-level cache routing counters.
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct TwoLevelCacheSnapshot {
    /// Reads satisfied by L1.
    pub l1_hits: u64,
    /// Reads that missed L1.
    pub l1_misses: u64,
    /// Reads satisfied by L2 after an L1 miss.
    pub l2_hits: u64,
    /// Reads that missed both levels.
    pub l2_misses: u64,
    /// Number of L2 values written back into L1.
    pub backfills: u64,
}

/// Shared counters for [`TwoLevelCacheStore`].
#[derive(Debug, Clone, Default)]
pub struct TwoLevelCacheStats {
    inner: Arc<Mutex<StatsInner>>,
}

impl TwoLevelCacheStats {
    async fn record_l1_hit(&self) {
        self.inner.lock().await.l1_hits += 1;
    }

    async fn record_l1_miss(&self) {
        self.inner.lock().await.l1_misses += 1;
    }

    async fn record_l2_hit(&self) {
        self.inner.lock().await.l2_hits += 1;
    }

    async fn record_l2_miss(&self) {
        self.inner.lock().await.l2_misses += 1;
    }

    async fn record_backfill(&self) {
        self.inner.lock().await.backfills += 1;
    }

    /// Returns the current two-level cache counters.
    pub async fn snapshot(&self) -> TwoLevelCacheSnapshot {
        let stats = self.inner.lock().await;
        TwoLevelCacheSnapshot {
            l1_hits: stats.l1_hits,
            l1_misses: stats.l1_misses,
            l2_hits: stats.l2_hits,
            l2_misses: stats.l2_misses,
            backfills: stats.backfills,
        }
    }
}

/// Cache store that composes a fast L1 store with an authoritative L2 store.
#[derive(Debug, Clone)]
pub struct TwoLevelCacheStore<L1, L2> {
    l1: L1,
    l2: L2,
    l1_backfill_ttl: Option<Duration>,
    stats: TwoLevelCacheStats,
    #[cfg(feature = "observability")]
    metrics: Option<crate::observability::MetricsRegistry>,
}

impl<L1, L2> TwoLevelCacheStore<L1, L2> {
    /// Creates a two-level cache without a custom L1 backfill TTL.
    pub fn new(l1: L1, l2: L2) -> Self {
        Self {
            l1,
            l2,
            l1_backfill_ttl: None,
            stats: TwoLevelCacheStats::default(),
            #[cfg(feature = "observability")]
            metrics: None,
        }
    }

    /// Sets the TTL used when an L2 hit is backfilled into L1.
    pub fn with_l1_backfill_ttl(mut self, ttl: Option<Duration>) -> Self {
        self.l1_backfill_ttl = ttl;
        self
    }

    /// Attaches a metrics registry to this two-level cache store.
    #[cfg(feature = "observability")]
    pub fn with_metrics(mut self, metrics: crate::observability::MetricsRegistry) -> Self {
        self.metrics = Some(metrics);
        self
    }

    /// Returns shared two-level cache counters.
    pub fn stats(&self) -> TwoLevelCacheStats {
        self.stats.clone()
    }

    /// Returns a reference to the L1 store.
    pub fn l1(&self) -> &L1 {
        &self.l1
    }

    /// Returns a reference to the L2 store.
    pub fn l2(&self) -> &L2 {
        &self.l2
    }

    fn record_event(&self, operation: &str, result: &str) {
        #[cfg(feature = "observability")]
        crate::observability::cache::record_cache_event(
            self.metrics.as_ref(),
            "two_level",
            operation,
            result,
        );

        #[cfg(not(feature = "observability"))]
        {
            let _ = (operation, result);
        }
    }
}

#[async_trait]
impl<L1, L2> CacheStore for TwoLevelCacheStore<L1, L2>
where
    L1: CacheStore,
    L2: CacheStore,
{
    async fn get_raw(&self, key: &CacheKey) -> CacheResult<Option<Vec<u8>>> {
        if let Some(value) = self.l1.get_raw(key).await? {
            self.stats.record_l1_hit().await;
            self.record_event("get", "l1_hit");
            return Ok(Some(value));
        }
        self.stats.record_l1_miss().await;
        self.record_event("get", "l1_miss");

        let Some(value) = self.l2.get_raw(key).await? else {
            self.stats.record_l2_miss().await;
            self.record_event("get", "l2_miss");
            return Ok(None);
        };
        self.stats.record_l2_hit().await;
        self.record_event("get", "l2_hit");
        self.l1
            .set_raw(key, value.clone(), self.l1_backfill_ttl)
            .await?;
        self.stats.record_backfill().await;
        self.record_event("set", "backfill");
        Ok(Some(value))
    }

    async fn set_raw(
        &self,
        key: &CacheKey,
        value: Vec<u8>,
        ttl: Option<Duration>,
    ) -> CacheResult<()> {
        self.l2.set_raw(key, value.clone(), ttl).await?;
        self.l1.set_raw(key, value, ttl).await?;
        self.record_event("set", "success");
        Ok(())
    }

    async fn delete(&self, key: &CacheKey) -> CacheResult<()> {
        let l1 = self.l1.delete(key).await;
        let l2 = self.l2.delete(key).await;
        match (l1, l2) {
            (Ok(()), Ok(())) => {
                self.record_event("delete", "success");
                Ok(())
            }
            (Err(error), Ok(())) | (Ok(()), Err(error)) => {
                self.record_event("delete", "error");
                Err(error)
            }
            (Err(left), Err(right)) => {
                self.record_event("delete", "error");
                Err(CacheError::Backend(format!(
                    "two-level cache delete failed: l1={left}; l2={right}"
                )))
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use crate::cache::{CacheKey, CacheStore, MemoryCacheStore, TwoLevelCacheStore};

    #[tokio::test]
    async fn two_level_cache_backfills_l1_from_l2() {
        let l1 = MemoryCacheStore::new();
        let l2 = MemoryCacheStore::new();
        let key = CacheKey::new("app", ["user", "1"]);
        l2.set_raw(&key, b"ada".to_vec(), None)
            .await
            .expect("set l2");

        let store = TwoLevelCacheStore::new(l1.clone(), l2);
        assert_eq!(
            store.get_raw(&key).await.expect("get"),
            Some(b"ada".to_vec())
        );
        assert_eq!(l1.get_raw(&key).await.expect("l1"), Some(b"ada".to_vec()));

        let snapshot = store.stats().snapshot().await;
        assert_eq!(snapshot.l1_misses, 1);
        assert_eq!(snapshot.l2_hits, 1);
        assert_eq!(snapshot.backfills, 1);
    }

    #[tokio::test]
    async fn two_level_cache_sets_and_deletes_both_levels() {
        let l1 = MemoryCacheStore::new();
        let l2 = MemoryCacheStore::new();
        let store = TwoLevelCacheStore::new(l1.clone(), l2.clone());
        let key = CacheKey::new("app", ["user", "2"]);

        store
            .set_raw(&key, b"grace".to_vec(), None)
            .await
            .expect("set");
        assert_eq!(l1.get_raw(&key).await.expect("l1"), Some(b"grace".to_vec()));
        assert_eq!(l2.get_raw(&key).await.expect("l2"), Some(b"grace".to_vec()));

        store.delete(&key).await.expect("delete");
        assert!(l1.get_raw(&key).await.expect("l1").is_none());
        assert!(l2.get_raw(&key).await.expect("l2").is_none());
    }
}