Skip to main content

chainrpc_core/
metrics.rs

1//! Metrics and observability for RPC providers.
2//!
3//! Provides lock-free, atomic counters for request success/failure rates,
4//! latency tracking, rate-limit hits, and circuit-breaker events.
5//!
6//! # Design
7//!
8//! - [`ProviderMetrics`] tracks per-provider counters using `AtomicU64`.
9//! - [`MetricsSnapshot`] is an immutable, serializable point-in-time snapshot.
10//! - [`RpcMetrics`] aggregates metrics across multiple providers.
11
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::time::Duration;
14
15use serde::Serialize;
16
17// ---------------------------------------------------------------------------
18// Per-provider metrics
19// ---------------------------------------------------------------------------
20
21/// Atomic counters for a single RPC provider endpoint.
22pub struct ProviderMetrics {
23    /// Provider URL or identifier.
24    url: String,
25    /// Total requests sent (success + failure).
26    total_requests: AtomicU64,
27    /// Requests that completed successfully.
28    successful_requests: AtomicU64,
29    /// Requests that failed (transport error, timeout, etc.).
30    failed_requests: AtomicU64,
31    /// Cumulative latency in microseconds (for averaging).
32    total_latency_us: AtomicU64,
33    /// Minimum observed latency in microseconds.
34    min_latency_us: AtomicU64,
35    /// Maximum observed latency in microseconds.
36    max_latency_us: AtomicU64,
37    /// Number of times a request was rejected by the rate limiter.
38    rate_limit_hits: AtomicU64,
39    /// Number of times the circuit breaker opened.
40    circuit_open_count: AtomicU64,
41}
42
43impl ProviderMetrics {
44    /// Create a new metrics instance for the given provider URL.
45    pub fn new(url: impl Into<String>) -> Self {
46        Self {
47            url: url.into(),
48            total_requests: AtomicU64::new(0),
49            successful_requests: AtomicU64::new(0),
50            failed_requests: AtomicU64::new(0),
51            total_latency_us: AtomicU64::new(0),
52            min_latency_us: AtomicU64::new(u64::MAX),
53            max_latency_us: AtomicU64::new(0),
54            rate_limit_hits: AtomicU64::new(0),
55            circuit_open_count: AtomicU64::new(0),
56        }
57    }
58
59    /// Record a successful request with the given latency.
60    pub fn record_success(&self, latency: Duration) {
61        self.total_requests.fetch_add(1, Ordering::Relaxed);
62        self.successful_requests.fetch_add(1, Ordering::Relaxed);
63
64        let us = latency.as_micros() as u64;
65        self.total_latency_us.fetch_add(us, Ordering::Relaxed);
66        self.update_min_latency(us);
67        self.update_max_latency(us);
68    }
69
70    /// Record a failed request.
71    pub fn record_failure(&self) {
72        self.total_requests.fetch_add(1, Ordering::Relaxed);
73        self.failed_requests.fetch_add(1, Ordering::Relaxed);
74    }
75
76    /// Record a rate-limit rejection.
77    pub fn record_rate_limit(&self) {
78        self.rate_limit_hits.fetch_add(1, Ordering::Relaxed);
79    }
80
81    /// Record the circuit breaker opening.
82    pub fn record_circuit_open(&self) {
83        self.circuit_open_count.fetch_add(1, Ordering::Relaxed);
84    }
85
86    /// Compute the average latency across all successful requests.
87    ///
88    /// Returns `Duration::ZERO` if no successful requests have been recorded.
89    pub fn avg_latency(&self) -> Duration {
90        let total = self.total_latency_us.load(Ordering::Relaxed);
91        let count = self.successful_requests.load(Ordering::Relaxed);
92        if count == 0 {
93            return Duration::ZERO;
94        }
95        Duration::from_micros(total / count)
96    }
97
98    /// Compute the success rate as a fraction in `[0.0, 1.0]`.
99    ///
100    /// Returns `1.0` if no requests have been made.
101    pub fn success_rate(&self) -> f64 {
102        let total = self.total_requests.load(Ordering::Relaxed);
103        if total == 0 {
104            return 1.0;
105        }
106        let successes = self.successful_requests.load(Ordering::Relaxed);
107        successes as f64 / total as f64
108    }
109
110    /// Return the provider URL.
111    pub fn url(&self) -> &str {
112        &self.url
113    }
114
115    /// Produce an immutable snapshot for reporting / serialization.
116    pub fn snapshot(&self) -> MetricsSnapshot {
117        let total = self.total_requests.load(Ordering::Relaxed);
118        let successful = self.successful_requests.load(Ordering::Relaxed);
119        let failed = self.failed_requests.load(Ordering::Relaxed);
120        let total_latency = self.total_latency_us.load(Ordering::Relaxed);
121        let min_us = self.min_latency_us.load(Ordering::Relaxed);
122        let max_us = self.max_latency_us.load(Ordering::Relaxed);
123
124        let avg_latency_ms = if successful > 0 {
125            (total_latency as f64 / successful as f64) / 1000.0
126        } else {
127            0.0
128        };
129
130        let min_latency_ms = if min_us == u64::MAX {
131            0.0
132        } else {
133            min_us as f64 / 1000.0
134        };
135
136        let max_latency_ms = max_us as f64 / 1000.0;
137
138        let success_rate = if total > 0 {
139            successful as f64 / total as f64
140        } else {
141            1.0
142        };
143
144        MetricsSnapshot {
145            url: self.url.clone(),
146            total_requests: total,
147            successful_requests: successful,
148            failed_requests: failed,
149            avg_latency_ms,
150            min_latency_ms,
151            max_latency_ms,
152            rate_limit_hits: self.rate_limit_hits.load(Ordering::Relaxed),
153            circuit_open_count: self.circuit_open_count.load(Ordering::Relaxed),
154            success_rate,
155        }
156    }
157
158    // -- internal helpers ---------------------------------------------------
159
160    /// Atomically update `min_latency_us` if `us` is smaller.
161    fn update_min_latency(&self, us: u64) {
162        let mut current = self.min_latency_us.load(Ordering::Relaxed);
163        while us < current {
164            match self.min_latency_us.compare_exchange_weak(
165                current,
166                us,
167                Ordering::Relaxed,
168                Ordering::Relaxed,
169            ) {
170                Ok(_) => break,
171                Err(actual) => current = actual,
172            }
173        }
174    }
175
176    /// Atomically update `max_latency_us` if `us` is larger.
177    fn update_max_latency(&self, us: u64) {
178        let mut current = self.max_latency_us.load(Ordering::Relaxed);
179        while us > current {
180            match self.max_latency_us.compare_exchange_weak(
181                current,
182                us,
183                Ordering::Relaxed,
184                Ordering::Relaxed,
185            ) {
186                Ok(_) => break,
187                Err(actual) => current = actual,
188            }
189        }
190    }
191}
192
193impl std::fmt::Debug for ProviderMetrics {
194    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195        f.debug_struct("ProviderMetrics")
196            .field("url", &self.url)
197            .field(
198                "total_requests",
199                &self.total_requests.load(Ordering::Relaxed),
200            )
201            .field("success_rate", &self.success_rate())
202            .field("avg_latency", &self.avg_latency())
203            .finish()
204    }
205}
206
207// ---------------------------------------------------------------------------
208// Snapshot
209// ---------------------------------------------------------------------------
210
211/// An immutable, serializable point-in-time snapshot of provider metrics.
212#[derive(Debug, Clone, Serialize)]
213pub struct MetricsSnapshot {
214    /// Provider URL or identifier.
215    pub url: String,
216    /// Total requests sent.
217    pub total_requests: u64,
218    /// Number of successful requests.
219    pub successful_requests: u64,
220    /// Number of failed requests.
221    pub failed_requests: u64,
222    /// Average latency in milliseconds.
223    pub avg_latency_ms: f64,
224    /// Minimum observed latency in milliseconds.
225    pub min_latency_ms: f64,
226    /// Maximum observed latency in milliseconds.
227    pub max_latency_ms: f64,
228    /// Number of rate-limit rejections.
229    pub rate_limit_hits: u64,
230    /// Number of circuit-breaker opens.
231    pub circuit_open_count: u64,
232    /// Success rate as a fraction in [0.0, 1.0].
233    pub success_rate: f64,
234}
235
236impl MetricsSnapshot {
237    /// Format metrics in Prometheus exposition format.
238    ///
239    /// Each metric is prefixed with `chainrpc_` and labeled with `provider="<url>"`.
240    pub fn to_prometheus(&self) -> String {
241        let label = format!("provider=\"{}\"", self.url.replace('"', "\\\""));
242        let mut out = String::new();
243
244        out.push_str(&format!(
245            "chainrpc_requests_total{{{label}}} {}\n",
246            self.total_requests
247        ));
248        out.push_str(&format!(
249            "chainrpc_requests_successful_total{{{label}}} {}\n",
250            self.successful_requests
251        ));
252        out.push_str(&format!(
253            "chainrpc_requests_failed_total{{{label}}} {}\n",
254            self.failed_requests
255        ));
256        out.push_str(&format!(
257            "chainrpc_latency_avg_ms{{{label}}} {:.3}\n",
258            self.avg_latency_ms
259        ));
260        out.push_str(&format!(
261            "chainrpc_latency_min_ms{{{label}}} {:.3}\n",
262            self.min_latency_ms
263        ));
264        out.push_str(&format!(
265            "chainrpc_latency_max_ms{{{label}}} {:.3}\n",
266            self.max_latency_ms
267        ));
268        out.push_str(&format!(
269            "chainrpc_rate_limit_hits_total{{{label}}} {}\n",
270            self.rate_limit_hits
271        ));
272        out.push_str(&format!(
273            "chainrpc_circuit_open_total{{{label}}} {}\n",
274            self.circuit_open_count
275        ));
276        out.push_str(&format!(
277            "chainrpc_success_rate{{{label}}} {:.4}\n",
278            self.success_rate
279        ));
280
281        out
282    }
283}
284
285// ---------------------------------------------------------------------------
286// Aggregated metrics
287// ---------------------------------------------------------------------------
288
289/// Aggregated metrics across all RPC providers.
290pub struct RpcMetrics {
291    providers: Vec<ProviderMetrics>,
292}
293
294impl RpcMetrics {
295    /// Create a new (empty) metrics aggregator.
296    pub fn new() -> Self {
297        Self {
298            providers: Vec::new(),
299        }
300    }
301
302    /// Register a provider and return a reference to its metrics.
303    pub fn add_provider(&mut self, url: impl Into<String>) -> &ProviderMetrics {
304        self.providers.push(ProviderMetrics::new(url));
305        self.providers.last().unwrap()
306    }
307
308    /// Produce snapshots for all registered providers.
309    pub fn snapshot_all(&self) -> Vec<MetricsSnapshot> {
310        self.providers.iter().map(|p| p.snapshot()).collect()
311    }
312
313    /// Total requests across all providers.
314    pub fn total_requests(&self) -> u64 {
315        self.providers
316            .iter()
317            .map(|p| p.total_requests.load(Ordering::Relaxed))
318            .sum()
319    }
320
321    /// Number of registered providers.
322    pub fn provider_count(&self) -> usize {
323        self.providers.len()
324    }
325
326    /// Format all provider metrics in Prometheus exposition format.
327    pub fn to_prometheus(&self) -> String {
328        let mut out = String::with_capacity(512);
329        out.push_str("# HELP chainrpc_requests_total Total RPC requests per provider.\n");
330        out.push_str("# TYPE chainrpc_requests_total counter\n");
331        out.push_str("# HELP chainrpc_latency_avg_ms Average request latency in milliseconds.\n");
332        out.push_str("# TYPE chainrpc_latency_avg_ms gauge\n");
333        for snap in self.snapshot_all() {
334            out.push_str(&snap.to_prometheus());
335        }
336        out
337    }
338}
339
340impl Default for RpcMetrics {
341    fn default() -> Self {
342        Self::new()
343    }
344}
345
346impl std::fmt::Debug for RpcMetrics {
347    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
348        f.debug_struct("RpcMetrics")
349            .field("provider_count", &self.providers.len())
350            .field("total_requests", &self.total_requests())
351            .finish()
352    }
353}
354
355// ---------------------------------------------------------------------------
356// Tests
357// ---------------------------------------------------------------------------
358
359#[cfg(test)]
360mod tests {
361    use super::*;
362
363    #[test]
364    fn record_success_updates_counters() {
365        let m = ProviderMetrics::new("https://rpc.example.com");
366        m.record_success(Duration::from_millis(50));
367        m.record_success(Duration::from_millis(150));
368
369        assert_eq!(m.total_requests.load(Ordering::Relaxed), 2);
370        assert_eq!(m.successful_requests.load(Ordering::Relaxed), 2);
371        assert_eq!(m.failed_requests.load(Ordering::Relaxed), 0);
372    }
373
374    #[test]
375    fn record_failure_updates_counters() {
376        let m = ProviderMetrics::new("https://rpc.example.com");
377        m.record_success(Duration::from_millis(10));
378        m.record_failure();
379        m.record_failure();
380
381        assert_eq!(m.total_requests.load(Ordering::Relaxed), 3);
382        assert_eq!(m.successful_requests.load(Ordering::Relaxed), 1);
383        assert_eq!(m.failed_requests.load(Ordering::Relaxed), 2);
384    }
385
386    #[test]
387    fn avg_latency_calculation() {
388        let m = ProviderMetrics::new("https://rpc.example.com");
389        m.record_success(Duration::from_millis(100));
390        m.record_success(Duration::from_millis(200));
391
392        let avg = m.avg_latency();
393        // Average should be 150ms.
394        assert!(
395            avg >= Duration::from_millis(140) && avg <= Duration::from_millis(160),
396            "unexpected avg latency: {avg:?}"
397        );
398    }
399
400    #[test]
401    fn avg_latency_zero_when_no_requests() {
402        let m = ProviderMetrics::new("https://rpc.example.com");
403        assert_eq!(m.avg_latency(), Duration::ZERO);
404    }
405
406    #[test]
407    fn success_rate_calculation() {
408        let m = ProviderMetrics::new("https://rpc.example.com");
409        m.record_success(Duration::from_millis(10));
410        m.record_success(Duration::from_millis(10));
411        m.record_failure();
412
413        let rate = m.success_rate();
414        // 2 out of 3 = 0.6667
415        assert!(
416            (rate - 2.0 / 3.0).abs() < 0.001,
417            "unexpected success rate: {rate}"
418        );
419    }
420
421    #[test]
422    fn success_rate_defaults_to_one() {
423        let m = ProviderMetrics::new("https://rpc.example.com");
424        assert_eq!(m.success_rate(), 1.0);
425    }
426
427    #[test]
428    fn min_max_latency_tracked() {
429        let m = ProviderMetrics::new("https://rpc.example.com");
430        m.record_success(Duration::from_millis(50));
431        m.record_success(Duration::from_millis(200));
432        m.record_success(Duration::from_millis(10));
433
434        let snap = m.snapshot();
435        assert!(
436            snap.min_latency_ms >= 9.0 && snap.min_latency_ms <= 11.0,
437            "unexpected min: {}",
438            snap.min_latency_ms
439        );
440        assert!(
441            snap.max_latency_ms >= 199.0 && snap.max_latency_ms <= 201.0,
442            "unexpected max: {}",
443            snap.max_latency_ms
444        );
445    }
446
447    #[test]
448    fn snapshot_serialization() {
449        let m = ProviderMetrics::new("https://rpc.example.com");
450        m.record_success(Duration::from_millis(100));
451        m.record_failure();
452        m.record_rate_limit();
453        m.record_circuit_open();
454
455        let snap = m.snapshot();
456        let json = serde_json::to_string(&snap).unwrap();
457
458        assert!(json.contains("\"url\":\"https://rpc.example.com\""));
459        assert!(json.contains("\"total_requests\":2"));
460        assert!(json.contains("\"successful_requests\":1"));
461        assert!(json.contains("\"failed_requests\":1"));
462        assert!(json.contains("\"rate_limit_hits\":1"));
463        assert!(json.contains("\"circuit_open_count\":1"));
464        assert!(json.contains("\"success_rate\":0.5"));
465    }
466
467    #[test]
468    fn rate_limit_and_circuit_open_counts() {
469        let m = ProviderMetrics::new("https://rpc.example.com");
470        m.record_rate_limit();
471        m.record_rate_limit();
472        m.record_rate_limit();
473        m.record_circuit_open();
474
475        assert_eq!(m.rate_limit_hits.load(Ordering::Relaxed), 3);
476        assert_eq!(m.circuit_open_count.load(Ordering::Relaxed), 1);
477    }
478
479    #[test]
480    fn rpc_metrics_aggregated() {
481        let mut metrics = RpcMetrics::new();
482        let p1 = metrics.add_provider("https://a.com") as *const ProviderMetrics;
483        let p2 = metrics.add_provider("https://b.com") as *const ProviderMetrics;
484
485        // Safety: we just created these; they're valid for the lifetime of `metrics`.
486        unsafe {
487            (*p1).record_success(Duration::from_millis(10));
488            (*p1).record_success(Duration::from_millis(20));
489            (*p2).record_failure();
490        }
491
492        assert_eq!(metrics.total_requests(), 3);
493        assert_eq!(metrics.provider_count(), 2);
494
495        let snaps = metrics.snapshot_all();
496        assert_eq!(snaps.len(), 2);
497        assert_eq!(snaps[0].url, "https://a.com");
498        assert_eq!(snaps[0].successful_requests, 2);
499        assert_eq!(snaps[1].url, "https://b.com");
500        assert_eq!(snaps[1].failed_requests, 1);
501    }
502
503    #[test]
504    fn prometheus_export() {
505        let m = ProviderMetrics::new("https://rpc.example.com");
506        m.record_success(Duration::from_millis(100));
507        m.record_failure();
508        let snap = m.snapshot();
509        let prom = snap.to_prometheus();
510        assert!(prom.contains("chainrpc_requests_total{provider=\"https://rpc.example.com\"} 2"));
511        assert!(prom.contains("chainrpc_requests_successful_total"));
512        assert!(prom.contains("chainrpc_requests_failed_total"));
513        assert!(prom.contains("chainrpc_latency_avg_ms"));
514        assert!(prom.contains("chainrpc_success_rate"));
515    }
516
517    #[test]
518    fn rpc_metrics_default() {
519        let metrics = RpcMetrics::default();
520        assert_eq!(metrics.provider_count(), 0);
521        assert_eq!(metrics.total_requests(), 0);
522    }
523}