rmcp_server_kit/
metrics.rs1use std::sync::Arc;
16
17use prometheus::{
18 Encoder, HistogramOpts, HistogramVec, IntCounterVec, Registry, TextEncoder, opts,
19};
20
21use crate::error::McpxError;
22
23const HTTP_DURATION_BUCKETS: &[f64] = &[
30 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0,
31];
32
33#[derive(Clone, Debug)]
35#[non_exhaustive]
36pub struct McpMetrics {
37 pub registry: Registry,
39 pub http_requests_total: IntCounterVec,
41 pub http_request_duration_seconds: HistogramVec,
43 pub rate_limited_total: IntCounterVec,
48}
49
50impl McpMetrics {
51 pub fn new() -> Result<Self, McpxError> {
58 let registry = Registry::new();
59
60 let http_requests_total = IntCounterVec::new(
61 opts!("rmcp_server_kit_http_requests_total", "Total HTTP requests"),
62 &["method", "path", "status"],
63 )
64 .map_err(|e| McpxError::Metrics(e.to_string()))?;
65 registry
66 .register(Box::new(http_requests_total.clone()))
67 .map_err(|e| McpxError::Metrics(e.to_string()))?;
68
69 let http_request_duration_seconds = HistogramVec::new(
70 HistogramOpts::new(
71 "rmcp_server_kit_http_request_duration_seconds",
72 "HTTP request duration in seconds",
73 )
74 .buckets(HTTP_DURATION_BUCKETS.to_vec()),
75 &["method", "path"],
76 )
77 .map_err(|e| McpxError::Metrics(e.to_string()))?;
78 registry
79 .register(Box::new(http_request_duration_seconds.clone()))
80 .map_err(|e| McpxError::Metrics(e.to_string()))?;
81
82 let rate_limited_total = IntCounterVec::new(
83 opts!(
84 "rmcp_server_kit_rate_limited_total",
85 "Rate-limiter denials by limiter"
86 ),
87 &["limiter"],
88 )
89 .map_err(|e| McpxError::Metrics(e.to_string()))?;
90 registry
91 .register(Box::new(rate_limited_total.clone()))
92 .map_err(|e| McpxError::Metrics(e.to_string()))?;
93
94 Ok(Self {
95 registry,
96 http_requests_total,
97 http_request_duration_seconds,
98 rate_limited_total,
99 })
100 }
101
102 #[must_use]
104 pub fn encode(&self) -> String {
105 let encoder = TextEncoder::new();
106 let metric_families = self.registry.gather();
107 let mut buf = Vec::new();
108 if let Err(e) = encoder.encode(&metric_families, &mut buf) {
109 tracing::warn!(error = %e, "prometheus encode failed");
110 return String::new();
111 }
112 String::from_utf8(buf).unwrap_or_default()
115 }
116}
117
118pub(crate) fn record_rate_limit_deny(ext: &axum::http::Extensions, limiter: &str) {
127 if let Some(m) = ext.get::<Arc<McpMetrics>>() {
128 m.rate_limited_total.with_label_values(&[limiter]).inc();
129 }
130}
131
132pub async fn serve_metrics(
143 bind: String,
144 metrics: Arc<McpMetrics>,
145 shutdown: tokio_util::sync::CancellationToken,
146) -> Result<(), McpxError> {
147 let app = axum::Router::new().route(
148 "/metrics",
149 axum::routing::get(move || {
150 let m = Arc::clone(&metrics);
151 async move { m.encode() }
152 }),
153 );
154
155 let listener = tokio::net::TcpListener::bind(&bind)
156 .await
157 .map_err(|e| McpxError::Startup(format!("metrics bind {bind}: {e}")))?;
158 tracing::info!("metrics endpoint listening on http://{bind}/metrics");
159 axum::serve(listener, app)
160 .with_graceful_shutdown(async move { shutdown.cancelled().await })
161 .await
162 .map_err(|e| McpxError::Startup(format!("metrics serve: {e}")))?;
163 Ok(())
164}
165
166#[cfg(test)]
167mod tests {
168 #![allow(
169 clippy::unwrap_used,
170 clippy::expect_used,
171 clippy::panic,
172 clippy::indexing_slicing,
173 clippy::unwrap_in_result,
174 clippy::print_stdout,
175 clippy::print_stderr,
176 reason = "test-only relaxations; production code uses ? and tracing"
177 )]
178 use super::*;
179
180 #[test]
181 fn new_creates_registry_with_counters() {
182 let m = McpMetrics::new().unwrap();
183 m.http_requests_total
185 .with_label_values(&["GET", "/test", "200"])
186 .inc();
187 m.http_request_duration_seconds
188 .with_label_values(&["GET", "/test"])
189 .observe(0.1);
190 assert_eq!(m.registry.gather().len(), 2);
191 }
192
193 #[test]
194 fn encode_empty_registry() {
195 let m = McpMetrics::new().unwrap();
196 let output = m.encode();
197 assert!(output.is_empty() || output.contains("rmcp_server_kit_"));
199 }
200
201 #[test]
202 fn counter_increment_shows_in_encode() {
203 let m = McpMetrics::new().unwrap();
204 m.http_requests_total
205 .with_label_values(&["GET", "/healthz", "200"])
206 .inc();
207 let output = m.encode();
208 assert!(output.contains("rmcp_server_kit_http_requests_total"));
209 assert!(output.contains("method=\"GET\""));
210 assert!(output.contains("path=\"/healthz\""));
211 assert!(output.contains("status=\"200\""));
212 assert!(output.contains(" 1")); }
214
215 #[test]
216 fn histogram_observe_shows_in_encode() {
217 let m = McpMetrics::new().unwrap();
218 m.http_request_duration_seconds
219 .with_label_values(&["POST", "/mcp"])
220 .observe(0.042);
221 let output = m.encode();
222 assert!(output.contains("rmcp_server_kit_http_request_duration_seconds"));
223 assert!(output.contains("method=\"POST\""));
224 assert!(output.contains("path=\"/mcp\""));
225 }
226
227 #[test]
228 fn multiple_increments_accumulate() {
229 let m = McpMetrics::new().unwrap();
230 let counter = m
231 .http_requests_total
232 .with_label_values(&["POST", "/mcp", "200"]);
233 counter.inc();
234 counter.inc();
235 counter.inc();
236 let output = m.encode();
237 assert!(output.contains(" 3")); }
239
240 #[test]
241 fn clone_shares_registry() {
242 let m = McpMetrics::new().unwrap();
243 let m2 = m.clone();
244 m.http_requests_total
245 .with_label_values(&["GET", "/test", "200"])
246 .inc();
247 let output = m2.encode();
249 assert!(output.contains(" 1"));
250 }
251
252 #[test]
253 fn rate_limited_counter_registers_and_encodes() {
254 let m = McpMetrics::new().unwrap();
255 m.rate_limited_total.with_label_values(&["tool"]).inc();
256 let output = m.encode();
257 assert!(output.contains("rmcp_server_kit_rate_limited_total"));
258 assert!(output.contains("limiter=\"tool\""));
259 assert!(output.contains(" 1"));
260 }
261
262 #[test]
263 fn record_rate_limit_deny_increments_via_extension() {
264 let m = Arc::new(McpMetrics::new().unwrap());
265 let mut ext = axum::http::Extensions::new();
266 ext.insert(Arc::clone(&m));
267 record_rate_limit_deny(&ext, "auth_pre");
268 record_rate_limit_deny(&ext, "auth_pre");
269 assert_eq!(
270 m.rate_limited_total.with_label_values(&["auth_pre"]).get(),
271 2
272 );
273 let empty = axum::http::Extensions::new();
275 record_rate_limit_deny(&empty, "auth_pre");
276 assert_eq!(
277 m.rate_limited_total.with_label_values(&["auth_pre"]).get(),
278 2
279 );
280 }
281
282 #[tokio::test]
288 async fn serve_metrics_releases_port_on_shutdown() {
289 let probe = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
292 let addr = probe.local_addr().unwrap();
293 drop(probe);
294
295 let metrics = Arc::new(McpMetrics::new().unwrap());
296 let shutdown = tokio_util::sync::CancellationToken::new();
297 let handle = tokio::spawn(serve_metrics(
298 addr.to_string(),
299 Arc::clone(&metrics),
300 shutdown.clone(),
301 ));
302
303 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
305 loop {
306 if tokio::net::TcpStream::connect(addr).await.is_ok() {
307 break;
308 }
309 assert!(
310 std::time::Instant::now() < deadline,
311 "metrics listener never accepted on {addr}"
312 );
313 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
314 }
315
316 shutdown.cancel();
318 let join = tokio::time::timeout(std::time::Duration::from_secs(5), handle)
319 .await
320 .expect("serve_metrics did not return within timeout");
321 join.expect("join error")
322 .expect("serve_metrics returned Err");
323
324 let rebind = tokio::net::TcpListener::bind(addr)
326 .await
327 .expect("port not released after shutdown");
328 drop(rebind);
329 }
330}