vyre_runtime/pipeline_cache/
metrics.rs1use std::sync::atomic::{AtomicU64, Ordering};
5
6use vyre_driver::accounting::{checked_add_u64_lazy, checked_mul_u64_lazy};
7
8#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
10pub struct PipelineCacheMetrics {
11 pub lookups: u64,
13 pub hits: u64,
15 pub misses: u64,
17 pub puts: u64,
19 pub rejected_puts: u64,
21 pub evictions: u64,
23 pub evicted_bytes: u64,
25 pub flushes: u64,
27 pub flush_errors: u64,
29 pub cached_bytes: u64,
31 pub entries: u64,
33}
34
35impl PipelineCacheMetrics {
36 #[must_use]
38 pub fn hit_rate_ppm(&self) -> u32 {
39 if self.lookups == 0 {
40 return 0;
41 }
42 let numerator = checked_mul_u64_lazy(self.hits, 1_000_000, || {
43 "pipeline cache hit-rate numerator overflowed u64. Fix: reset cache metrics before counters wrap."
44 })
45 .unwrap_or_else(|message| panic!("{message}"));
46 let value = numerator / self.lookups;
47 if value > u64::from(u32::MAX) {
48 panic!("pipeline cache hit-rate ppm cannot fit u32. Fix: reset cache metrics before counters wrap.");
49 }
50 u32::try_from(value).unwrap_or_else(|source| {
51 panic!(
52 "pipeline cache hit-rate ppm cannot fit u32 after range check: {source}. Fix: reset cache metrics before counters wrap."
53 )
54 })
55 }
56
57 pub(super) fn checked_add(self, rhs: Self) -> Self {
58 Self {
59 lookups: checked_metric_add(self.lookups, rhs.lookups, "lookups"),
60 hits: checked_metric_add(self.hits, rhs.hits, "hits"),
61 misses: checked_metric_add(self.misses, rhs.misses, "misses"),
62 puts: checked_metric_add(self.puts, rhs.puts, "puts"),
63 rejected_puts: checked_metric_add(
64 self.rejected_puts,
65 rhs.rejected_puts,
66 "rejected puts",
67 ),
68 evictions: checked_metric_add(self.evictions, rhs.evictions, "evictions"),
69 evicted_bytes: checked_metric_add(
70 self.evicted_bytes,
71 rhs.evicted_bytes,
72 "evicted bytes",
73 ),
74 flushes: checked_metric_add(self.flushes, rhs.flushes, "flushes"),
75 flush_errors: checked_metric_add(self.flush_errors, rhs.flush_errors, "flush errors"),
76 cached_bytes: checked_metric_add(self.cached_bytes, rhs.cached_bytes, "cached bytes"),
77 entries: checked_metric_add(self.entries, rhs.entries, "entries"),
78 }
79 }
80}
81
82fn checked_metric_add(lhs: u64, rhs: u64, label: &'static str) -> u64 {
83 checked_add_u64_lazy(lhs, rhs, || {
84 format!(
85 "pipeline cache metric {label} overflowed u64. Fix: reset or shard pipeline cache metrics before aggregation."
86 )
87 })
88 .unwrap_or_else(|message| panic!("{message}"))
89}
90
91#[derive(Debug, Default)]
92pub(super) struct PipelineCacheCounters {
93 pub(super) lookups: AtomicU64,
94 pub(super) hits: AtomicU64,
95 pub(super) misses: AtomicU64,
96 pub(super) puts: AtomicU64,
97 pub(super) rejected_puts: AtomicU64,
98 pub(super) evictions: AtomicU64,
99 pub(super) evicted_bytes: AtomicU64,
100 pub(super) flushes: AtomicU64,
101 pub(super) flush_errors: AtomicU64,
102}
103
104impl PipelineCacheCounters {
105 pub(super) fn increment(counter: &AtomicU64, label: &'static str) {
106 Self::add(counter, 1, label);
107 }
108
109 pub(super) fn add(counter: &AtomicU64, value: u64, label: &'static str) {
110 vyre_driver::accounting::checked_atomic_add_u64_with_order(
111 counter,
112 value,
113 Ordering::Relaxed,
114 Ordering::Relaxed,
115 Ordering::Relaxed,
116 |_, _| {
117 format!(
118 "pipeline cache counter {label} overflowed u64. Fix: reset cache metrics before counters wrap."
119 )
120 },
121 )
122 .unwrap_or_else(|message| panic!("{message}"));
123 }
124
125 pub(super) fn snapshot(&self, cached_bytes: u64, entries: u64) -> PipelineCacheMetrics {
126 PipelineCacheMetrics {
127 lookups: self.lookups.load(Ordering::Relaxed),
128 hits: self.hits.load(Ordering::Relaxed),
129 misses: self.misses.load(Ordering::Relaxed),
130 puts: self.puts.load(Ordering::Relaxed),
131 rejected_puts: self.rejected_puts.load(Ordering::Relaxed),
132 evictions: self.evictions.load(Ordering::Relaxed),
133 evicted_bytes: self.evicted_bytes.load(Ordering::Relaxed),
134 flushes: self.flushes.load(Ordering::Relaxed),
135 flush_errors: self.flush_errors.load(Ordering::Relaxed),
136 cached_bytes,
137 entries,
138 }
139 }
140}
141
142#[cfg(test)]
143mod tests {
144 use std::sync::atomic::AtomicU64;
145
146 use super::{PipelineCacheCounters, PipelineCacheMetrics};
147
148 #[test]
149 fn pipeline_cache_metrics_generated_hit_rates_are_exact_ppm() {
150 for hits in 0..=1024_u64 {
151 let metrics = PipelineCacheMetrics {
152 lookups: 2048,
153 hits,
154 ..PipelineCacheMetrics::default()
155 };
156 assert_eq!(metrics.hit_rate_ppm(), ((hits * 1_000_000) / 2048) as u32);
157 }
158 }
159
160 #[test]
161 #[should_panic(expected = "pipeline cache metric cached bytes overflowed u64")]
162 fn pipeline_cache_metric_aggregation_rejects_overflow() {
163 let lhs = PipelineCacheMetrics {
164 cached_bytes: u64::MAX,
165 ..PipelineCacheMetrics::default()
166 };
167 let rhs = PipelineCacheMetrics {
168 cached_bytes: 1,
169 ..PipelineCacheMetrics::default()
170 };
171
172 let _ = lhs.checked_add(rhs);
173 }
174
175 #[test]
176 fn pipeline_cache_counter_add_uses_checked_shared_arithmetic() {
177 let counter = AtomicU64::new(41);
178
179 PipelineCacheCounters::add(&counter, 1, "generated counter");
180
181 assert_eq!(counter.load(std::sync::atomic::Ordering::Relaxed), 42);
182 }
183}