Skip to main content

rmcp_server_kit/
metrics.rs

1//! Prometheus metrics for MCP servers.
2//!
3//! Provides a shared [`crate::metrics::McpMetrics`] registry with standard HTTP counters.
4//! The transport layer exposes these via a `/metrics` endpoint on a
5//! dedicated listener when `metrics_enabled` is true.
6//!
7//! # Public surface and the `prometheus` crate
8//!
9//! [`crate::metrics::McpMetrics::registry`] and the `IntCounterVec` / `HistogramVec` fields are
10//! intentionally exposed so downstream crates can register additional custom
11//! collectors against the same registry. This re-exports the [`prometheus`]
12//! crate types as part of `rmcp-server-kit`'s public API; pin the same major version to
13//! avoid type-identity mismatches when registering custom metrics.
14
15use std::sync::Arc;
16
17use prometheus::{
18    Encoder, HistogramOpts, HistogramVec, IntCounterVec, Registry, TextEncoder, opts,
19};
20
21use crate::error::McpxError;
22
23/// Default Prometheus histogram buckets for HTTP request latency
24/// (seconds). Tuned for low-latency service work: sub-millisecond
25/// through five seconds, covering health-check fast paths up to slow
26/// outbound dependencies. Operators that need different buckets can
27/// register their own histogram against
28/// [`McpMetrics::registry`].
29const HTTP_DURATION_BUCKETS: &[f64] = &[
30    0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0,
31];
32
33/// Collected Prometheus metrics for an MCP server.
34#[derive(Clone, Debug)]
35#[non_exhaustive]
36pub struct McpMetrics {
37    /// Prometheus registry holding all counters and histograms.
38    pub registry: Registry,
39    /// Total HTTP requests by method, path, and status code.
40    pub http_requests_total: IntCounterVec,
41    /// HTTP request duration in seconds by method and path.
42    pub http_request_duration_seconds: HistogramVec,
43    /// Rate-limiter denials by limiter. Label `limiter` is one of
44    /// `tool`, `auth_pre`, `auth_post`, `extra_route` — matching the
45    /// four built-in per-IP limiters. Incremented at each deny site
46    /// alongside the existing warn-level log.
47    pub rate_limited_total: IntCounterVec,
48}
49
50impl McpMetrics {
51    /// Create a new metrics registry with default MCP counters.
52    ///
53    /// # Errors
54    ///
55    /// Returns [`McpxError::Metrics`] if counter registration fails (should
56    /// not happen unless duplicate registrations occur).
57    pub fn new() -> Result<Self, McpxError> {
58        let registry = Registry::new();
59
60        let http_requests_total = IntCounterVec::new(
61            opts!("rmcp_server_kit_http_requests_total", "Total HTTP requests"),
62            &["method", "path", "status"],
63        )
64        .map_err(|e| McpxError::Metrics(e.to_string()))?;
65        registry
66            .register(Box::new(http_requests_total.clone()))
67            .map_err(|e| McpxError::Metrics(e.to_string()))?;
68
69        let http_request_duration_seconds = HistogramVec::new(
70            HistogramOpts::new(
71                "rmcp_server_kit_http_request_duration_seconds",
72                "HTTP request duration in seconds",
73            )
74            .buckets(HTTP_DURATION_BUCKETS.to_vec()),
75            &["method", "path"],
76        )
77        .map_err(|e| McpxError::Metrics(e.to_string()))?;
78        registry
79            .register(Box::new(http_request_duration_seconds.clone()))
80            .map_err(|e| McpxError::Metrics(e.to_string()))?;
81
82        let rate_limited_total = IntCounterVec::new(
83            opts!(
84                "rmcp_server_kit_rate_limited_total",
85                "Rate-limiter denials by limiter"
86            ),
87            &["limiter"],
88        )
89        .map_err(|e| McpxError::Metrics(e.to_string()))?;
90        registry
91            .register(Box::new(rate_limited_total.clone()))
92            .map_err(|e| McpxError::Metrics(e.to_string()))?;
93
94        Ok(Self {
95            registry,
96            http_requests_total,
97            http_request_duration_seconds,
98            rate_limited_total,
99        })
100    }
101
102    /// Encode all collected metrics as Prometheus text format.
103    #[must_use]
104    pub fn encode(&self) -> String {
105        let encoder = TextEncoder::new();
106        let metric_families = self.registry.gather();
107        let mut buf = Vec::new();
108        if let Err(e) = encoder.encode(&metric_families, &mut buf) {
109            tracing::warn!(error = %e, "prometheus encode failed");
110            return String::new();
111        }
112        // TextEncoder always produces valid UTF-8; fall back to empty on
113        // the near-impossible chance it doesn't.
114        String::from_utf8(buf).unwrap_or_default()
115    }
116}
117
118/// Increment the rate-limiter deny counter for `limiter`, if the shared
119/// [`McpMetrics`] handle is present in the request extensions.
120///
121/// The handle is inserted by the transport's metrics middleware (the
122/// outermost layer on the merged router) only when `metrics_enabled` is
123/// true; absent the extension this is a no-op, so deny sites behave
124/// identically with metrics disabled. `limiter` is one of `tool`,
125/// `auth_pre`, `auth_post`, `extra_route`.
126pub(crate) fn record_rate_limit_deny(ext: &axum::http::Extensions, limiter: &str) {
127    if let Some(m) = ext.get::<Arc<McpMetrics>>() {
128        m.rate_limited_total.with_label_values(&[limiter]).inc();
129    }
130}
131
132/// Spawn a dedicated HTTP listener that serves Prometheus metrics on `/metrics`.
133///
134/// The listener exits and releases the bound port when `shutdown` is
135/// cancelled, keeping the metrics endpoint tied to the parent server's
136/// graceful-shutdown lifecycle (M7).
137///
138/// # Errors
139///
140/// Returns [`McpxError::Startup`] if the TCP listener cannot bind or the
141/// underlying axum server fails.
142pub async fn serve_metrics(
143    bind: String,
144    metrics: Arc<McpMetrics>,
145    shutdown: tokio_util::sync::CancellationToken,
146) -> Result<(), McpxError> {
147    let app = axum::Router::new().route(
148        "/metrics",
149        axum::routing::get(move || {
150            let m = Arc::clone(&metrics);
151            async move { m.encode() }
152        }),
153    );
154
155    let listener = tokio::net::TcpListener::bind(&bind)
156        .await
157        .map_err(|e| McpxError::Startup(format!("metrics bind {bind}: {e}")))?;
158    tracing::info!("metrics endpoint listening on http://{bind}/metrics");
159    axum::serve(listener, app)
160        .with_graceful_shutdown(async move { shutdown.cancelled().await })
161        .await
162        .map_err(|e| McpxError::Startup(format!("metrics serve: {e}")))?;
163    Ok(())
164}
165
166#[cfg(test)]
167mod tests {
168    #![allow(
169        clippy::unwrap_used,
170        clippy::expect_used,
171        clippy::panic,
172        clippy::indexing_slicing,
173        clippy::unwrap_in_result,
174        clippy::print_stdout,
175        clippy::print_stderr,
176        reason = "test-only relaxations; production code uses ? and tracing"
177    )]
178    use super::*;
179
180    #[test]
181    fn new_creates_registry_with_counters() {
182        let m = McpMetrics::new().unwrap();
183        // Incrementing a counter should make it appear in gather output.
184        m.http_requests_total
185            .with_label_values(&["GET", "/test", "200"])
186            .inc();
187        m.http_request_duration_seconds
188            .with_label_values(&["GET", "/test"])
189            .observe(0.1);
190        assert_eq!(m.registry.gather().len(), 2);
191    }
192
193    #[test]
194    fn encode_empty_registry() {
195        let m = McpMetrics::new().unwrap();
196        let output = m.encode();
197        // Empty counters/histograms produce no samples but the output is valid.
198        assert!(output.is_empty() || output.contains("rmcp_server_kit_"));
199    }
200
201    #[test]
202    fn counter_increment_shows_in_encode() {
203        let m = McpMetrics::new().unwrap();
204        m.http_requests_total
205            .with_label_values(&["GET", "/healthz", "200"])
206            .inc();
207        let output = m.encode();
208        assert!(output.contains("rmcp_server_kit_http_requests_total"));
209        assert!(output.contains("method=\"GET\""));
210        assert!(output.contains("path=\"/healthz\""));
211        assert!(output.contains("status=\"200\""));
212        assert!(output.contains(" 1")); // count = 1
213    }
214
215    #[test]
216    fn histogram_observe_shows_in_encode() {
217        let m = McpMetrics::new().unwrap();
218        m.http_request_duration_seconds
219            .with_label_values(&["POST", "/mcp"])
220            .observe(0.042);
221        let output = m.encode();
222        assert!(output.contains("rmcp_server_kit_http_request_duration_seconds"));
223        assert!(output.contains("method=\"POST\""));
224        assert!(output.contains("path=\"/mcp\""));
225    }
226
227    #[test]
228    fn multiple_increments_accumulate() {
229        let m = McpMetrics::new().unwrap();
230        let counter = m
231            .http_requests_total
232            .with_label_values(&["POST", "/mcp", "200"]);
233        counter.inc();
234        counter.inc();
235        counter.inc();
236        let output = m.encode();
237        assert!(output.contains(" 3")); // count = 3
238    }
239
240    #[test]
241    fn clone_shares_registry() {
242        let m = McpMetrics::new().unwrap();
243        let m2 = m.clone();
244        m.http_requests_total
245            .with_label_values(&["GET", "/test", "200"])
246            .inc();
247        // The clone should see the same counter value.
248        let output = m2.encode();
249        assert!(output.contains(" 1"));
250    }
251
252    #[test]
253    fn rate_limited_counter_registers_and_encodes() {
254        let m = McpMetrics::new().unwrap();
255        m.rate_limited_total.with_label_values(&["tool"]).inc();
256        let output = m.encode();
257        assert!(output.contains("rmcp_server_kit_rate_limited_total"));
258        assert!(output.contains("limiter=\"tool\""));
259        assert!(output.contains(" 1"));
260    }
261
262    #[test]
263    fn record_rate_limit_deny_increments_via_extension() {
264        let m = Arc::new(McpMetrics::new().unwrap());
265        let mut ext = axum::http::Extensions::new();
266        ext.insert(Arc::clone(&m));
267        record_rate_limit_deny(&ext, "auth_pre");
268        record_rate_limit_deny(&ext, "auth_pre");
269        assert_eq!(
270            m.rate_limited_total.with_label_values(&["auth_pre"]).get(),
271            2
272        );
273        // Absent handle: silent no-op (metrics disabled path).
274        let empty = axum::http::Extensions::new();
275        record_rate_limit_deny(&empty, "auth_pre");
276        assert_eq!(
277            m.rate_limited_total.with_label_values(&["auth_pre"]).get(),
278            2
279        );
280    }
281
282    // M7 regression: cancelling the shutdown token must release the
283    // metrics listener's bound port so a subsequent bind to the same
284    // address succeeds. Prior to M7 the metrics endpoint ran without
285    // graceful_shutdown wiring and would leak the port until process
286    // exit.
287    #[tokio::test]
288    async fn serve_metrics_releases_port_on_shutdown() {
289        // Pick an ephemeral port, then drop the probe so serve_metrics
290        // can claim it.
291        let probe = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
292        let addr = probe.local_addr().unwrap();
293        drop(probe);
294
295        let metrics = Arc::new(McpMetrics::new().unwrap());
296        let shutdown = tokio_util::sync::CancellationToken::new();
297        let handle = tokio::spawn(serve_metrics(
298            addr.to_string(),
299            Arc::clone(&metrics),
300            shutdown.clone(),
301        ));
302
303        // Wait until the listener is actually accepting connections.
304        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
305        loop {
306            if tokio::net::TcpStream::connect(addr).await.is_ok() {
307                break;
308            }
309            assert!(
310                std::time::Instant::now() < deadline,
311                "metrics listener never accepted on {addr}"
312            );
313            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
314        }
315
316        // Cancel and await graceful shutdown.
317        shutdown.cancel();
318        let join = tokio::time::timeout(std::time::Duration::from_secs(5), handle)
319            .await
320            .expect("serve_metrics did not return within timeout");
321        join.expect("join error")
322            .expect("serve_metrics returned Err");
323
324        // Port must be immediately rebindable.
325        let rebind = tokio::net::TcpListener::bind(addr)
326            .await
327            .expect("port not released after shutdown");
328        drop(rebind);
329    }
330}