Skip to main content

rs_zero/cache/
lru.rs

1use std::{
2    collections::HashMap,
3    sync::Arc,
4    time::{Duration, Instant},
5};
6
7use async_trait::async_trait;
8use tokio::sync::RwLock;
9
10use crate::cache::{CacheError, CacheKey, CacheResult, CacheStore};
11
12#[derive(Debug, Clone)]
13pub(super) struct Entry {
14    value: Vec<u8>,
15    expires_at: Option<Instant>,
16    pub(super) previous: Option<String>,
17    pub(super) next: Option<String>,
18}
19
20#[derive(Debug, Default)]
21struct State {
22    entries: HashMap<String, Entry>,
23    lru: super::lru_list::LruList,
24    evictions: u64,
25    expired_removals: u64,
26}
27
28impl State {
29    fn remove_entry(&mut self, key: &str) -> Option<Entry> {
30        self.lru.remove_entry(&mut self.entries, key)
31    }
32
33    fn move_to_most_recent(&mut self, key: &str) {
34        self.lru.move_to_most_recent(&mut self.entries, key);
35    }
36
37    fn insert_most_recent(&mut self, key: String, entry: Entry) {
38        self.lru.insert_most_recent(&mut self.entries, key, entry);
39    }
40}
41
42/// Point-in-time statistics for an [`LruCacheStore`].
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub struct LruCacheSnapshot {
45    /// Configured maximum number of entries.
46    pub capacity: usize,
47    /// Number of currently retained entries.
48    pub entries: usize,
49    /// Number of entries removed because capacity was exceeded.
50    pub evictions: u64,
51    /// Number of expired entries removed during reads or cleanup.
52    pub expired_removals: u64,
53}
54
55/// Bounded in-process cache store using least-recently-used eviction.
56#[derive(Debug, Clone)]
57pub struct LruCacheStore {
58    capacity: usize,
59    state: Arc<RwLock<State>>,
60    #[cfg(feature = "observability")]
61    metrics: Option<crate::observability::MetricsRegistry>,
62}
63
64impl LruCacheStore {
65    /// Creates a bounded LRU store.
66    pub fn new(capacity: usize) -> CacheResult<Self> {
67        if capacity == 0 {
68            return Err(CacheError::Backend(
69                "lru cache capacity must be greater than zero".to_string(),
70            ));
71        }
72        Ok(Self {
73            capacity,
74            state: Arc::new(RwLock::new(State::default())),
75            #[cfg(feature = "observability")]
76            metrics: None,
77        })
78    }
79
80    /// Returns the configured capacity.
81    pub fn capacity(&self) -> usize {
82        self.capacity
83    }
84
85    /// Attaches a metrics registry to this L1 cache store.
86    #[cfg(feature = "observability")]
87    pub fn with_metrics(mut self, metrics: crate::observability::MetricsRegistry) -> Self {
88        self.metrics = Some(metrics);
89        self
90    }
91
92    /// Removes expired entries and returns the current snapshot.
93    pub async fn snapshot(&self) -> LruCacheSnapshot {
94        let mut state = self.state.write().await;
95        Self::remove_expired(&mut state);
96        LruCacheSnapshot {
97            capacity: self.capacity,
98            entries: state.entries.len(),
99            evictions: state.evictions,
100            expired_removals: state.expired_removals,
101        }
102    }
103
104    fn remove_expired(state: &mut State) {
105        let now = Instant::now();
106        let expired = state
107            .entries
108            .iter()
109            .filter(|(_, entry)| entry.expires_at.is_some_and(|deadline| deadline <= now))
110            .map(|(key, _)| key.clone())
111            .collect::<Vec<_>>();
112        for key in &expired {
113            state.remove_entry(key);
114        }
115        state.expired_removals += expired.len() as u64;
116    }
117
118    fn evict_if_needed(&self, state: &mut State) {
119        while state.entries.len() > self.capacity {
120            let Some(key) = state.lru.least_recent.clone() else {
121                return;
122            };
123            state.remove_entry(&key);
124            state.evictions += 1;
125            self.record_event("evict", "capacity");
126        }
127    }
128
129    fn record_event(&self, operation: &str, result: &str) {
130        #[cfg(feature = "observability")]
131        crate::observability::cache::record_cache_event(
132            self.metrics.as_ref(),
133            "lru",
134            operation,
135            result,
136        );
137
138        #[cfg(not(feature = "observability"))]
139        {
140            let _ = (operation, result);
141        }
142    }
143}
144
145#[async_trait]
146impl CacheStore for LruCacheStore {
147    async fn get_raw(&self, key: &CacheKey) -> CacheResult<Option<Vec<u8>>> {
148        let rendered = key.render();
149        let mut state = self.state.write().await;
150        let expired = state
151            .entries
152            .get(&rendered)
153            .and_then(|entry| entry.expires_at)
154            .is_some_and(|deadline| deadline <= Instant::now());
155        if expired {
156            state.remove_entry(&rendered);
157            state.expired_removals += 1;
158            self.record_event("get", "expired");
159            return Ok(None);
160        }
161
162        let value = state
163            .entries
164            .get(&rendered)
165            .map(|entry| entry.value.clone());
166        if value.is_some() {
167            state.move_to_most_recent(&rendered);
168        }
169        self.record_event("get", if value.is_some() { "hit" } else { "miss" });
170        Ok(value)
171    }
172
173    async fn set_raw(
174        &self,
175        key: &CacheKey,
176        value: Vec<u8>,
177        ttl: Option<Duration>,
178    ) -> CacheResult<()> {
179        let mut state = self.state.write().await;
180        Self::remove_expired(&mut state);
181        let rendered = key.render();
182        let expires_at = ttl.map(|ttl| Instant::now() + ttl);
183        if let Some(entry) = state.entries.get_mut(&rendered) {
184            entry.value = value;
185            entry.expires_at = expires_at;
186            state.move_to_most_recent(&rendered);
187        } else {
188            state.insert_most_recent(
189                rendered,
190                Entry {
191                    value,
192                    expires_at,
193                    previous: None,
194                    next: None,
195                },
196            );
197        }
198        self.evict_if_needed(&mut state);
199        self.record_event("set", "success");
200        Ok(())
201    }
202
203    async fn delete(&self, key: &CacheKey) -> CacheResult<()> {
204        let mut state = self.state.write().await;
205        state.remove_entry(&key.render());
206        self.record_event("delete", "success");
207        Ok(())
208    }
209}
210
211#[cfg(test)]
212mod tests {
213    use std::time::Duration;
214
215    use crate::cache::{CacheKey, CacheStore, LruCacheStore};
216
217    #[tokio::test]
218    async fn lru_cache_evicts_least_recently_used_entry() {
219        let store = LruCacheStore::new(2).expect("store");
220        let first = CacheKey::new("app", ["first"]);
221        let second = CacheKey::new("app", ["second"]);
222        let third = CacheKey::new("app", ["third"]);
223
224        store
225            .set_raw(&first, b"1".to_vec(), None)
226            .await
227            .expect("set");
228        store
229            .set_raw(&second, b"2".to_vec(), None)
230            .await
231            .expect("set");
232        assert_eq!(
233            store.get_raw(&first).await.expect("get"),
234            Some(b"1".to_vec())
235        );
236        store
237            .set_raw(&third, b"3".to_vec(), None)
238            .await
239            .expect("set");
240
241        assert_eq!(
242            store.get_raw(&first).await.expect("get"),
243            Some(b"1".to_vec())
244        );
245        assert!(store.get_raw(&second).await.expect("get").is_none());
246        assert_eq!(store.snapshot().await.evictions, 1);
247    }
248
249    #[tokio::test]
250    async fn lru_cache_removes_expired_entries_and_deletes() {
251        let store = LruCacheStore::new(2).expect("store");
252        let key = CacheKey::new("app", ["ttl"]);
253        store
254            .set_raw(&key, b"value".to_vec(), Some(Duration::from_millis(5)))
255            .await
256            .expect("set");
257        tokio::time::sleep(Duration::from_millis(10)).await;
258        assert!(store.get_raw(&key).await.expect("get").is_none());
259
260        store
261            .set_raw(&key, b"value".to_vec(), None)
262            .await
263            .expect("set");
264        store.delete(&key).await.expect("delete");
265        assert!(store.get_raw(&key).await.expect("get").is_none());
266    }
267}