vyre_runtime/pipeline_cache/
in_memory.rs1use std::sync::{Arc, Mutex, MutexGuard};
5
6use rustc_hash::FxHashMap;
7use vyre_driver::accounting::{
8 checked_add_u64_lazy, checked_add_usize_lazy, checked_sub_usize_lazy, checked_usize_to_u64_lazy,
9};
10
11use super::fingerprint::PipelineFingerprint;
12use super::metrics::{PipelineCacheCounters, PipelineCacheMetrics};
13use super::store::PipelineCacheStore;
14
15#[derive(Debug)]
19pub struct InMemoryPipelineCache {
20 shards: [Mutex<InMemoryCacheShard>; Self::SHARD_COUNT],
21 max_entries_per_shard: usize,
22 max_bytes_per_shard: usize,
23 metrics: PipelineCacheCounters,
24}
25
26impl InMemoryPipelineCache {
27 pub(super) const SHARD_COUNT: usize = 256;
28 pub(super) const MAX_ENTRIES_PER_SHARD: usize = 256;
29 pub(super) const MAX_BYTES_PER_SHARD: usize = 16 * 1024 * 1024;
30
31 #[inline]
32 fn shard_index(fp: &PipelineFingerprint) -> usize {
33 usize::from(fp.0[0]) % Self::SHARD_COUNT
34 }
35
36 fn lock_shard(shard: &Mutex<InMemoryCacheShard>) -> MutexGuard<'_, InMemoryCacheShard> {
37 shard.lock().unwrap_or_else(|error| {
38 panic!(
39 "Vyre in-memory pipeline cache shard lock was poisoned: {error}. Fix: discard this cache instance after a panic; continuing could publish corrupted pipeline artifacts."
40 )
41 })
42 }
43
44 #[must_use]
46 pub fn new() -> Self {
47 Self::default()
48 }
49
50 #[must_use]
55 pub fn with_limits(max_entries_per_shard: usize, max_bytes_per_shard: usize) -> Self {
56 Self {
57 shards: std::array::from_fn(|_| Mutex::new(InMemoryCacheShard::default())),
58 max_entries_per_shard,
59 max_bytes_per_shard,
60 metrics: PipelineCacheCounters::default(),
61 }
62 }
63
64 pub fn len(&self) -> usize {
66 self.shards
67 .iter()
68 .map(|s| Self::lock_shard(s).entries.len())
69 .fold(0usize, |acc, value| {
70 cache_usize_add(
71 acc,
72 value,
73 "entry count",
74 "shard cache metrics before snapshotting",
75 )
76 })
77 }
78
79 pub fn cached_bytes(&self) -> usize {
81 self.shards
82 .iter()
83 .map(|s| Self::lock_shard(s).bytes)
84 .fold(0usize, |acc, value| {
85 cache_usize_add(
86 acc,
87 value,
88 "byte count",
89 "shard cache metrics before snapshotting",
90 )
91 })
92 }
93
94 pub fn is_empty(&self) -> bool {
96 self.shards
97 .iter()
98 .all(|s| Self::lock_shard(s).entries.is_empty())
99 }
100}
101
102impl Default for InMemoryPipelineCache {
103 fn default() -> Self {
104 Self::with_limits(Self::MAX_ENTRIES_PER_SHARD, Self::MAX_BYTES_PER_SHARD)
105 }
106}
107
108fn cache_usize_add(lhs: usize, rhs: usize, label: &'static str, fix: &'static str) -> usize {
109 checked_add_usize_lazy(lhs, rhs, || {
110 format!("Vyre in-memory pipeline cache {label} overflowed usize. Fix: {fix}.")
111 })
112 .unwrap_or_else(|message| panic!("{message}"))
113}
114
115fn cache_usize_sub(lhs: usize, rhs: usize, label: &'static str, fix: &'static str) -> usize {
116 checked_sub_usize_lazy(lhs, rhs, || {
117 format!("Vyre in-memory pipeline cache {label} underflowed usize. Fix: {fix}.")
118 })
119 .unwrap_or_else(|message| panic!("{message}"))
120}
121
122fn cache_u64_add(lhs: u64, rhs: u64, label: &'static str, fix: &'static str) -> u64 {
123 checked_add_u64_lazy(lhs, rhs, || {
124 format!("Vyre in-memory pipeline cache {label} overflowed u64. Fix: {fix}.")
125 })
126 .unwrap_or_else(|message| panic!("{message}"))
127}
128
129fn cache_usize_to_u64(value: usize, label: &'static str, fix: &'static str) -> u64 {
130 checked_usize_to_u64_lazy(value, || {
131 format!("Vyre in-memory pipeline cache {label} cannot fit u64. Fix: {fix}.")
132 })
133 .unwrap_or_else(|message| panic!("{message}"))
134}
135
136#[derive(Debug, Default)]
137struct InMemoryCacheShard {
138 entries: FxHashMap<PipelineFingerprint, InMemoryCacheEntry>,
139 bytes: usize,
140 clock: u64,
141}
142
143impl InMemoryCacheShard {
144 fn next_tick(&mut self) -> u64 {
145 self.clock = cache_u64_add(
146 self.clock,
147 1,
148 "shard clock",
149 "recreate the cache before LRU timestamps wrap",
150 );
151 self.clock
152 }
153
154 fn evict_to_limits(&mut self, max_entries: usize, max_bytes: usize) -> (u64, u64) {
155 let mut evictions = 0_u64;
156 let mut evicted_bytes = 0_u64;
157 while self.entries.len() > max_entries || self.bytes > max_bytes {
158 let Some(victim) = self
159 .entries
160 .iter()
161 .min_by_key(|(_, entry)| entry.last_used)
162 .map(|(fp, _)| *fp)
163 else {
164 self.bytes = 0;
165 return (evictions, evicted_bytes);
166 };
167 if let Some(removed) = self.entries.remove(&victim) {
168 self.bytes = cache_usize_sub(
169 self.bytes,
170 removed.bytes,
171 "byte accounting during eviction",
172 "rebuild the cache",
173 );
174 evictions =
175 cache_u64_add(evictions, 1, "eviction count", "shard cache eviction work");
176 evicted_bytes = cache_u64_add(
177 evicted_bytes,
178 cache_usize_to_u64(
179 removed.bytes,
180 "evicted byte count",
181 "shard cache artifacts before eviction",
182 ),
183 "evicted byte count",
184 "shard cache eviction work",
185 );
186 }
187 }
188 (evictions, evicted_bytes)
189 }
190}
191
192#[derive(Debug)]
193struct InMemoryCacheEntry {
194 artifact: Arc<Vec<u8>>,
195 bytes: usize,
196 last_used: u64,
197}
198
199impl PipelineCacheStore for InMemoryPipelineCache {
200 fn get(&self, fp: &PipelineFingerprint) -> Option<Vec<u8>> {
201 self.get_arc(fp).map(|artifact| (*artifact).clone())
202 }
203
204 fn get_arc(&self, fp: &PipelineFingerprint) -> Option<Arc<Vec<u8>>> {
207 PipelineCacheCounters::increment(&self.metrics.lookups, "lookups");
208 let i = Self::shard_index(fp);
209 let mut shard = Self::lock_shard(&self.shards[i]);
210 let tick = shard.next_tick();
211 let Some(entry) = shard.entries.get_mut(fp) else {
212 PipelineCacheCounters::increment(&self.metrics.misses, "misses");
213 return None;
214 };
215 entry.last_used = tick;
216 PipelineCacheCounters::increment(&self.metrics.hits, "hits");
217 Some(Arc::clone(&entry.artifact))
218 }
219
220 fn put(&self, fp: PipelineFingerprint, artifact: Vec<u8>) {
221 let i = Self::shard_index(&fp);
222 let mut shard = Self::lock_shard(&self.shards[i]);
223 let bytes = artifact.len();
224 if self.max_entries_per_shard == 0
225 || self.max_bytes_per_shard == 0
226 || bytes > self.max_bytes_per_shard
227 {
228 PipelineCacheCounters::increment(&self.metrics.rejected_puts, "rejected puts");
229 if let Some(removed) = shard.entries.remove(&fp) {
230 shard.bytes = cache_usize_sub(
231 shard.bytes,
232 removed.bytes,
233 "byte accounting while rejecting put",
234 "rebuild the cache",
235 );
236 PipelineCacheCounters::increment(&self.metrics.evictions, "evictions");
237 PipelineCacheCounters::add(
238 &self.metrics.evicted_bytes,
239 cache_usize_to_u64(
240 removed.bytes,
241 "evicted byte count",
242 "shard cache artifacts before eviction",
243 ),
244 "evicted bytes",
245 );
246 }
247 return;
248 }
249
250 if let Some(existing) = shard.entries.remove(&fp) {
251 shard.bytes = cache_usize_sub(
252 shard.bytes,
253 existing.bytes,
254 "byte accounting while replacing entry",
255 "rebuild the cache",
256 );
257 }
258 let tick = shard.next_tick();
259 shard.bytes = cache_usize_add(
260 shard.bytes,
261 bytes,
262 "byte accounting while inserting entry",
263 "lower per-shard cache byte budget",
264 );
265 shard.entries.insert(
266 fp,
267 InMemoryCacheEntry {
268 artifact: Arc::new(artifact),
269 bytes,
270 last_used: tick,
271 },
272 );
273 PipelineCacheCounters::increment(&self.metrics.puts, "puts");
274 let (evictions, evicted_bytes) =
275 shard.evict_to_limits(self.max_entries_per_shard, self.max_bytes_per_shard);
276 PipelineCacheCounters::add(&self.metrics.evictions, evictions, "evictions");
277 PipelineCacheCounters::add(&self.metrics.evicted_bytes, evicted_bytes, "evicted bytes");
278 }
279
280 fn metrics(&self) -> PipelineCacheMetrics {
281 self.metrics
282 .snapshot(
283 u64::try_from(self.cached_bytes()).unwrap_or_else(|error| {
284 panic!(
285 "Vyre in-memory pipeline cache retained bytes cannot fit u64: {error}. Fix: shard cache metrics before snapshotting."
286 )
287 }),
288 u64::try_from(self.len()).unwrap_or_else(|error| {
289 panic!(
290 "Vyre in-memory pipeline cache entry count cannot fit u64: {error}. Fix: shard cache metrics before snapshotting."
291 )
292 }),
293 )
294 }
295}
296
297#[cfg(test)]
298mod tests {
299 use super::*;
300 use crate::pipeline_cache::test_helpers::tiny_program;
301
302 #[test]
303 fn in_memory_cache_roundtrip() {
304 let cache = InMemoryPipelineCache::new();
305 let fp = PipelineFingerprint::of(&tiny_program());
306 assert!(cache.get(&fp).is_none());
307 cache.put(fp, b"target-bytes".to_vec());
308 assert_eq!(cache.get(&fp).unwrap(), b"target-bytes".to_vec());
309 assert_eq!(cache.len(), 1);
310 }
311
312 #[test]
313 fn in_memory_cache_caps_each_shard() {
314 let cache = InMemoryPipelineCache::new();
315 for i in 0..(InMemoryPipelineCache::MAX_ENTRIES_PER_SHARD + 17) {
316 let mut bytes = [0_u8; 32];
317 bytes[1..9].copy_from_slice(&(i as u64).to_le_bytes());
318 cache.put(PipelineFingerprint(bytes), vec![i as u8]);
319 }
320 assert_eq!(cache.len(), InMemoryPipelineCache::MAX_ENTRIES_PER_SHARD);
321 }
322
323 #[test]
324 fn in_memory_cache_evicts_least_recently_used_entry() {
325 let cache = InMemoryPipelineCache::with_limits(2, 1024);
326 let a = PipelineFingerprint([0; 32]);
327 let mut b_bytes = [0; 32];
328 b_bytes[1] = 1;
329 let b = PipelineFingerprint(b_bytes);
330 let mut c_bytes = [0; 32];
331 c_bytes[1] = 2;
332 let c = PipelineFingerprint(c_bytes);
333
334 cache.put(a, b"a".to_vec());
335 cache.put(b, b"b".to_vec());
336 assert_eq!(cache.get(&a).unwrap(), b"a".to_vec());
337 cache.put(c, b"c".to_vec());
338
339 assert_eq!(cache.get(&a).unwrap(), b"a".to_vec());
340 assert!(cache.get(&b).is_none());
341 assert_eq!(cache.get(&c).unwrap(), b"c".to_vec());
342 }
343
344 #[test]
345 fn in_memory_cache_enforces_byte_budget() {
346 let cache = InMemoryPipelineCache::with_limits(8, 10);
347 let a = PipelineFingerprint([0; 32]);
348 let mut b_bytes = [0; 32];
349 b_bytes[1] = 1;
350 let b = PipelineFingerprint(b_bytes);
351 let mut too_large_bytes = [0; 32];
352 too_large_bytes[1] = 2;
353 let too_large = PipelineFingerprint(too_large_bytes);
354
355 cache.put(a, vec![1; 6]);
356 cache.put(b, vec![2; 6]);
357 assert!(cache.get(&a).is_none());
358 assert_eq!(cache.get(&b).unwrap(), vec![2; 6]);
359 assert_eq!(cache.cached_bytes(), 6);
360
361 cache.put(too_large, vec![3; 11]);
362 assert!(cache.get(&too_large).is_none());
363 assert_eq!(cache.cached_bytes(), 6);
364 }
365
366 #[test]
367 fn in_memory_cache_metrics_track_hits_misses_and_evictions() {
368 let cache = InMemoryPipelineCache::with_limits(1, 8);
369 let a = PipelineFingerprint([0; 32]);
370 let mut b_bytes = [0; 32];
371 b_bytes[1] = 1;
372 let b = PipelineFingerprint(b_bytes);
373
374 assert!(cache.get(&a).is_none());
375 cache.put(a, vec![1; 4]);
376 assert!(cache.get(&a).is_some());
377 cache.put(b, vec![2; 4]);
378
379 let metrics = cache.metrics();
380 assert_eq!(metrics.lookups, 2);
381 assert_eq!(metrics.hits, 1);
382 assert_eq!(metrics.misses, 1);
383 assert_eq!(metrics.puts, 2);
384 assert_eq!(metrics.evictions, 1);
385 assert_eq!(metrics.cached_bytes, 4);
386 assert_eq!(metrics.entries, 1);
387 assert_eq!(metrics.hit_rate_ppm(), 500_000);
388 }
389
390 #[test]
391 fn poisoned_cache_shard_is_not_silently_recovered() {
392 let cache = Arc::new(InMemoryPipelineCache::new());
393 let poisoned = Arc::clone(&cache);
394 let _ = std::thread::spawn(move || {
395 let _guard = InMemoryPipelineCache::lock_shard(&poisoned.shards[0]);
396 panic!("poison in-memory pipeline cache shard");
397 })
398 .join();
399
400 let panic = std::panic::catch_unwind(|| {
401 let _ = cache.len();
402 })
403 .expect_err("poisoned pipeline cache shard must panic instead of recovering");
404 let message = panic
405 .downcast_ref::<String>()
406 .map(String::as_str)
407 .or_else(|| panic.downcast_ref::<&'static str>().copied())
408 .unwrap_or("<non-string panic>");
409 assert!(
410 message.contains("pipeline cache shard lock was poisoned"),
411 "{message}"
412 );
413 }
414}