rs-zero 0.2.6

Rust-first microservice framework inspired by go-zero engineering practices
Documentation
use std::{
    collections::BTreeMap,
    fmt::Write,
    sync::{Arc, Mutex},
    time::Duration,
};

pub mod cache;
pub mod http;
pub mod redis;
pub mod resilience;
pub mod rpc;
pub mod sql;

pub use cache::CacheMetricLabels;
pub use http::HttpMetricLabels;
pub use redis::{RedisDegradationLabels, RedisEventLabels, RedisMetricLabels};
pub use resilience::ResilienceMetricLabels;
pub use rpc::RpcMetricLabels;
pub use sql::SqlMetricLabels;

pub(crate) const DURATION_BUCKETS: [f64; 9] = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5];

#[derive(Debug, Clone, Default)]
pub(crate) struct DurationMetricValue {
    pub(crate) count: u64,
    pub(crate) duration_seconds_sum: f64,
    pub(crate) buckets: [u64; DURATION_BUCKETS.len()],
}

impl DurationMetricValue {
    fn record(&mut self, duration: Duration) {
        self.count += 1;
        let seconds = duration.as_secs_f64();
        self.duration_seconds_sum += seconds;
        for (index, bucket) in DURATION_BUCKETS.iter().enumerate() {
            if seconds <= *bucket {
                self.buckets[index] += 1;
            }
        }
    }
}

/// In-process metrics registry with Prometheus text export.
#[derive(Debug, Clone, Default)]
pub struct MetricsRegistry {
    http_requests: Arc<Mutex<BTreeMap<HttpMetricLabels, DurationMetricValue>>>,
    http_in_flight: Arc<Mutex<i64>>,
    rpc_requests: Arc<Mutex<BTreeMap<RpcMetricLabels, DurationMetricValue>>>,
    sql_queries: Arc<Mutex<BTreeMap<SqlMetricLabels, DurationMetricValue>>>,
    redis_commands: Arc<Mutex<BTreeMap<RedisMetricLabels, DurationMetricValue>>>,
    redis_events: Arc<Mutex<BTreeMap<RedisEventLabels, u64>>>,
    redis_degradations: Arc<Mutex<BTreeMap<RedisDegradationLabels, u64>>>,
    cache_events: Arc<Mutex<BTreeMap<CacheMetricLabels, u64>>>,
    resilience_events: Arc<Mutex<BTreeMap<ResilienceMetricLabels, u64>>>,
}

impl MetricsRegistry {
    /// Creates an empty registry.
    pub fn new() -> Self {
        Self::default()
    }

    /// Records an HTTP request duration.
    pub fn record_http_request(&self, labels: HttpMetricLabels, duration: Duration) {
        record_duration(&self.http_requests, labels, duration);
    }

    /// Increments in-flight HTTP request gauge.
    pub fn increment_http_in_flight(&self) {
        *self.http_in_flight.lock().expect("metrics mutex") += 1;
    }

    /// Decrements in-flight HTTP request gauge.
    pub fn decrement_http_in_flight(&self) {
        let mut value = self.http_in_flight.lock().expect("metrics mutex");
        *value = (*value - 1).max(0);
    }

    /// Records a gRPC request duration.
    pub fn record_rpc_request(&self, labels: RpcMetricLabels, duration: Duration) {
        record_duration(&self.rpc_requests, labels, duration);
    }

    /// Records a SQL query duration.
    pub fn record_sql_query(&self, labels: SqlMetricLabels, duration: Duration) {
        record_duration(&self.sql_queries, labels, duration);
    }

    /// Records a Redis command duration.
    pub fn record_redis_command(&self, labels: RedisMetricLabels, duration: Duration) {
        record_duration(&self.redis_commands, labels, duration);
    }

    /// Records a Redis pool, redirect, script or degradation event.
    pub fn record_redis_event(&self, labels: RedisEventLabels) {
        increment_map(&self.redis_events, labels);
    }

    /// Records an explicit Redis degradation event.
    pub fn record_redis_degradation(&self, labels: RedisDegradationLabels) {
        increment_map(&self.redis_degradations, labels);
    }

    /// Records a cache event such as hit, miss or loader error.
    pub fn record_cache_event(&self, operation: impl Into<String>, result: impl Into<String>) {
        self.record_cache_event_with_component("cache", operation, result);
    }

    /// Records a cache event for a specific cache component.
    pub fn record_cache_event_with_component(
        &self,
        component: impl Into<String>,
        operation: impl Into<String>,
        result: impl Into<String>,
    ) {
        increment_map(
            &self.cache_events,
            CacheMetricLabels::new(component, operation, result),
        );
    }

    /// Records a generic resilience event.
    pub fn record_resilience_event(
        &self,
        component: impl Into<String>,
        outcome: impl Into<String>,
    ) {
        self.record_resilience_event_for_transport("unknown", component, outcome);
    }

    /// Records a transport-specific resilience event.
    pub fn record_resilience_event_for_transport(
        &self,
        transport: impl Into<String>,
        component: impl Into<String>,
        outcome: impl Into<String>,
    ) {
        increment_map(
            &self.resilience_events,
            ResilienceMetricLabels::new(transport, component, outcome),
        );
    }

