Skip to main content

vyre_runtime/pipeline_cache/
metrics.rs

1//! Pipeline-cache instrumentation: the public snapshot type and the
2//! internal atomic counter struct shared by every concrete backend.
3
4use std::sync::atomic::{AtomicU64, Ordering};
5
6use vyre_driver::accounting::{checked_add_u64_lazy, checked_mul_u64_lazy};
7
8/// Pipeline-cache instrumentation counters.
9#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
10pub struct PipelineCacheMetrics {
11    /// Lookup attempts.
12    pub lookups: u64,
13    /// Successful lookups.
14    pub hits: u64,
15    /// Failed lookups.
16    pub misses: u64,
17    /// Accepted put attempts.
18    pub puts: u64,
19    /// Rejected put attempts, usually because a blob exceeds the byte budget.
20    pub rejected_puts: u64,
21    /// Entries evicted by capacity or byte-budget pressure.
22    pub evictions: u64,
23    /// Bytes removed by eviction.
24    pub evicted_bytes: u64,
25    /// Explicit flush attempts.
26    pub flushes: u64,
27    /// Explicit flush failures.
28    pub flush_errors: u64,
29    /// Current retained bytes when the backend can report them cheaply.
30    pub cached_bytes: u64,
31    /// Current retained entries when the backend can report them cheaply.
32    pub entries: u64,
33}
34
35impl PipelineCacheMetrics {
36    /// Cache-hit rate in parts per million.
37    #[must_use]
38    pub fn hit_rate_ppm(&self) -> u32 {
39        if self.lookups == 0 {
40            return 0;
41        }
42        let numerator = checked_mul_u64_lazy(self.hits, 1_000_000, || {
43            "pipeline cache hit-rate numerator overflowed u64. Fix: reset cache metrics before counters wrap."
44        })
45        .unwrap_or_else(|message| panic!("{message}"));
46        let value = numerator / self.lookups;
47        if value > u64::from(u32::MAX) {
48            panic!("pipeline cache hit-rate ppm cannot fit u32. Fix: reset cache metrics before counters wrap.");
49        }
50        u32::try_from(value).unwrap_or_else(|source| {
51            panic!(
52                "pipeline cache hit-rate ppm cannot fit u32 after range check: {source}. Fix: reset cache metrics before counters wrap."
53            )
54        })
55    }
56
57    pub(super) fn checked_add(self, rhs: Self) -> Self {
58        Self {
59            lookups: checked_metric_add(self.lookups, rhs.lookups, "lookups"),
60            hits: checked_metric_add(self.hits, rhs.hits, "hits"),
61            misses: checked_metric_add(self.misses, rhs.misses, "misses"),
62            puts: checked_metric_add(self.puts, rhs.puts, "puts"),
63            rejected_puts: checked_metric_add(
64                self.rejected_puts,
65                rhs.rejected_puts,
66                "rejected puts",
67            ),
68            evictions: checked_metric_add(self.evictions, rhs.evictions, "evictions"),
69            evicted_bytes: checked_metric_add(
70                self.evicted_bytes,
71                rhs.evicted_bytes,
72                "evicted bytes",
73            ),
74            flushes: checked_metric_add(self.flushes, rhs.flushes, "flushes"),
75            flush_errors: checked_metric_add(self.flush_errors, rhs.flush_errors, "flush errors"),
76            cached_bytes: checked_metric_add(self.cached_bytes, rhs.cached_bytes, "cached bytes"),
77            entries: checked_metric_add(self.entries, rhs.entries, "entries"),
78        }
79    }
80}
81
82fn checked_metric_add(lhs: u64, rhs: u64, label: &'static str) -> u64 {
83    checked_add_u64_lazy(lhs, rhs, || {
84        format!(
85            "pipeline cache metric {label} overflowed u64. Fix: reset or shard pipeline cache metrics before aggregation."
86        )
87    })
88    .unwrap_or_else(|message| panic!("{message}"))
89}
90
91#[derive(Debug, Default)]
92pub(super) struct PipelineCacheCounters {
93    pub(super) lookups: AtomicU64,
94    pub(super) hits: AtomicU64,
95    pub(super) misses: AtomicU64,
96    pub(super) puts: AtomicU64,
97    pub(super) rejected_puts: AtomicU64,
98    pub(super) evictions: AtomicU64,
99    pub(super) evicted_bytes: AtomicU64,
100    pub(super) flushes: AtomicU64,
101    pub(super) flush_errors: AtomicU64,
102}
103
104impl PipelineCacheCounters {
105    pub(super) fn increment(counter: &AtomicU64, label: &'static str) {
106        Self::add(counter, 1, label);
107    }
108
109    pub(super) fn add(counter: &AtomicU64, value: u64, label: &'static str) {
110        vyre_driver::accounting::checked_atomic_add_u64_with_order(
111            counter,
112            value,
113            Ordering::Relaxed,
114            Ordering::Relaxed,
115            Ordering::Relaxed,
116            |_, _| {
117                format!(
118                    "pipeline cache counter {label} overflowed u64. Fix: reset cache metrics before counters wrap."
119                )
120            },
121        )
122        .unwrap_or_else(|message| panic!("{message}"));
123    }
124
125    pub(super) fn snapshot(&self, cached_bytes: u64, entries: u64) -> PipelineCacheMetrics {
126        PipelineCacheMetrics {
127            lookups: self.lookups.load(Ordering::Relaxed),
128            hits: self.hits.load(Ordering::Relaxed),
129            misses: self.misses.load(Ordering::Relaxed),
130            puts: self.puts.load(Ordering::Relaxed),
131            rejected_puts: self.rejected_puts.load(Ordering::Relaxed),
132            evictions: self.evictions.load(Ordering::Relaxed),
133            evicted_bytes: self.evicted_bytes.load(Ordering::Relaxed),
134            flushes: self.flushes.load(Ordering::Relaxed),
135            flush_errors: self.flush_errors.load(Ordering::Relaxed),
136            cached_bytes,
137            entries,
138        }
139    }
140}
141
142#[cfg(test)]
143mod tests {
144    use std::sync::atomic::AtomicU64;
145
146    use super::{PipelineCacheCounters, PipelineCacheMetrics};
147
148    #[test]
149    fn pipeline_cache_metrics_generated_hit_rates_are_exact_ppm() {
150        for hits in 0..=1024_u64 {
151            let metrics = PipelineCacheMetrics {
152                lookups: 2048,
153                hits,
154                ..PipelineCacheMetrics::default()
155            };
156            assert_eq!(metrics.hit_rate_ppm(), ((hits * 1_000_000) / 2048) as u32);
157        }
158    }
159
160    #[test]
161    #[should_panic(expected = "pipeline cache metric cached bytes overflowed u64")]
162    fn pipeline_cache_metric_aggregation_rejects_overflow() {
163        let lhs = PipelineCacheMetrics {
164            cached_bytes: u64::MAX,
165            ..PipelineCacheMetrics::default()
166        };
167        let rhs = PipelineCacheMetrics {
168            cached_bytes: 1,
169            ..PipelineCacheMetrics::default()
170        };
171
172        let _ = lhs.checked_add(rhs);
173    }
174
175    #[test]
176    fn pipeline_cache_counter_add_uses_checked_shared_arithmetic() {
177        let counter = AtomicU64::new(41);
178
179        PipelineCacheCounters::add(&counter, 1, "generated counter");
180
181        assert_eq!(counter.load(std::sync::atomic::Ordering::Relaxed), 42);
182    }
183}