1use std::{
2 collections::BTreeMap,
3 fmt::Write,
4 sync::{Arc, Mutex},
5 time::Duration,
6};
7
8pub mod cache;
9pub mod http;
10pub mod redis;
11pub mod resilience;
12pub mod rpc;
13pub mod sql;
14
15pub use cache::CacheMetricLabels;
16pub use http::HttpMetricLabels;
17pub use redis::{RedisDegradationLabels, RedisEventLabels, RedisMetricLabels};
18pub use resilience::ResilienceMetricLabels;
19pub use rpc::RpcMetricLabels;
20pub use sql::SqlMetricLabels;
21
22pub(crate) const DURATION_BUCKETS: [f64; 9] = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5];
23
24#[derive(Debug, Clone, Default)]
25pub(crate) struct DurationMetricValue {
26 pub(crate) count: u64,
27 pub(crate) duration_seconds_sum: f64,
28 pub(crate) buckets: [u64; DURATION_BUCKETS.len()],
29}
30
31impl DurationMetricValue {
32 fn record(&mut self, duration: Duration) {
33 self.count += 1;
34 let seconds = duration.as_secs_f64();
35 self.duration_seconds_sum += seconds;
36 for (index, bucket) in DURATION_BUCKETS.iter().enumerate() {
37 if seconds <= *bucket {
38 self.buckets[index] += 1;
39 }
40 }
41 }
42}
43
44#[derive(Debug, Clone, Default)]
46pub struct MetricsRegistry {
47 http_requests: Arc<Mutex<BTreeMap<HttpMetricLabels, DurationMetricValue>>>,
48 http_in_flight: Arc<Mutex<i64>>,
49 rpc_requests: Arc<Mutex<BTreeMap<RpcMetricLabels, DurationMetricValue>>>,
50 sql_queries: Arc<Mutex<BTreeMap<SqlMetricLabels, DurationMetricValue>>>,
51 redis_commands: Arc<Mutex<BTreeMap<RedisMetricLabels, DurationMetricValue>>>,
52 redis_events: Arc<Mutex<BTreeMap<RedisEventLabels, u64>>>,
53 redis_degradations: Arc<Mutex<BTreeMap<RedisDegradationLabels, u64>>>,
54 cache_events: Arc<Mutex<BTreeMap<CacheMetricLabels, u64>>>,
55 resilience_events: Arc<Mutex<BTreeMap<ResilienceMetricLabels, u64>>>,
56}
57
58impl MetricsRegistry {
59 pub fn new() -> Self {
61 Self::default()
62 }
63
64 pub fn record_http_request(&self, labels: HttpMetricLabels, duration: Duration) {
66 record_duration(&self.http_requests, labels, duration);
67 }
68
69 pub fn increment_http_in_flight(&self) {
71 *self.http_in_flight.lock().expect("metrics mutex") += 1;
72 }
73
74 pub fn decrement_http_in_flight(&self) {
76 let mut value = self.http_in_flight.lock().expect("metrics mutex");
77 *value = (*value - 1).max(0);
78 }
79
80 pub fn record_rpc_request(&self, labels: RpcMetricLabels, duration: Duration) {
82 record_duration(&self.rpc_requests, labels, duration);
83 }
84
85 pub fn record_sql_query(&self, labels: SqlMetricLabels, duration: Duration) {
87 record_duration(&self.sql_queries, labels, duration);
88 }
89
90 pub fn record_redis_command(&self, labels: RedisMetricLabels, duration: Duration) {
92 record_duration(&self.redis_commands, labels, duration);
93 }
94
95 pub fn record_redis_event(&self, labels: RedisEventLabels) {
97 increment_map(&self.redis_events, labels);
98 }
99
100 pub fn record_redis_degradation(&self, labels: RedisDegradationLabels) {
102 increment_map(&self.redis_degradations, labels);
103 }
104
105 pub fn record_cache_event(&self, operation: impl Into<String>, result: impl Into<String>) {
107 self.record_cache_event_with_component("cache", operation, result);
108 }
109
110 pub fn record_cache_event_with_component(
112 &self,
113 component: impl Into<String>,
114 operation: impl Into<String>,
115 result: impl Into<String>,
116 ) {
117 increment_map(
118 &self.cache_events,
119 CacheMetricLabels::new(component, operation, result),
120 );
121 }
122
123 pub fn record_resilience_event(
125 &self,
126 component: impl Into<String>,
127 outcome: impl Into<String>,
128 ) {
129 self.record_resilience_event_for_transport("unknown", component, outcome);
130 }
131
132 pub fn record_resilience_event_for_transport(
134 &self,
135 transport: impl Into<String>,
136 component: impl Into<String>,
137 outcome: impl Into<String>,
138 ) {
139 increment_map(
140 &self.resilience_events,
141 ResilienceMetricLabels::new(transport, component, outcome),
142 );
143 }
144
145 pub fn render_prometheus(&self) -> String {
147 let mut output = String::new();
148 http::render(
149 &mut output,
150 &self.http_requests.lock().expect("metrics mutex"),
151 *self.http_in_flight.lock().expect("metrics mutex"),
152 );
153 rpc::render(
154 &mut output,
155 &self.rpc_requests.lock().expect("metrics mutex"),
156 );
157 sql::render(
158 &mut output,
159 &self.sql_queries.lock().expect("metrics mutex"),
160 );
161 redis::render(
162 &mut output,
163 &self.redis_commands.lock().expect("metrics mutex"),
164 &self.redis_events.lock().expect("metrics mutex"),
165 &self.redis_degradations.lock().expect("metrics mutex"),
166 );
167 cache::render(
168 &mut output,
169 &self.cache_events.lock().expect("metrics mutex"),
170 );
171 resilience::render(
172 &mut output,
173 &self.resilience_events.lock().expect("metrics mutex"),
174 );
175 output
176 }
177}
178
179fn record_duration<K>(map: &Mutex<BTreeMap<K, DurationMetricValue>>, key: K, duration: Duration)
180where
181 K: Ord,
182{
183 let mut metrics = map.lock().expect("metrics mutex");
184 metrics.entry(key).or_default().record(duration);
185}
186
187pub(crate) fn increment_map<K>(map: &Mutex<BTreeMap<K, u64>>, key: K)
188where
189 K: Ord,
190{
191 let mut map = map.lock().expect("metrics mutex");
192 *map.entry(key).or_default() += 1;
193}
194
195pub(crate) fn escape_label(value: &str) -> String {
196 value.replace('\\', "\\\\").replace('"', "\\\"")
197}
198
199pub(crate) fn write_duration_metric<L>(
200 output: &mut String,
201 base_name: &str,
202 labels: &L,
203 value: &DurationMetricValue,
204 write_labels: impl Fn(&mut String, &L, Option<&str>),
205) {
206 for (index, bucket) in DURATION_BUCKETS.iter().enumerate() {
207 write!(output, "{base_name}_bucket{{").ok();
208 write_labels(output, labels, Some(&bucket_label(*bucket)));
209 writeln!(output, "}} {}", value.buckets[index]).ok();
210 }
211 write!(output, "{base_name}_bucket{{").ok();
212 write_labels(output, labels, Some("+Inf"));
213 writeln!(output, "}} {}", value.count).ok();
214
215 write!(output, "{base_name}_count{{").ok();
216 write_labels(output, labels, None);
217 writeln!(output, "}} {}", value.count).ok();
218
219 write!(output, "{base_name}_sum{{").ok();
220 write_labels(output, labels, None);
221 writeln!(output, "}} {:.6}", value.duration_seconds_sum).ok();
222}
223
224fn bucket_label(value: f64) -> String {
225 format!("{value:.3}")
226}
227
228#[cfg(test)]
229mod tests {
230 use std::time::Duration;
231
232 use super::{
233 HttpMetricLabels, MetricsRegistry, RedisDegradationLabels, RedisEventLabels,
234 RedisMetricLabels, RpcMetricLabels, SqlMetricLabels,
235 };
236
237 #[test]
238 fn metrics_render_prometheus_text() {
239 let registry = MetricsRegistry::new();
240 registry.record_http_request(
241 HttpMetricLabels::new("GET", "/users/:id", 200),
242 Duration::from_millis(20),
243 );
244 registry.record_rpc_request(
245 RpcMetricLabels::new("hello.Hello", "Say", "Ok"),
246 Duration::from_millis(15),
247 );
248 registry.record_sql_query(
249 SqlMetricLabels::new("sqlite", "users", "find_by_id", "select", "success"),
250 Duration::from_millis(2),
251 );
252 registry.record_redis_command(
253 RedisMetricLabels::new("GET", "primary", "success"),
254 Duration::from_millis(1),
255 );
256 registry.record_redis_event(RedisEventLabels::new("pool", "primary", "success"));
257 registry.record_redis_degradation(RedisDegradationLabels::new(
258 "get",
259 "return_miss",
260 "primary",
261 ));
262 registry.record_cache_event_with_component("cache_aside", "get", "hit");
263 registry.record_resilience_event_for_transport("http", "breaker", "allowed");
264
265 let text = registry.render_prometheus();
266
267 assert!(text.contains("rs_zero_http_requests_total"));
268 assert!(text.contains("rs_zero_rpc_requests_total"));
269 assert!(text.contains("rs_zero_sql_queries_total"));
270 assert!(text.contains("rs_zero_redis_commands_total"));
271 assert!(text.contains("rs_zero_redis_events_total"));
272 assert!(text.contains("rs_zero_redis_degradations_total"));
273 assert!(text.contains("rs_zero_cache_events_total"));
274 assert!(text.contains("rs_zero_resilience_events_total"));
275 assert!(text.contains("route=\"/users/:id\""));
276 assert!(!text.contains("/users/42"));
277 }
278
279 #[test]
280 fn legacy_resilience_labels_keep_component_and_outcome_adjacent() {
281 let registry = MetricsRegistry::new();
282 registry.record_resilience_event("breaker", "allowed");
283
284 let text = registry.render_prometheus();
285
286 assert!(text.contains("component=\"breaker\",outcome=\"allowed\""));
287 }
288
289 #[test]
290 fn label_values_are_escaped() {
291 let registry = MetricsRegistry::new();
292 registry.record_resilience_event("bad\"x", "ok");
293
294 let text = registry.render_prometheus();
295
296 assert!(text.contains("bad\\\"x"));
297 }
298}