Skip to main content

mount/
cache.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Shared blob cache for `ContentAddressedMount`.
3//!
4//! The cache holds materialised file bytes keyed by `ContentHash` so a
5//! hot kernel `read` syscall is a pointer-bump + slice copy rather than
6//! a full `get_blob` + `decompress` chain. It's the difference between
7//! mount reads beating `std::fs::read` (warm) and being ~10× slower
8//! (cold) — see `benches/mount_read_paths.rs`.
9//!
10//! Why a separate, shared pool instead of owning the cache inline?
11//!
12//! Heddle's content-addressed model means two threads forked from the
13//! same parent share *every* blob hash on the parts of the tree they
14//! haven't diverged on yet. If each mount carries its own LRU, every
15//! freshly-opened mount starts cold even when a sibling mount in the
16//! same process just decompressed the exact same bytes a millisecond
17//! ago. By making the cache an `Arc<BlobCachePool>` the daemon can
18//! attach one pool to itself, hand it to every new mount, and every
19//! cache-hot blob anywhere in the process is hot for the new mount
20//! too. Cap stays the same; hit rate goes up.
21//!
22//! The pool is byte-bounded, not entry-bounded — a 256 MiB cap holds
23//! roughly 25 × 10 MiB blobs or 250 000 × 1 KiB blobs, whichever the
24//! workload happens to be. Eviction is LRU. A single blob larger than
25//! the cap bypasses the cache entirely so one giant file can't
26//! evict the rest of the working set.
27
28use std::sync::{
29    Mutex,
30    atomic::{AtomicU64, Ordering},
31};
32
33use bytes::Bytes;
34use lru::LruCache;
35use objects::{object::ContentHash, sync::LockExt};
36
37/// Default cap. Picked so a typical agent workspace fits in memory
38/// without the cache dominating RSS — for daemon deployments the
39/// recommended sizing is `min(4 GiB, 25% of physical RAM)`, set via
40/// [`BlobCachePool::with_capacity`].
41pub const DEFAULT_BLOB_CACHE_BYTES: usize = 256 * 1024 * 1024;
42
43/// Process-shared blob cache. Construct once per `Repository` (or once
44/// per daemon process) and hand the same `Arc<BlobCachePool>` to every
45/// [`crate::ContentAddressedMount`] that wants to share its warm
46/// state with sibling mounts.
47///
48/// Clone-cheap: it's just `Arc<Inner>` under the hood.
49pub struct BlobCachePool {
50    inner: Mutex<BlobCacheInner>,
51    cap_bytes: usize,
52    hits: AtomicU64,
53    misses: AtomicU64,
54    inserts: AtomicU64,
55}
56
57struct BlobCacheInner {
58    lru: LruCache<ContentHash, Bytes>,
59    /// Running sum of `Bytes.len()` over every live entry. Kept
60    /// in sync with `lru` mutations so eviction can stop the moment
61    /// the working total drops back under `cap_bytes`.
62    bytes: usize,
63}
64
65impl BlobCachePool {
66    /// Construct a pool with [`DEFAULT_BLOB_CACHE_BYTES`] cap. Suitable
67    /// for tests, CLI one-shots, and any caller that doesn't have a
68    /// sizing strategy.
69    pub fn with_default_capacity() -> Self {
70        Self::with_capacity(DEFAULT_BLOB_CACHE_BYTES)
71    }
72
73    /// Construct a pool with an explicit byte cap. Daemon callers
74    /// should size this from physical RAM (see module docs).
75    pub fn with_capacity(cap_bytes: usize) -> Self {
76        Self {
77            inner: Mutex::new(BlobCacheInner {
78                lru: LruCache::unbounded(),
79                bytes: 0,
80            }),
81            cap_bytes,
82            hits: AtomicU64::new(0),
83            misses: AtomicU64::new(0),
84            inserts: AtomicU64::new(0),
85        }
86    }
87
88    /// Look up a blob. Returns `Some(Arc::clone)` on hit, `None` on
89    /// miss. Bumps the entry to MRU.
90    pub(crate) fn get(&self, hash: &ContentHash) -> Option<Bytes> {
91        let mut guard = self.inner.lock_or_poisoned();
92        match guard.lru.get(hash).cloned() {
93            Some(bytes) => {
94                self.hits.fetch_add(1, Ordering::Relaxed);
95                Some(bytes)
96            }
97            None => {
98                self.misses.fetch_add(1, Ordering::Relaxed);
99                None
100            }
101        }
102    }
103
104    /// Insert a freshly-loaded blob. Evicts LRU entries until the
105    /// total falls back below cap. A blob larger than cap bypasses
106    /// the cache entirely so one giant file can't displace the rest
107    /// of the working set.
108    pub(crate) fn insert(&self, hash: ContentHash, bytes: Bytes) {
109        let size = bytes.len();
110        if size > self.cap_bytes {
111            return;
112        }
113        let mut guard = self.inner.lock_or_poisoned();
114        if let Some(prev) = guard.lru.put(hash, bytes) {
115            guard.bytes = guard.bytes.saturating_sub(prev.len());
116        }
117        guard.bytes += size;
118        while guard.bytes > self.cap_bytes {
119            let Some((_evicted_hash, evicted)) = guard.lru.pop_lru() else {
120                break;
121            };
122            guard.bytes = guard.bytes.saturating_sub(evicted.len());
123        }
124        self.inserts.fetch_add(1, Ordering::Relaxed);
125    }
126
127    /// Drop every cached entry. Used by benchmarks that need to
128    /// measure the true cold path.
129    pub fn clear(&self) {
130        let mut guard = self.inner.lock_or_poisoned();
131        guard.lru.clear();
132        guard.bytes = 0;
133    }
134
135    /// Byte cap configured at construction time.
136    pub fn cap_bytes(&self) -> usize {
137        self.cap_bytes
138    }
139
140    /// Current resident bytes across all entries. Cheap (single lock
141    /// acquire) but not free — meant for diagnostics, not hot-path
142    /// budgeting.
143    pub fn resident_bytes(&self) -> usize {
144        self.inner.lock_or_poisoned().bytes
145    }
146
147    /// Number of distinct blobs currently cached. Same caveat as
148    /// `resident_bytes`.
149    pub fn entry_count(&self) -> usize {
150        self.inner.lock_or_poisoned().lru.len()
151    }
152
153    /// Cumulative hit / miss / insert counters since the pool was
154    /// constructed. Useful for sizing decisions: a pool with
155    /// `inserts >> hits` is undersized, a pool with `hits >> inserts`
156    /// is right-sized for its workload.
157    pub fn stats(&self) -> BlobCacheStats {
158        BlobCacheStats {
159            hits: self.hits.load(Ordering::Relaxed),
160            misses: self.misses.load(Ordering::Relaxed),
161            inserts: self.inserts.load(Ordering::Relaxed),
162        }
163    }
164}
165
166#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
167pub struct BlobCacheStats {
168    pub hits: u64,
169    pub misses: u64,
170    pub inserts: u64,
171}
172
173#[cfg(test)]
174mod tests {
175    use std::sync::Arc;
176
177    use super::*;
178
179    fn h(byte: u8) -> ContentHash {
180        let mut buf = [0u8; 32];
181        buf.iter_mut().for_each(|b| *b = byte);
182        ContentHash::from_bytes(buf)
183    }
184
185    fn payload(byte: u8, len: usize) -> Bytes {
186        Bytes::from(vec![byte; len])
187    }
188
189    #[test]
190    fn round_trip_get_hit_miss() {
191        let pool = BlobCachePool::with_capacity(1024);
192        assert!(pool.get(&h(1)).is_none());
193        pool.insert(h(1), payload(0xAA, 64));
194        let hit = pool.get(&h(1)).expect("should hit");
195        assert_eq!(&hit[..], &vec![0xAA; 64][..]);
196        let stats = pool.stats();
197        assert_eq!(stats.hits, 1);
198        assert_eq!(stats.misses, 1);
199        assert_eq!(stats.inserts, 1);
200    }
201
202    #[test]
203    fn byte_bound_evicts_lru_until_under_cap() {
204        let pool = BlobCachePool::with_capacity(300);
205        pool.insert(h(1), payload(0x01, 100));
206        pool.insert(h(2), payload(0x02, 100));
207        pool.insert(h(3), payload(0x03, 100));
208        assert_eq!(pool.resident_bytes(), 300);
209        pool.insert(h(4), payload(0x04, 100));
210        // h(1) is the LRU after the inserts; it should have been
211        // evicted to keep us under cap.
212        assert!(pool.get(&h(1)).is_none());
213        assert!(pool.get(&h(4)).is_some());
214        assert_eq!(pool.resident_bytes(), 300);
215    }
216
217    #[test]
218    fn oversized_blob_bypasses_cache() {
219        let pool = BlobCachePool::with_capacity(256);
220        // Existing entry should survive.
221        pool.insert(h(1), payload(0x01, 200));
222        // 500 > cap → bypassed, no eviction.
223        pool.insert(h(2), payload(0x02, 500));
224        assert!(pool.get(&h(1)).is_some());
225        assert!(pool.get(&h(2)).is_none());
226        assert_eq!(pool.resident_bytes(), 200);
227    }
228
229    #[test]
230    fn shared_pool_visible_across_clones() {
231        let pool = Arc::new(BlobCachePool::with_capacity(1024));
232        let a = Arc::clone(&pool);
233        let b = Arc::clone(&pool);
234        a.insert(h(1), payload(0xAA, 64));
235        assert!(b.get(&h(1)).is_some());
236    }
237}