use std::fmt::Write;
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use dashmap::DashMap;
pub const DEFAULT_LATENCY_BUCKETS_SECS: &[f64] = &[
0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
];
const N_BUCKETS: usize = 12;
pub struct ServiceMetricsAtomic {
pub calls_total: AtomicU64,
pub errors_no_route: AtomicU64,
pub errors_timeout: AtomicU64,
pub errors_server: AtomicU64,
pub errors_transport: AtomicU64,
pub in_flight: AtomicI64,
pub latency_sum_ns: AtomicU64,
pub latency_count: AtomicU64,
pub latency_buckets: [AtomicU64; N_BUCKETS],
pub handler_invocations_total: AtomicU64,
pub handler_panics_total: AtomicU64,
pub handler_in_flight: AtomicI64,
pub handler_duration_sum_ns: AtomicU64,
pub handler_duration_count: AtomicU64,
pub handler_duration_buckets: [AtomicU64; N_BUCKETS],
pub streaming_chunks_emitted_total: AtomicU64,
pub streaming_chunks_dropped_total: AtomicU64,
pub capability_denied_total: AtomicU64,
}
impl ServiceMetricsAtomic {
fn new() -> Self {
Self {
calls_total: AtomicU64::new(0),
errors_no_route: AtomicU64::new(0),
errors_timeout: AtomicU64::new(0),
errors_server: AtomicU64::new(0),
errors_transport: AtomicU64::new(0),
in_flight: AtomicI64::new(0),
latency_sum_ns: AtomicU64::new(0),
latency_count: AtomicU64::new(0),
latency_buckets: Default::default(),
handler_invocations_total: AtomicU64::new(0),
handler_panics_total: AtomicU64::new(0),
handler_in_flight: AtomicI64::new(0),
handler_duration_sum_ns: AtomicU64::new(0),
handler_duration_count: AtomicU64::new(0),
handler_duration_buckets: Default::default(),
streaming_chunks_emitted_total: AtomicU64::new(0),
streaming_chunks_dropped_total: AtomicU64::new(0),
capability_denied_total: AtomicU64::new(0),
}
}
pub(super) fn record_latency(&self, elapsed: Duration) {
record_into_histogram(
elapsed,
&self.latency_sum_ns,
&self.latency_count,
&self.latency_buckets,
);
}
pub fn record_handler_duration(&self, elapsed: Duration) {
record_into_histogram(
elapsed,
&self.handler_duration_sum_ns,
&self.handler_duration_count,
&self.handler_duration_buckets,
);
}
}
fn record_into_histogram(
elapsed: Duration,
sum_ns: &AtomicU64,
count: &AtomicU64,
buckets: &[AtomicU64; N_BUCKETS],
) {
let ns = elapsed.as_nanos() as u64;
sum_ns.fetch_add(ns, Ordering::Relaxed);
count.fetch_add(1, Ordering::Relaxed);
let secs = ns as f64 / 1.0e9_f64;
for (i, le) in DEFAULT_LATENCY_BUCKETS_SECS.iter().enumerate() {
if secs <= *le {
buckets[i].fetch_add(1, Ordering::Relaxed);
}
}
buckets[N_BUCKETS - 1].fetch_add(1, Ordering::Relaxed);
}
#[derive(Debug, Clone, Copy)]
pub(super) enum CallOutcome {
Ok,
NoRoute,
Timeout,
ServerError,
Transport,
}
pub const MAX_TRACKED_SERVICES: usize = 4096;
pub const OVERFLOW_SERVICE_LABEL: &str = "__overflow__";
pub struct RpcMetricsRegistry {
services: DashMap<String, Arc<ServiceMetricsAtomic>>,
}
impl Default for RpcMetricsRegistry {
fn default() -> Self {
Self::new()
}
}
impl RpcMetricsRegistry {
pub fn new() -> Self {
Self {
services: DashMap::new(),
}
}
pub(crate) fn for_service(&self, service: &str) -> Arc<ServiceMetricsAtomic> {
if let Some(m) = self.services.get(service) {
return m.clone();
}
if self.services.len() >= MAX_TRACKED_SERVICES && !self.services.contains_key(service) {
return self
.services
.entry(OVERFLOW_SERVICE_LABEL.to_string())
.or_insert_with(|| Arc::new(ServiceMetricsAtomic::new()))
.clone();
}
self.services
.entry(service.to_string())
.or_insert_with(|| Arc::new(ServiceMetricsAtomic::new()))
.clone()
}
pub fn snapshot(&self) -> RpcMetricsSnapshot {
let mut services = Vec::with_capacity(self.services.len());
for entry in self.services.iter() {
let m = entry.value();
let mut buckets = Vec::with_capacity(N_BUCKETS);
for b in &m.latency_buckets {
buckets.push(b.load(Ordering::Relaxed));
}
let mut handler_buckets = Vec::with_capacity(N_BUCKETS);
for b in &m.handler_duration_buckets {
handler_buckets.push(b.load(Ordering::Relaxed));
}
services.push(ServiceMetrics {
service: entry.key().clone(),
calls_total: m.calls_total.load(Ordering::Relaxed),
errors_no_route: m.errors_no_route.load(Ordering::Relaxed),
errors_timeout: m.errors_timeout.load(Ordering::Relaxed),
errors_server: m.errors_server.load(Ordering::Relaxed),
errors_transport: m.errors_transport.load(Ordering::Relaxed),
in_flight: m.in_flight.load(Ordering::Relaxed),
latency_sum_ns: m.latency_sum_ns.load(Ordering::Relaxed),
latency_count: m.latency_count.load(Ordering::Relaxed),
latency_buckets: buckets,
handler_invocations_total: m.handler_invocations_total.load(Ordering::Relaxed),
handler_panics_total: m.handler_panics_total.load(Ordering::Relaxed),
handler_in_flight: m.handler_in_flight.load(Ordering::Relaxed),
handler_duration_sum_ns: m.handler_duration_sum_ns.load(Ordering::Relaxed),
handler_duration_count: m.handler_duration_count.load(Ordering::Relaxed),
handler_duration_buckets: handler_buckets,
streaming_chunks_emitted_total: m
.streaming_chunks_emitted_total
.load(Ordering::Relaxed),
streaming_chunks_dropped_total: m
.streaming_chunks_dropped_total
.load(Ordering::Relaxed),
capability_denied_total: m.capability_denied_total.load(Ordering::Relaxed),
});
}
services.sort_by(|a, b| a.service.cmp(&b.service));
RpcMetricsSnapshot { services }
}
}
#[derive(Debug, Clone)]
pub struct RpcMetricsSnapshot {
pub services: Vec<ServiceMetrics>,
}
#[derive(Debug, Clone)]
pub struct ServiceMetrics {
pub service: String,
pub calls_total: u64,
pub errors_no_route: u64,
pub errors_timeout: u64,
pub errors_server: u64,
pub errors_transport: u64,
pub in_flight: i64,
pub latency_sum_ns: u64,
pub latency_count: u64,
pub latency_buckets: Vec<u64>,
pub handler_invocations_total: u64,
pub handler_panics_total: u64,
pub handler_in_flight: i64,
pub handler_duration_sum_ns: u64,
pub handler_duration_count: u64,
pub handler_duration_buckets: Vec<u64>,
pub streaming_chunks_emitted_total: u64,
pub streaming_chunks_dropped_total: u64,
pub capability_denied_total: u64,
}
impl RpcMetricsSnapshot {
pub fn prometheus_text(&self) -> String {
let mut out = String::with_capacity(2048);
out.push_str(
"# HELP nrpc_calls_total Total nRPC calls that resolved (success or error).\n",
);
out.push_str("# TYPE nrpc_calls_total counter\n");
for s in &self.services {
let _ = writeln!(
out,
"nrpc_calls_total{{service=\"{}\"}} {}",
escape_label(&s.service),
s.calls_total
);
}
out.push_str("# HELP nrpc_errors_total nRPC call failures, partitioned by error kind.\n");
out.push_str("# TYPE nrpc_errors_total counter\n");
for s in &self.services {
let svc = escape_label(&s.service);
let _ = writeln!(
out,
"nrpc_errors_total{{service=\"{svc}\",kind=\"no_route\"}} {}",
s.errors_no_route
);
let _ = writeln!(
out,
"nrpc_errors_total{{service=\"{svc}\",kind=\"timeout\"}} {}",
s.errors_timeout
);
let _ = writeln!(
out,
"nrpc_errors_total{{service=\"{svc}\",kind=\"server\"}} {}",
s.errors_server
);
let _ = writeln!(
out,
"nrpc_errors_total{{service=\"{svc}\",kind=\"transport\"}} {}",
s.errors_transport
);
}
out.push_str("# HELP nrpc_in_flight_calls Currently-in-flight nRPC calls.\n");
out.push_str("# TYPE nrpc_in_flight_calls gauge\n");
for s in &self.services {
let _ = writeln!(
out,
"nrpc_in_flight_calls{{service=\"{}\"}} {}",
escape_label(&s.service),
s.in_flight.max(0),
);
}
out.push_str("# HELP nrpc_call_latency_seconds Wall-clock nRPC call latency in seconds.\n");
out.push_str("# TYPE nrpc_call_latency_seconds histogram\n");
for s in &self.services {
let svc = escape_label(&s.service);
for (i, le) in DEFAULT_LATENCY_BUCKETS_SECS.iter().enumerate() {
let _ = writeln!(
out,
"nrpc_call_latency_seconds_bucket{{service=\"{svc}\",le=\"{le}\"}} {}",
s.latency_buckets.get(i).copied().unwrap_or(0)
);
}
let _ = writeln!(
out,
"nrpc_call_latency_seconds_bucket{{service=\"{svc}\",le=\"+Inf\"}} {}",
s.latency_buckets.last().copied().unwrap_or(0)
);
let _ = writeln!(
out,
"nrpc_call_latency_seconds_sum{{service=\"{svc}\"}} {}",
s.latency_sum_ns as f64 / 1.0e9_f64
);
let _ = writeln!(
out,
"nrpc_call_latency_seconds_count{{service=\"{svc}\"}} {}",
s.latency_count
);
}
out.push_str(
"# HELP nrpc_handler_invocations_total Total nRPC handler invocations on this node.\n",
);
out.push_str("# TYPE nrpc_handler_invocations_total counter\n");
for s in &self.services {
let _ = writeln!(
out,
"nrpc_handler_invocations_total{{service=\"{}\"}} {}",
escape_label(&s.service),
s.handler_invocations_total
);
}
out.push_str(
"# HELP nrpc_handler_panics_total Handler panics caught by the fold's catch_unwind.\n",
);
out.push_str("# TYPE nrpc_handler_panics_total counter\n");
for s in &self.services {
let _ = writeln!(
out,
"nrpc_handler_panics_total{{service=\"{}\"}} {}",
escape_label(&s.service),
s.handler_panics_total
);
}
out.push_str(
"# HELP nrpc_handler_in_flight Currently-running handler tasks for this service.\n",
);
out.push_str("# TYPE nrpc_handler_in_flight gauge\n");
for s in &self.services {
let _ = writeln!(
out,
"nrpc_handler_in_flight{{service=\"{}\"}} {}",
escape_label(&s.service),
s.handler_in_flight.max(0),
);
}
out.push_str(
"# HELP nrpc_handler_duration_seconds Server-side handler wall-clock duration (excludes network).\n",
);
out.push_str("# TYPE nrpc_handler_duration_seconds histogram\n");
for s in &self.services {
let svc = escape_label(&s.service);
for (i, le) in DEFAULT_LATENCY_BUCKETS_SECS.iter().enumerate() {
let _ = writeln!(
out,
"nrpc_handler_duration_seconds_bucket{{service=\"{svc}\",le=\"{le}\"}} {}",
s.handler_duration_buckets.get(i).copied().unwrap_or(0)
);
}
let _ = writeln!(
out,
"nrpc_handler_duration_seconds_bucket{{service=\"{svc}\",le=\"+Inf\"}} {}",
s.handler_duration_buckets.last().copied().unwrap_or(0)
);
let _ = writeln!(
out,
"nrpc_handler_duration_seconds_sum{{service=\"{svc}\"}} {}",
s.handler_duration_sum_ns as f64 / 1.0e9_f64
);
let _ = writeln!(
out,
"nrpc_handler_duration_seconds_count{{service=\"{svc}\"}} {}",
s.handler_duration_count
);
}
out.push_str(
"# HELP nrpc_streaming_chunks_emitted_total Total chunks emitted by streaming handlers via sink.send().\n",
);
out.push_str("# TYPE nrpc_streaming_chunks_emitted_total counter\n");
for s in &self.services {
let _ = writeln!(
out,
"nrpc_streaming_chunks_emitted_total{{service=\"{}\"}} {}",
escape_label(&s.service),
s.streaming_chunks_emitted_total
);
}
out.push_str(
"# HELP nrpc_streaming_chunks_dropped_total Streaming chunks dropped because the per-call pump mpsc was full (handler outpaced the publish path).\n",
);
out.push_str("# TYPE nrpc_streaming_chunks_dropped_total counter\n");
for s in &self.services {
let _ = writeln!(
out,
"nrpc_streaming_chunks_dropped_total{{service=\"{}\"}} {}",
escape_label(&s.service),
s.streaming_chunks_dropped_total
);
}
out.push_str(
"# HELP nrpc_capability_denied_total Inbound nRPC calls rejected by the callee-side capability-auth gate before handler invocation.\n",
);
out.push_str("# TYPE nrpc_capability_denied_total counter\n");
for s in &self.services {
let _ = writeln!(
out,
"nrpc_capability_denied_total{{service=\"{}\"}} {}",
escape_label(&s.service),
s.capability_denied_total
);
}
out
}
}
fn escape_label(s: &str) -> String {
let mut out = String::with_capacity(s.len());
for c in s.chars() {
match c {
'\\' => out.push_str("\\\\"),
'"' => out.push_str("\\\""),
'\n' => out.push_str("\\n"),
'\r' => out.push_str("\\r"),
other => out.push(other),
}
}
out
}
pub(super) struct CallMetricsGuard {
metrics: Arc<ServiceMetricsAtomic>,
started: std::time::Instant,
outcome: Option<CallOutcome>,
}
impl CallMetricsGuard {
pub(super) fn new(metrics: Arc<ServiceMetricsAtomic>) -> Self {
metrics.in_flight.fetch_add(1, Ordering::Relaxed);
Self {
metrics,
started: std::time::Instant::now(),
outcome: None,
}
}
pub(super) fn record(&mut self, outcome: CallOutcome) {
self.outcome = Some(outcome);
}
}
impl Drop for CallMetricsGuard {
fn drop(&mut self) {
self.metrics.in_flight.fetch_sub(1, Ordering::Relaxed);
if let Some(outcome) = self.outcome {
self.metrics.calls_total.fetch_add(1, Ordering::Relaxed);
self.metrics.record_latency(self.started.elapsed());
match outcome {
CallOutcome::Ok => {}
CallOutcome::NoRoute => {
self.metrics.errors_no_route.fetch_add(1, Ordering::Relaxed);
}
CallOutcome::Timeout => {
self.metrics.errors_timeout.fetch_add(1, Ordering::Relaxed);
}
CallOutcome::ServerError => {
self.metrics.errors_server.fetch_add(1, Ordering::Relaxed);
}
CallOutcome::Transport => {
self.metrics
.errors_transport
.fetch_add(1, Ordering::Relaxed);
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn registry_snapshot_round_trip() {
let r = RpcMetricsRegistry::new();
let m = r.for_service("echo");
m.calls_total.fetch_add(3, Ordering::Relaxed);
m.errors_timeout.fetch_add(1, Ordering::Relaxed);
m.record_latency(Duration::from_millis(7));
m.record_latency(Duration::from_millis(150));
m.record_latency(Duration::from_secs(3));
let snap = r.snapshot();
assert_eq!(snap.services.len(), 1);
let s = &snap.services[0];
assert_eq!(s.service, "echo");
assert_eq!(s.calls_total, 3);
assert_eq!(s.errors_timeout, 1);
assert_eq!(s.latency_count, 3);
assert_eq!(s.latency_buckets[0], 0, "no obs ≤ 5ms");
assert_eq!(s.latency_buckets[1], 1, "7ms ≤ 10ms");
assert_eq!(s.latency_buckets[5], 2, "7ms + 150ms ≤ 0.25s");
assert_eq!(s.latency_buckets[N_BUCKETS - 1], 3, "+Inf == count");
}
#[test]
fn prometheus_text_emits_canonical_metric_names() {
let r = RpcMetricsRegistry::new();
let m = r.for_service("echo");
m.calls_total.fetch_add(1, Ordering::Relaxed);
m.record_latency(Duration::from_millis(5));
m.handler_invocations_total.fetch_add(2, Ordering::Relaxed);
m.record_handler_duration(Duration::from_millis(3));
let text = r.snapshot().prometheus_text();
assert!(text.contains("nrpc_calls_total"));
assert!(text.contains("nrpc_errors_total"));
assert!(text.contains("nrpc_in_flight_calls"));
assert!(text.contains("nrpc_call_latency_seconds_bucket"));
assert!(text.contains("nrpc_call_latency_seconds_sum"));
assert!(text.contains("nrpc_call_latency_seconds_count"));
assert!(text.contains("nrpc_handler_invocations_total"));
assert!(text.contains("nrpc_handler_panics_total"));
assert!(text.contains("nrpc_handler_in_flight"));
assert!(text.contains("nrpc_handler_duration_seconds_bucket"));
assert!(text.contains("nrpc_streaming_chunks_emitted_total"));
assert!(text.contains("nrpc_capability_denied_total"));
assert!(text.contains("le=\"+Inf\""));
}
#[test]
fn capability_denied_counter_surfaces_through_snapshot_and_prometheus() {
let r = RpcMetricsRegistry::new();
let m = r.for_service("locked");
m.capability_denied_total.fetch_add(2, Ordering::Relaxed);
let snap = r.snapshot();
assert_eq!(snap.services[0].capability_denied_total, 2);
let txt = snap.prometheus_text();
assert!(
txt.contains("nrpc_capability_denied_total{service=\"locked\"} 2"),
"prometheus output must surface the counter; got:\n{txt}",
);
}
#[test]
fn record_handler_duration_lands_in_buckets() {
let r = RpcMetricsRegistry::new();
let m = r.for_service("svc");
m.record_handler_duration(Duration::from_millis(7));
m.record_handler_duration(Duration::from_secs(3));
let snap = r.snapshot();
let s = &snap.services[0];
assert_eq!(s.handler_duration_count, 2);
assert_eq!(
*s.handler_duration_buckets.last().unwrap(),
2,
"+Inf bucket equals count",
);
assert_eq!(s.handler_duration_buckets[1], 1, "7ms ≤ 10ms");
assert_eq!(s.handler_duration_buckets[9], 2, "7ms + 3s both ≤ 5s");
}
#[test]
fn label_escaping() {
assert_eq!(escape_label("simple"), "simple");
assert_eq!(escape_label(r#"has"quote"#), r#"has\"quote"#);
assert_eq!(escape_label("has\\bs"), "has\\\\bs");
assert_eq!(escape_label("line1\nline2"), "line1\\nline2");
assert_eq!(escape_label("dos\r\nstyle"), "dos\\r\\nstyle");
}
#[test]
fn registry_caps_service_count_at_max_tracked_services() {
let reg = RpcMetricsRegistry::new();
for i in 0..MAX_TRACKED_SERVICES {
let _ = reg.for_service(&format!("svc-{i}"));
}
assert_eq!(reg.services.len(), MAX_TRACKED_SERVICES);
let m1 = reg.for_service("overflow-1");
let m2 = reg.for_service("overflow-2");
let m3 = reg.for_service("overflow-3");
assert!(
Arc::ptr_eq(&m1, &m2) && Arc::ptr_eq(&m2, &m3),
"overflow services must share the __overflow__ counter set",
);
assert_eq!(
reg.services.len(),
MAX_TRACKED_SERVICES + 1,
"registry size must never exceed MAX_TRACKED_SERVICES + 1",
);
let known = reg.for_service("svc-0");
assert!(
!Arc::ptr_eq(&known, &m1),
"known services keep their dedicated counters",
);
}
#[test]
fn prometheus_text_clamps_negative_gauge() {
let r = RpcMetricsRegistry::new();
let m = r.for_service("clamp");
m.in_flight.store(-3, Ordering::Relaxed);
m.handler_in_flight.store(-7, Ordering::Relaxed);
let snap = r.snapshot();
let txt = snap.prometheus_text();
assert!(
txt.contains("nrpc_in_flight_calls{service=\"clamp\"} 0"),
"must clamp negative caller-side gauge to 0; got:\n{txt}",
);
assert!(
txt.contains("nrpc_handler_in_flight{service=\"clamp\"} 0"),
"must clamp negative server-side gauge to 0; got:\n{txt}",
);
m.in_flight.store(5, Ordering::Relaxed);
let snap = r.snapshot();
let txt = snap.prometheus_text();
assert!(txt.contains("nrpc_in_flight_calls{service=\"clamp\"} 5"));
}
#[test]
fn guard_records_in_flight_and_outcome() {
let r = RpcMetricsRegistry::new();
let m = r.for_service("svc");
{
let mut g = CallMetricsGuard::new(m.clone());
assert_eq!(m.in_flight.load(Ordering::Relaxed), 1);
g.record(CallOutcome::Ok);
}
assert_eq!(m.in_flight.load(Ordering::Relaxed), 0);
assert_eq!(m.calls_total.load(Ordering::Relaxed), 1);
assert_eq!(m.latency_count.load(Ordering::Relaxed), 1);
}
#[test]
fn guard_dropped_without_outcome_balances_in_flight_only() {
let r = RpcMetricsRegistry::new();
let m = r.for_service("dropped");
{
let _g = CallMetricsGuard::new(m.clone());
assert_eq!(m.in_flight.load(Ordering::Relaxed), 1);
}
assert_eq!(m.in_flight.load(Ordering::Relaxed), 0, "in_flight balanced");
assert_eq!(
m.calls_total.load(Ordering::Relaxed),
0,
"no outcome recorded"
);
assert_eq!(m.latency_count.load(Ordering::Relaxed), 0);
}
}