rmcp_server_kit/
metrics.rs1use std::sync::Arc;
16
17use prometheus::{
18 Encoder, HistogramOpts, HistogramVec, IntCounterVec, Registry, TextEncoder, opts,
19};
20
21use crate::error::McpxError;
22
23const HTTP_DURATION_BUCKETS: &[f64] = &[
30 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0,
31];
32
33#[derive(Clone, Debug)]
35#[non_exhaustive]
36pub struct McpMetrics {
37 pub registry: Registry,
39 pub http_requests_total: IntCounterVec,
41 pub http_request_duration_seconds: HistogramVec,
43}
44
45impl McpMetrics {
46 pub fn new() -> Result<Self, McpxError> {
53 let registry = Registry::new();
54
55 let http_requests_total = IntCounterVec::new(
56 opts!("rmcp_server_kit_http_requests_total", "Total HTTP requests"),
57 &["method", "path", "status"],
58 )
59 .map_err(|e| McpxError::Metrics(e.to_string()))?;
60 registry
61 .register(Box::new(http_requests_total.clone()))
62 .map_err(|e| McpxError::Metrics(e.to_string()))?;
63
64 let http_request_duration_seconds = HistogramVec::new(
65 HistogramOpts::new(
66 "rmcp_server_kit_http_request_duration_seconds",
67 "HTTP request duration in seconds",
68 )
69 .buckets(HTTP_DURATION_BUCKETS.to_vec()),
70 &["method", "path"],
71 )
72 .map_err(|e| McpxError::Metrics(e.to_string()))?;
73 registry
74 .register(Box::new(http_request_duration_seconds.clone()))
75 .map_err(|e| McpxError::Metrics(e.to_string()))?;
76
77 Ok(Self {
78 registry,
79 http_requests_total,
80 http_request_duration_seconds,
81 })
82 }
83
84 #[must_use]
86 pub fn encode(&self) -> String {
87 let encoder = TextEncoder::new();
88 let metric_families = self.registry.gather();
89 let mut buf = Vec::new();
90 if let Err(e) = encoder.encode(&metric_families, &mut buf) {
91 tracing::warn!(error = %e, "prometheus encode failed");
92 return String::new();
93 }
94 String::from_utf8(buf).unwrap_or_default()
97 }
98}
99
100pub async fn serve_metrics(
111 bind: String,
112 metrics: Arc<McpMetrics>,
113 shutdown: tokio_util::sync::CancellationToken,
114) -> Result<(), McpxError> {
115 let app = axum::Router::new().route(
116 "/metrics",
117 axum::routing::get(move || {
118 let m = Arc::clone(&metrics);
119 async move { m.encode() }
120 }),
121 );
122
123 let listener = tokio::net::TcpListener::bind(&bind)
124 .await
125 .map_err(|e| McpxError::Startup(format!("metrics bind {bind}: {e}")))?;
126 tracing::info!("metrics endpoint listening on http://{bind}/metrics");
127 axum::serve(listener, app)
128 .with_graceful_shutdown(async move { shutdown.cancelled().await })
129 .await
130 .map_err(|e| McpxError::Startup(format!("metrics serve: {e}")))?;
131 Ok(())
132}
133
134#[cfg(test)]
135mod tests {
136 #![allow(
137 clippy::unwrap_used,
138 clippy::expect_used,
139 clippy::panic,
140 clippy::indexing_slicing,
141 clippy::unwrap_in_result,
142 clippy::print_stdout,
143 clippy::print_stderr,
144 reason = "test-only relaxations; production code uses ? and tracing"
145 )]
146 use super::*;
147
148 #[test]
149 fn new_creates_registry_with_counters() {
150 let m = McpMetrics::new().unwrap();
151 m.http_requests_total
153 .with_label_values(&["GET", "/test", "200"])
154 .inc();
155 m.http_request_duration_seconds
156 .with_label_values(&["GET", "/test"])
157 .observe(0.1);
158 assert_eq!(m.registry.gather().len(), 2);
159 }
160
161 #[test]
162 fn encode_empty_registry() {
163 let m = McpMetrics::new().unwrap();
164 let output = m.encode();
165 assert!(output.is_empty() || output.contains("rmcp_server_kit_"));
167 }
168
169 #[test]
170 fn counter_increment_shows_in_encode() {
171 let m = McpMetrics::new().unwrap();
172 m.http_requests_total
173 .with_label_values(&["GET", "/healthz", "200"])
174 .inc();
175 let output = m.encode();
176 assert!(output.contains("rmcp_server_kit_http_requests_total"));
177 assert!(output.contains("method=\"GET\""));
178 assert!(output.contains("path=\"/healthz\""));
179 assert!(output.contains("status=\"200\""));
180 assert!(output.contains(" 1")); }
182
183 #[test]
184 fn histogram_observe_shows_in_encode() {
185 let m = McpMetrics::new().unwrap();
186 m.http_request_duration_seconds
187 .with_label_values(&["POST", "/mcp"])
188 .observe(0.042);
189 let output = m.encode();
190 assert!(output.contains("rmcp_server_kit_http_request_duration_seconds"));
191 assert!(output.contains("method=\"POST\""));
192 assert!(output.contains("path=\"/mcp\""));
193 }
194
195 #[test]
196 fn multiple_increments_accumulate() {
197 let m = McpMetrics::new().unwrap();
198 let counter = m
199 .http_requests_total
200 .with_label_values(&["POST", "/mcp", "200"]);
201 counter.inc();
202 counter.inc();
203 counter.inc();
204 let output = m.encode();
205 assert!(output.contains(" 3")); }
207
208 #[test]
209 fn clone_shares_registry() {
210 let m = McpMetrics::new().unwrap();
211 let m2 = m.clone();
212 m.http_requests_total
213 .with_label_values(&["GET", "/test", "200"])
214 .inc();
215 let output = m2.encode();
217 assert!(output.contains(" 1"));
218 }
219
220 #[tokio::test]
226 async fn serve_metrics_releases_port_on_shutdown() {
227 let probe = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
230 let addr = probe.local_addr().unwrap();
231 drop(probe);
232
233 let metrics = Arc::new(McpMetrics::new().unwrap());
234 let shutdown = tokio_util::sync::CancellationToken::new();
235 let handle = tokio::spawn(serve_metrics(
236 addr.to_string(),
237 Arc::clone(&metrics),
238 shutdown.clone(),
239 ));
240
241 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
243 loop {
244 if tokio::net::TcpStream::connect(addr).await.is_ok() {
245 break;
246 }
247 assert!(
248 std::time::Instant::now() < deadline,
249 "metrics listener never accepted on {addr}"
250 );
251 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
252 }
253
254 shutdown.cancel();
256 let join = tokio::time::timeout(std::time::Duration::from_secs(5), handle)
257 .await
258 .expect("serve_metrics did not return within timeout");
259 join.expect("join error")
260 .expect("serve_metrics returned Err");
261
262 let rebind = tokio::net::TcpListener::bind(addr)
264 .await
265 .expect("port not released after shutdown");
266 drop(rebind);
267 }
268}