Skip to main content

vyre_runtime/pipeline_cache/
metrics.rs

1//! Pipeline-cache instrumentation: the public snapshot type and the
2//! internal atomic counter struct shared by every concrete backend.
3
4use std::sync::atomic::{AtomicU64, Ordering};
5
6/// Pipeline-cache instrumentation counters.
7#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
8pub struct PipelineCacheMetrics {
9    /// Lookup attempts.
10    pub lookups: u64,
11    /// Successful lookups.
12    pub hits: u64,
13    /// Failed lookups.
14    pub misses: u64,
15    /// Accepted put attempts.
16    pub puts: u64,
17    /// Rejected put attempts, usually because a blob exceeds the byte budget.
18    pub rejected_puts: u64,
19    /// Entries evicted by capacity or byte-budget pressure.
20    pub evictions: u64,
21    /// Bytes removed by eviction.
22    pub evicted_bytes: u64,
23    /// Explicit flush attempts.
24    pub flushes: u64,
25    /// Explicit flush failures.
26    pub flush_errors: u64,
27    /// Current retained bytes when the backend can report them cheaply.
28    pub cached_bytes: u64,
29    /// Current retained entries when the backend can report them cheaply.
30    pub entries: u64,
31}
32
33/// Pipeline-cache metric arithmetic failure.
34#[derive(Debug, Clone, PartialEq, Eq)]
35pub struct PipelineCacheMetricError {
36    field: &'static str,
37    message: String,
38}
39
40impl PipelineCacheMetricError {
41    fn new(field: &'static str, message: impl Into<String>) -> Self {
42        Self {
43            field,
44            message: message.into(),
45        }
46    }
47
48    /// Metric field that failed arithmetic.
49    #[must_use]
50    pub const fn field(&self) -> &'static str {
51        self.field
52    }
53
54    /// Actionable failure text.
55    #[must_use]
56    pub fn message(&self) -> &str {
57        &self.message
58    }
59}
60
61impl std::fmt::Display for PipelineCacheMetricError {
62    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        write!(formatter, "{}", self.message)
64    }
65}
66
67impl std::error::Error for PipelineCacheMetricError {}
68
69impl PipelineCacheMetrics {
70    /// Cache-hit rate in parts per million.
71    #[must_use]
72    pub fn hit_rate_ppm(&self) -> u32 {
73        self.try_hit_rate_ppm().unwrap_or(u32::MAX)
74    }
75
76    /// Fallibly compute cache-hit rate in parts per million.
77    ///
78    /// # Errors
79    ///
80    /// Returns [`PipelineCacheMetricError`] when the numerator or final value
81    /// cannot fit the public metric ABI.
82    pub fn try_hit_rate_ppm(&self) -> Result<u32, PipelineCacheMetricError> {
83        if self.lookups == 0 {
84            return Ok(0);
85        }
86        let numerator =
87            self.hits
88                .checked_mul(1_000_000)
89                .ok_or_else(|| PipelineCacheMetricError::new(
90                    "hit_rate_ppm",
91                    "pipeline cache hit-rate numerator overflowed u64. Fix: snapshot and reset cache metrics before counters saturate.",
92                ))?;
93        let value = numerator / self.lookups;
94        u32::try_from(value).map_err(|source| {
95            PipelineCacheMetricError::new(
96                "hit_rate_ppm",
97                format!(
98                    "pipeline cache hit-rate ppm cannot fit u32: {source}. Fix: snapshot and reset cache metrics before counters saturate."
99                ),
100            )
101        })
102    }
103
104    pub(super) fn checked_add(self, rhs: Self) -> Self {
105        self.saturating_add(rhs)
106    }
107
108    pub(super) fn try_checked_add(self, rhs: Self) -> Result<Self, PipelineCacheMetricError> {
109        Ok(Self {
110            lookups: try_metric_add(self.lookups, rhs.lookups, "lookups")?,
111            hits: try_metric_add(self.hits, rhs.hits, "hits")?,
112            misses: try_metric_add(self.misses, rhs.misses, "misses")?,
113            puts: try_metric_add(self.puts, rhs.puts, "puts")?,
114            rejected_puts: try_metric_add(self.rejected_puts, rhs.rejected_puts, "rejected puts")?,
115            evictions: try_metric_add(self.evictions, rhs.evictions, "evictions")?,
116            evicted_bytes: try_metric_add(self.evicted_bytes, rhs.evicted_bytes, "evicted bytes")?,
117            flushes: try_metric_add(self.flushes, rhs.flushes, "flushes")?,
118            flush_errors: try_metric_add(self.flush_errors, rhs.flush_errors, "flush errors")?,
119            cached_bytes: try_metric_add(self.cached_bytes, rhs.cached_bytes, "cached bytes")?,
120            entries: try_metric_add(self.entries, rhs.entries, "entries")?,
121        })
122    }
123
124    fn saturating_add(self, rhs: Self) -> Self {
125        Self {
126            lookups: self.lookups.saturating_add(rhs.lookups),
127            hits: self.hits.saturating_add(rhs.hits),
128            misses: self.misses.saturating_add(rhs.misses),
129            puts: self.puts.saturating_add(rhs.puts),
130            rejected_puts: self.rejected_puts.saturating_add(rhs.rejected_puts),
131            evictions: self.evictions.saturating_add(rhs.evictions),
132            evicted_bytes: self.evicted_bytes.saturating_add(rhs.evicted_bytes),
133            flushes: self.flushes.saturating_add(rhs.flushes),
134            flush_errors: self.flush_errors.saturating_add(rhs.flush_errors),
135            cached_bytes: self.cached_bytes.saturating_add(rhs.cached_bytes),
136            entries: self.entries.saturating_add(rhs.entries),
137        }
138    }
139}
140
141fn try_metric_add(
142    lhs: u64,
143    rhs: u64,
144    label: &'static str,
145) -> Result<u64, PipelineCacheMetricError> {
146    lhs.checked_add(rhs).ok_or_else(|| {
147        PipelineCacheMetricError::new(
148            label,
149            format!(
150                "pipeline cache metric {label} overflowed u64. Fix: reset or shard pipeline cache metrics before aggregation."
151            ),
152        )
153    })
154}
155
156#[derive(Debug, Default)]
157pub(super) struct PipelineCacheCounters {
158    pub(super) lookups: AtomicU64,
159    pub(super) hits: AtomicU64,
160    pub(super) misses: AtomicU64,
161    pub(super) puts: AtomicU64,
162    pub(super) rejected_puts: AtomicU64,
163    pub(super) evictions: AtomicU64,
164    pub(super) evicted_bytes: AtomicU64,
165    pub(super) flushes: AtomicU64,
166    pub(super) flush_errors: AtomicU64,
167}
168
169impl PipelineCacheCounters {
170    pub(super) fn increment(counter: &AtomicU64, label: &'static str) {
171        Self::add(counter, 1, label);
172    }
173
174    pub(super) fn add(counter: &AtomicU64, value: u64, label: &'static str) {
175        if let Err(error) = Self::try_add(counter, value, label) {
176            tracing::warn!(error = %error, label, "pipeline cache counter saturated");
177            counter.store(u64::MAX, Ordering::Relaxed);
178        }
179    }
180
181    pub(super) fn try_add(
182        counter: &AtomicU64,
183        value: u64,
184        label: &'static str,
185    ) -> Result<(), PipelineCacheMetricError> {
186        let mut current = counter.load(Ordering::Relaxed);
187        loop {
188            let Some(next) = current.checked_add(value) else {
189                return Err(PipelineCacheMetricError::new(
190                    label,
191                    format!(
192                        "pipeline cache counter {label} overflowed u64. Fix: snapshot and reset cache metrics before counters saturate."
193                    ),
194                ));
195            };
196            match counter.compare_exchange_weak(current, next, Ordering::Relaxed, Ordering::Relaxed)
197            {
198                Ok(_) => return Ok(()),
199                Err(observed) => current = observed,
200            }
201        }
202    }
203
204    pub(super) fn snapshot(&self, cached_bytes: u64, entries: u64) -> PipelineCacheMetrics {
205        PipelineCacheMetrics {
206            lookups: self.lookups.load(Ordering::Relaxed),
207            hits: self.hits.load(Ordering::Relaxed),
208            misses: self.misses.load(Ordering::Relaxed),
209            puts: self.puts.load(Ordering::Relaxed),
210            rejected_puts: self.rejected_puts.load(Ordering::Relaxed),
211            evictions: self.evictions.load(Ordering::Relaxed),
212            evicted_bytes: self.evicted_bytes.load(Ordering::Relaxed),
213            flushes: self.flushes.load(Ordering::Relaxed),
214            flush_errors: self.flush_errors.load(Ordering::Relaxed),
215            cached_bytes,
216            entries,
217        }
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use std::sync::atomic::AtomicU64;
224
225    use super::{PipelineCacheCounters, PipelineCacheMetrics};
226
227    #[test]
228    fn pipeline_cache_metrics_generated_hit_rates_are_exact_ppm() {
229        for hits in 0..=1024_u64 {
230            let metrics = PipelineCacheMetrics {
231                lookups: 2048,
232                hits,
233                ..PipelineCacheMetrics::default()
234            };
235            assert_eq!(metrics.hit_rate_ppm(), ((hits * 1_000_000) / 2048) as u32);
236        }
237    }
238
239    #[test]
240    fn pipeline_cache_metric_try_aggregation_rejects_overflow_without_panic() {
241        let lhs = PipelineCacheMetrics {
242            cached_bytes: u64::MAX,
243            ..PipelineCacheMetrics::default()
244        };
245        let rhs = PipelineCacheMetrics {
246            cached_bytes: 1,
247            ..PipelineCacheMetrics::default()
248        };
249
250        let error = lhs
251            .try_checked_add(rhs)
252            .expect_err("Fix: fallible pipeline cache metric aggregation must reject overflow");
253        assert_eq!(error.field(), "cached bytes");
254        assert!(error.message().contains("Fix:"));
255    }
256
257    #[test]
258    fn pipeline_cache_metric_compat_aggregation_saturates_on_overflow() {
259        let lhs = PipelineCacheMetrics {
260            cached_bytes: u64::MAX,
261            hits: 41,
262            ..PipelineCacheMetrics::default()
263        };
264        let rhs = PipelineCacheMetrics {
265            cached_bytes: 1,
266            hits: 1,
267            ..PipelineCacheMetrics::default()
268        };
269
270        let metrics = lhs.checked_add(rhs);
271
272        assert_eq!(metrics.cached_bytes, u64::MAX);
273        assert_eq!(metrics.hits, 42);
274    }
275
276    #[test]
277    fn pipeline_cache_counter_add_uses_checked_shared_arithmetic() {
278        let counter = AtomicU64::new(41);
279
280        PipelineCacheCounters::add(&counter, 1, "generated counter");
281
282        assert_eq!(counter.load(std::sync::atomic::Ordering::Relaxed), 42);
283    }
284
285    #[test]
286    fn pipeline_cache_counter_try_add_rejects_overflow_without_panic() {
287        let counter = AtomicU64::new(u64::MAX);
288
289        let error = PipelineCacheCounters::try_add(&counter, 1, "generated counter")
290            .expect_err("Fix: fallible counter add must reject overflow");
291
292        assert_eq!(error.field(), "generated counter");
293        assert!(error.message().contains("Fix:"));
294        assert_eq!(counter.load(std::sync::atomic::Ordering::Relaxed), u64::MAX);
295    }
296
297    #[test]
298    fn pipeline_cache_counter_compat_add_saturates_on_overflow() {
299        let counter = AtomicU64::new(u64::MAX);
300
301        PipelineCacheCounters::add(&counter, 1, "generated counter");
302
303        assert_eq!(counter.load(std::sync::atomic::Ordering::Relaxed), u64::MAX);
304    }
305
306    #[test]
307    fn pipeline_cache_hit_rate_try_path_rejects_overflow_without_panic() {
308        let metrics = PipelineCacheMetrics {
309            lookups: 1,
310            hits: u64::MAX,
311            ..PipelineCacheMetrics::default()
312        };
313
314        let error = metrics
315            .try_hit_rate_ppm()
316            .expect_err("Fix: fallible hit-rate path must reject numerator overflow");
317
318        assert_eq!(error.field(), "hit_rate_ppm");
319        assert!(error.message().contains("Fix:"));
320        assert_eq!(metrics.hit_rate_ppm(), u32::MAX);
321    }
322}