rs-zero 0.2.4

Rust-first microservice framework inspired by go-zero engineering practices
Documentation
use std::{
    collections::HashMap,
    sync::Arc,
    time::{Duration, Instant},
};

use async_trait::async_trait;
use tokio::sync::RwLock;

use crate::cache::{CacheError, CacheKey, CacheResult, CacheStore};

#[derive(Debug, Clone)]
pub(super) struct Entry {
    value: Vec<u8>,
    expires_at: Option<Instant>,
    pub(super) previous: Option<String>,
    pub(super) next: Option<String>,
}

#[derive(Debug, Default)]
struct State {
    entries: HashMap<String, Entry>,
    lru: super::lru_list::LruList,
    evictions: u64,
    expired_removals: u64,
}

impl State {
    fn remove_entry(&mut self, key: &str) -> Option<Entry> {
        self.lru.remove_entry(&mut self.entries, key)
    }

    fn move_to_most_recent(&mut self, key: &str) {
        self.lru.move_to_most_recent(&mut self.entries, key);
    }

    fn insert_most_recent(&mut self, key: String, entry: Entry) {
        self.lru.insert_most_recent(&mut self.entries, key, entry);
    }
}

/// Point-in-time statistics for an [`LruCacheStore`].
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct LruCacheSnapshot {
    /// Configured maximum number of entries.
    pub capacity: usize,
    /// Number of currently retained entries.
    pub entries: usize,
    /// Number of entries removed because capacity was exceeded.
    pub evictions: u64,
    /// Number of expired entries removed during reads or cleanup.
    pub expired_removals: u64,
}

/// Bounded in-process cache store using least-recently-used eviction.
#[derive(Debug, Clone)]
pub struct LruCacheStore {
    capacity: usize,
    state: Arc<RwLock<State>>,
    #[cfg(feature = "observability")]
    metrics: Option<crate::observability::MetricsRegistry>,
}

impl LruCacheStore {
    /// Creates a bounded LRU store.
    pub fn new(capacity: usize) -> CacheResult<Self> {
        if capacity == 0 {
            return Err(CacheError::Backend(
                "lru cache capacity must be greater than zero".to_string(),
            ));
        }
        Ok(Self {
            capacity,
            state: Arc::new(RwLock::new(State::default())),
            #[cfg(feature = "observability")]
            metrics: None,
        })
    }

    /// Returns the configured capacity.
    pub fn capacity(&self) -> usize {
        self.capacity
    }

    /// Attaches a metrics registry to this L1 cache store.
    #[cfg(feature = "observability")]
    pub fn with_metrics(mut self, metrics: crate::observability::MetricsRegistry) -> Self {
        self.metrics = Some(metrics);
        self
    }

    /// Removes expired entries and returns the current snapshot.
    pub async fn snapshot(&self) -> LruCacheSnapshot {
        let mut state = self.state.write().await;
        Self::remove_expired(&mut state);
        LruCacheSnapshot {
            capacity: self.capacity,
            entries: state.entries.len(),
            evictions: state.evictions,
            expired_removals: state.expired_removals,
        }
    }

    fn remove_expired(state: &mut State) {
        let now = Instant::now();
        let expired = state
            .entries
            .iter()
            .filter(|(_, entry)| entry.expires_at.is_some_and(|deadline| deadline <= now))
            .map(|(key, _)| key.clone())
            .collect::<Vec<_>>();
        for key in &expired {
            state.remove_entry(key);
        }
        state.expired_removals += expired.len() as u64;
    }

    fn evict_if_needed(&self, state: &mut State) {
        while state.entries.len() > self.capacity {
            let Some(key) = state.lru.least_recent.clone() else {
                return;
            };
            state.remove_entry(&key);
            state.evictions += 1;
            self.record_event("evict", "capacity");
        }
    }

    fn record_event(&self, operation: &str, result: &str) {
        #[cfg(feature = "observability")]
        crate::observability::cache::record_cache_event(
            self.metrics.as_ref(),
            "lru",
            operation,
            result,
        );

        #[cfg(not(feature = "observability"))]
        {
            let _ = (operation, result);
        }
    }
}

#[async_trait]
impl CacheStore for LruCacheStore {
    async fn get_raw(&self, key: &CacheKey) -> CacheResult<Option<Vec<u8>>> {
        let rendered = key.render();
        let mut state = self.state.write().await;
        let expired = state
            .entries
            .get(&rendered)
            .and_then(|entry| entry.expires_at)
            .is_some_and(|deadline| deadline <= Instant::now());
        if expired {
            state.remove_entry(&rendered);
            state.expired_removals += 1;
            self.record_event("get", "expired");
            return Ok(None);
        }

        let value = state
            .entries
            .get(&rendered)
            .map(|entry| entry.value.clone());
        if value.is_some() {
            state.move_to_most_recent(&rendered);
        }
        self.record_event("get", if value.is_some() { "hit" } else { "miss" });
        Ok(value)
    }

    async fn set_raw(
        &self,
        key: &CacheKey,
        value: Vec<u8>,
        ttl: Option<Duration>,
    ) -> CacheResult<()> {
        let mut state = self.state.write().await;
        Self::remove_expired(&mut state);
        let rendered = key.render();
        let expires_at = ttl.map(|ttl| Instant::now() + ttl);
        if let Some(entry) = state.entries.get_mut(&rendered) {
            entry.value = value;
            entry.expires_at = expires_at;
            state.move_to_most_recent(&rendered);
        } else {
            state.insert_most_recent(
                rendered,
                Entry {
                    value,
                    expires_at,
                    previous: None,
                    next: None,
                },
            );
        }
        self.evict_if_needed(&mut state);
        self.record_event("set", "success");
        Ok(())
    }

    async fn delete(&self, key: &CacheKey) -> CacheResult<()> {
        let mut state = self.state.write().await;
        state.remove_entry(&key.render());
        self.record_event("delete", "success");
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use std::time::Duration;

    use crate::cache::{CacheKey, CacheStore, LruCacheStore};

    #[tokio::test]
    async fn lru_cache_evicts_least_recently_used_entry() {
        let store = LruCacheStore::new(2).expect("store");
        let first = CacheKey::new("app", ["first"]);
        let second = CacheKey::new("app", ["second"]);
        let third = CacheKey::new("app", ["third"]);

        store
            .set_raw(&first, b"1".to_vec(), None)
            .await
            .expect("set");
        store
            .set_raw(&second, b"2".to_vec(), None)
            .await
            .expect("set");
        assert_eq!(
            store.get_raw(&first).await.expect("get"),
            Some(b"1".to_vec())
        );
        store
            .set_raw(&third, b"3".to_vec(), None)
            .await
            .expect("set");

        assert_eq!(
            store.get_raw(&first).await.expect("get"),
            Some(b"1".to_vec())
        );
        assert!(store.get_raw(&second).await.expect("get").is_none());
        assert_eq!(store.snapshot().await.evictions, 1);
    }

    #[tokio::test]
    async fn lru_cache_removes_expired_entries_and_deletes() {
        let store = LruCacheStore::new(2).expect("store");
        let key = CacheKey::new("app", ["ttl"]);
        store
            .set_raw(&key, b"value".to_vec(), Some(Duration::from_millis(5)))
            .await
            .expect("set");
        tokio::time::sleep(Duration::from_millis(10)).await;
        assert!(store.get_raw(&key).await.expect("get").is_none());

        store
            .set_raw(&key, b"value".to_vec(), None)
            .await
            .expect("set");
        store.delete(&key).await.expect("delete");
        assert!(store.get_raw(&key).await.expect("get").is_none());
    }
}