use std::{
collections::BTreeMap,
fmt::Write,
sync::{Arc, Mutex},
time::Duration,
};
pub mod cache;
pub mod http;
pub mod redis;
pub mod resilience;
pub mod rpc;
pub mod sql;
pub use cache::CacheMetricLabels;
pub use http::HttpMetricLabels;
pub use redis::{RedisDegradationLabels, RedisEventLabels, RedisMetricLabels};
pub use resilience::ResilienceMetricLabels;
pub use rpc::RpcMetricLabels;
pub use sql::SqlMetricLabels;
pub(crate) const DURATION_BUCKETS: [f64; 9] = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5];
#[derive(Debug, Clone, Default)]
pub(crate) struct DurationMetricValue {
pub(crate) count: u64,
pub(crate) duration_seconds_sum: f64,
pub(crate) buckets: [u64; DURATION_BUCKETS.len()],
}
impl DurationMetricValue {
fn record(&mut self, duration: Duration) {
self.count += 1;
let seconds = duration.as_secs_f64();
self.duration_seconds_sum += seconds;
for (index, bucket) in DURATION_BUCKETS.iter().enumerate() {
if seconds <= *bucket {
self.buckets[index] += 1;
}
}
}
}
#[derive(Debug, Clone, Default)]
pub struct MetricsRegistry {
http_requests: Arc<Mutex<BTreeMap<HttpMetricLabels, DurationMetricValue>>>,
http_in_flight: Arc<Mutex<i64>>,
rpc_requests: Arc<Mutex<BTreeMap<RpcMetricLabels, DurationMetricValue>>>,
sql_queries: Arc<Mutex<BTreeMap<SqlMetricLabels, DurationMetricValue>>>,
redis_commands: Arc<Mutex<BTreeMap<RedisMetricLabels, DurationMetricValue>>>,
redis_events: Arc<Mutex<BTreeMap<RedisEventLabels, u64>>>,
redis_degradations: Arc<Mutex<BTreeMap<RedisDegradationLabels, u64>>>,
cache_events: Arc<Mutex<BTreeMap<CacheMetricLabels, u64>>>,
resilience_events: Arc<Mutex<BTreeMap<ResilienceMetricLabels, u64>>>,
}
impl MetricsRegistry {
pub fn new() -> Self {
Self::default()
}
pub fn record_http_request(&self, labels: HttpMetricLabels, duration: Duration) {
record_duration(&self.http_requests, labels, duration);
}
pub fn increment_http_in_flight(&self) {
*self.http_in_flight.lock().expect("metrics mutex") += 1;
}
pub fn decrement_http_in_flight(&self) {
let mut value = self.http_in_flight.lock().expect("metrics mutex");
*value = (*value - 1).max(0);
}
pub fn record_rpc_request(&self, labels: RpcMetricLabels, duration: Duration) {
record_duration(&self.rpc_requests, labels, duration);
}
pub fn record_sql_query(&self, labels: SqlMetricLabels, duration: Duration) {
record_duration(&self.sql_queries, labels, duration);
}
pub fn record_redis_command(&self, labels: RedisMetricLabels, duration: Duration) {
record_duration(&self.redis_commands, labels, duration);
}
pub fn record_redis_event(&self, labels: RedisEventLabels) {
increment_map(&self.redis_events, labels);
}
pub fn record_redis_degradation(&self, labels: RedisDegradationLabels) {
increment_map(&self.redis_degradations, labels);
}
pub fn record_cache_event(&self, operation: impl Into<String>, result: impl Into<String>) {
self.record_cache_event_with_component("cache", operation, result);
}
pub fn record_cache_event_with_component(
&self,
component: impl Into<String>,
operation: impl Into<String>,
result: impl Into<String>,
) {
increment_map(
&self.cache_events,
CacheMetricLabels::new(component, operation, result),
);
}
pub fn record_resilience_event(
&self,
component: impl Into<String>,
outcome: impl Into<String>,
) {
self.record_resilience_event_for_transport("unknown", component, outcome);
}
pub fn record_resilience_event_for_transport(
&self,
transport: impl Into<String>,
component: impl Into<String>,
outcome: impl Into<String>,
) {
increment_map(
&self.resilience_events,
ResilienceMetricLabels::new(transport, component, outcome),
);
}
pub fn render_prometheus(&self) -> String {
let mut output = String::new();
http::render(
&mut output,
&self.http_requests.lock().expect("metrics mutex"),
*self.http_in_flight.lock().expect("metrics mutex"),
);
rpc::render(
&mut output,
&self.rpc_requests.lock().expect("metrics mutex"),
);
sql::render(
&mut output,
&self.sql_queries.lock().expect("metrics mutex"),
);
redis::render(
&mut output,
&self.redis_commands.lock().expect("metrics mutex"),
&self.redis_events.lock().expect("metrics mutex"),
&self.redis_degradations.lock().expect("metrics mutex"),
);
cache::render(
&mut output,
&self.cache_events.lock().expect("metrics mutex"),
);
resilience::render(
&mut output,
&self.resilience_events.lock().expect("metrics mutex"),
);
output
}
}
fn record_duration<K>(map: &Mutex<BTreeMap<K, DurationMetricValue>>, key: K, duration: Duration)
where
K: Ord,
{
let mut metrics = map.lock().expect("metrics mutex");
metrics.entry(key).or_default().record(duration);
}
pub(crate) fn increment_map<K>(map: &Mutex<BTreeMap<K, u64>>, key: K)
where
K: Ord,
{
let mut map = map.lock().expect("metrics mutex");
*map.entry(key).or_default() += 1;
}
pub(crate) fn escape_label(value: &str) -> String {
value.replace('\\', "\\\\").replace('"', "\\\"")
}
pub(crate) fn write_duration_metric<L>(
output: &mut String,
base_name: &str,
labels: &L,
value: &DurationMetricValue,
write_labels: impl Fn(&mut String, &L, Option<&str>),
) {
for (index, bucket) in DURATION_BUCKETS.iter().enumerate() {
write!(output, "{base_name}_bucket{{").ok();
write_labels(output, labels, Some(&bucket_label(*bucket)));
writeln!(output, "}} {}", value.buckets[index]).ok();
}
write!(output, "{base_name}_bucket{{").ok();
write_labels(output, labels, Some("+Inf"));
writeln!(output, "}} {}", value.count).ok();
write!(output, "{base_name}_count{{").ok();
write_labels(output, labels, None);
writeln!(output, "}} {}", value.count).ok();
write!(output, "{base_name}_sum{{").ok();
write_labels(output, labels, None);
writeln!(output, "}} {:.6}", value.duration_seconds_sum).ok();
}
fn bucket_label(value: f64) -> String {
format!("{value:.3}")
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::{
HttpMetricLabels, MetricsRegistry, RedisDegradationLabels, RedisEventLabels,
RedisMetricLabels, RpcMetricLabels, SqlMetricLabels,
};
#[test]
fn metrics_render_prometheus_text() {
let registry = MetricsRegistry::new();
registry.record_http_request(
HttpMetricLabels::new("GET", "/users/:id", 200),
Duration::from_millis(20),
);
registry.record_rpc_request(
RpcMetricLabels::new("hello.Hello", "Say", "Ok"),
Duration::from_millis(15),
);
registry.record_sql_query(
SqlMetricLabels::new("sqlite", "users", "find_by_id", "select", "success"),
Duration::from_millis(2),
);
registry.record_redis_command(
RedisMetricLabels::new("GET", "primary", "success"),
Duration::from_millis(1),
);
registry.record_redis_event(RedisEventLabels::new("pool", "primary", "success"));
registry.record_redis_degradation(RedisDegradationLabels::new(
"get",
"return_miss",
"primary",
));
registry.record_cache_event_with_component("cache_aside", "get", "hit");
registry.record_resilience_event_for_transport("http", "breaker", "allowed");
let text = registry.render_prometheus();
assert!(text.contains("rs_zero_http_requests_total"));
assert!(text.contains("rs_zero_rpc_requests_total"));
assert!(text.contains("rs_zero_sql_queries_total"));
assert!(text.contains("rs_zero_redis_commands_total"));
assert!(text.contains("rs_zero_redis_events_total"));
assert!(text.contains("rs_zero_redis_degradations_total"));
assert!(text.contains("rs_zero_cache_events_total"));
assert!(text.contains("rs_zero_resilience_events_total"));
assert!(text.contains("route=\"/users/:id\""));
assert!(!text.contains("/users/42"));
}
#[test]
fn legacy_resilience_labels_keep_component_and_outcome_adjacent() {
let registry = MetricsRegistry::new();
registry.record_resilience_event("breaker", "allowed");
let text = registry.render_prometheus();
assert!(text.contains("component=\"breaker\",outcome=\"allowed\""));
}
#[test]
fn label_values_are_escaped() {
let registry = MetricsRegistry::new();
registry.record_resilience_event("bad\"x", "ok");
let text = registry.render_prometheus();
assert!(text.contains("bad\\\"x"));
}
}