    /// Renders all metrics in Prometheus text exposition format.
    pub fn render_prometheus(&self) -> String {
        let mut output = String::new();
        http::render(
            &mut output,
            &self.http_requests.lock().expect("metrics mutex"),
            *self.http_in_flight.lock().expect("metrics mutex"),
        );
        rpc::render(
            &mut output,
            &self.rpc_requests.lock().expect("metrics mutex"),
        );
        sql::render(
            &mut output,
            &self.sql_queries.lock().expect("metrics mutex"),
        );
        redis::render(
            &mut output,
            &self.redis_commands.lock().expect("metrics mutex"),
            &self.redis_events.lock().expect("metrics mutex"),
            &self.redis_degradations.lock().expect("metrics mutex"),
        );
        cache::render(
            &mut output,
            &self.cache_events.lock().expect("metrics mutex"),
        );
        resilience::render(
            &mut output,
            &self.resilience_events.lock().expect("metrics mutex"),
        );
        output
    }
}

fn record_duration<K>(map: &Mutex<BTreeMap<K, DurationMetricValue>>, key: K, duration: Duration)
where
    K: Ord,
{
    let mut metrics = map.lock().expect("metrics mutex");
    metrics.entry(key).or_default().record(duration);
}

pub(crate) fn increment_map<K>(map: &Mutex<BTreeMap<K, u64>>, key: K)
where
    K: Ord,
{
    let mut map = map.lock().expect("metrics mutex");
    *map.entry(key).or_default() += 1;
}

pub(crate) fn escape_label(value: &str) -> String {
    value.replace('\\', "\\\\").replace('"', "\\\"")
}

pub(crate) fn write_duration_metric<L>(
    output: &mut String,
    base_name: &str,
    labels: &L,
    value: &DurationMetricValue,
    write_labels: impl Fn(&mut String, &L, Option<&str>),
) {
    for (index, bucket) in DURATION_BUCKETS.iter().enumerate() {
        write!(output, "{base_name}_bucket{{").ok();
        write_labels(output, labels, Some(&bucket_label(*bucket)));
        writeln!(output, "}} {}", value.buckets[index]).ok();
    }
    write!(output, "{base_name}_bucket{{").ok();
    write_labels(output, labels, Some("+Inf"));
    writeln!(output, "}} {}", value.count).ok();

    write!(output, "{base_name}_count{{").ok();
    write_labels(output, labels, None);
    writeln!(output, "}} {}", value.count).ok();

    write!(output, "{base_name}_sum{{").ok();
    write_labels(output, labels, None);
    writeln!(output, "}} {:.6}", value.duration_seconds_sum).ok();
}

fn bucket_label(value: f64) -> String {
    format!("{value:.3}")
}

#[cfg(test)]
mod tests {
    use std::time::Duration;

    use super::{
        HttpMetricLabels, MetricsRegistry, RedisDegradationLabels, RedisEventLabels,
        RedisMetricLabels, RpcMetricLabels, SqlMetricLabels,
    };

    #[test]
    fn metrics_render_prometheus_text() {
        let registry = MetricsRegistry::new();
        registry.record_http_request(
            HttpMetricLabels::new("GET", "/users/:id", 200),
            Duration::from_millis(20),
        );
        registry.record_rpc_request(
            RpcMetricLabels::new("hello.Hello", "Say", "Ok"),
            Duration::from_millis(15),
        );
        registry.record_sql_query(
            SqlMetricLabels::new("sqlite", "users", "find_by_id", "select", "success"),
            Duration::from_millis(2),
        );
        registry.record_redis_command(
            RedisMetricLabels::new("GET", "primary", "success"),
            Duration::from_millis(1),
        );
        registry.record_redis_event(RedisEventLabels::new("pool", "primary", "success"));
        registry.record_redis_degradation(RedisDegradationLabels::new(
            "get",
            "return_miss",
            "primary",
        ));
        registry.record_cache_event_with_component("cache_aside", "get", "hit");
        registry.record_resilience_event_for_transport("http", "breaker", "allowed");

        let text = registry.render_prometheus();

        assert!(text.contains("rs_zero_http_requests_total"));
        assert!(text.contains("rs_zero_rpc_requests_total"));
        assert!(text.contains("rs_zero_sql_queries_total"));
        assert!(text.contains("rs_zero_redis_commands_total"));
        assert!(text.contains("rs_zero_redis_events_total"));
        assert!(text.contains("rs_zero_redis_degradations_total"));
        assert!(text.contains("rs_zero_cache_events_total"));
        assert!(text.contains("rs_zero_resilience_events_total"));
        assert!(text.contains("route=\"/users/:id\""));
        assert!(!text.contains("/users/42"));
    }

    #[test]
    fn legacy_resilience_labels_keep_component_and_outcome_adjacent() {
        let registry = MetricsRegistry::new();
        registry.record_resilience_event("breaker", "allowed");

        let text = registry.render_prometheus();

        assert!(text.contains("component=\"breaker\",outcome=\"allowed\""));
    }

    #[test]
    fn label_values_are_escaped() {
        let registry = MetricsRegistry::new();
        registry.record_resilience_event("bad\"x", "ok");

        let text = registry.render_prometheus();

        assert!(text.contains("bad\\\"x"));
    }
}