Skip to main content

vyre_runtime/pipeline_cache/
in_memory.rs

1//! [`InMemoryPipelineCache`]  -  sharded zero-persistence cache. The hot
2//! path for in-process pipeline reuse.
3
4use std::sync::{Arc, Mutex, MutexGuard};
5
6use rustc_hash::FxHashMap;
7
8use super::fingerprint::PipelineFingerprint;
9use super::metrics::{PipelineCacheCounters, PipelineCacheMetrics};
10use super::store::PipelineCacheStore;
11
12/// In-memory pipeline cache  -  zero-persistence, zero-network, sharded
13/// `FxHashMap`s behind mutexes so concurrent `get`/`put` on different
14/// fingerprints rarely contend (VYRE_RUNTIME / PERF hot-cache audit).
15#[derive(Debug)]
16pub struct InMemoryPipelineCache {
17    shards: [Mutex<InMemoryCacheShard>; Self::SHARD_COUNT],
18    max_entries_per_shard: usize,
19    max_bytes_per_shard: usize,
20    metrics: PipelineCacheCounters,
21}
22
23impl InMemoryPipelineCache {
24    pub(super) const SHARD_COUNT: usize = 256;
25    pub(super) const MAX_ENTRIES_PER_SHARD: usize = 256;
26    pub(super) const MAX_BYTES_PER_SHARD: usize = 16 * 1024 * 1024;
27
28    #[inline]
29    fn shard_index(fp: &PipelineFingerprint) -> usize {
30        usize::from(fp.0[0]) % Self::SHARD_COUNT
31    }
32
33    fn lock_shard(shard: &Mutex<InMemoryCacheShard>) -> MutexGuard<'_, InMemoryCacheShard> {
34        // Fail closed on poison. `PoisonError::into_inner` would silently hand
35        // back a guard over shard state left half-mutated by a panicking writer
36        // — every subsequent cache read/write would then trust corrupt data with
37        // no signal. A poisoned pipeline-cache shard is unrecoverable; surface it
38        // loudly instead of laundering it.
39        shard
40            .lock()
41            .unwrap_or_else(|_| panic!("pipeline cache shard lock was poisoned"))
42    }
43
44    /// Construct an empty cache.
45    #[must_use]
46    pub fn new() -> Self {
47        Self::default()
48    }
49
50    /// Construct an empty cache with explicit per-shard entry and byte budgets.
51    ///
52    /// A zero entry budget or zero byte budget creates a disabled cache that
53    /// accepts `put` calls but retains no artifacts.
54    #[must_use]
55    pub fn with_limits(max_entries_per_shard: usize, max_bytes_per_shard: usize) -> Self {
56        Self {
57            shards: std::array::from_fn(|_| Mutex::new(InMemoryCacheShard::default())),
58            max_entries_per_shard,
59            max_bytes_per_shard,
60            metrics: PipelineCacheCounters::default(),
61        }
62    }
63
64    /// Current entry count. Thread-safe snapshot.
65    pub fn len(&self) -> usize {
66        self.shards
67            .iter()
68            .map(|s| Self::lock_shard(s).entries.len())
69            .fold(0usize, |acc, value| {
70                cache_usize_add(
71                    acc,
72                    value,
73                    "entry count",
74                    "shard cache metrics before snapshotting",
75                )
76            })
77    }
78
79    /// Current cached artifact bytes. Thread-safe snapshot.
80    pub fn cached_bytes(&self) -> usize {
81        self.shards
82            .iter()
83            .map(|s| Self::lock_shard(s).bytes)
84            .fold(0usize, |acc, value| {
85                cache_usize_add(
86                    acc,
87                    value,
88                    "byte count",
89                    "shard cache metrics before snapshotting",
90                )
91            })
92    }
93
94    /// Whether the cache is empty.
95    pub fn is_empty(&self) -> bool {
96        self.shards
97            .iter()
98            .all(|s| Self::lock_shard(s).entries.is_empty())
99    }
100
101    /// Snapshot the most recent eviction report from every shard that has
102    /// evicted at least one artifact.
103    #[must_use]
104    pub fn eviction_reports(&self) -> Vec<InMemoryEvictionReport> {
105        let mut reports = Vec::new();
106        for shard in &self.shards {
107            if let Some(report) = Self::lock_shard(shard).last_eviction {
108                reports.push(report);
109            }
110        }
111        reports
112    }
113}
114
115impl Default for InMemoryPipelineCache {
116    fn default() -> Self {
117        Self::with_limits(Self::MAX_ENTRIES_PER_SHARD, Self::MAX_BYTES_PER_SHARD)
118    }
119}
120
121fn cache_usize_add(lhs: usize, rhs: usize, _label: &'static str, _fix: &'static str) -> usize {
122    lhs.saturating_add(rhs)
123}
124
125fn cache_usize_sub(lhs: usize, rhs: usize, _label: &'static str, _fix: &'static str) -> usize {
126    lhs.saturating_sub(rhs)
127}
128
129fn cache_u64_add(lhs: u64, rhs: u64, _label: &'static str, _fix: &'static str) -> u64 {
130    lhs.saturating_add(rhs)
131}
132
133fn cache_u64_sub(lhs: u64, rhs: u64, _label: &'static str, _fix: &'static str) -> u64 {
134    lhs.saturating_sub(rhs)
135}
136
137fn cache_usize_to_u64(value: usize, _label: &'static str, _fix: &'static str) -> u64 {
138    match u64::try_from(value) {
139        Ok(value) => value,
140        Err(_) => u64::MAX,
141    }
142}
143
144#[derive(Debug, Default)]
145struct InMemoryCacheShard {
146    entries: FxHashMap<PipelineFingerprint, InMemoryCacheEntry>,
147    bytes: usize,
148    clock: u64,
149    last_eviction: Option<InMemoryEvictionReport>,
150}
151
152impl InMemoryCacheShard {
153    fn next_tick(&mut self) -> u64 {
154        self.clock = cache_u64_add(
155            self.clock,
156            1,
157            "shard clock",
158            "recreate the cache before LRU timestamps wrap",
159        );
160        self.clock
161    }
162
163    fn evict_to_limits(
164        &mut self,
165        max_entries: usize,
166        max_bytes: usize,
167    ) -> Option<InMemoryEvictionReport> {
168        let mut report = None;
169        while self.entries.len() > max_entries || self.bytes > max_bytes {
170            let reason = InMemoryEvictionReason::from_limits(
171                self.entries.len(),
172                self.bytes,
173                max_entries,
174                max_bytes,
175            );
176            let Some(victim) = self
177                .entries
178                .iter()
179                .min_by_key(|(_, entry)| entry.last_used)
180                .map(|(fp, _)| *fp)
181            else {
182                self.bytes = 0;
183                self.last_eviction = report;
184                return report;
185            };
186            if let Some(removed) = self.entries.remove(&victim) {
187                self.bytes = cache_usize_sub(
188                    self.bytes,
189                    removed.bytes,
190                    "byte accounting during eviction",
191                    "rebuild the cache",
192                );
193                record_eviction_report(&mut report, reason, self.clock, &removed);
194            }
195        }
196        if report.is_some() {
197            self.last_eviction = report;
198        }
199        report
200    }
201}
202
203#[derive(Debug)]
204struct InMemoryCacheEntry {
205    artifact: Arc<Vec<u8>>,
206    bytes: usize,
207    last_used: u64,
208}
209
210/// Reason the in-memory pipeline cache evicted retained artifacts.
211#[derive(Debug, Clone, Copy, PartialEq, Eq)]
212pub enum InMemoryEvictionReason {
213    /// Entry-count budget was exceeded.
214    EntryLimit,
215    /// Byte budget was exceeded.
216    ByteLimit,
217    /// Entry-count and byte budgets were both exceeded.
218    EntryAndByteLimit,
219    /// A rejected put removed an existing artifact for the same key.
220    RejectedPut,
221}
222
223impl InMemoryEvictionReason {
224    fn from_limits(entries: usize, bytes: usize, max_entries: usize, max_bytes: usize) -> Self {
225        match (entries > max_entries, bytes > max_bytes) {
226            (true, true) => Self::EntryAndByteLimit,
227            (true, false) => Self::EntryLimit,
228            (false, true) => Self::ByteLimit,
229            (false, false) => Self::EntryLimit,
230        }
231    }
232}
233
234/// Structured eviction report for one in-memory cache shard.
235#[derive(Debug, Clone, Copy, PartialEq, Eq)]
236pub struct InMemoryEvictionReport {
237    /// Number of entries removed by the eviction decision.
238    pub entries: u64,
239    /// Bytes removed by the eviction decision.
240    pub bytes: u64,
241    /// Maximum age of an evicted entry in shard clock ticks.
242    pub max_age_ticks: u64,
243    /// Why eviction happened.
244    pub reason: InMemoryEvictionReason,
245}
246
247fn record_eviction_report(
248    report: &mut Option<InMemoryEvictionReport>,
249    reason: InMemoryEvictionReason,
250    now: u64,
251    removed: &InMemoryCacheEntry,
252) {
253    let removed_bytes = cache_usize_to_u64(
254        removed.bytes,
255        "evicted byte count",
256        "shard cache artifacts before eviction",
257    );
258    let age = cache_u64_sub(
259        now,
260        removed.last_used,
261        "evicted entry age",
262        "rebuild the cache LRU clock",
263    );
264    match report {
265        Some(report) => {
266            report.entries = cache_u64_add(
267                report.entries,
268                1,
269                "eviction count",
270                "shard cache eviction work",
271            );
272            report.bytes = cache_u64_add(
273                report.bytes,
274                removed_bytes,
275                "evicted byte count",
276                "shard cache eviction work",
277            );
278            report.max_age_ticks = report.max_age_ticks.max(age);
279        }
280        None => {
281            *report = Some(InMemoryEvictionReport {
282                entries: 1,
283                bytes: removed_bytes,
284                max_age_ticks: age,
285                reason,
286            });
287        }
288    }
289}
290
291impl PipelineCacheStore for InMemoryPipelineCache {
292    fn get(&self, fp: &PipelineFingerprint) -> Option<Vec<u8>> {
293        self.get_arc(fp).map(|artifact| (*artifact).clone())
294    }
295
296    /// V7-PERF-009: zero-clone hot-path lookup. The cache already stores
297    /// payloads behind `Arc<Vec<u8>>`, so a hit is one refcount bump.
298    fn get_arc(&self, fp: &PipelineFingerprint) -> Option<Arc<Vec<u8>>> {
299        PipelineCacheCounters::increment(&self.metrics.lookups, "lookups");
300        let i = Self::shard_index(fp);
301        let mut shard = Self::lock_shard(&self.shards[i]);
302        let tick = shard.next_tick();
303        let Some(entry) = shard.entries.get_mut(fp) else {
304            PipelineCacheCounters::increment(&self.metrics.misses, "misses");
305            return None;
306        };
307        entry.last_used = tick;
308        PipelineCacheCounters::increment(&self.metrics.hits, "hits");
309        Some(Arc::clone(&entry.artifact))
310    }
311
312    fn put(&self, fp: PipelineFingerprint, artifact: Vec<u8>) {
313        let i = Self::shard_index(&fp);
314        let mut shard = Self::lock_shard(&self.shards[i]);
315        let bytes = artifact.len();
316        if self.max_entries_per_shard == 0
317            || self.max_bytes_per_shard == 0
318            || bytes > self.max_bytes_per_shard
319        {
320            PipelineCacheCounters::increment(&self.metrics.rejected_puts, "rejected puts");
321            if let Some(removed) = shard.entries.remove(&fp) {
322                let tick = shard.next_tick();
323                shard.bytes = cache_usize_sub(
324                    shard.bytes,
325                    removed.bytes,
326                    "byte accounting while rejecting put",
327                    "rebuild the cache",
328                );
329                let mut report = None;
330                record_eviction_report(
331                    &mut report,
332                    InMemoryEvictionReason::RejectedPut,
333                    tick,
334                    &removed,
335                );
336                shard.last_eviction = report;
337                PipelineCacheCounters::increment(&self.metrics.evictions, "evictions");
338                PipelineCacheCounters::add(
339                    &self.metrics.evicted_bytes,
340                    cache_usize_to_u64(
341                        removed.bytes,
342                        "evicted byte count",
343                        "shard cache artifacts before eviction",
344                    ),
345                    "evicted bytes",
346                );
347            }
348            return;
349        }
350
351        if let Some(existing) = shard.entries.remove(&fp) {
352            shard.bytes = cache_usize_sub(
353                shard.bytes,
354                existing.bytes,
355                "byte accounting while replacing entry",
356                "rebuild the cache",
357            );
358        }
359        let tick = shard.next_tick();
360        shard.bytes = cache_usize_add(
361            shard.bytes,
362            bytes,
363            "byte accounting while inserting entry",
364            "lower per-shard cache byte budget",
365        );
366        shard.entries.insert(
367            fp,
368            InMemoryCacheEntry {
369                artifact: Arc::new(artifact),
370                bytes,
371                last_used: tick,
372            },
373        );
374        PipelineCacheCounters::increment(&self.metrics.puts, "puts");
375        if let Some(report) =
376            shard.evict_to_limits(self.max_entries_per_shard, self.max_bytes_per_shard)
377        {
378            PipelineCacheCounters::add(&self.metrics.evictions, report.entries, "evictions");
379            PipelineCacheCounters::add(&self.metrics.evicted_bytes, report.bytes, "evicted bytes");
380        }
381    }
382
383    fn metrics(&self) -> PipelineCacheMetrics {
384        self.metrics
385            .snapshot(
386                cache_usize_to_u64(
387                    self.cached_bytes(),
388                    "retained byte snapshot",
389                    "shard cache metrics before snapshotting",
390                ),
391                cache_usize_to_u64(
392                    self.len(),
393                    "entry count snapshot",
394                    "shard cache metrics before snapshotting",
395                ),
396            )
397    }
398}
399
400#[cfg(test)]
401mod tests {
402    use super::*;
403    use crate::pipeline_cache::test_helpers::tiny_program;
404
405    #[test]
406    fn in_memory_cache_roundtrip() {
407        let cache = InMemoryPipelineCache::new();
408        let fp = PipelineFingerprint::of(&tiny_program());
409        assert!(cache.get(&fp).is_none());
410        cache.put(fp, b"target-bytes".to_vec());
411        assert_eq!(cache.get(&fp).unwrap(), b"target-bytes".to_vec());
412        assert_eq!(cache.len(), 1);
413    }
414
415    #[test]
416    fn in_memory_cache_caps_each_shard() {
417        let cache = InMemoryPipelineCache::new();
418        for i in 0..(InMemoryPipelineCache::MAX_ENTRIES_PER_SHARD + 17) {
419            let mut bytes = [0_u8; 32];
420            bytes[1..9].copy_from_slice(&(i as u64).to_le_bytes());
421            cache.put(PipelineFingerprint(bytes), vec![i as u8]);
422        }
423        assert_eq!(cache.len(), InMemoryPipelineCache::MAX_ENTRIES_PER_SHARD);
424    }
425
426    #[test]
427    fn in_memory_cache_evicts_least_recently_used_entry() {
428        let cache = InMemoryPipelineCache::with_limits(2, 1024);
429        let a = PipelineFingerprint([0; 32]);
430        let mut b_bytes = [0; 32];
431        b_bytes[1] = 1;
432        let b = PipelineFingerprint(b_bytes);
433        let mut c_bytes = [0; 32];
434        c_bytes[1] = 2;
435        let c = PipelineFingerprint(c_bytes);
436
437        cache.put(a, b"a".to_vec());
438        cache.put(b, b"b".to_vec());
439        assert_eq!(cache.get(&a).unwrap(), b"a".to_vec());
440        cache.put(c, b"c".to_vec());
441
442        assert_eq!(cache.get(&a).unwrap(), b"a".to_vec());
443        assert!(cache.get(&b).is_none());
444        assert_eq!(cache.get(&c).unwrap(), b"c".to_vec());
445    }
446
447    #[test]
448    fn in_memory_cache_enforces_byte_budget() {
449        let cache = InMemoryPipelineCache::with_limits(8, 10);
450        let a = PipelineFingerprint([0; 32]);
451        let mut b_bytes = [0; 32];
452        b_bytes[1] = 1;
453        let b = PipelineFingerprint(b_bytes);
454        let mut too_large_bytes = [0; 32];
455        too_large_bytes[1] = 2;
456        let too_large = PipelineFingerprint(too_large_bytes);
457
458        cache.put(a, vec![1; 6]);
459        cache.put(b, vec![2; 6]);
460        assert!(cache.get(&a).is_none());
461        assert_eq!(cache.get(&b).unwrap(), vec![2; 6]);
462        assert_eq!(cache.cached_bytes(), 6);
463
464        cache.put(too_large, vec![3; 11]);
465        assert!(cache.get(&too_large).is_none());
466        assert_eq!(cache.cached_bytes(), 6);
467    }
468
469    #[test]
470    fn in_memory_cache_metrics_track_hits_misses_and_evictions() {
471        let cache = InMemoryPipelineCache::with_limits(1, 8);
472        let a = PipelineFingerprint([0; 32]);
473        let mut b_bytes = [0; 32];
474        b_bytes[1] = 1;
475        let b = PipelineFingerprint(b_bytes);
476
477        assert!(cache.get(&a).is_none());
478        cache.put(a, vec![1; 4]);
479        assert!(cache.get(&a).is_some());
480        cache.put(b, vec![2; 4]);
481
482        let metrics = cache.metrics();
483        assert_eq!(metrics.lookups, 2);
484        assert_eq!(metrics.hits, 1);
485        assert_eq!(metrics.misses, 1);
486        assert_eq!(metrics.puts, 2);
487        assert_eq!(metrics.evictions, 1);
488        assert_eq!(metrics.cached_bytes, 4);
489        assert_eq!(metrics.entries, 1);
490        assert_eq!(metrics.hit_rate_ppm(), 500_000);
491    }
492
493    #[test]
494    fn in_memory_cache_eviction_report_records_reason_entries_bytes_and_age() {
495        let cache = InMemoryPipelineCache::with_limits(1, 8);
496        let a = PipelineFingerprint([0; 32]);
497        let mut b_bytes = [0; 32];
498        b_bytes[1] = 1;
499        let b = PipelineFingerprint(b_bytes);
500
501        cache.put(a, vec![1; 4]);
502        assert!(cache.get(&a).is_some());
503        cache.put(b, vec![2; 4]);
504
505        let reports = cache.eviction_reports();
506        assert_eq!(reports.len(), 1);
507        let report = reports[0];
508        assert_eq!(report.reason, InMemoryEvictionReason::EntryLimit);
509        assert_eq!(report.entries, 1);
510        assert_eq!(report.bytes, 4);
511        assert!(
512            report.max_age_ticks > 0,
513            "Fix: eviction reports must expose LRU age, got {report:?}"
514        );
515    }
516
517    #[test]
518    fn rejected_oversize_put_records_eviction_reason_for_replaced_entry() {
519        let cache = InMemoryPipelineCache::with_limits(8, 8);
520        let fp = PipelineFingerprint([0; 32]);
521
522        cache.put(fp, vec![1; 4]);
523        cache.put(fp, vec![2; 9]);
524
525        assert!(cache.get(&fp).is_none());
526        let reports = cache.eviction_reports();
527        assert_eq!(reports.len(), 1);
528        let report = reports[0];
529        assert_eq!(report.reason, InMemoryEvictionReason::RejectedPut);
530        assert_eq!(report.entries, 1);
531        assert_eq!(report.bytes, 4);
532    }
533
534    #[test]
535    fn poisoned_cache_shard_is_not_silently_recovered() {
536        let cache = Arc::new(InMemoryPipelineCache::new());
537        let poisoned = Arc::clone(&cache);
538        let _ = std::thread::spawn(move || {
539            let _guard = InMemoryPipelineCache::lock_shard(&poisoned.shards[0]);
540            panic!("poison in-memory pipeline cache shard");
541        })
542        .join();
543
544        let panic = std::panic::catch_unwind(|| {
545            let _ = cache.len();
546        })
547        .expect_err("poisoned pipeline cache shard must panic instead of recovering");
548        let message = panic
549            .downcast_ref::<String>()
550            .map(String::as_str)
551            .or_else(|| panic.downcast_ref::<&'static str>().copied())
552            .unwrap_or("<non-string panic>");
553        assert!(
554            message.contains("pipeline cache shard lock was poisoned"),
555            "{message}"
556        );
557    }
558}