1use std::sync::{Arc, Mutex, MutexGuard};
5
6use rustc_hash::FxHashMap;
7
8use super::fingerprint::PipelineFingerprint;
9use super::metrics::{PipelineCacheCounters, PipelineCacheMetrics};
10use super::store::PipelineCacheStore;
11
12#[derive(Debug)]
16pub struct InMemoryPipelineCache {
17 shards: [Mutex<InMemoryCacheShard>; Self::SHARD_COUNT],
18 max_entries_per_shard: usize,
19 max_bytes_per_shard: usize,
20 metrics: PipelineCacheCounters,
21}
22
23impl InMemoryPipelineCache {
24 pub(super) const SHARD_COUNT: usize = 256;
25 pub(super) const MAX_ENTRIES_PER_SHARD: usize = 256;
26 pub(super) const MAX_BYTES_PER_SHARD: usize = 16 * 1024 * 1024;
27
28 #[inline]
29 fn shard_index(fp: &PipelineFingerprint) -> usize {
30 usize::from(fp.0[0]) % Self::SHARD_COUNT
31 }
32
33 fn lock_shard(shard: &Mutex<InMemoryCacheShard>) -> MutexGuard<'_, InMemoryCacheShard> {
34 shard
40 .lock()
41 .unwrap_or_else(|_| panic!("pipeline cache shard lock was poisoned"))
42 }
43
44 #[must_use]
46 pub fn new() -> Self {
47 Self::default()
48 }
49
50 #[must_use]
55 pub fn with_limits(max_entries_per_shard: usize, max_bytes_per_shard: usize) -> Self {
56 Self {
57 shards: std::array::from_fn(|_| Mutex::new(InMemoryCacheShard::default())),
58 max_entries_per_shard,
59 max_bytes_per_shard,
60 metrics: PipelineCacheCounters::default(),
61 }
62 }
63
64 pub fn len(&self) -> usize {
66 self.shards
67 .iter()
68 .map(|s| Self::lock_shard(s).entries.len())
69 .fold(0usize, |acc, value| {
70 cache_usize_add(
71 acc,
72 value,
73 "entry count",
74 "shard cache metrics before snapshotting",
75 )
76 })
77 }
78
79 pub fn cached_bytes(&self) -> usize {
81 self.shards
82 .iter()
83 .map(|s| Self::lock_shard(s).bytes)
84 .fold(0usize, |acc, value| {
85 cache_usize_add(
86 acc,
87 value,
88 "byte count",
89 "shard cache metrics before snapshotting",
90 )
91 })
92 }
93
94 pub fn is_empty(&self) -> bool {
96 self.shards
97 .iter()
98 .all(|s| Self::lock_shard(s).entries.is_empty())
99 }
100
101 #[must_use]
104 pub fn eviction_reports(&self) -> Vec<InMemoryEvictionReport> {
105 let mut reports = Vec::new();
106 for shard in &self.shards {
107 if let Some(report) = Self::lock_shard(shard).last_eviction {
108 reports.push(report);
109 }
110 }
111 reports
112 }
113}
114
115impl Default for InMemoryPipelineCache {
116 fn default() -> Self {
117 Self::with_limits(Self::MAX_ENTRIES_PER_SHARD, Self::MAX_BYTES_PER_SHARD)
118 }
119}
120
121fn cache_usize_add(lhs: usize, rhs: usize, _label: &'static str, _fix: &'static str) -> usize {
122 lhs.saturating_add(rhs)
123}
124
125fn cache_usize_sub(lhs: usize, rhs: usize, _label: &'static str, _fix: &'static str) -> usize {
126 lhs.saturating_sub(rhs)
127}
128
129fn cache_u64_add(lhs: u64, rhs: u64, _label: &'static str, _fix: &'static str) -> u64 {
130 lhs.saturating_add(rhs)
131}
132
133fn cache_u64_sub(lhs: u64, rhs: u64, _label: &'static str, _fix: &'static str) -> u64 {
134 lhs.saturating_sub(rhs)
135}
136
137fn cache_usize_to_u64(value: usize, _label: &'static str, _fix: &'static str) -> u64 {
138 match u64::try_from(value) {
139 Ok(value) => value,
140 Err(_) => u64::MAX,
141 }
142}
143
144#[derive(Debug, Default)]
145struct InMemoryCacheShard {
146 entries: FxHashMap<PipelineFingerprint, InMemoryCacheEntry>,
147 bytes: usize,
148 clock: u64,
149 last_eviction: Option<InMemoryEvictionReport>,
150}
151
152impl InMemoryCacheShard {
153 fn next_tick(&mut self) -> u64 {
154 self.clock = cache_u64_add(
155 self.clock,
156 1,
157 "shard clock",
158 "recreate the cache before LRU timestamps wrap",
159 );
160 self.clock
161 }
162
163 fn evict_to_limits(
164 &mut self,
165 max_entries: usize,
166 max_bytes: usize,
167 ) -> Option<InMemoryEvictionReport> {
168 let mut report = None;
169 while self.entries.len() > max_entries || self.bytes > max_bytes {
170 let reason = InMemoryEvictionReason::from_limits(
171 self.entries.len(),
172 self.bytes,
173 max_entries,
174 max_bytes,
175 );
176 let Some(victim) = self
177 .entries
178 .iter()
179 .min_by_key(|(_, entry)| entry.last_used)
180 .map(|(fp, _)| *fp)
181 else {
182 self.bytes = 0;
183 self.last_eviction = report;
184 return report;
185 };
186 if let Some(removed) = self.entries.remove(&victim) {
187 self.bytes = cache_usize_sub(
188 self.bytes,
189 removed.bytes,
190 "byte accounting during eviction",
191 "rebuild the cache",
192 );
193 record_eviction_report(&mut report, reason, self.clock, &removed);
194 }
195 }
196 if report.is_some() {
197 self.last_eviction = report;
198 }
199 report
200 }
201}
202
203#[derive(Debug)]
204struct InMemoryCacheEntry {
205 artifact: Arc<Vec<u8>>,
206 bytes: usize,
207 last_used: u64,
208}
209
210#[derive(Debug, Clone, Copy, PartialEq, Eq)]
212pub enum InMemoryEvictionReason {
213 EntryLimit,
215 ByteLimit,
217 EntryAndByteLimit,
219 RejectedPut,
221}
222
223impl InMemoryEvictionReason {
224 fn from_limits(entries: usize, bytes: usize, max_entries: usize, max_bytes: usize) -> Self {
225 match (entries > max_entries, bytes > max_bytes) {
226 (true, true) => Self::EntryAndByteLimit,
227 (true, false) => Self::EntryLimit,
228 (false, true) => Self::ByteLimit,
229 (false, false) => Self::EntryLimit,
230 }
231 }
232}
233
234#[derive(Debug, Clone, Copy, PartialEq, Eq)]
236pub struct InMemoryEvictionReport {
237 pub entries: u64,
239 pub bytes: u64,
241 pub max_age_ticks: u64,
243 pub reason: InMemoryEvictionReason,
245}
246
247fn record_eviction_report(
248 report: &mut Option<InMemoryEvictionReport>,
249 reason: InMemoryEvictionReason,
250 now: u64,
251 removed: &InMemoryCacheEntry,
252) {
253 let removed_bytes = cache_usize_to_u64(
254 removed.bytes,
255 "evicted byte count",
256 "shard cache artifacts before eviction",
257 );
258 let age = cache_u64_sub(
259 now,
260 removed.last_used,
261 "evicted entry age",
262 "rebuild the cache LRU clock",
263 );
264 match report {
265 Some(report) => {
266 report.entries = cache_u64_add(
267 report.entries,
268 1,
269 "eviction count",
270 "shard cache eviction work",
271 );
272 report.bytes = cache_u64_add(
273 report.bytes,
274 removed_bytes,
275 "evicted byte count",
276 "shard cache eviction work",
277 );
278 report.max_age_ticks = report.max_age_ticks.max(age);
279 }
280 None => {
281 *report = Some(InMemoryEvictionReport {
282 entries: 1,
283 bytes: removed_bytes,
284 max_age_ticks: age,
285 reason,
286 });
287 }
288 }
289}
290
291impl PipelineCacheStore for InMemoryPipelineCache {
292 fn get(&self, fp: &PipelineFingerprint) -> Option<Vec<u8>> {
293 self.get_arc(fp).map(|artifact| (*artifact).clone())
294 }
295
296 fn get_arc(&self, fp: &PipelineFingerprint) -> Option<Arc<Vec<u8>>> {
299 PipelineCacheCounters::increment(&self.metrics.lookups, "lookups");
300 let i = Self::shard_index(fp);
301 let mut shard = Self::lock_shard(&self.shards[i]);
302 let tick = shard.next_tick();
303 let Some(entry) = shard.entries.get_mut(fp) else {
304 PipelineCacheCounters::increment(&self.metrics.misses, "misses");
305 return None;
306 };
307 entry.last_used = tick;
308 PipelineCacheCounters::increment(&self.metrics.hits, "hits");
309 Some(Arc::clone(&entry.artifact))
310 }
311
312 fn put(&self, fp: PipelineFingerprint, artifact: Vec<u8>) {
313 let i = Self::shard_index(&fp);
314 let mut shard = Self::lock_shard(&self.shards[i]);
315 let bytes = artifact.len();
316 if self.max_entries_per_shard == 0
317 || self.max_bytes_per_shard == 0
318 || bytes > self.max_bytes_per_shard
319 {
320 PipelineCacheCounters::increment(&self.metrics.rejected_puts, "rejected puts");
321 if let Some(removed) = shard.entries.remove(&fp) {
322 let tick = shard.next_tick();
323 shard.bytes = cache_usize_sub(
324 shard.bytes,
325 removed.bytes,
326 "byte accounting while rejecting put",
327 "rebuild the cache",
328 );
329 let mut report = None;
330 record_eviction_report(
331 &mut report,
332 InMemoryEvictionReason::RejectedPut,
333 tick,
334 &removed,
335 );
336 shard.last_eviction = report;
337 PipelineCacheCounters::increment(&self.metrics.evictions, "evictions");
338 PipelineCacheCounters::add(
339 &self.metrics.evicted_bytes,
340 cache_usize_to_u64(
341 removed.bytes,
342 "evicted byte count",
343 "shard cache artifacts before eviction",
344 ),
345 "evicted bytes",
346 );
347 }
348 return;
349 }
350
351 if let Some(existing) = shard.entries.remove(&fp) {
352 shard.bytes = cache_usize_sub(
353 shard.bytes,
354 existing.bytes,
355 "byte accounting while replacing entry",
356 "rebuild the cache",
357 );
358 }
359 let tick = shard.next_tick();
360 shard.bytes = cache_usize_add(
361 shard.bytes,
362 bytes,
363 "byte accounting while inserting entry",
364 "lower per-shard cache byte budget",
365 );
366 shard.entries.insert(
367 fp,
368 InMemoryCacheEntry {
369 artifact: Arc::new(artifact),
370 bytes,
371 last_used: tick,
372 },
373 );
374 PipelineCacheCounters::increment(&self.metrics.puts, "puts");
375 if let Some(report) =
376 shard.evict_to_limits(self.max_entries_per_shard, self.max_bytes_per_shard)
377 {
378 PipelineCacheCounters::add(&self.metrics.evictions, report.entries, "evictions");
379 PipelineCacheCounters::add(&self.metrics.evicted_bytes, report.bytes, "evicted bytes");
380 }
381 }
382
383 fn metrics(&self) -> PipelineCacheMetrics {
384 self.metrics
385 .snapshot(
386 cache_usize_to_u64(
387 self.cached_bytes(),
388 "retained byte snapshot",
389 "shard cache metrics before snapshotting",
390 ),
391 cache_usize_to_u64(
392 self.len(),
393 "entry count snapshot",
394 "shard cache metrics before snapshotting",
395 ),
396 )
397 }
398}
399
400#[cfg(test)]
401mod tests {
402 use super::*;
403 use crate::pipeline_cache::test_helpers::tiny_program;
404
405 #[test]
406 fn in_memory_cache_roundtrip() {
407 let cache = InMemoryPipelineCache::new();
408 let fp = PipelineFingerprint::of(&tiny_program());
409 assert!(cache.get(&fp).is_none());
410 cache.put(fp, b"target-bytes".to_vec());
411 assert_eq!(cache.get(&fp).unwrap(), b"target-bytes".to_vec());
412 assert_eq!(cache.len(), 1);
413 }
414
415 #[test]
416 fn in_memory_cache_caps_each_shard() {
417 let cache = InMemoryPipelineCache::new();
418 for i in 0..(InMemoryPipelineCache::MAX_ENTRIES_PER_SHARD + 17) {
419 let mut bytes = [0_u8; 32];
420 bytes[1..9].copy_from_slice(&(i as u64).to_le_bytes());
421 cache.put(PipelineFingerprint(bytes), vec![i as u8]);
422 }
423 assert_eq!(cache.len(), InMemoryPipelineCache::MAX_ENTRIES_PER_SHARD);
424 }
425
426 #[test]
427 fn in_memory_cache_evicts_least_recently_used_entry() {
428 let cache = InMemoryPipelineCache::with_limits(2, 1024);
429 let a = PipelineFingerprint([0; 32]);
430 let mut b_bytes = [0; 32];
431 b_bytes[1] = 1;
432 let b = PipelineFingerprint(b_bytes);
433 let mut c_bytes = [0; 32];
434 c_bytes[1] = 2;
435 let c = PipelineFingerprint(c_bytes);
436
437 cache.put(a, b"a".to_vec());
438 cache.put(b, b"b".to_vec());
439 assert_eq!(cache.get(&a).unwrap(), b"a".to_vec());
440 cache.put(c, b"c".to_vec());
441
442 assert_eq!(cache.get(&a).unwrap(), b"a".to_vec());
443 assert!(cache.get(&b).is_none());
444 assert_eq!(cache.get(&c).unwrap(), b"c".to_vec());
445 }
446
447 #[test]
448 fn in_memory_cache_enforces_byte_budget() {
449 let cache = InMemoryPipelineCache::with_limits(8, 10);
450 let a = PipelineFingerprint([0; 32]);
451 let mut b_bytes = [0; 32];
452 b_bytes[1] = 1;
453 let b = PipelineFingerprint(b_bytes);
454 let mut too_large_bytes = [0; 32];
455 too_large_bytes[1] = 2;
456 let too_large = PipelineFingerprint(too_large_bytes);
457
458 cache.put(a, vec![1; 6]);
459 cache.put(b, vec![2; 6]);
460 assert!(cache.get(&a).is_none());
461 assert_eq!(cache.get(&b).unwrap(), vec![2; 6]);
462 assert_eq!(cache.cached_bytes(), 6);
463
464 cache.put(too_large, vec![3; 11]);
465 assert!(cache.get(&too_large).is_none());
466 assert_eq!(cache.cached_bytes(), 6);
467 }
468
469 #[test]
470 fn in_memory_cache_metrics_track_hits_misses_and_evictions() {
471 let cache = InMemoryPipelineCache::with_limits(1, 8);
472 let a = PipelineFingerprint([0; 32]);
473 let mut b_bytes = [0; 32];
474 b_bytes[1] = 1;
475 let b = PipelineFingerprint(b_bytes);
476
477 assert!(cache.get(&a).is_none());
478 cache.put(a, vec![1; 4]);
479 assert!(cache.get(&a).is_some());
480 cache.put(b, vec![2; 4]);
481
482 let metrics = cache.metrics();
483 assert_eq!(metrics.lookups, 2);
484 assert_eq!(metrics.hits, 1);
485 assert_eq!(metrics.misses, 1);
486 assert_eq!(metrics.puts, 2);
487 assert_eq!(metrics.evictions, 1);
488 assert_eq!(metrics.cached_bytes, 4);
489 assert_eq!(metrics.entries, 1);
490 assert_eq!(metrics.hit_rate_ppm(), 500_000);
491 }
492
493 #[test]
494 fn in_memory_cache_eviction_report_records_reason_entries_bytes_and_age() {
495 let cache = InMemoryPipelineCache::with_limits(1, 8);
496 let a = PipelineFingerprint([0; 32]);
497 let mut b_bytes = [0; 32];
498 b_bytes[1] = 1;
499 let b = PipelineFingerprint(b_bytes);
500
501 cache.put(a, vec![1; 4]);
502 assert!(cache.get(&a).is_some());
503 cache.put(b, vec![2; 4]);
504
505 let reports = cache.eviction_reports();
506 assert_eq!(reports.len(), 1);
507 let report = reports[0];
508 assert_eq!(report.reason, InMemoryEvictionReason::EntryLimit);
509 assert_eq!(report.entries, 1);
510 assert_eq!(report.bytes, 4);
511 assert!(
512 report.max_age_ticks > 0,
513 "Fix: eviction reports must expose LRU age, got {report:?}"
514 );
515 }
516
517 #[test]
518 fn rejected_oversize_put_records_eviction_reason_for_replaced_entry() {
519 let cache = InMemoryPipelineCache::with_limits(8, 8);
520 let fp = PipelineFingerprint([0; 32]);
521
522 cache.put(fp, vec![1; 4]);
523 cache.put(fp, vec![2; 9]);
524
525 assert!(cache.get(&fp).is_none());
526 let reports = cache.eviction_reports();
527 assert_eq!(reports.len(), 1);
528 let report = reports[0];
529 assert_eq!(report.reason, InMemoryEvictionReason::RejectedPut);
530 assert_eq!(report.entries, 1);
531 assert_eq!(report.bytes, 4);
532 }
533
534 #[test]
535 fn poisoned_cache_shard_is_not_silently_recovered() {
536 let cache = Arc::new(InMemoryPipelineCache::new());
537 let poisoned = Arc::clone(&cache);
538 let _ = std::thread::spawn(move || {
539 let _guard = InMemoryPipelineCache::lock_shard(&poisoned.shards[0]);
540 panic!("poison in-memory pipeline cache shard");
541 })
542 .join();
543
544 let panic = std::panic::catch_unwind(|| {
545 let _ = cache.len();
546 })
547 .expect_err("poisoned pipeline cache shard must panic instead of recovering");
548 let message = panic
549 .downcast_ref::<String>()
550 .map(String::as_str)
551 .or_else(|| panic.downcast_ref::<&'static str>().copied())
552 .unwrap_or("<non-string panic>");
553 assert!(
554 message.contains("pipeline cache shard lock was poisoned"),
555 "{message}"
556 );
557 }
558}