ultrafast_mcp_monitoring/
metrics.rs

1//! Metrics collection and management for UltraFast MCP
2//!
3//! This module provides comprehensive metrics collection for MCP servers and clients,
4//! including request metrics, transport metrics, and system metrics.
5
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::{Duration, Instant, SystemTime};
9use tokio::sync::RwLock;
10use tracing::{debug, info, warn};
11
12/// Core metrics structure containing all collected metrics
13#[derive(Debug, Clone, serde::Serialize)]
14pub struct Metrics {
15    pub request: RequestMetrics,
16    pub transport: TransportMetrics,
17    pub system: SystemMetrics,
18}
19
20/// Request-related metrics
21#[derive(Debug, Clone, serde::Serialize)]
22pub struct RequestMetrics {
23    pub total_requests: u64,
24    pub successful_requests: u64,
25    pub failed_requests: u64,
26    pub average_response_time: f64,
27    pub method_counts: HashMap<String, u64>,
28    pub response_time_histogram: HashMap<String, Vec<Duration>>,
29    pub last_request_time: Option<SystemTime>,
30}
31
32/// Transport-related metrics
33#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Default)]
34pub struct TransportMetrics {
35    pub bytes_sent: u64,
36    pub bytes_received: u64,
37    pub connection_count: u32,
38    pub error_count: u64,
39    pub active_connections: u32,
40    pub connection_errors: HashMap<String, u64>,
41    pub last_activity: Option<SystemTime>,
42}
43
44/// System-related metrics
45#[derive(Debug, Clone, serde::Serialize)]
46pub struct SystemMetrics {
47    pub memory_usage: u64,
48    pub cpu_usage: f64,
49    pub active_connections: u32,
50    pub uptime: Duration,
51    pub start_time: SystemTime,
52    pub last_update: SystemTime,
53}
54
55/// Metrics collector for gathering and managing metrics
56pub struct MetricsCollector {
57    metrics: Arc<RwLock<Metrics>>,
58    collection_interval: Duration,
59    max_histogram_size: usize,
60}
61
62impl MetricsCollector {
63    /// Create a new metrics collector
64    pub fn new() -> Self {
65        Self {
66            metrics: Arc::new(RwLock::new(Metrics {
67                request: RequestMetrics::default(),
68                transport: TransportMetrics::default(),
69                system: SystemMetrics::default(),
70            })),
71            collection_interval: Duration::from_secs(30),
72            max_histogram_size: 1000,
73        }
74    }
75
76    /// Create a new metrics collector with custom configuration
77    pub fn with_config(collection_interval: Duration, max_histogram_size: usize) -> Self {
78        Self {
79            metrics: Arc::new(RwLock::new(Metrics {
80                request: RequestMetrics::default(),
81                transport: TransportMetrics::default(),
82                system: SystemMetrics::default(),
83            })),
84            collection_interval,
85            max_histogram_size,
86        }
87    }
88
89    /// Record a request with timing and success status
90    pub async fn record_request(&self, method: &str, response_time: Duration, success: bool) {
91        let mut metrics = self.metrics.write().await;
92
93        // Update request counts
94        metrics.request.total_requests += 1;
95        if success {
96            metrics.request.successful_requests += 1;
97        } else {
98            metrics.request.failed_requests += 1;
99        }
100
101        // Update method counts
102        *metrics
103            .request
104            .method_counts
105            .entry(method.to_string())
106            .or_insert(0) += 1;
107
108        // Update response time histogram
109        let method_histogram = metrics
110            .request
111            .response_time_histogram
112            .entry(method.to_string())
113            .or_insert_with(Vec::new);
114
115        method_histogram.push(response_time);
116
117        // Keep histogram size manageable
118        if method_histogram.len() > self.max_histogram_size {
119            method_histogram.remove(0);
120        }
121
122        // Update average response time
123        let total_time: Duration = method_histogram.iter().sum();
124        metrics.request.average_response_time =
125            total_time.as_millis() as f64 / method_histogram.len() as f64;
126
127        // Update last request time
128        metrics.request.last_request_time = Some(SystemTime::now());
129
130        debug!(
131            "Recorded request: method={}, response_time={:?}, success={}",
132            method, response_time, success
133        );
134    }
135
136    /// Record transport send operation
137    pub async fn record_transport_send(&self, bytes: u64) {
138        let mut metrics = self.metrics.write().await;
139        metrics.transport.bytes_sent += bytes;
140        metrics.transport.last_activity = Some(SystemTime::now());
141
142        debug!("Recorded transport send: {} bytes", bytes);
143    }
144
145    /// Record transport receive operation
146    pub async fn record_transport_receive(&self, bytes: u64) {
147        let mut metrics = self.metrics.write().await;
148        metrics.transport.bytes_received += bytes;
149        metrics.transport.last_activity = Some(SystemTime::now());
150
151        debug!("Recorded transport receive: {} bytes", bytes);
152    }
153
154    /// Record transport error
155    pub async fn record_transport_error(&self, error_type: &str) {
156        let mut metrics = self.metrics.write().await;
157        metrics.transport.error_count += 1;
158        *metrics
159            .transport
160            .connection_errors
161            .entry(error_type.to_string())
162            .or_insert(0) += 1;
163
164        warn!("Recorded transport error: {}", error_type);
165    }
166
167    /// Update connection count
168    pub async fn update_connection_count(&self, count: u32) {
169        let mut metrics = self.metrics.write().await;
170        metrics.transport.connection_count = count;
171        metrics.transport.active_connections = count;
172
173        debug!("Updated connection count: {}", count);
174    }
175
176    /// Update system metrics
177    pub async fn update_system_metrics(
178        &self,
179        active_connections: u32,
180        memory_usage: u64,
181        cpu_usage: f64,
182    ) {
183        let mut metrics = self.metrics.write().await;
184        metrics.system.active_connections = active_connections;
185        metrics.system.memory_usage = memory_usage;
186        metrics.system.cpu_usage = cpu_usage;
187        metrics.system.last_update = SystemTime::now();
188
189        // Update uptime
190        if let Ok(elapsed) = metrics.system.start_time.elapsed() {
191            metrics.system.uptime = elapsed;
192        }
193
194        debug!(
195            "Updated system metrics: connections={}, memory={}, cpu={}%",
196            active_connections, memory_usage, cpu_usage
197        );
198    }
199
200    /// Get current metrics snapshot
201    pub async fn get_metrics(&self) -> Metrics {
202        self.metrics.read().await.clone()
203    }
204
205    /// Export metrics as JSON
206    pub async fn export_json(&self) -> serde_json::Result<String> {
207        let metrics = self.get_metrics().await;
208        serde_json::to_string_pretty(&metrics)
209    }
210
211    /// Export metrics in Prometheus format
212    pub async fn export_prometheus(&self) -> String {
213        let metrics = self.get_metrics().await;
214        let mut prometheus_output = String::new();
215
216        // Request metrics
217        prometheus_output.push_str("# HELP mcp_requests_total Total number of requests\n");
218        prometheus_output.push_str("# TYPE mcp_requests_total counter\n");
219        prometheus_output.push_str(&format!(
220            "mcp_requests_total {}\n",
221            metrics.request.total_requests
222        ));
223
224        prometheus_output
225            .push_str("# HELP mcp_requests_successful Total number of successful requests\n");
226        prometheus_output.push_str("# TYPE mcp_requests_successful counter\n");
227        prometheus_output.push_str(&format!(
228            "mcp_requests_successful {}\n",
229            metrics.request.successful_requests
230        ));
231
232        prometheus_output.push_str("# HELP mcp_requests_failed Total number of failed requests\n");
233        prometheus_output.push_str("# TYPE mcp_requests_failed counter\n");
234        prometheus_output.push_str(&format!(
235            "mcp_requests_failed {}\n",
236            metrics.request.failed_requests
237        ));
238
239        prometheus_output.push_str(
240            "# HELP mcp_request_duration_average Average request duration in milliseconds\n",
241        );
242        prometheus_output.push_str("# TYPE mcp_request_duration_average gauge\n");
243        prometheus_output.push_str(&format!(
244            "mcp_request_duration_average {}\n",
245            metrics.request.average_response_time
246        ));
247
248        // Method-specific metrics
249        for (method, count) in &metrics.request.method_counts {
250            prometheus_output.push_str(
251                "# HELP mcp_requests_by_method_total Total requests by method\n"
252                    .to_string()
253                    .as_str(),
254            );
255            prometheus_output.push_str(
256                "# TYPE mcp_requests_by_method_total counter\n"
257                    .to_string()
258                    .as_str(),
259            );
260            prometheus_output.push_str(&format!(
261                "mcp_requests_by_method_total{{method=\"{method}\"}} {count}\n"
262            ));
263        }
264
265        // Transport metrics
266        prometheus_output.push_str("# HELP mcp_transport_bytes_sent Total bytes sent\n");
267        prometheus_output.push_str("# TYPE mcp_transport_bytes_sent counter\n");
268        prometheus_output.push_str(&format!(
269            "mcp_transport_bytes_sent {}\n",
270            metrics.transport.bytes_sent
271        ));
272
273        prometheus_output.push_str("# HELP mcp_transport_bytes_received Total bytes received\n");
274        prometheus_output.push_str("# TYPE mcp_transport_bytes_received counter\n");
275        prometheus_output.push_str(&format!(
276            "mcp_transport_bytes_received {}\n",
277            metrics.transport.bytes_received
278        ));
279
280        prometheus_output
281            .push_str("# HELP mcp_transport_connections_active Current active connections\n");
282        prometheus_output.push_str("# TYPE mcp_transport_connections_active gauge\n");
283        prometheus_output.push_str(&format!(
284            "mcp_transport_connections_active {}\n",
285            metrics.transport.active_connections
286        ));
287
288        prometheus_output.push_str("# HELP mcp_transport_errors_total Total transport errors\n");
289        prometheus_output.push_str("# TYPE mcp_transport_errors_total counter\n");
290        prometheus_output.push_str(&format!(
291            "mcp_transport_errors_total {}\n",
292            metrics.transport.error_count
293        ));
294
295        // System metrics
296        prometheus_output
297            .push_str("# HELP mcp_system_memory_usage_bytes Current memory usage in bytes\n");
298        prometheus_output.push_str("# TYPE mcp_system_memory_usage_bytes gauge\n");
299        prometheus_output.push_str(&format!(
300            "mcp_system_memory_usage_bytes {}\n",
301            metrics.system.memory_usage
302        ));
303
304        prometheus_output
305            .push_str("# HELP mcp_system_cpu_usage_percent Current CPU usage percentage\n");
306        prometheus_output.push_str("# TYPE mcp_system_cpu_usage_percent gauge\n");
307        prometheus_output.push_str(&format!(
308            "mcp_system_cpu_usage_percent {}\n",
309            metrics.system.cpu_usage
310        ));
311
312        prometheus_output.push_str("# HELP mcp_system_uptime_seconds System uptime in seconds\n");
313        prometheus_output.push_str("# TYPE mcp_system_uptime_seconds gauge\n");
314        prometheus_output.push_str(&format!(
315            "mcp_system_uptime_seconds {}\n",
316            metrics.system.uptime.as_secs()
317        ));
318
319        prometheus_output
320    }
321
322    /// Reset all metrics
323    pub async fn reset(&self) {
324        let mut metrics = self.metrics.write().await;
325        *metrics = Metrics {
326            request: RequestMetrics::default(),
327            transport: TransportMetrics::default(),
328            system: SystemMetrics::default(),
329        };
330
331        info!("Metrics reset completed");
332    }
333
334    /// Get metrics collection interval
335    pub fn collection_interval(&self) -> Duration {
336        self.collection_interval
337    }
338
339    /// Set metrics collection interval
340    pub fn set_collection_interval(&mut self, interval: Duration) {
341        self.collection_interval = interval;
342    }
343}
344
345impl Default for MetricsCollector {
346    fn default() -> Self {
347        Self::new()
348    }
349}
350
351impl Default for RequestMetrics {
352    fn default() -> Self {
353        Self {
354            total_requests: 0,
355            successful_requests: 0,
356            failed_requests: 0,
357            average_response_time: 0.0,
358            method_counts: HashMap::new(),
359            response_time_histogram: HashMap::new(),
360            last_request_time: None,
361        }
362    }
363}
364
365impl Default for SystemMetrics {
366    fn default() -> Self {
367        Self {
368            memory_usage: 0,
369            cpu_usage: 0.0,
370            active_connections: 0,
371            uptime: Duration::ZERO,
372            start_time: SystemTime::now(),
373            last_update: SystemTime::now(),
374        }
375    }
376}
377
378/// Timer for measuring request duration
379pub struct RequestTimer {
380    start: Instant,
381    method: String,
382    metrics: Arc<MetricsCollector>,
383}
384
385impl RequestTimer {
386    /// Start a new request timer
387    pub fn start(method: impl Into<String>, metrics: Arc<MetricsCollector>) -> Self {
388        Self {
389            start: Instant::now(),
390            method: method.into(),
391            metrics,
392        }
393    }
394
395    /// Finish the timer and record the metrics
396    pub async fn finish(self, success: bool) {
397        let duration = self.start.elapsed();
398        self.metrics
399            .record_request(&self.method, duration, success)
400            .await;
401
402        debug!(
403            "Request completed: method={}, duration={:?}, success={}",
404            self.method, duration, success
405        );
406    }
407}
408
409#[cfg(test)]
410mod tests {
411    use super::*;
412    use tokio::time::sleep;
413
414    #[tokio::test]
415    async fn test_metrics_collector_creation() {
416        let collector = MetricsCollector::new();
417        let metrics = collector.get_metrics().await;
418
419        assert_eq!(metrics.request.total_requests, 0);
420        assert_eq!(metrics.transport.bytes_sent, 0);
421        assert_eq!(metrics.system.memory_usage, 0);
422    }
423
424    #[tokio::test]
425    async fn test_request_recording() {
426        let collector = Arc::new(MetricsCollector::new());
427
428        // Record a successful request
429        collector
430            .record_request("test_method", Duration::from_millis(100), true)
431            .await;
432
433        let metrics = collector.get_metrics().await;
434        assert_eq!(metrics.request.total_requests, 1);
435        assert_eq!(metrics.request.successful_requests, 1);
436        assert_eq!(metrics.request.failed_requests, 0);
437        assert_eq!(metrics.request.method_counts["test_method"], 1);
438    }
439
440    #[tokio::test]
441    async fn test_transport_metrics() {
442        let collector = Arc::new(MetricsCollector::new());
443
444        collector.record_transport_send(1024).await;
445        collector.record_transport_receive(2048).await;
446        collector.record_transport_error("connection_failed").await;
447
448        let metrics = collector.get_metrics().await;
449        assert_eq!(metrics.transport.bytes_sent, 1024);
450        assert_eq!(metrics.transport.bytes_received, 2048);
451        assert_eq!(metrics.transport.error_count, 1);
452        assert_eq!(metrics.transport.connection_errors["connection_failed"], 1);
453    }
454
455    #[tokio::test]
456    async fn test_request_timer() {
457        let collector = Arc::new(MetricsCollector::new());
458
459        let timer = RequestTimer::start("timer_test", collector.clone());
460        sleep(Duration::from_millis(10)).await;
461        timer.finish(true).await;
462
463        let metrics = collector.get_metrics().await;
464        assert_eq!(metrics.request.total_requests, 1);
465        assert_eq!(metrics.request.successful_requests, 1);
466        assert!(metrics.request.average_response_time > 0.0);
467    }
468
469    #[tokio::test]
470    async fn test_prometheus_export() {
471        let collector = Arc::new(MetricsCollector::new());
472
473        collector
474            .record_request("test", Duration::from_millis(50), true)
475            .await;
476        collector.record_transport_send(100).await;
477
478        let prometheus_output = collector.export_prometheus().await;
479
480        assert!(prometheus_output.contains("mcp_requests_total 1"));
481        assert!(prometheus_output.contains("mcp_transport_bytes_sent 100"));
482        assert!(prometheus_output.contains("mcp_request_duration_average"));
483    }
484
485    #[tokio::test]
486    async fn test_metrics_reset() {
487        let collector = Arc::new(MetricsCollector::new());
488
489        collector
490            .record_request("test", Duration::from_millis(50), true)
491            .await;
492        collector.record_transport_send(100).await;
493
494        // Verify metrics were recorded
495        let metrics = collector.get_metrics().await;
496        assert_eq!(metrics.request.total_requests, 1);
497        assert_eq!(metrics.transport.bytes_sent, 100);
498
499        // Reset metrics
500        collector.reset().await;
501
502        // Verify metrics were reset
503        let metrics = collector.get_metrics().await;
504        assert_eq!(metrics.request.total_requests, 0);
505        assert_eq!(metrics.transport.bytes_sent, 0);
506    }
507}