Skip to main content

vyre_runtime/pipeline_cache/
in_memory.rs

1//! [`InMemoryPipelineCache`]  -  sharded zero-persistence cache. The hot
2//! path for in-process pipeline reuse.
3
4use std::sync::{Arc, Mutex, MutexGuard};
5
6use rustc_hash::FxHashMap;
7use vyre_driver::accounting::{
8    checked_add_u64_lazy, checked_add_usize_lazy, checked_sub_usize_lazy, checked_usize_to_u64_lazy,
9};
10
11use super::fingerprint::PipelineFingerprint;
12use super::metrics::{PipelineCacheCounters, PipelineCacheMetrics};
13use super::store::PipelineCacheStore;
14
15/// In-memory pipeline cache  -  zero-persistence, zero-network, sharded
16/// `FxHashMap`s behind mutexes so concurrent `get`/`put` on different
17/// fingerprints rarely contend (VYRE_RUNTIME / PERF hot-cache audit).
18#[derive(Debug)]
19pub struct InMemoryPipelineCache {
20    shards: [Mutex<InMemoryCacheShard>; Self::SHARD_COUNT],
21    max_entries_per_shard: usize,
22    max_bytes_per_shard: usize,
23    metrics: PipelineCacheCounters,
24}
25
26impl InMemoryPipelineCache {
27    pub(super) const SHARD_COUNT: usize = 256;
28    pub(super) const MAX_ENTRIES_PER_SHARD: usize = 256;
29    pub(super) const MAX_BYTES_PER_SHARD: usize = 16 * 1024 * 1024;
30
31    #[inline]
32    fn shard_index(fp: &PipelineFingerprint) -> usize {
33        usize::from(fp.0[0]) % Self::SHARD_COUNT
34    }
35
36    fn lock_shard(shard: &Mutex<InMemoryCacheShard>) -> MutexGuard<'_, InMemoryCacheShard> {
37        shard.lock().unwrap_or_else(|error| {
38            panic!(
39                "Vyre in-memory pipeline cache shard lock was poisoned: {error}. Fix: discard this cache instance after a panic; continuing could publish corrupted pipeline artifacts."
40            )
41        })
42    }
43
44    /// Construct an empty cache.
45    #[must_use]
46    pub fn new() -> Self {
47        Self::default()
48    }
49
50    /// Construct an empty cache with explicit per-shard entry and byte budgets.
51    ///
52    /// A zero entry budget or zero byte budget creates a disabled cache that
53    /// accepts `put` calls but retains no artifacts.
54    #[must_use]
55    pub fn with_limits(max_entries_per_shard: usize, max_bytes_per_shard: usize) -> Self {
56        Self {
57            shards: std::array::from_fn(|_| Mutex::new(InMemoryCacheShard::default())),
58            max_entries_per_shard,
59            max_bytes_per_shard,
60            metrics: PipelineCacheCounters::default(),
61        }
62    }
63
64    /// Current entry count. Thread-safe snapshot.
65    pub fn len(&self) -> usize {
66        self.shards
67            .iter()
68            .map(|s| Self::lock_shard(s).entries.len())
69            .fold(0usize, |acc, value| {
70                cache_usize_add(
71                    acc,
72                    value,
73                    "entry count",
74                    "shard cache metrics before snapshotting",
75                )
76            })
77    }
78
79    /// Current cached artifact bytes. Thread-safe snapshot.
80    pub fn cached_bytes(&self) -> usize {
81        self.shards
82            .iter()
83            .map(|s| Self::lock_shard(s).bytes)
84            .fold(0usize, |acc, value| {
85                cache_usize_add(
86                    acc,
87                    value,
88                    "byte count",
89                    "shard cache metrics before snapshotting",
90                )
91            })
92    }
93
94    /// Whether the cache is empty.
95    pub fn is_empty(&self) -> bool {
96        self.shards
97            .iter()
98            .all(|s| Self::lock_shard(s).entries.is_empty())
99    }
100}
101
102impl Default for InMemoryPipelineCache {
103    fn default() -> Self {
104        Self::with_limits(Self::MAX_ENTRIES_PER_SHARD, Self::MAX_BYTES_PER_SHARD)
105    }
106}
107
108fn cache_usize_add(lhs: usize, rhs: usize, label: &'static str, fix: &'static str) -> usize {
109    checked_add_usize_lazy(lhs, rhs, || {
110        format!("Vyre in-memory pipeline cache {label} overflowed usize. Fix: {fix}.")
111    })
112    .unwrap_or_else(|message| panic!("{message}"))
113}
114
115fn cache_usize_sub(lhs: usize, rhs: usize, label: &'static str, fix: &'static str) -> usize {
116    checked_sub_usize_lazy(lhs, rhs, || {
117        format!("Vyre in-memory pipeline cache {label} underflowed usize. Fix: {fix}.")
118    })
119    .unwrap_or_else(|message| panic!("{message}"))
120}
121
122fn cache_u64_add(lhs: u64, rhs: u64, label: &'static str, fix: &'static str) -> u64 {
123    checked_add_u64_lazy(lhs, rhs, || {
124        format!("Vyre in-memory pipeline cache {label} overflowed u64. Fix: {fix}.")
125    })
126    .unwrap_or_else(|message| panic!("{message}"))
127}
128
129fn cache_usize_to_u64(value: usize, label: &'static str, fix: &'static str) -> u64 {
130    checked_usize_to_u64_lazy(value, || {
131        format!("Vyre in-memory pipeline cache {label} cannot fit u64. Fix: {fix}.")
132    })
133    .unwrap_or_else(|message| panic!("{message}"))
134}
135
136#[derive(Debug, Default)]
137struct InMemoryCacheShard {
138    entries: FxHashMap<PipelineFingerprint, InMemoryCacheEntry>,
139    bytes: usize,
140    clock: u64,
141}
142
143impl InMemoryCacheShard {
144    fn next_tick(&mut self) -> u64 {
145        self.clock = cache_u64_add(
146            self.clock,
147            1,
148            "shard clock",
149            "recreate the cache before LRU timestamps wrap",
150        );
151        self.clock
152    }
153
154    fn evict_to_limits(&mut self, max_entries: usize, max_bytes: usize) -> (u64, u64) {
155        let mut evictions = 0_u64;
156        let mut evicted_bytes = 0_u64;
157        while self.entries.len() > max_entries || self.bytes > max_bytes {
158            let Some(victim) = self
159                .entries
160                .iter()
161                .min_by_key(|(_, entry)| entry.last_used)
162                .map(|(fp, _)| *fp)
163            else {
164                self.bytes = 0;
165                return (evictions, evicted_bytes);
166            };
167            if let Some(removed) = self.entries.remove(&victim) {
168                self.bytes = cache_usize_sub(
169                    self.bytes,
170                    removed.bytes,
171                    "byte accounting during eviction",
172                    "rebuild the cache",
173                );
174                evictions =
175                    cache_u64_add(evictions, 1, "eviction count", "shard cache eviction work");
176                evicted_bytes = cache_u64_add(
177                    evicted_bytes,
178                    cache_usize_to_u64(
179                        removed.bytes,
180                        "evicted byte count",
181                        "shard cache artifacts before eviction",
182                    ),
183                    "evicted byte count",
184                    "shard cache eviction work",
185                );
186            }
187        }
188        (evictions, evicted_bytes)
189    }
190}
191
192#[derive(Debug)]
193struct InMemoryCacheEntry {
194    artifact: Arc<Vec<u8>>,
195    bytes: usize,
196    last_used: u64,
197}
198
199impl PipelineCacheStore for InMemoryPipelineCache {
200    fn get(&self, fp: &PipelineFingerprint) -> Option<Vec<u8>> {
201        self.get_arc(fp).map(|artifact| (*artifact).clone())
202    }
203
204    /// V7-PERF-009: zero-clone hot-path lookup. The cache already stores
205    /// payloads behind `Arc<Vec<u8>>`, so a hit is one refcount bump.
206    fn get_arc(&self, fp: &PipelineFingerprint) -> Option<Arc<Vec<u8>>> {
207        PipelineCacheCounters::increment(&self.metrics.lookups, "lookups");
208        let i = Self::shard_index(fp);
209        let mut shard = Self::lock_shard(&self.shards[i]);
210        let tick = shard.next_tick();
211        let Some(entry) = shard.entries.get_mut(fp) else {
212            PipelineCacheCounters::increment(&self.metrics.misses, "misses");
213            return None;
214        };
215        entry.last_used = tick;
216        PipelineCacheCounters::increment(&self.metrics.hits, "hits");
217        Some(Arc::clone(&entry.artifact))
218    }
219
220    fn put(&self, fp: PipelineFingerprint, artifact: Vec<u8>) {
221        let i = Self::shard_index(&fp);
222        let mut shard = Self::lock_shard(&self.shards[i]);
223        let bytes = artifact.len();
224        if self.max_entries_per_shard == 0
225            || self.max_bytes_per_shard == 0
226            || bytes > self.max_bytes_per_shard
227        {
228            PipelineCacheCounters::increment(&self.metrics.rejected_puts, "rejected puts");
229            if let Some(removed) = shard.entries.remove(&fp) {
230                shard.bytes = cache_usize_sub(
231                    shard.bytes,
232                    removed.bytes,
233                    "byte accounting while rejecting put",
234                    "rebuild the cache",
235                );
236                PipelineCacheCounters::increment(&self.metrics.evictions, "evictions");
237                PipelineCacheCounters::add(
238                    &self.metrics.evicted_bytes,
239                    cache_usize_to_u64(
240                        removed.bytes,
241                        "evicted byte count",
242                        "shard cache artifacts before eviction",
243                    ),
244                    "evicted bytes",
245                );
246            }
247            return;
248        }
249
250        if let Some(existing) = shard.entries.remove(&fp) {
251            shard.bytes = cache_usize_sub(
252                shard.bytes,
253                existing.bytes,
254                "byte accounting while replacing entry",
255                "rebuild the cache",
256            );
257        }
258        let tick = shard.next_tick();
259        shard.bytes = cache_usize_add(
260            shard.bytes,
261            bytes,
262            "byte accounting while inserting entry",
263            "lower per-shard cache byte budget",
264        );
265        shard.entries.insert(
266            fp,
267            InMemoryCacheEntry {
268                artifact: Arc::new(artifact),
269                bytes,
270                last_used: tick,
271            },
272        );
273        PipelineCacheCounters::increment(&self.metrics.puts, "puts");
274        let (evictions, evicted_bytes) =
275            shard.evict_to_limits(self.max_entries_per_shard, self.max_bytes_per_shard);
276        PipelineCacheCounters::add(&self.metrics.evictions, evictions, "evictions");
277        PipelineCacheCounters::add(&self.metrics.evicted_bytes, evicted_bytes, "evicted bytes");
278    }
279
280    fn metrics(&self) -> PipelineCacheMetrics {
281        self.metrics
282            .snapshot(
283                u64::try_from(self.cached_bytes()).unwrap_or_else(|error| {
284                    panic!(
285                        "Vyre in-memory pipeline cache retained bytes cannot fit u64: {error}. Fix: shard cache metrics before snapshotting."
286                    )
287                }),
288                u64::try_from(self.len()).unwrap_or_else(|error| {
289                    panic!(
290                        "Vyre in-memory pipeline cache entry count cannot fit u64: {error}. Fix: shard cache metrics before snapshotting."
291                    )
292                }),
293            )
294    }
295}
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300    use crate::pipeline_cache::test_helpers::tiny_program;
301
302    #[test]
303    fn in_memory_cache_roundtrip() {
304        let cache = InMemoryPipelineCache::new();
305        let fp = PipelineFingerprint::of(&tiny_program());
306        assert!(cache.get(&fp).is_none());
307        cache.put(fp, b"target-bytes".to_vec());
308        assert_eq!(cache.get(&fp).unwrap(), b"target-bytes".to_vec());
309        assert_eq!(cache.len(), 1);
310    }
311
312    #[test]
313    fn in_memory_cache_caps_each_shard() {
314        let cache = InMemoryPipelineCache::new();
315        for i in 0..(InMemoryPipelineCache::MAX_ENTRIES_PER_SHARD + 17) {
316            let mut bytes = [0_u8; 32];
317            bytes[1..9].copy_from_slice(&(i as u64).to_le_bytes());
318            cache.put(PipelineFingerprint(bytes), vec![i as u8]);
319        }
320        assert_eq!(cache.len(), InMemoryPipelineCache::MAX_ENTRIES_PER_SHARD);
321    }
322
323    #[test]
324    fn in_memory_cache_evicts_least_recently_used_entry() {
325        let cache = InMemoryPipelineCache::with_limits(2, 1024);
326        let a = PipelineFingerprint([0; 32]);
327        let mut b_bytes = [0; 32];
328        b_bytes[1] = 1;
329        let b = PipelineFingerprint(b_bytes);
330        let mut c_bytes = [0; 32];
331        c_bytes[1] = 2;
332        let c = PipelineFingerprint(c_bytes);
333
334        cache.put(a, b"a".to_vec());
335        cache.put(b, b"b".to_vec());
336        assert_eq!(cache.get(&a).unwrap(), b"a".to_vec());
337        cache.put(c, b"c".to_vec());
338
339        assert_eq!(cache.get(&a).unwrap(), b"a".to_vec());
340        assert!(cache.get(&b).is_none());
341        assert_eq!(cache.get(&c).unwrap(), b"c".to_vec());
342    }
343
344    #[test]
345    fn in_memory_cache_enforces_byte_budget() {
346        let cache = InMemoryPipelineCache::with_limits(8, 10);
347        let a = PipelineFingerprint([0; 32]);
348        let mut b_bytes = [0; 32];
349        b_bytes[1] = 1;
350        let b = PipelineFingerprint(b_bytes);
351        let mut too_large_bytes = [0; 32];
352        too_large_bytes[1] = 2;
353        let too_large = PipelineFingerprint(too_large_bytes);
354
355        cache.put(a, vec![1; 6]);
356        cache.put(b, vec![2; 6]);
357        assert!(cache.get(&a).is_none());
358        assert_eq!(cache.get(&b).unwrap(), vec![2; 6]);
359        assert_eq!(cache.cached_bytes(), 6);
360
361        cache.put(too_large, vec![3; 11]);
362        assert!(cache.get(&too_large).is_none());
363        assert_eq!(cache.cached_bytes(), 6);
364    }
365
366    #[test]
367    fn in_memory_cache_metrics_track_hits_misses_and_evictions() {
368        let cache = InMemoryPipelineCache::with_limits(1, 8);
369        let a = PipelineFingerprint([0; 32]);
370        let mut b_bytes = [0; 32];
371        b_bytes[1] = 1;
372        let b = PipelineFingerprint(b_bytes);
373
374        assert!(cache.get(&a).is_none());
375        cache.put(a, vec![1; 4]);
376        assert!(cache.get(&a).is_some());
377        cache.put(b, vec![2; 4]);
378
379        let metrics = cache.metrics();
380        assert_eq!(metrics.lookups, 2);
381        assert_eq!(metrics.hits, 1);
382        assert_eq!(metrics.misses, 1);
383        assert_eq!(metrics.puts, 2);
384        assert_eq!(metrics.evictions, 1);
385        assert_eq!(metrics.cached_bytes, 4);
386        assert_eq!(metrics.entries, 1);
387        assert_eq!(metrics.hit_rate_ppm(), 500_000);
388    }
389
390    #[test]
391    fn poisoned_cache_shard_is_not_silently_recovered() {
392        let cache = Arc::new(InMemoryPipelineCache::new());
393        let poisoned = Arc::clone(&cache);
394        let _ = std::thread::spawn(move || {
395            let _guard = InMemoryPipelineCache::lock_shard(&poisoned.shards[0]);
396            panic!("poison in-memory pipeline cache shard");
397        })
398        .join();
399
400        let panic = std::panic::catch_unwind(|| {
401            let _ = cache.len();
402        })
403        .expect_err("poisoned pipeline cache shard must panic instead of recovering");
404        let message = panic
405            .downcast_ref::<String>()
406            .map(String::as_str)
407            .or_else(|| panic.downcast_ref::<&'static str>().copied())
408            .unwrap_or("<non-string panic>");
409        assert!(
410            message.contains("pipeline cache shard lock was poisoned"),
411            "{message}"
412        );
413    }
414}