Skip to main content

rs_zero/observability/
metrics.rs

1use std::{
2    collections::BTreeMap,
3    fmt::Write,
4    sync::{Arc, Mutex},
5    time::Duration,
6};
7
8pub mod cache;
9pub mod http;
10pub mod redis;
11pub mod resilience;
12pub mod rpc;
13pub mod sql;
14
15pub use cache::CacheMetricLabels;
16pub use http::HttpMetricLabels;
17pub use redis::{RedisDegradationLabels, RedisEventLabels, RedisMetricLabels};
18pub use resilience::ResilienceMetricLabels;
19pub use rpc::RpcMetricLabels;
20pub use sql::SqlMetricLabels;
21
22pub(crate) const DURATION_BUCKETS: [f64; 9] = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5];
23
24#[derive(Debug, Clone, Default)]
25pub(crate) struct DurationMetricValue {
26    pub(crate) count: u64,
27    pub(crate) duration_seconds_sum: f64,
28    pub(crate) buckets: [u64; DURATION_BUCKETS.len()],
29}
30
31impl DurationMetricValue {
32    fn record(&mut self, duration: Duration) {
33        self.count += 1;
34        let seconds = duration.as_secs_f64();
35        self.duration_seconds_sum += seconds;
36        for (index, bucket) in DURATION_BUCKETS.iter().enumerate() {
37            if seconds <= *bucket {
38                self.buckets[index] += 1;
39            }
40        }
41    }
42}
43
44/// In-process metrics registry with Prometheus text export.
45#[derive(Debug, Clone, Default)]
46pub struct MetricsRegistry {
47    http_requests: Arc<Mutex<BTreeMap<HttpMetricLabels, DurationMetricValue>>>,
48    http_in_flight: Arc<Mutex<i64>>,
49    rpc_requests: Arc<Mutex<BTreeMap<RpcMetricLabels, DurationMetricValue>>>,
50    sql_queries: Arc<Mutex<BTreeMap<SqlMetricLabels, DurationMetricValue>>>,
51    redis_commands: Arc<Mutex<BTreeMap<RedisMetricLabels, DurationMetricValue>>>,
52    redis_events: Arc<Mutex<BTreeMap<RedisEventLabels, u64>>>,
53    redis_degradations: Arc<Mutex<BTreeMap<RedisDegradationLabels, u64>>>,
54    cache_events: Arc<Mutex<BTreeMap<CacheMetricLabels, u64>>>,
55    resilience_events: Arc<Mutex<BTreeMap<ResilienceMetricLabels, u64>>>,
56}
57
58impl MetricsRegistry {
59    /// Creates an empty registry.
60    pub fn new() -> Self {
61        Self::default()
62    }
63
64    /// Records an HTTP request duration.
65    pub fn record_http_request(&self, labels: HttpMetricLabels, duration: Duration) {
66        record_duration(&self.http_requests, labels, duration);
67    }
68
69    /// Increments in-flight HTTP request gauge.
70    pub fn increment_http_in_flight(&self) {
71        *self.http_in_flight.lock().expect("metrics mutex") += 1;
72    }
73
74    /// Decrements in-flight HTTP request gauge.
75    pub fn decrement_http_in_flight(&self) {
76        let mut value = self.http_in_flight.lock().expect("metrics mutex");
77        *value = (*value - 1).max(0);
78    }
79
80    /// Records a gRPC request duration.
81    pub fn record_rpc_request(&self, labels: RpcMetricLabels, duration: Duration) {
82        record_duration(&self.rpc_requests, labels, duration);
83    }
84
85    /// Records a SQL query duration.
86    pub fn record_sql_query(&self, labels: SqlMetricLabels, duration: Duration) {
87        record_duration(&self.sql_queries, labels, duration);
88    }
89
90    /// Records a Redis command duration.
91    pub fn record_redis_command(&self, labels: RedisMetricLabels, duration: Duration) {
92        record_duration(&self.redis_commands, labels, duration);
93    }
94
95    /// Records a Redis pool, redirect, script or degradation event.
96    pub fn record_redis_event(&self, labels: RedisEventLabels) {
97        increment_map(&self.redis_events, labels);
98    }
99
100    /// Records an explicit Redis degradation event.
101    pub fn record_redis_degradation(&self, labels: RedisDegradationLabels) {
102        increment_map(&self.redis_degradations, labels);
103    }
104
105    /// Records a cache event such as hit, miss or loader error.
106    pub fn record_cache_event(&self, operation: impl Into<String>, result: impl Into<String>) {
107        self.record_cache_event_with_component("cache", operation, result);
108    }
109
110    /// Records a cache event for a specific cache component.
111    pub fn record_cache_event_with_component(
112        &self,
113        component: impl Into<String>,
114        operation: impl Into<String>,
115        result: impl Into<String>,
116    ) {
117        increment_map(
118            &self.cache_events,
119            CacheMetricLabels::new(component, operation, result),
120        );
121    }
122
123    /// Records a generic resilience event.
124    pub fn record_resilience_event(
125        &self,
126        component: impl Into<String>,
127        outcome: impl Into<String>,
128    ) {
129        self.record_resilience_event_for_transport("unknown", component, outcome);
130    }
131
132    /// Records a transport-specific resilience event.
133    pub fn record_resilience_event_for_transport(
134        &self,
135        transport: impl Into<String>,
136        component: impl Into<String>,
137        outcome: impl Into<String>,
138    ) {
139        increment_map(
140            &self.resilience_events,
141            ResilienceMetricLabels::new(transport, component, outcome),
142        );
143    }
144
145    /// Renders all metrics in Prometheus text exposition format.
146    pub fn render_prometheus(&self) -> String {
147        let mut output = String::new();
148        http::render(
149            &mut output,
150            &self.http_requests.lock().expect("metrics mutex"),
151            *self.http_in_flight.lock().expect("metrics mutex"),
152        );
153        rpc::render(
154            &mut output,
155            &self.rpc_requests.lock().expect("metrics mutex"),
156        );
157        sql::render(
158            &mut output,
159            &self.sql_queries.lock().expect("metrics mutex"),
160        );
161        redis::render(
162            &mut output,
163            &self.redis_commands.lock().expect("metrics mutex"),
164            &self.redis_events.lock().expect("metrics mutex"),
165            &self.redis_degradations.lock().expect("metrics mutex"),
166        );
167        cache::render(
168            &mut output,
169            &self.cache_events.lock().expect("metrics mutex"),
170        );
171        resilience::render(
172            &mut output,
173            &self.resilience_events.lock().expect("metrics mutex"),
174        );
175        output
176    }
177}
178
179fn record_duration<K>(map: &Mutex<BTreeMap<K, DurationMetricValue>>, key: K, duration: Duration)
180where
181    K: Ord,
182{
183    let mut metrics = map.lock().expect("metrics mutex");
184    metrics.entry(key).or_default().record(duration);
185}
186
187pub(crate) fn increment_map<K>(map: &Mutex<BTreeMap<K, u64>>, key: K)
188where
189    K: Ord,
190{
191    let mut map = map.lock().expect("metrics mutex");
192    *map.entry(key).or_default() += 1;
193}
194
195pub(crate) fn escape_label(value: &str) -> String {
196    value.replace('\\', "\\\\").replace('"', "\\\"")
197}
198
199pub(crate) fn write_duration_metric<L>(
200    output: &mut String,
201    base_name: &str,
202    labels: &L,
203    value: &DurationMetricValue,
204    write_labels: impl Fn(&mut String, &L, Option<&str>),
205) {
206    for (index, bucket) in DURATION_BUCKETS.iter().enumerate() {
207        write!(output, "{base_name}_bucket{{").ok();
208        write_labels(output, labels, Some(&bucket_label(*bucket)));
209        writeln!(output, "}} {}", value.buckets[index]).ok();
210    }
211    write!(output, "{base_name}_bucket{{").ok();
212    write_labels(output, labels, Some("+Inf"));
213    writeln!(output, "}} {}", value.count).ok();
214
215    write!(output, "{base_name}_count{{").ok();
216    write_labels(output, labels, None);
217    writeln!(output, "}} {}", value.count).ok();
218
219    write!(output, "{base_name}_sum{{").ok();
220    write_labels(output, labels, None);
221    writeln!(output, "}} {:.6}", value.duration_seconds_sum).ok();
222}
223
224fn bucket_label(value: f64) -> String {
225    format!("{value:.3}")
226}
227
228#[cfg(test)]
229mod tests {
230    use std::time::Duration;
231
232    use super::{
233        HttpMetricLabels, MetricsRegistry, RedisDegradationLabels, RedisEventLabels,
234        RedisMetricLabels, RpcMetricLabels, SqlMetricLabels,
235    };
236
237    #[test]
238    fn metrics_render_prometheus_text() {
239        let registry = MetricsRegistry::new();
240        registry.record_http_request(
241            HttpMetricLabels::new("GET", "/users/:id", 200),
242            Duration::from_millis(20),
243        );
244        registry.record_rpc_request(
245            RpcMetricLabels::new("hello.Hello", "Say", "Ok"),
246            Duration::from_millis(15),
247        );
248        registry.record_sql_query(
249            SqlMetricLabels::new("sqlite", "users", "find_by_id", "select", "success"),
250            Duration::from_millis(2),
251        );
252        registry.record_redis_command(
253            RedisMetricLabels::new("GET", "primary", "success"),
254            Duration::from_millis(1),
255        );
256        registry.record_redis_event(RedisEventLabels::new("pool", "primary", "success"));
257        registry.record_redis_degradation(RedisDegradationLabels::new(
258            "get",
259            "return_miss",
260            "primary",
261        ));
262        registry.record_cache_event_with_component("cache_aside", "get", "hit");
263        registry.record_resilience_event_for_transport("http", "breaker", "allowed");
264
265        let text = registry.render_prometheus();
266
267        assert!(text.contains("rs_zero_http_requests_total"));
268        assert!(text.contains("rs_zero_rpc_requests_total"));
269        assert!(text.contains("rs_zero_sql_queries_total"));
270        assert!(text.contains("rs_zero_redis_commands_total"));
271        assert!(text.contains("rs_zero_redis_events_total"));
272        assert!(text.contains("rs_zero_redis_degradations_total"));
273        assert!(text.contains("rs_zero_cache_events_total"));
274        assert!(text.contains("rs_zero_resilience_events_total"));
275        assert!(text.contains("route=\"/users/:id\""));
276        assert!(!text.contains("/users/42"));
277    }
278
279    #[test]
280    fn legacy_resilience_labels_keep_component_and_outcome_adjacent() {
281        let registry = MetricsRegistry::new();
282        registry.record_resilience_event("breaker", "allowed");
283
284        let text = registry.render_prometheus();
285
286        assert!(text.contains("component=\"breaker\",outcome=\"allowed\""));
287    }
288
289    #[test]
290    fn label_values_are_escaped() {
291        let registry = MetricsRegistry::new();
292        registry.record_resilience_event("bad\"x", "ok");
293
294        let text = registry.render_prometheus();
295
296        assert!(text.contains("bad\\\"x"));
297    }
298}