ultrafast_mcp_monitoring/
metrics.rs1use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::{Duration, Instant, SystemTime};
9use tokio::sync::RwLock;
10use tracing::{debug, info, warn};
11
12#[derive(Debug, Clone, serde::Serialize)]
14pub struct Metrics {
15 pub request: RequestMetrics,
16 pub transport: TransportMetrics,
17 pub system: SystemMetrics,
18}
19
20#[derive(Debug, Clone, serde::Serialize)]
22pub struct RequestMetrics {
23 pub total_requests: u64,
24 pub successful_requests: u64,
25 pub failed_requests: u64,
26 pub average_response_time: f64,
27 pub method_counts: HashMap<String, u64>,
28 pub response_time_histogram: HashMap<String, Vec<Duration>>,
29 pub last_request_time: Option<SystemTime>,
30}
31
32#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Default)]
34pub struct TransportMetrics {
35 pub bytes_sent: u64,
36 pub bytes_received: u64,
37 pub connection_count: u32,
38 pub error_count: u64,
39 pub active_connections: u32,
40 pub connection_errors: HashMap<String, u64>,
41 pub last_activity: Option<SystemTime>,
42}
43
44#[derive(Debug, Clone, serde::Serialize)]
46pub struct SystemMetrics {
47 pub memory_usage: u64,
48 pub cpu_usage: f64,
49 pub active_connections: u32,
50 pub uptime: Duration,
51 pub start_time: SystemTime,
52 pub last_update: SystemTime,
53}
54
55pub struct MetricsCollector {
57 metrics: Arc<RwLock<Metrics>>,
58 collection_interval: Duration,
59 max_histogram_size: usize,
60}
61
62impl MetricsCollector {
63 pub fn new() -> Self {
65 Self {
66 metrics: Arc::new(RwLock::new(Metrics {
67 request: RequestMetrics::default(),
68 transport: TransportMetrics::default(),
69 system: SystemMetrics::default(),
70 })),
71 collection_interval: Duration::from_secs(30),
72 max_histogram_size: 1000,
73 }
74 }
75
76 pub fn with_config(collection_interval: Duration, max_histogram_size: usize) -> Self {
78 Self {
79 metrics: Arc::new(RwLock::new(Metrics {
80 request: RequestMetrics::default(),
81 transport: TransportMetrics::default(),
82 system: SystemMetrics::default(),
83 })),
84 collection_interval,
85 max_histogram_size,
86 }
87 }
88
89 pub async fn record_request(&self, method: &str, response_time: Duration, success: bool) {
91 let mut metrics = self.metrics.write().await;
92
93 metrics.request.total_requests += 1;
95 if success {
96 metrics.request.successful_requests += 1;
97 } else {
98 metrics.request.failed_requests += 1;
99 }
100
101 *metrics
103 .request
104 .method_counts
105 .entry(method.to_string())
106 .or_insert(0) += 1;
107
108 let method_histogram = metrics
110 .request
111 .response_time_histogram
112 .entry(method.to_string())
113 .or_insert_with(Vec::new);
114
115 method_histogram.push(response_time);
116
117 if method_histogram.len() > self.max_histogram_size {
119 method_histogram.remove(0);
120 }
121
122 let total_time: Duration = method_histogram.iter().sum();
124 metrics.request.average_response_time =
125 total_time.as_millis() as f64 / method_histogram.len() as f64;
126
127 metrics.request.last_request_time = Some(SystemTime::now());
129
130 debug!(
131 "Recorded request: method={}, response_time={:?}, success={}",
132 method, response_time, success
133 );
134 }
135
136 pub async fn record_transport_send(&self, bytes: u64) {
138 let mut metrics = self.metrics.write().await;
139 metrics.transport.bytes_sent += bytes;
140 metrics.transport.last_activity = Some(SystemTime::now());
141
142 debug!("Recorded transport send: {} bytes", bytes);
143 }
144
145 pub async fn record_transport_receive(&self, bytes: u64) {
147 let mut metrics = self.metrics.write().await;
148 metrics.transport.bytes_received += bytes;
149 metrics.transport.last_activity = Some(SystemTime::now());
150
151 debug!("Recorded transport receive: {} bytes", bytes);
152 }
153
154 pub async fn record_transport_error(&self, error_type: &str) {
156 let mut metrics = self.metrics.write().await;
157 metrics.transport.error_count += 1;
158 *metrics
159 .transport
160 .connection_errors
161 .entry(error_type.to_string())
162 .or_insert(0) += 1;
163
164 warn!("Recorded transport error: {}", error_type);
165 }
166
167 pub async fn update_connection_count(&self, count: u32) {
169 let mut metrics = self.metrics.write().await;
170 metrics.transport.connection_count = count;
171 metrics.transport.active_connections = count;
172
173 debug!("Updated connection count: {}", count);
174 }
175
176 pub async fn update_system_metrics(
178 &self,
179 active_connections: u32,
180 memory_usage: u64,
181 cpu_usage: f64,
182 ) {
183 let mut metrics = self.metrics.write().await;
184 metrics.system.active_connections = active_connections;
185 metrics.system.memory_usage = memory_usage;
186 metrics.system.cpu_usage = cpu_usage;
187 metrics.system.last_update = SystemTime::now();
188
189 if let Ok(elapsed) = metrics.system.start_time.elapsed() {
191 metrics.system.uptime = elapsed;
192 }
193
194 debug!(
195 "Updated system metrics: connections={}, memory={}, cpu={}%",
196 active_connections, memory_usage, cpu_usage
197 );
198 }
199
200 pub async fn get_metrics(&self) -> Metrics {
202 self.metrics.read().await.clone()
203 }
204
205 pub async fn export_json(&self) -> serde_json::Result<String> {
207 let metrics = self.get_metrics().await;
208 serde_json::to_string_pretty(&metrics)
209 }
210
211 pub async fn export_prometheus(&self) -> String {
213 let metrics = self.get_metrics().await;
214 let mut prometheus_output = String::new();
215
216 prometheus_output.push_str("# HELP mcp_requests_total Total number of requests\n");
218 prometheus_output.push_str("# TYPE mcp_requests_total counter\n");
219 prometheus_output.push_str(&format!(
220 "mcp_requests_total {}\n",
221 metrics.request.total_requests
222 ));
223
224 prometheus_output
225 .push_str("# HELP mcp_requests_successful Total number of successful requests\n");
226 prometheus_output.push_str("# TYPE mcp_requests_successful counter\n");
227 prometheus_output.push_str(&format!(
228 "mcp_requests_successful {}\n",
229 metrics.request.successful_requests
230 ));
231
232 prometheus_output.push_str("# HELP mcp_requests_failed Total number of failed requests\n");
233 prometheus_output.push_str("# TYPE mcp_requests_failed counter\n");
234 prometheus_output.push_str(&format!(
235 "mcp_requests_failed {}\n",
236 metrics.request.failed_requests
237 ));
238
239 prometheus_output.push_str(
240 "# HELP mcp_request_duration_average Average request duration in milliseconds\n",
241 );
242 prometheus_output.push_str("# TYPE mcp_request_duration_average gauge\n");
243 prometheus_output.push_str(&format!(
244 "mcp_request_duration_average {}\n",
245 metrics.request.average_response_time
246 ));
247
248 for (method, count) in &metrics.request.method_counts {
250 prometheus_output.push_str(
251 "# HELP mcp_requests_by_method_total Total requests by method\n"
252 .to_string()
253 .as_str(),
254 );
255 prometheus_output.push_str(
256 "# TYPE mcp_requests_by_method_total counter\n"
257 .to_string()
258 .as_str(),
259 );
260 prometheus_output.push_str(&format!(
261 "mcp_requests_by_method_total{{method=\"{method}\"}} {count}\n"
262 ));
263 }
264
265 prometheus_output.push_str("# HELP mcp_transport_bytes_sent Total bytes sent\n");
267 prometheus_output.push_str("# TYPE mcp_transport_bytes_sent counter\n");
268 prometheus_output.push_str(&format!(
269 "mcp_transport_bytes_sent {}\n",
270 metrics.transport.bytes_sent
271 ));
272
273 prometheus_output.push_str("# HELP mcp_transport_bytes_received Total bytes received\n");
274 prometheus_output.push_str("# TYPE mcp_transport_bytes_received counter\n");
275 prometheus_output.push_str(&format!(
276 "mcp_transport_bytes_received {}\n",
277 metrics.transport.bytes_received
278 ));
279
280 prometheus_output
281 .push_str("# HELP mcp_transport_connections_active Current active connections\n");
282 prometheus_output.push_str("# TYPE mcp_transport_connections_active gauge\n");
283 prometheus_output.push_str(&format!(
284 "mcp_transport_connections_active {}\n",
285 metrics.transport.active_connections
286 ));
287
288 prometheus_output.push_str("# HELP mcp_transport_errors_total Total transport errors\n");
289 prometheus_output.push_str("# TYPE mcp_transport_errors_total counter\n");
290 prometheus_output.push_str(&format!(
291 "mcp_transport_errors_total {}\n",
292 metrics.transport.error_count
293 ));
294
295 prometheus_output
297 .push_str("# HELP mcp_system_memory_usage_bytes Current memory usage in bytes\n");
298 prometheus_output.push_str("# TYPE mcp_system_memory_usage_bytes gauge\n");
299 prometheus_output.push_str(&format!(
300 "mcp_system_memory_usage_bytes {}\n",
301 metrics.system.memory_usage
302 ));
303
304 prometheus_output
305 .push_str("# HELP mcp_system_cpu_usage_percent Current CPU usage percentage\n");
306 prometheus_output.push_str("# TYPE mcp_system_cpu_usage_percent gauge\n");
307 prometheus_output.push_str(&format!(
308 "mcp_system_cpu_usage_percent {}\n",
309 metrics.system.cpu_usage
310 ));
311
312 prometheus_output.push_str("# HELP mcp_system_uptime_seconds System uptime in seconds\n");
313 prometheus_output.push_str("# TYPE mcp_system_uptime_seconds gauge\n");
314 prometheus_output.push_str(&format!(
315 "mcp_system_uptime_seconds {}\n",
316 metrics.system.uptime.as_secs()
317 ));
318
319 prometheus_output
320 }
321
322 pub async fn reset(&self) {
324 let mut metrics = self.metrics.write().await;
325 *metrics = Metrics {
326 request: RequestMetrics::default(),
327 transport: TransportMetrics::default(),
328 system: SystemMetrics::default(),
329 };
330
331 info!("Metrics reset completed");
332 }
333
334 pub fn collection_interval(&self) -> Duration {
336 self.collection_interval
337 }
338
339 pub fn set_collection_interval(&mut self, interval: Duration) {
341 self.collection_interval = interval;
342 }
343}
344
345impl Default for MetricsCollector {
346 fn default() -> Self {
347 Self::new()
348 }
349}
350
351impl Default for RequestMetrics {
352 fn default() -> Self {
353 Self {
354 total_requests: 0,
355 successful_requests: 0,
356 failed_requests: 0,
357 average_response_time: 0.0,
358 method_counts: HashMap::new(),
359 response_time_histogram: HashMap::new(),
360 last_request_time: None,
361 }
362 }
363}
364
365impl Default for SystemMetrics {
366 fn default() -> Self {
367 Self {
368 memory_usage: 0,
369 cpu_usage: 0.0,
370 active_connections: 0,
371 uptime: Duration::ZERO,
372 start_time: SystemTime::now(),
373 last_update: SystemTime::now(),
374 }
375 }
376}
377
378pub struct RequestTimer {
380 start: Instant,
381 method: String,
382 metrics: Arc<MetricsCollector>,
383}
384
385impl RequestTimer {
386 pub fn start(method: impl Into<String>, metrics: Arc<MetricsCollector>) -> Self {
388 Self {
389 start: Instant::now(),
390 method: method.into(),
391 metrics,
392 }
393 }
394
395 pub async fn finish(self, success: bool) {
397 let duration = self.start.elapsed();
398 self.metrics
399 .record_request(&self.method, duration, success)
400 .await;
401
402 debug!(
403 "Request completed: method={}, duration={:?}, success={}",
404 self.method, duration, success
405 );
406 }
407}
408
409#[cfg(test)]
410mod tests {
411 use super::*;
412 use tokio::time::sleep;
413
414 #[tokio::test]
415 async fn test_metrics_collector_creation() {
416 let collector = MetricsCollector::new();
417 let metrics = collector.get_metrics().await;
418
419 assert_eq!(metrics.request.total_requests, 0);
420 assert_eq!(metrics.transport.bytes_sent, 0);
421 assert_eq!(metrics.system.memory_usage, 0);
422 }
423
424 #[tokio::test]
425 async fn test_request_recording() {
426 let collector = Arc::new(MetricsCollector::new());
427
428 collector
430 .record_request("test_method", Duration::from_millis(100), true)
431 .await;
432
433 let metrics = collector.get_metrics().await;
434 assert_eq!(metrics.request.total_requests, 1);
435 assert_eq!(metrics.request.successful_requests, 1);
436 assert_eq!(metrics.request.failed_requests, 0);
437 assert_eq!(metrics.request.method_counts["test_method"], 1);
438 }
439
440 #[tokio::test]
441 async fn test_transport_metrics() {
442 let collector = Arc::new(MetricsCollector::new());
443
444 collector.record_transport_send(1024).await;
445 collector.record_transport_receive(2048).await;
446 collector.record_transport_error("connection_failed").await;
447
448 let metrics = collector.get_metrics().await;
449 assert_eq!(metrics.transport.bytes_sent, 1024);
450 assert_eq!(metrics.transport.bytes_received, 2048);
451 assert_eq!(metrics.transport.error_count, 1);
452 assert_eq!(metrics.transport.connection_errors["connection_failed"], 1);
453 }
454
455 #[tokio::test]
456 async fn test_request_timer() {
457 let collector = Arc::new(MetricsCollector::new());
458
459 let timer = RequestTimer::start("timer_test", collector.clone());
460 sleep(Duration::from_millis(10)).await;
461 timer.finish(true).await;
462
463 let metrics = collector.get_metrics().await;
464 assert_eq!(metrics.request.total_requests, 1);
465 assert_eq!(metrics.request.successful_requests, 1);
466 assert!(metrics.request.average_response_time > 0.0);
467 }
468
469 #[tokio::test]
470 async fn test_prometheus_export() {
471 let collector = Arc::new(MetricsCollector::new());
472
473 collector
474 .record_request("test", Duration::from_millis(50), true)
475 .await;
476 collector.record_transport_send(100).await;
477
478 let prometheus_output = collector.export_prometheus().await;
479
480 assert!(prometheus_output.contains("mcp_requests_total 1"));
481 assert!(prometheus_output.contains("mcp_transport_bytes_sent 100"));
482 assert!(prometheus_output.contains("mcp_request_duration_average"));
483 }
484
485 #[tokio::test]
486 async fn test_metrics_reset() {
487 let collector = Arc::new(MetricsCollector::new());
488
489 collector
490 .record_request("test", Duration::from_millis(50), true)
491 .await;
492 collector.record_transport_send(100).await;
493
494 let metrics = collector.get_metrics().await;
496 assert_eq!(metrics.request.total_requests, 1);
497 assert_eq!(metrics.transport.bytes_sent, 100);
498
499 collector.reset().await;
501
502 let metrics = collector.get_metrics().await;
504 assert_eq!(metrics.request.total_requests, 0);
505 assert_eq!(metrics.transport.bytes_sent, 0);
506 }
507}