Skip to main content

ash_rpc/observability/
prometheus.rs

1//! Prometheus metrics collection for JSON-RPC
2
3use prometheus::{CounterVec, Encoder, HistogramOpts, HistogramVec, IntGauge, Opts, Registry};
4use std::sync::Arc;
5use std::time::Duration;
6
7/// Prometheus metrics collector for JSON-RPC
8pub struct PrometheusMetrics {
9    registry: Registry,
10    request_counter: CounterVec,
11    request_duration: HistogramVec,
12    error_counter: CounterVec,
13    active_connections: IntGauge,
14}
15
16impl PrometheusMetrics {
17    /// Create a new Prometheus metrics collector
18    ///
19    /// # Errors
20    /// Returns error if registry or metric creation fails
21    pub fn new() -> Result<Self, prometheus::Error> {
22        Self::with_prefix("jsonrpc")
23    }
24
25    /// Create a new metrics collector with custom prefix
26    ///
27    /// # Errors
28    /// Returns error if registry or metric creation fails
29    pub fn with_prefix(prefix: &str) -> Result<Self, prometheus::Error> {
30        let registry = Registry::new();
31
32        let request_counter = CounterVec::new(
33            Opts::new(
34                format!("{prefix}_requests_total"),
35                "Total number of JSON-RPC requests",
36            ),
37            &["method"],
38        )?;
39
40        let request_duration = HistogramVec::new(
41            HistogramOpts::new(
42                format!("{prefix}_request_duration_seconds"),
43                "JSON-RPC request duration in seconds",
44            )
45            .buckets(vec![
46                0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
47            ]),
48            &["method"],
49        )?;
50
51        let error_counter = CounterVec::new(
52            Opts::new(
53                format!("{prefix}_errors_total"),
54                "Total number of JSON-RPC errors",
55            ),
56            &["method"],
57        )?;
58
59        let active_connections = IntGauge::new(
60            format!("{prefix}_active_connections"),
61            "Number of active connections",
62        )?;
63
64        registry.register(Box::new(request_counter.clone()))?;
65        registry.register(Box::new(request_duration.clone()))?;
66        registry.register(Box::new(error_counter.clone()))?;
67        registry.register(Box::new(active_connections.clone()))?;
68
69        Ok(Self {
70            registry,
71            request_counter,
72            request_duration,
73            error_counter,
74            active_connections,
75        })
76    }
77
78    /// Record a request with method, duration, and success status
79    pub fn record_request(&self, method: &str, duration: Duration, success: bool) {
80        // Limit cardinality by using a normalized method name
81        let normalized_method = Self::normalize_method(method);
82
83        self.request_counter
84            .with_label_values(&[normalized_method])
85            .inc();
86
87        self.request_duration
88            .with_label_values(&[normalized_method])
89            .observe(duration.as_secs_f64());
90
91        if !success {
92            self.error_counter
93                .with_label_values(&[normalized_method])
94                .inc();
95        }
96    }
97
98    /// Increment active connections count
99    pub fn connection_opened(&self) {
100        self.active_connections.inc();
101    }
102
103    /// Decrement active connections count
104    pub fn connection_closed(&self) {
105        self.active_connections.dec();
106    }
107
108    /// Get the Prometheus registry
109    #[must_use]
110    pub fn registry(&self) -> &Registry {
111        &self.registry
112    }
113
114    /// Gather metrics in Prometheus text format
115    ///
116    /// # Errors
117    /// Returns error if metric encoding fails
118    pub fn gather_text(&self) -> Result<String, prometheus::Error> {
119        use prometheus::TextEncoder;
120        let encoder = TextEncoder::new();
121        let metric_families = self.registry.gather();
122        let mut buffer = Vec::new();
123        encoder.encode(&metric_families, &mut buffer)?;
124        Ok(String::from_utf8_lossy(&buffer).to_string())
125    }
126
127    /// Normalize method name to prevent cardinality explosion
128    /// Keeps known methods as-is, groups unknown methods as "other"
129    fn normalize_method(method: &str) -> &str {
130        // Common RPC methods - extend as needed
131        const KNOWN_METHODS: &[&str] = &[
132            "ping",
133            "echo",
134            "add",
135            "subtract",
136            "multiply",
137            "divide",
138            "healthcheck",
139            "get_metrics",
140            "get_health",
141        ];
142
143        if KNOWN_METHODS.contains(&method) {
144            method
145        } else {
146            "other"
147        }
148    }
149}
150
151impl Default for PrometheusMetrics {
152    #[allow(clippy::panic)] // Fatal configuration error, should panic
153    fn default() -> Self {
154        Self::new().unwrap_or_else(|e| panic!("Failed to create default PrometheusMetrics: {e}"))
155    }
156}
157
158/// Builder for creating Prometheus metrics with custom configuration
159pub struct PrometheusMetricsBuilder {
160    prefix: String,
161    known_methods: Vec<String>,
162}
163
164impl PrometheusMetricsBuilder {
165    /// Create a new builder with default prefix
166    #[must_use]
167    pub fn new() -> Self {
168        Self {
169            prefix: "jsonrpc".to_owned(),
170            known_methods: vec![
171                "ping".to_owned(),
172                "echo".to_owned(),
173                "healthcheck".to_owned(),
174            ],
175        }
176    }
177
178    /// Set custom metric prefix
179    #[must_use]
180    pub fn prefix(mut self, prefix: impl Into<String>) -> Self {
181        self.prefix = prefix.into();
182        self
183    }
184
185    /// Add known method names for cardinality control
186    #[must_use]
187    pub fn add_known_method(mut self, method: impl Into<String>) -> Self {
188        self.known_methods.push(method.into());
189        self
190    }
191
192    /// Build the metrics collector
193    ///
194    /// # Errors
195    /// Returns error if metrics collector creation fails
196    pub fn build(self) -> Result<PrometheusMetrics, prometheus::Error> {
197        PrometheusMetrics::with_prefix(&self.prefix)
198    }
199}
200
201impl Default for PrometheusMetricsBuilder {
202    fn default() -> Self {
203        Self::new()
204    }
205}
206
207/// RPC method handler that exposes metrics in Prometheus format
208pub fn get_metrics_method(
209    metrics: Arc<PrometheusMetrics>,
210) -> impl Fn(Option<serde_json::Value>, Option<crate::RequestId>) -> crate::Response {
211    move |_params, id| match metrics.gather_text() {
212        Ok(text) => crate::rpc_success!(text, id),
213        Err(e) => crate::rpc_error!(
214            crate::error_codes::INTERNAL_ERROR,
215            format!("Failed to gather metrics: {}", e),
216            id
217        ),
218    }
219}
220
221/// Enhanced health check that includes basic metrics
222pub fn get_health_method(
223    metrics: Arc<PrometheusMetrics>,
224) -> impl Fn(Option<serde_json::Value>, Option<crate::RequestId>) -> crate::Response {
225    move |_params, id| {
226        let health = serde_json::json!({
227            "status": "ok",
228            "active_connections": metrics.active_connections.get(),
229        });
230        crate::rpc_success!(health, id)
231    }
232}
233
234#[cfg(test)]
235mod tests {
236    use super::*;
237
238    #[test]
239    fn test_prometheus_metrics_creation() {
240        let metrics = PrometheusMetrics::new().unwrap();
241        assert!(!metrics.registry().gather().is_empty());
242    }
243
244    #[test]
245    fn test_record_request() {
246        let metrics = PrometheusMetrics::new().unwrap();
247        metrics.record_request("ping", Duration::from_millis(10), true);
248        metrics.record_request("echo", Duration::from_millis(20), false);
249
250        let text = metrics.gather_text().unwrap();
251        assert!(text.contains("jsonrpc_requests_total"));
252        assert!(text.contains("jsonrpc_request_duration_seconds"));
253        assert!(text.contains("jsonrpc_errors_total"));
254    }
255
256    #[test]
257    fn test_connection_tracking() {
258        let metrics = PrometheusMetrics::new().unwrap();
259        assert_eq!(metrics.active_connections.get(), 0);
260
261        metrics.connection_opened();
262        assert_eq!(metrics.active_connections.get(), 1);
263
264        metrics.connection_opened();
265        assert_eq!(metrics.active_connections.get(), 2);
266
267        metrics.connection_closed();
268        assert_eq!(metrics.active_connections.get(), 1);
269    }
270
271    #[test]
272    fn test_custom_prefix() {
273        let metrics = PrometheusMetrics::with_prefix("custom").unwrap();
274        // Record a request to ensure metrics appear in output
275        metrics.record_request("ping", Duration::from_millis(10), true);
276        let text = metrics.gather_text().unwrap();
277        assert!(text.contains("custom_requests_total"));
278    }
279}