Skip to main content

vyre_runtime/pipeline_cache/
in_memory.rs

1//! [`InMemoryPipelineCache`]  -  sharded zero-persistence cache. The hot
2//! path for in-process pipeline reuse.
3
4use std::sync::{Arc, Mutex, MutexGuard};
5
6use rustc_hash::FxHashMap;
7use vyre_driver::accounting::{
8    checked_add_u64_lazy, checked_add_usize_lazy, checked_sub_usize_lazy,
9    checked_usize_to_u64_lazy,
10};
11
12use super::fingerprint::PipelineFingerprint;
13use super::metrics::{PipelineCacheCounters, PipelineCacheMetrics};
14use super::store::PipelineCacheStore;
15
16/// In-memory pipeline cache  -  zero-persistence, zero-network, sharded
17/// `FxHashMap`s behind mutexes so concurrent `get`/`put` on different
18/// fingerprints rarely contend (VYRE_RUNTIME / PERF hot-cache audit).
19#[derive(Debug)]
20pub struct InMemoryPipelineCache {
21    shards: [Mutex<InMemoryCacheShard>; Self::SHARD_COUNT],
22    max_entries_per_shard: usize,
23    max_bytes_per_shard: usize,
24    metrics: PipelineCacheCounters,
25}
26
27impl InMemoryPipelineCache {
28    pub(super) const SHARD_COUNT: usize = 256;
29    pub(super) const MAX_ENTRIES_PER_SHARD: usize = 256;
30    pub(super) const MAX_BYTES_PER_SHARD: usize = 16 * 1024 * 1024;
31
32    #[inline]
33    fn shard_index(fp: &PipelineFingerprint) -> usize {
34        usize::from(fp.0[0]) % Self::SHARD_COUNT
35    }
36
37    fn lock_shard(shard: &Mutex<InMemoryCacheShard>) -> MutexGuard<'_, InMemoryCacheShard> {
38        shard.lock().unwrap_or_else(|error| {
39            panic!(
40                "Vyre in-memory pipeline cache shard lock was poisoned: {error}. Fix: discard this cache instance after a panic; continuing could publish corrupted pipeline artifacts."
41            )
42        })
43    }
44
45    /// Construct an empty cache.
46    #[must_use]
47    pub fn new() -> Self {
48        Self::default()
49    }
50
51    /// Construct an empty cache with explicit per-shard entry and byte budgets.
52    ///
53    /// A zero entry budget or zero byte budget creates a disabled cache that
54    /// accepts `put` calls but retains no artifacts.
55    #[must_use]
56    pub fn with_limits(max_entries_per_shard: usize, max_bytes_per_shard: usize) -> Self {
57        Self {
58            shards: std::array::from_fn(|_| Mutex::new(InMemoryCacheShard::default())),
59            max_entries_per_shard,
60            max_bytes_per_shard,
61            metrics: PipelineCacheCounters::default(),
62        }
63    }
64
65    /// Current entry count. Thread-safe snapshot.
66    pub fn len(&self) -> usize {
67        self.shards
68            .iter()
69            .map(|s| Self::lock_shard(s).entries.len())
70            .fold(0usize, |acc, value| {
71                cache_usize_add(
72                    acc,
73                    value,
74                    "entry count",
75                    "shard cache metrics before snapshotting",
76                )
77            })
78    }
79
80    /// Current cached artifact bytes. Thread-safe snapshot.
81    pub fn cached_bytes(&self) -> usize {
82        self.shards
83            .iter()
84            .map(|s| Self::lock_shard(s).bytes)
85            .fold(0usize, |acc, value| {
86                cache_usize_add(
87                    acc,
88                    value,
89                    "byte count",
90                    "shard cache metrics before snapshotting",
91                )
92            })
93    }
94
95    /// Whether the cache is empty.
96    pub fn is_empty(&self) -> bool {
97        self.shards
98            .iter()
99            .all(|s| Self::lock_shard(s).entries.is_empty())
100    }
101}
102
103impl Default for InMemoryPipelineCache {
104    fn default() -> Self {
105        Self::with_limits(Self::MAX_ENTRIES_PER_SHARD, Self::MAX_BYTES_PER_SHARD)
106    }
107}
108
109fn cache_usize_add(lhs: usize, rhs: usize, label: &'static str, fix: &'static str) -> usize {
110    checked_add_usize_lazy(lhs, rhs, || {
111        format!("Vyre in-memory pipeline cache {label} overflowed usize. Fix: {fix}.")
112    })
113    .unwrap_or_else(|message| panic!("{message}"))
114}
115
116fn cache_usize_sub(lhs: usize, rhs: usize, label: &'static str, fix: &'static str) -> usize {
117    checked_sub_usize_lazy(lhs, rhs, || {
118        format!("Vyre in-memory pipeline cache {label} underflowed usize. Fix: {fix}.")
119    })
120    .unwrap_or_else(|message| panic!("{message}"))
121}
122
123fn cache_u64_add(lhs: u64, rhs: u64, label: &'static str, fix: &'static str) -> u64 {
124    checked_add_u64_lazy(lhs, rhs, || {
125        format!("Vyre in-memory pipeline cache {label} overflowed u64. Fix: {fix}.")
126    })
127    .unwrap_or_else(|message| panic!("{message}"))
128}
129
130fn cache_usize_to_u64(value: usize, label: &'static str, fix: &'static str) -> u64 {
131    checked_usize_to_u64_lazy(value, || {
132        format!("Vyre in-memory pipeline cache {label} cannot fit u64. Fix: {fix}.")
133    })
134    .unwrap_or_else(|message| panic!("{message}"))
135}
136
137#[derive(Debug, Default)]
138struct InMemoryCacheShard {
139    entries: FxHashMap<PipelineFingerprint, InMemoryCacheEntry>,
140    bytes: usize,
141    clock: u64,
142}
143
144impl InMemoryCacheShard {
145    fn next_tick(&mut self) -> u64 {
146        self.clock = cache_u64_add(
147            self.clock,
148            1,
149            "shard clock",
150            "recreate the cache before LRU timestamps wrap",
151        );
152        self.clock
153    }
154
155    fn evict_to_limits(&mut self, max_entries: usize, max_bytes: usize) -> (u64, u64) {
156        let mut evictions = 0_u64;
157        let mut evicted_bytes = 0_u64;
158        while self.entries.len() > max_entries || self.bytes > max_bytes {
159            let Some(victim) = self
160                .entries
161                .iter()
162                .min_by_key(|(_, entry)| entry.last_used)
163                .map(|(fp, _)| *fp)
164            else {
165                self.bytes = 0;
166                return (evictions, evicted_bytes);
167            };
168            if let Some(removed) = self.entries.remove(&victim) {
169                self.bytes = cache_usize_sub(
170                    self.bytes,
171                    removed.bytes,
172                    "byte accounting during eviction",
173                    "rebuild the cache",
174                );
175                evictions = cache_u64_add(
176                    evictions,
177                    1,
178                    "eviction count",
179                    "shard cache eviction work",
180                );
181                evicted_bytes = cache_u64_add(
182                    evicted_bytes,
183                    cache_usize_to_u64(
184                        removed.bytes,
185                        "evicted byte count",
186                        "shard cache artifacts before eviction",
187                    ),
188                    "evicted byte count",
189                    "shard cache eviction work",
190                );
191            }
192        }
193        (evictions, evicted_bytes)
194    }
195}
196
197#[derive(Debug)]
198struct InMemoryCacheEntry {
199    artifact: Arc<Vec<u8>>,
200    bytes: usize,
201    last_used: u64,
202}
203
204impl PipelineCacheStore for InMemoryPipelineCache {
205    fn get(&self, fp: &PipelineFingerprint) -> Option<Vec<u8>> {
206        self.get_arc(fp).map(|artifact| (*artifact).clone())
207    }
208
209    /// V7-PERF-009: zero-clone hot-path lookup. The cache already stores
210    /// payloads behind `Arc<Vec<u8>>`, so a hit is one refcount bump.
211    fn get_arc(&self, fp: &PipelineFingerprint) -> Option<Arc<Vec<u8>>> {
212        PipelineCacheCounters::increment(&self.metrics.lookups, "lookups");
213        let i = Self::shard_index(fp);
214        let mut shard = Self::lock_shard(&self.shards[i]);
215        let tick = shard.next_tick();
216        let Some(entry) = shard.entries.get_mut(fp) else {
217            PipelineCacheCounters::increment(&self.metrics.misses, "misses");
218            return None;
219        };
220        entry.last_used = tick;
221        PipelineCacheCounters::increment(&self.metrics.hits, "hits");
222        Some(Arc::clone(&entry.artifact))
223    }
224
225    fn put(&self, fp: PipelineFingerprint, artifact: Vec<u8>) {
226        let i = Self::shard_index(&fp);
227        let mut shard = Self::lock_shard(&self.shards[i]);
228        let bytes = artifact.len();
229        if self.max_entries_per_shard == 0
230            || self.max_bytes_per_shard == 0
231            || bytes > self.max_bytes_per_shard
232        {
233            PipelineCacheCounters::increment(&self.metrics.rejected_puts, "rejected puts");
234            if let Some(removed) = shard.entries.remove(&fp) {
235                shard.bytes = cache_usize_sub(
236                    shard.bytes,
237                    removed.bytes,
238                    "byte accounting while rejecting put",
239                    "rebuild the cache",
240                );
241                PipelineCacheCounters::increment(&self.metrics.evictions, "evictions");
242                PipelineCacheCounters::add(
243                    &self.metrics.evicted_bytes,
244                    cache_usize_to_u64(
245                        removed.bytes,
246                        "evicted byte count",
247                        "shard cache artifacts before eviction",
248                    ),
249                    "evicted bytes",
250                );
251            }
252            return;
253        }
254
255        if let Some(existing) = shard.entries.remove(&fp) {
256            shard.bytes = cache_usize_sub(
257                shard.bytes,
258                existing.bytes,
259                "byte accounting while replacing entry",
260                "rebuild the cache",
261            );
262        }
263        let tick = shard.next_tick();
264        shard.bytes = cache_usize_add(
265            shard.bytes,
266            bytes,
267            "byte accounting while inserting entry",
268            "lower per-shard cache byte budget",
269        );
270        shard.entries.insert(
271            fp,
272            InMemoryCacheEntry {
273                artifact: Arc::new(artifact),
274                bytes,
275                last_used: tick,
276            },
277        );
278        PipelineCacheCounters::increment(&self.metrics.puts, "puts");
279        let (evictions, evicted_bytes) =
280            shard.evict_to_limits(self.max_entries_per_shard, self.max_bytes_per_shard);
281        PipelineCacheCounters::add(&self.metrics.evictions, evictions, "evictions");
282        PipelineCacheCounters::add(&self.metrics.evicted_bytes, evicted_bytes, "evicted bytes");
283    }
284
285    fn metrics(&self) -> PipelineCacheMetrics {
286        self.metrics
287            .snapshot(
288                u64::try_from(self.cached_bytes()).unwrap_or_else(|error| {
289                    panic!(
290                        "Vyre in-memory pipeline cache retained bytes cannot fit u64: {error}. Fix: shard cache metrics before snapshotting."
291                    )
292                }),
293                u64::try_from(self.len()).unwrap_or_else(|error| {
294                    panic!(
295                        "Vyre in-memory pipeline cache entry count cannot fit u64: {error}. Fix: shard cache metrics before snapshotting."
296                    )
297                }),
298            )
299    }
300}
301
302#[cfg(test)]
303mod tests {
304    use super::*;
305    use crate::pipeline_cache::test_helpers::tiny_program;
306
307    #[test]
308    fn in_memory_cache_roundtrip() {
309        let cache = InMemoryPipelineCache::new();
310        let fp = PipelineFingerprint::of(&tiny_program());
311        assert!(cache.get(&fp).is_none());
312        cache.put(fp, b"target-bytes".to_vec());
313        assert_eq!(cache.get(&fp).unwrap(), b"target-bytes".to_vec());
314        assert_eq!(cache.len(), 1);
315    }
316
317    #[test]
318    fn in_memory_cache_caps_each_shard() {
319        let cache = InMemoryPipelineCache::new();
320        for i in 0..(InMemoryPipelineCache::MAX_ENTRIES_PER_SHARD + 17) {
321            let mut bytes = [0_u8; 32];
322            bytes[1..9].copy_from_slice(&(i as u64).to_le_bytes());
323            cache.put(PipelineFingerprint(bytes), vec![i as u8]);
324        }
325        assert_eq!(cache.len(), InMemoryPipelineCache::MAX_ENTRIES_PER_SHARD);
326    }
327
328    #[test]
329    fn in_memory_cache_evicts_least_recently_used_entry() {
330        let cache = InMemoryPipelineCache::with_limits(2, 1024);
331        let a = PipelineFingerprint([0; 32]);
332        let mut b_bytes = [0; 32];
333        b_bytes[1] = 1;
334        let b = PipelineFingerprint(b_bytes);
335        let mut c_bytes = [0; 32];
336        c_bytes[1] = 2;
337        let c = PipelineFingerprint(c_bytes);
338
339        cache.put(a, b"a".to_vec());
340        cache.put(b, b"b".to_vec());
341        assert_eq!(cache.get(&a).unwrap(), b"a".to_vec());
342        cache.put(c, b"c".to_vec());
343
344        assert_eq!(cache.get(&a).unwrap(), b"a".to_vec());
345        assert!(cache.get(&b).is_none());
346        assert_eq!(cache.get(&c).unwrap(), b"c".to_vec());
347    }
348
349    #[test]
350    fn in_memory_cache_enforces_byte_budget() {
351        let cache = InMemoryPipelineCache::with_limits(8, 10);
352        let a = PipelineFingerprint([0; 32]);
353        let mut b_bytes = [0; 32];
354        b_bytes[1] = 1;
355        let b = PipelineFingerprint(b_bytes);
356        let mut too_large_bytes = [0; 32];
357        too_large_bytes[1] = 2;
358        let too_large = PipelineFingerprint(too_large_bytes);
359
360        cache.put(a, vec![1; 6]);
361        cache.put(b, vec![2; 6]);
362        assert!(cache.get(&a).is_none());
363        assert_eq!(cache.get(&b).unwrap(), vec![2; 6]);
364        assert_eq!(cache.cached_bytes(), 6);
365
366        cache.put(too_large, vec![3; 11]);
367        assert!(cache.get(&too_large).is_none());
368        assert_eq!(cache.cached_bytes(), 6);
369    }
370
371    #[test]
372    fn in_memory_cache_metrics_track_hits_misses_and_evictions() {
373        let cache = InMemoryPipelineCache::with_limits(1, 8);
374        let a = PipelineFingerprint([0; 32]);
375        let mut b_bytes = [0; 32];
376        b_bytes[1] = 1;
377        let b = PipelineFingerprint(b_bytes);
378
379        assert!(cache.get(&a).is_none());
380        cache.put(a, vec![1; 4]);
381        assert!(cache.get(&a).is_some());
382        cache.put(b, vec![2; 4]);
383
384        let metrics = cache.metrics();
385        assert_eq!(metrics.lookups, 2);
386        assert_eq!(metrics.hits, 1);
387        assert_eq!(metrics.misses, 1);
388        assert_eq!(metrics.puts, 2);
389        assert_eq!(metrics.evictions, 1);
390        assert_eq!(metrics.cached_bytes, 4);
391        assert_eq!(metrics.entries, 1);
392        assert_eq!(metrics.hit_rate_ppm(), 500_000);
393    }
394
395    #[test]
396    fn poisoned_cache_shard_is_not_silently_recovered() {
397        let cache = Arc::new(InMemoryPipelineCache::new());
398        let poisoned = Arc::clone(&cache);
399        let _ = std::thread::spawn(move || {
400            let _guard = InMemoryPipelineCache::lock_shard(&poisoned.shards[0]);
401            panic!("poison in-memory pipeline cache shard");
402        })
403        .join();
404
405        let panic = std::panic::catch_unwind(|| {
406            let _ = cache.len();
407        })
408        .expect_err("poisoned pipeline cache shard must panic instead of recovering");
409        let message = panic
410            .downcast_ref::<String>()
411            .map(String::as_str)
412            .or_else(|| panic.downcast_ref::<&'static str>().copied())
413            .unwrap_or("<non-string panic>");
414        assert!(
415            message.contains("pipeline cache shard lock was poisoned"),
416            "{message}"
417        );
418    }
419}