vyre_runtime/pipeline_cache/
metrics.rs1use std::sync::atomic::{AtomicU64, Ordering};
5
6#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
8pub struct PipelineCacheMetrics {
9 pub lookups: u64,
11 pub hits: u64,
13 pub misses: u64,
15 pub puts: u64,
17 pub rejected_puts: u64,
19 pub evictions: u64,
21 pub evicted_bytes: u64,
23 pub flushes: u64,
25 pub flush_errors: u64,
27 pub cached_bytes: u64,
29 pub entries: u64,
31}
32
33#[derive(Debug, Clone, PartialEq, Eq)]
35pub struct PipelineCacheMetricError {
36 field: &'static str,
37 message: String,
38}
39
40impl PipelineCacheMetricError {
41 fn new(field: &'static str, message: impl Into<String>) -> Self {
42 Self {
43 field,
44 message: message.into(),
45 }
46 }
47
48 #[must_use]
50 pub const fn field(&self) -> &'static str {
51 self.field
52 }
53
54 #[must_use]
56 pub fn message(&self) -> &str {
57 &self.message
58 }
59}
60
61impl std::fmt::Display for PipelineCacheMetricError {
62 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 write!(formatter, "{}", self.message)
64 }
65}
66
67impl std::error::Error for PipelineCacheMetricError {}
68
69impl PipelineCacheMetrics {
70 #[must_use]
72 pub fn hit_rate_ppm(&self) -> u32 {
73 self.try_hit_rate_ppm().unwrap_or(u32::MAX)
74 }
75
76 pub fn try_hit_rate_ppm(&self) -> Result<u32, PipelineCacheMetricError> {
83 if self.lookups == 0 {
84 return Ok(0);
85 }
86 let numerator =
87 self.hits
88 .checked_mul(1_000_000)
89 .ok_or_else(|| PipelineCacheMetricError::new(
90 "hit_rate_ppm",
91 "pipeline cache hit-rate numerator overflowed u64. Fix: snapshot and reset cache metrics before counters saturate.",
92 ))?;
93 let value = numerator / self.lookups;
94 u32::try_from(value).map_err(|source| {
95 PipelineCacheMetricError::new(
96 "hit_rate_ppm",
97 format!(
98 "pipeline cache hit-rate ppm cannot fit u32: {source}. Fix: snapshot and reset cache metrics before counters saturate."
99 ),
100 )
101 })
102 }
103
104 pub(super) fn checked_add(self, rhs: Self) -> Self {
105 self.saturating_add(rhs)
106 }
107
108 pub(super) fn try_checked_add(self, rhs: Self) -> Result<Self, PipelineCacheMetricError> {
109 Ok(Self {
110 lookups: try_metric_add(self.lookups, rhs.lookups, "lookups")?,
111 hits: try_metric_add(self.hits, rhs.hits, "hits")?,
112 misses: try_metric_add(self.misses, rhs.misses, "misses")?,
113 puts: try_metric_add(self.puts, rhs.puts, "puts")?,
114 rejected_puts: try_metric_add(self.rejected_puts, rhs.rejected_puts, "rejected puts")?,
115 evictions: try_metric_add(self.evictions, rhs.evictions, "evictions")?,
116 evicted_bytes: try_metric_add(self.evicted_bytes, rhs.evicted_bytes, "evicted bytes")?,
117 flushes: try_metric_add(self.flushes, rhs.flushes, "flushes")?,
118 flush_errors: try_metric_add(self.flush_errors, rhs.flush_errors, "flush errors")?,
119 cached_bytes: try_metric_add(self.cached_bytes, rhs.cached_bytes, "cached bytes")?,
120 entries: try_metric_add(self.entries, rhs.entries, "entries")?,
121 })
122 }
123
124 fn saturating_add(self, rhs: Self) -> Self {
125 Self {
126 lookups: self.lookups.saturating_add(rhs.lookups),
127 hits: self.hits.saturating_add(rhs.hits),
128 misses: self.misses.saturating_add(rhs.misses),
129 puts: self.puts.saturating_add(rhs.puts),
130 rejected_puts: self.rejected_puts.saturating_add(rhs.rejected_puts),
131 evictions: self.evictions.saturating_add(rhs.evictions),
132 evicted_bytes: self.evicted_bytes.saturating_add(rhs.evicted_bytes),
133 flushes: self.flushes.saturating_add(rhs.flushes),
134 flush_errors: self.flush_errors.saturating_add(rhs.flush_errors),
135 cached_bytes: self.cached_bytes.saturating_add(rhs.cached_bytes),
136 entries: self.entries.saturating_add(rhs.entries),
137 }
138 }
139}
140
141fn try_metric_add(
142 lhs: u64,
143 rhs: u64,
144 label: &'static str,
145) -> Result<u64, PipelineCacheMetricError> {
146 lhs.checked_add(rhs).ok_or_else(|| {
147 PipelineCacheMetricError::new(
148 label,
149 format!(
150 "pipeline cache metric {label} overflowed u64. Fix: reset or shard pipeline cache metrics before aggregation."
151 ),
152 )
153 })
154}
155
156#[derive(Debug, Default)]
157pub(super) struct PipelineCacheCounters {
158 pub(super) lookups: AtomicU64,
159 pub(super) hits: AtomicU64,
160 pub(super) misses: AtomicU64,
161 pub(super) puts: AtomicU64,
162 pub(super) rejected_puts: AtomicU64,
163 pub(super) evictions: AtomicU64,
164 pub(super) evicted_bytes: AtomicU64,
165 pub(super) flushes: AtomicU64,
166 pub(super) flush_errors: AtomicU64,
167}
168
169impl PipelineCacheCounters {
170 pub(super) fn increment(counter: &AtomicU64, label: &'static str) {
171 Self::add(counter, 1, label);
172 }
173
174 pub(super) fn add(counter: &AtomicU64, value: u64, label: &'static str) {
175 if let Err(error) = Self::try_add(counter, value, label) {
176 tracing::warn!(error = %error, label, "pipeline cache counter saturated");
177 counter.store(u64::MAX, Ordering::Relaxed);
178 }
179 }
180
181 pub(super) fn try_add(
182 counter: &AtomicU64,
183 value: u64,
184 label: &'static str,
185 ) -> Result<(), PipelineCacheMetricError> {
186 let mut current = counter.load(Ordering::Relaxed);
187 loop {
188 let Some(next) = current.checked_add(value) else {
189 return Err(PipelineCacheMetricError::new(
190 label,
191 format!(
192 "pipeline cache counter {label} overflowed u64. Fix: snapshot and reset cache metrics before counters saturate."
193 ),
194 ));
195 };
196 match counter.compare_exchange_weak(current, next, Ordering::Relaxed, Ordering::Relaxed)
197 {
198 Ok(_) => return Ok(()),
199 Err(observed) => current = observed,
200 }
201 }
202 }
203
204 pub(super) fn snapshot(&self, cached_bytes: u64, entries: u64) -> PipelineCacheMetrics {
205 PipelineCacheMetrics {
206 lookups: self.lookups.load(Ordering::Relaxed),
207 hits: self.hits.load(Ordering::Relaxed),
208 misses: self.misses.load(Ordering::Relaxed),
209 puts: self.puts.load(Ordering::Relaxed),
210 rejected_puts: self.rejected_puts.load(Ordering::Relaxed),
211 evictions: self.evictions.load(Ordering::Relaxed),
212 evicted_bytes: self.evicted_bytes.load(Ordering::Relaxed),
213 flushes: self.flushes.load(Ordering::Relaxed),
214 flush_errors: self.flush_errors.load(Ordering::Relaxed),
215 cached_bytes,
216 entries,
217 }
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use std::sync::atomic::AtomicU64;
224
225 use super::{PipelineCacheCounters, PipelineCacheMetrics};
226
227 #[test]
228 fn pipeline_cache_metrics_generated_hit_rates_are_exact_ppm() {
229 for hits in 0..=1024_u64 {
230 let metrics = PipelineCacheMetrics {
231 lookups: 2048,
232 hits,
233 ..PipelineCacheMetrics::default()
234 };
235 assert_eq!(metrics.hit_rate_ppm(), ((hits * 1_000_000) / 2048) as u32);
236 }
237 }
238
239 #[test]
240 fn pipeline_cache_metric_try_aggregation_rejects_overflow_without_panic() {
241 let lhs = PipelineCacheMetrics {
242 cached_bytes: u64::MAX,
243 ..PipelineCacheMetrics::default()
244 };
245 let rhs = PipelineCacheMetrics {
246 cached_bytes: 1,
247 ..PipelineCacheMetrics::default()
248 };
249
250 let error = lhs
251 .try_checked_add(rhs)
252 .expect_err("Fix: fallible pipeline cache metric aggregation must reject overflow");
253 assert_eq!(error.field(), "cached bytes");
254 assert!(error.message().contains("Fix:"));
255 }
256
257 #[test]
258 fn pipeline_cache_metric_compat_aggregation_saturates_on_overflow() {
259 let lhs = PipelineCacheMetrics {
260 cached_bytes: u64::MAX,
261 hits: 41,
262 ..PipelineCacheMetrics::default()
263 };
264 let rhs = PipelineCacheMetrics {
265 cached_bytes: 1,
266 hits: 1,
267 ..PipelineCacheMetrics::default()
268 };
269
270 let metrics = lhs.checked_add(rhs);
271
272 assert_eq!(metrics.cached_bytes, u64::MAX);
273 assert_eq!(metrics.hits, 42);
274 }
275
276 #[test]
277 fn pipeline_cache_counter_add_uses_checked_shared_arithmetic() {
278 let counter = AtomicU64::new(41);
279
280 PipelineCacheCounters::add(&counter, 1, "generated counter");
281
282 assert_eq!(counter.load(std::sync::atomic::Ordering::Relaxed), 42);
283 }
284
285 #[test]
286 fn pipeline_cache_counter_try_add_rejects_overflow_without_panic() {
287 let counter = AtomicU64::new(u64::MAX);
288
289 let error = PipelineCacheCounters::try_add(&counter, 1, "generated counter")
290 .expect_err("Fix: fallible counter add must reject overflow");
291
292 assert_eq!(error.field(), "generated counter");
293 assert!(error.message().contains("Fix:"));
294 assert_eq!(counter.load(std::sync::atomic::Ordering::Relaxed), u64::MAX);
295 }
296
297 #[test]
298 fn pipeline_cache_counter_compat_add_saturates_on_overflow() {
299 let counter = AtomicU64::new(u64::MAX);
300
301 PipelineCacheCounters::add(&counter, 1, "generated counter");
302
303 assert_eq!(counter.load(std::sync::atomic::Ordering::Relaxed), u64::MAX);
304 }
305
306 #[test]
307 fn pipeline_cache_hit_rate_try_path_rejects_overflow_without_panic() {
308 let metrics = PipelineCacheMetrics {
309 lookups: 1,
310 hits: u64::MAX,
311 ..PipelineCacheMetrics::default()
312 };
313
314 let error = metrics
315 .try_hit_rate_ppm()
316 .expect_err("Fix: fallible hit-rate path must reject numerator overflow");
317
318 assert_eq!(error.field(), "hit_rate_ppm");
319 assert!(error.message().contains("Fix:"));
320 assert_eq!(metrics.hit_rate_ppm(), u32::MAX);
321 }
322}