Skip to main content

fakecloud_persistence/
cache.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use bytes::Bytes;
5use parking_lot::Mutex;
6
7#[derive(Clone, Debug, Hash, PartialEq, Eq)]
8pub struct BodyKey {
9    pub bucket: String,
10    pub key: String,
11    pub version: Option<String>,
12}
13
14impl BodyKey {
15    pub fn new(bucket: impl Into<String>, key: impl Into<String>, version: Option<String>) -> Self {
16        Self {
17            bucket: bucket.into(),
18            key: key.into(),
19            version,
20        }
21    }
22}
23
24/// Per-entry accounting overhead charged to each cache entry in addition to
25/// the raw body size. Without this, zero-length bodies accrue zero bytes
26/// against the capacity and a pathological flood of empty objects would grow
27/// the index unboundedly.
28const PER_ENTRY_OVERHEAD: u64 = 128;
29
30#[derive(Debug)]
31struct Node {
32    key: BodyKey,
33    bytes: Bytes,
34    charged: u64,
35    prev: Option<usize>,
36    next: Option<usize>,
37}
38
39struct Inner {
40    capacity_bytes: u64,
41    single_object_cap: u64,
42    used_bytes: u64,
43    nodes: Vec<Option<Node>>,
44    free: Vec<usize>,
45    index: HashMap<BodyKey, usize>,
46    head: Option<usize>, // MRU
47    tail: Option<usize>, // LRU
48}
49
50impl Inner {
51    fn new(capacity_bytes: u64) -> Self {
52        Self {
53            capacity_bytes,
54            single_object_cap: capacity_bytes / 2,
55            used_bytes: 0,
56            nodes: Vec::new(),
57            free: Vec::new(),
58            index: HashMap::new(),
59            head: None,
60            tail: None,
61        }
62    }
63
64    fn alloc_node(&mut self, node: Node) -> usize {
65        if let Some(idx) = self.free.pop() {
66            self.nodes[idx] = Some(node);
67            idx
68        } else {
69            self.nodes.push(Some(node));
70            self.nodes.len() - 1
71        }
72    }
73
74    fn detach(&mut self, idx: usize) {
75        let (prev, next) = {
76            let n = self.nodes[idx].as_ref().unwrap();
77            (n.prev, n.next)
78        };
79        match prev {
80            Some(p) => self.nodes[p].as_mut().unwrap().next = next,
81            None => self.head = next,
82        }
83        match next {
84            Some(n) => self.nodes[n].as_mut().unwrap().prev = prev,
85            None => self.tail = prev,
86        }
87        let n = self.nodes[idx].as_mut().unwrap();
88        n.prev = None;
89        n.next = None;
90    }
91
92    fn push_front(&mut self, idx: usize) {
93        let old_head = self.head;
94        {
95            let n = self.nodes[idx].as_mut().unwrap();
96            n.prev = None;
97            n.next = old_head;
98        }
99        if let Some(h) = old_head {
100            self.nodes[h].as_mut().unwrap().prev = Some(idx);
101        }
102        self.head = Some(idx);
103        if self.tail.is_none() {
104            self.tail = Some(idx);
105        }
106    }
107
108    fn remove(&mut self, idx: usize) -> Node {
109        self.detach(idx);
110        let node = self.nodes[idx].take().unwrap();
111        self.free.push(idx);
112        self.used_bytes = self.used_bytes.saturating_sub(node.charged);
113        self.index.remove(&node.key);
114        node
115    }
116
117    fn evict_until_fits(&mut self, incoming: u64) {
118        while self.used_bytes + incoming > self.capacity_bytes {
119            let Some(tail) = self.tail else {
120                break;
121            };
122            self.remove(tail);
123        }
124    }
125
126    fn get(&mut self, key: &BodyKey) -> Option<Bytes> {
127        let idx = *self.index.get(key)?;
128        self.detach(idx);
129        self.push_front(idx);
130        Some(self.nodes[idx].as_ref().unwrap().bytes.clone())
131    }
132
133    fn insert(&mut self, key: BodyKey, bytes: Bytes) {
134        // Always drop any existing entry for this key, even if the new body
135        // is oversized and will bypass the cache — otherwise a stale older
136        // body could be served after an overwrite.
137        if let Some(&idx) = self.index.get(&key) {
138            self.remove(idx);
139        }
140        let size = bytes.len() as u64;
141        let charged = size.saturating_add(PER_ENTRY_OVERHEAD);
142        if size > self.single_object_cap || charged > self.capacity_bytes {
143            return;
144        }
145        self.evict_until_fits(charged);
146        self.used_bytes += charged;
147        let node = Node {
148            key: key.clone(),
149            bytes,
150            charged,
151            prev: None,
152            next: None,
153        };
154        let idx = self.alloc_node(node);
155        self.index.insert(key, idx);
156        self.push_front(idx);
157    }
158
159    fn invalidate(&mut self, key: &BodyKey) {
160        if let Some(&idx) = self.index.get(key) {
161            self.remove(idx);
162        }
163    }
164}
165
166#[derive(Clone)]
167pub struct BodyCache {
168    inner: Arc<Mutex<Inner>>,
169    capacity_bytes: u64,
170}
171
172impl BodyCache {
173    pub fn new(capacity_bytes: u64) -> Self {
174        Self {
175            inner: Arc::new(Mutex::new(Inner::new(capacity_bytes))),
176            capacity_bytes,
177        }
178    }
179
180    pub fn capacity_bytes(&self) -> u64 {
181        self.capacity_bytes
182    }
183
184    pub fn single_object_cap(&self) -> u64 {
185        self.capacity_bytes / 2
186    }
187
188    pub fn get(&self, key: &BodyKey) -> Option<Bytes> {
189        self.inner.lock().get(key)
190    }
191
192    pub fn insert(&self, key: BodyKey, bytes: Bytes) {
193        self.inner.lock().insert(key, bytes);
194    }
195
196    pub fn invalidate(&self, key: &BodyKey) {
197        self.inner.lock().invalidate(key);
198    }
199
200    #[cfg(test)]
201    fn used_bytes(&self) -> u64 {
202        self.inner.lock().used_bytes
203    }
204
205    #[cfg(test)]
206    fn len(&self) -> usize {
207        self.inner.lock().index.len()
208    }
209}
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214
215    fn k(name: &str) -> BodyKey {
216        BodyKey::new("b", name, None)
217    }
218
219    fn mk(n: usize) -> Bytes {
220        Bytes::from(vec![0u8; n])
221    }
222
223    #[test]
224    fn insert_and_get() {
225        let c = BodyCache::new(1024);
226        c.insert(k("a"), mk(100));
227        assert_eq!(c.get(&k("a")).unwrap().len(), 100);
228        assert_eq!(c.used_bytes(), 100 + PER_ENTRY_OVERHEAD);
229    }
230
231    #[test]
232    fn byte_accounting_on_overwrite() {
233        let c = BodyCache::new(1024);
234        c.insert(k("a"), mk(100));
235        c.insert(k("a"), mk(50));
236        assert_eq!(c.used_bytes(), 50 + PER_ENTRY_OVERHEAD);
237        assert_eq!(c.len(), 1);
238    }
239
240    #[test]
241    fn lru_eviction_on_capacity_pressure() {
242        // Capacity 3 * (100 + overhead) so exactly three entries fit.
243        let c = BodyCache::new(3 * (100 + PER_ENTRY_OVERHEAD));
244        c.insert(k("a"), mk(100));
245        c.insert(k("b"), mk(100));
246        c.insert(k("c"), mk(100));
247        // Access a to make it MRU; b is now LRU.
248        let _ = c.get(&k("a"));
249        c.insert(k("d"), mk(100));
250        assert!(c.get(&k("b")).is_none());
251        assert!(c.get(&k("a")).is_some());
252        assert!(c.get(&k("c")).is_some());
253        assert!(c.get(&k("d")).is_some());
254    }
255
256    #[test]
257    fn empty_bodies_still_evict_under_entry_overhead() {
258        // 1 MiB capacity: with 128 bytes of overhead per entry, the cache can
259        // hold at most ~4096 zero-length entries before eviction kicks in.
260        // Insert 10_000 distinct empty keys and assert the index is bounded.
261        let c = BodyCache::new(1024 * 1024);
262        for i in 0..10_000 {
263            c.insert(k(&format!("empty-{i}")), Bytes::new());
264        }
265        let max_entries = (1024 * 1024 / PER_ENTRY_OVERHEAD) as usize;
266        assert!(
267            c.len() <= max_entries,
268            "cache must evict empty bodies: len={} max={}",
269            c.len(),
270            max_entries
271        );
272        assert!(c.used_bytes() <= 1024 * 1024);
273    }
274
275    #[test]
276    fn single_object_cap_bypass_leaves_cache_untouched() {
277        // 64 MiB capacity, 32 MiB single-object cap
278        let cap = 64 * 1024 * 1024;
279        let c = BodyCache::new(cap);
280        c.insert(k("a"), mk(1024));
281        let before_used = c.used_bytes();
282        let before_len = c.len();
283        c.insert(k("big"), mk(40 * 1024 * 1024));
284        assert_eq!(c.used_bytes(), before_used);
285        assert_eq!(c.len(), before_len);
286        assert!(c.get(&k("big")).is_none());
287        assert!(c.get(&k("a")).is_some());
288    }
289
290    #[test]
291    fn get_promotes_to_mru() {
292        let c = BodyCache::new(3 * (100 + PER_ENTRY_OVERHEAD));
293        c.insert(k("a"), mk(100));
294        c.insert(k("b"), mk(100));
295        c.insert(k("c"), mk(100));
296        let _ = c.get(&k("a"));
297        c.insert(k("d"), mk(100));
298        // b should be evicted, a should remain.
299        assert!(c.get(&k("a")).is_some());
300        assert!(c.get(&k("b")).is_none());
301    }
302
303    #[test]
304    fn oversized_overwrite_invalidates_previous_entry() {
305        // 64 MiB capacity, 32 MiB single-object cap
306        let cap = 64 * 1024 * 1024;
307        let c = BodyCache::new(cap);
308        c.insert(k("a"), mk(1024));
309        assert!(c.get(&k("a")).is_some());
310        // Overwrite with an oversized body: the new body bypasses, and the
311        // previous entry must be invalidated so stale bytes are not served.
312        c.insert(k("a"), mk(40 * 1024 * 1024));
313        assert!(c.get(&k("a")).is_none());
314        assert_eq!(c.used_bytes(), 0);
315        assert_eq!(c.len(), 0);
316    }
317
318    #[test]
319    fn charged_size_cannot_exceed_total_capacity() {
320        // capacity=200, single_object_cap=100. A 100-byte body charged at
321        // 100+128=228 > 200, so it must be rejected even though it fits the
322        // single-object cap.
323        let c = BodyCache::new(200);
324        c.insert(k("a"), mk(100));
325        assert!(c.get(&k("a")).is_none());
326        assert_eq!(c.used_bytes(), 0);
327        assert_eq!(c.len(), 0);
328
329        // capacity=600, single_object_cap=300. A 300-byte body charged at
330        // 300+128=428 <= 600, so it succeeds.
331        let c = BodyCache::new(600);
332        c.insert(k("b"), mk(300));
333        assert!(c.get(&k("b")).is_some());
334        assert_eq!(c.used_bytes(), 300 + PER_ENTRY_OVERHEAD);
335
336        // capacity=600, single_object_cap=300. A 500-byte body exceeds the
337        // single-object cap and is rejected.
338        let c = BodyCache::new(600);
339        c.insert(k("c"), mk(500));
340        assert!(c.get(&k("c")).is_none());
341    }
342
343    #[test]
344    fn invalidate_removes_entry() {
345        let c = BodyCache::new(1024);
346        c.insert(k("a"), mk(100));
347        c.invalidate(&k("a"));
348        assert!(c.get(&k("a")).is_none());
349        assert_eq!(c.used_bytes(), 0);
350    }
351}