vyre_runtime/pipeline_cache/
in_memory.rs1use std::sync::{Arc, Mutex, MutexGuard};
5
6use rustc_hash::FxHashMap;
7use vyre_driver::accounting::{
8 checked_add_u64_lazy, checked_add_usize_lazy, checked_sub_usize_lazy,
9 checked_usize_to_u64_lazy,
10};
11
12use super::fingerprint::PipelineFingerprint;
13use super::metrics::{PipelineCacheCounters, PipelineCacheMetrics};
14use super::store::PipelineCacheStore;
15
16#[derive(Debug)]
20pub struct InMemoryPipelineCache {
21 shards: [Mutex<InMemoryCacheShard>; Self::SHARD_COUNT],
22 max_entries_per_shard: usize,
23 max_bytes_per_shard: usize,
24 metrics: PipelineCacheCounters,
25}
26
27impl InMemoryPipelineCache {
28 pub(super) const SHARD_COUNT: usize = 256;
29 pub(super) const MAX_ENTRIES_PER_SHARD: usize = 256;
30 pub(super) const MAX_BYTES_PER_SHARD: usize = 16 * 1024 * 1024;
31
32 #[inline]
33 fn shard_index(fp: &PipelineFingerprint) -> usize {
34 usize::from(fp.0[0]) % Self::SHARD_COUNT
35 }
36
37 fn lock_shard(shard: &Mutex<InMemoryCacheShard>) -> MutexGuard<'_, InMemoryCacheShard> {
38 shard.lock().unwrap_or_else(|error| {
39 panic!(
40 "Vyre in-memory pipeline cache shard lock was poisoned: {error}. Fix: discard this cache instance after a panic; continuing could publish corrupted pipeline artifacts."
41 )
42 })
43 }
44
45 #[must_use]
47 pub fn new() -> Self {
48 Self::default()
49 }
50
51 #[must_use]
56 pub fn with_limits(max_entries_per_shard: usize, max_bytes_per_shard: usize) -> Self {
57 Self {
58 shards: std::array::from_fn(|_| Mutex::new(InMemoryCacheShard::default())),
59 max_entries_per_shard,
60 max_bytes_per_shard,
61 metrics: PipelineCacheCounters::default(),
62 }
63 }
64
65 pub fn len(&self) -> usize {
67 self.shards
68 .iter()
69 .map(|s| Self::lock_shard(s).entries.len())
70 .fold(0usize, |acc, value| {
71 cache_usize_add(
72 acc,
73 value,
74 "entry count",
75 "shard cache metrics before snapshotting",
76 )
77 })
78 }
79
80 pub fn cached_bytes(&self) -> usize {
82 self.shards
83 .iter()
84 .map(|s| Self::lock_shard(s).bytes)
85 .fold(0usize, |acc, value| {
86 cache_usize_add(
87 acc,
88 value,
89 "byte count",
90 "shard cache metrics before snapshotting",
91 )
92 })
93 }
94
95 pub fn is_empty(&self) -> bool {
97 self.shards
98 .iter()
99 .all(|s| Self::lock_shard(s).entries.is_empty())
100 }
101}
102
103impl Default for InMemoryPipelineCache {
104 fn default() -> Self {
105 Self::with_limits(Self::MAX_ENTRIES_PER_SHARD, Self::MAX_BYTES_PER_SHARD)
106 }
107}
108
109fn cache_usize_add(lhs: usize, rhs: usize, label: &'static str, fix: &'static str) -> usize {
110 checked_add_usize_lazy(lhs, rhs, || {
111 format!("Vyre in-memory pipeline cache {label} overflowed usize. Fix: {fix}.")
112 })
113 .unwrap_or_else(|message| panic!("{message}"))
114}
115
116fn cache_usize_sub(lhs: usize, rhs: usize, label: &'static str, fix: &'static str) -> usize {
117 checked_sub_usize_lazy(lhs, rhs, || {
118 format!("Vyre in-memory pipeline cache {label} underflowed usize. Fix: {fix}.")
119 })
120 .unwrap_or_else(|message| panic!("{message}"))
121}
122
123fn cache_u64_add(lhs: u64, rhs: u64, label: &'static str, fix: &'static str) -> u64 {
124 checked_add_u64_lazy(lhs, rhs, || {
125 format!("Vyre in-memory pipeline cache {label} overflowed u64. Fix: {fix}.")
126 })
127 .unwrap_or_else(|message| panic!("{message}"))
128}
129
130fn cache_usize_to_u64(value: usize, label: &'static str, fix: &'static str) -> u64 {
131 checked_usize_to_u64_lazy(value, || {
132 format!("Vyre in-memory pipeline cache {label} cannot fit u64. Fix: {fix}.")
133 })
134 .unwrap_or_else(|message| panic!("{message}"))
135}
136
137#[derive(Debug, Default)]
138struct InMemoryCacheShard {
139 entries: FxHashMap<PipelineFingerprint, InMemoryCacheEntry>,
140 bytes: usize,
141 clock: u64,
142}
143
144impl InMemoryCacheShard {
145 fn next_tick(&mut self) -> u64 {
146 self.clock = cache_u64_add(
147 self.clock,
148 1,
149 "shard clock",
150 "recreate the cache before LRU timestamps wrap",
151 );
152 self.clock
153 }
154
155 fn evict_to_limits(&mut self, max_entries: usize, max_bytes: usize) -> (u64, u64) {
156 let mut evictions = 0_u64;
157 let mut evicted_bytes = 0_u64;
158 while self.entries.len() > max_entries || self.bytes > max_bytes {
159 let Some(victim) = self
160 .entries
161 .iter()
162 .min_by_key(|(_, entry)| entry.last_used)
163 .map(|(fp, _)| *fp)
164 else {
165 self.bytes = 0;
166 return (evictions, evicted_bytes);
167 };
168 if let Some(removed) = self.entries.remove(&victim) {
169 self.bytes = cache_usize_sub(
170 self.bytes,
171 removed.bytes,
172 "byte accounting during eviction",
173 "rebuild the cache",
174 );
175 evictions = cache_u64_add(
176 evictions,
177 1,
178 "eviction count",
179 "shard cache eviction work",
180 );
181 evicted_bytes = cache_u64_add(
182 evicted_bytes,
183 cache_usize_to_u64(
184 removed.bytes,
185 "evicted byte count",
186 "shard cache artifacts before eviction",
187 ),
188 "evicted byte count",
189 "shard cache eviction work",
190 );
191 }
192 }
193 (evictions, evicted_bytes)
194 }
195}
196
197#[derive(Debug)]
198struct InMemoryCacheEntry {
199 artifact: Arc<Vec<u8>>,
200 bytes: usize,
201 last_used: u64,
202}
203
204impl PipelineCacheStore for InMemoryPipelineCache {
205 fn get(&self, fp: &PipelineFingerprint) -> Option<Vec<u8>> {
206 self.get_arc(fp).map(|artifact| (*artifact).clone())
207 }
208
209 fn get_arc(&self, fp: &PipelineFingerprint) -> Option<Arc<Vec<u8>>> {
212 PipelineCacheCounters::increment(&self.metrics.lookups, "lookups");
213 let i = Self::shard_index(fp);
214 let mut shard = Self::lock_shard(&self.shards[i]);
215 let tick = shard.next_tick();
216 let Some(entry) = shard.entries.get_mut(fp) else {
217 PipelineCacheCounters::increment(&self.metrics.misses, "misses");
218 return None;
219 };
220 entry.last_used = tick;
221 PipelineCacheCounters::increment(&self.metrics.hits, "hits");
222 Some(Arc::clone(&entry.artifact))
223 }
224
225 fn put(&self, fp: PipelineFingerprint, artifact: Vec<u8>) {
226 let i = Self::shard_index(&fp);
227 let mut shard = Self::lock_shard(&self.shards[i]);
228 let bytes = artifact.len();
229 if self.max_entries_per_shard == 0
230 || self.max_bytes_per_shard == 0
231 || bytes > self.max_bytes_per_shard
232 {
233 PipelineCacheCounters::increment(&self.metrics.rejected_puts, "rejected puts");
234 if let Some(removed) = shard.entries.remove(&fp) {
235 shard.bytes = cache_usize_sub(
236 shard.bytes,
237 removed.bytes,
238 "byte accounting while rejecting put",
239 "rebuild the cache",
240 );
241 PipelineCacheCounters::increment(&self.metrics.evictions, "evictions");
242 PipelineCacheCounters::add(
243 &self.metrics.evicted_bytes,
244 cache_usize_to_u64(
245 removed.bytes,
246 "evicted byte count",
247 "shard cache artifacts before eviction",
248 ),
249 "evicted bytes",
250 );
251 }
252 return;
253 }
254
255 if let Some(existing) = shard.entries.remove(&fp) {
256 shard.bytes = cache_usize_sub(
257 shard.bytes,
258 existing.bytes,
259 "byte accounting while replacing entry",
260 "rebuild the cache",
261 );
262 }
263 let tick = shard.next_tick();
264 shard.bytes = cache_usize_add(
265 shard.bytes,
266 bytes,
267 "byte accounting while inserting entry",
268 "lower per-shard cache byte budget",
269 );
270 shard.entries.insert(
271 fp,
272 InMemoryCacheEntry {
273 artifact: Arc::new(artifact),
274 bytes,
275 last_used: tick,
276 },
277 );
278 PipelineCacheCounters::increment(&self.metrics.puts, "puts");
279 let (evictions, evicted_bytes) =
280 shard.evict_to_limits(self.max_entries_per_shard, self.max_bytes_per_shard);
281 PipelineCacheCounters::add(&self.metrics.evictions, evictions, "evictions");
282 PipelineCacheCounters::add(&self.metrics.evicted_bytes, evicted_bytes, "evicted bytes");
283 }
284
285 fn metrics(&self) -> PipelineCacheMetrics {
286 self.metrics
287 .snapshot(
288 u64::try_from(self.cached_bytes()).unwrap_or_else(|error| {
289 panic!(
290 "Vyre in-memory pipeline cache retained bytes cannot fit u64: {error}. Fix: shard cache metrics before snapshotting."
291 )
292 }),
293 u64::try_from(self.len()).unwrap_or_else(|error| {
294 panic!(
295 "Vyre in-memory pipeline cache entry count cannot fit u64: {error}. Fix: shard cache metrics before snapshotting."
296 )
297 }),
298 )
299 }
300}
301
302#[cfg(test)]
303mod tests {
304 use super::*;
305 use crate::pipeline_cache::test_helpers::tiny_program;
306
307 #[test]
308 fn in_memory_cache_roundtrip() {
309 let cache = InMemoryPipelineCache::new();
310 let fp = PipelineFingerprint::of(&tiny_program());
311 assert!(cache.get(&fp).is_none());
312 cache.put(fp, b"target-bytes".to_vec());
313 assert_eq!(cache.get(&fp).unwrap(), b"target-bytes".to_vec());
314 assert_eq!(cache.len(), 1);
315 }
316
317 #[test]
318 fn in_memory_cache_caps_each_shard() {
319 let cache = InMemoryPipelineCache::new();
320 for i in 0..(InMemoryPipelineCache::MAX_ENTRIES_PER_SHARD + 17) {
321 let mut bytes = [0_u8; 32];
322 bytes[1..9].copy_from_slice(&(i as u64).to_le_bytes());
323 cache.put(PipelineFingerprint(bytes), vec![i as u8]);
324 }
325 assert_eq!(cache.len(), InMemoryPipelineCache::MAX_ENTRIES_PER_SHARD);
326 }
327
328 #[test]
329 fn in_memory_cache_evicts_least_recently_used_entry() {
330 let cache = InMemoryPipelineCache::with_limits(2, 1024);
331 let a = PipelineFingerprint([0; 32]);
332 let mut b_bytes = [0; 32];
333 b_bytes[1] = 1;
334 let b = PipelineFingerprint(b_bytes);
335 let mut c_bytes = [0; 32];
336 c_bytes[1] = 2;
337 let c = PipelineFingerprint(c_bytes);
338
339 cache.put(a, b"a".to_vec());
340 cache.put(b, b"b".to_vec());
341 assert_eq!(cache.get(&a).unwrap(), b"a".to_vec());
342 cache.put(c, b"c".to_vec());
343
344 assert_eq!(cache.get(&a).unwrap(), b"a".to_vec());
345 assert!(cache.get(&b).is_none());
346 assert_eq!(cache.get(&c).unwrap(), b"c".to_vec());
347 }
348
349 #[test]
350 fn in_memory_cache_enforces_byte_budget() {
351 let cache = InMemoryPipelineCache::with_limits(8, 10);
352 let a = PipelineFingerprint([0; 32]);
353 let mut b_bytes = [0; 32];
354 b_bytes[1] = 1;
355 let b = PipelineFingerprint(b_bytes);
356 let mut too_large_bytes = [0; 32];
357 too_large_bytes[1] = 2;
358 let too_large = PipelineFingerprint(too_large_bytes);
359
360 cache.put(a, vec![1; 6]);
361 cache.put(b, vec![2; 6]);
362 assert!(cache.get(&a).is_none());
363 assert_eq!(cache.get(&b).unwrap(), vec![2; 6]);
364 assert_eq!(cache.cached_bytes(), 6);
365
366 cache.put(too_large, vec![3; 11]);
367 assert!(cache.get(&too_large).is_none());
368 assert_eq!(cache.cached_bytes(), 6);
369 }
370
371 #[test]
372 fn in_memory_cache_metrics_track_hits_misses_and_evictions() {
373 let cache = InMemoryPipelineCache::with_limits(1, 8);
374 let a = PipelineFingerprint([0; 32]);
375 let mut b_bytes = [0; 32];
376 b_bytes[1] = 1;
377 let b = PipelineFingerprint(b_bytes);
378
379 assert!(cache.get(&a).is_none());
380 cache.put(a, vec![1; 4]);
381 assert!(cache.get(&a).is_some());
382 cache.put(b, vec![2; 4]);
383
384 let metrics = cache.metrics();
385 assert_eq!(metrics.lookups, 2);
386 assert_eq!(metrics.hits, 1);
387 assert_eq!(metrics.misses, 1);
388 assert_eq!(metrics.puts, 2);
389 assert_eq!(metrics.evictions, 1);
390 assert_eq!(metrics.cached_bytes, 4);
391 assert_eq!(metrics.entries, 1);
392 assert_eq!(metrics.hit_rate_ppm(), 500_000);
393 }
394
395 #[test]
396 fn poisoned_cache_shard_is_not_silently_recovered() {
397 let cache = Arc::new(InMemoryPipelineCache::new());
398 let poisoned = Arc::clone(&cache);
399 let _ = std::thread::spawn(move || {
400 let _guard = InMemoryPipelineCache::lock_shard(&poisoned.shards[0]);
401 panic!("poison in-memory pipeline cache shard");
402 })
403 .join();
404
405 let panic = std::panic::catch_unwind(|| {
406 let _ = cache.len();
407 })
408 .expect_err("poisoned pipeline cache shard must panic instead of recovering");
409 let message = panic
410 .downcast_ref::<String>()
411 .map(String::as_str)
412 .or_else(|| panic.downcast_ref::<&'static str>().copied())
413 .unwrap_or("<non-string panic>");
414 assert!(
415 message.contains("pipeline cache shard lock was poisoned"),
416 "{message}"
417 );
418 }
419}