turbomcp_server/
metrics.rs

1//! Production-grade metrics collection and monitoring system
2//!
3//! This module provides a comprehensive, lock-free metrics collection system designed
4//! for high-performance production environments with zero-allocation hot paths.
5
6use parking_lot::RwLock;
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::time::{Duration, Instant};
11
12/// Production-grade server metrics collector with lock-free atomic operations
13#[derive(Debug)]
14pub struct ServerMetrics {
15    /// Total number of requests received since server start
16    pub requests_total: AtomicU64,
17    /// Number of requests that completed successfully
18    pub requests_successful: AtomicU64,
19    /// Number of requests that failed with errors
20    pub requests_failed: AtomicU64,
21    /// Number of requests currently being processed
22    pub requests_in_flight: AtomicU64,
23
24    /// Total number of errors across all categories
25    pub errors_total: AtomicU64,
26    /// Number of validation/schema errors
27    pub errors_validation: AtomicU64,
28    /// Number of authentication/authorization errors
29    pub errors_auth: AtomicU64,
30    /// Number of network-related errors
31    pub errors_network: AtomicU64,
32    /// Number of timeout errors
33    pub errors_timeout: AtomicU64,
34
35    /// Sum of all response times in microseconds
36    pub total_response_time_us: AtomicU64,
37    /// Minimum response time observed (microseconds)
38    pub min_response_time_us: AtomicU64,
39    /// Maximum response time observed (microseconds)
40    pub max_response_time_us: AtomicU64,
41
42    /// Total number of tool calls initiated
43    pub tool_calls_total: AtomicU64,
44    /// Number of tool calls that completed successfully
45    pub tool_calls_successful: AtomicU64,
46    /// Number of tool calls that failed
47    pub tool_calls_failed: AtomicU64,
48
49    /// Number of currently active connections
50    pub connections_active: AtomicU64,
51    /// Total connections accepted since server start
52    pub connections_total: AtomicU64,
53    /// Number of connections rejected (rate limiting, etc.)
54    pub connections_rejected: AtomicU64,
55
56    /// Current memory usage in bytes
57    pub memory_usage_bytes: AtomicU64,
58    /// Current CPU usage as percentage × 100 (due to no AtomicF64)
59    pub cpu_usage_percent_x100: AtomicU64,
60
61    /// Custom application-specific metrics (rare updates, RwLock acceptable)
62    pub custom: RwLock<HashMap<String, f64>>,
63
64    /// Response time histogram for latency distribution analysis
65    pub response_time_buckets: ResponseTimeHistogram,
66
67    /// Server start time for uptime calculation
68    pub start_time: Instant,
69}
70
71/// High-performance histogram for response time distribution
72#[derive(Debug)]
73pub struct ResponseTimeHistogram {
74    /// Requests completed in under 1 millisecond
75    pub bucket_1ms: AtomicU64,
76    /// Requests completed in 1-5 milliseconds
77    pub bucket_5ms: AtomicU64,
78    /// Requests completed in 5-10 milliseconds
79    pub bucket_10ms: AtomicU64,
80    /// Requests completed in 10-25 milliseconds
81    pub bucket_25ms: AtomicU64,
82    /// Requests completed in 25-50 milliseconds
83    pub bucket_50ms: AtomicU64,
84    /// Requests completed in 50-100 milliseconds
85    pub bucket_100ms: AtomicU64,
86    /// Requests completed in 100-250 milliseconds
87    pub bucket_250ms: AtomicU64,
88    /// Requests completed in 250-500 milliseconds
89    pub bucket_500ms: AtomicU64,
90    /// Requests completed in 500ms-1 second
91    pub bucket_1s: AtomicU64,
92    /// Requests completed in 1-2.5 seconds
93    pub bucket_2_5s: AtomicU64,
94    /// Requests completed in 2.5-5 seconds
95    pub bucket_5s: AtomicU64,
96    /// Requests completed in 5-10 seconds
97    pub bucket_10s: AtomicU64,
98    /// Requests completed in over 10 seconds
99    pub bucket_inf: AtomicU64,
100}
101
102impl Default for ResponseTimeHistogram {
103    fn default() -> Self {
104        Self::new()
105    }
106}
107
108impl ResponseTimeHistogram {
109    /// Production-grade histogram creation with proper bucket initialization
110    pub fn new() -> Self {
111        Self {
112            bucket_1ms: AtomicU64::new(0),
113            bucket_5ms: AtomicU64::new(0),
114            bucket_10ms: AtomicU64::new(0),
115            bucket_25ms: AtomicU64::new(0),
116            bucket_50ms: AtomicU64::new(0),
117            bucket_100ms: AtomicU64::new(0),
118            bucket_250ms: AtomicU64::new(0),
119            bucket_500ms: AtomicU64::new(0),
120            bucket_1s: AtomicU64::new(0),
121            bucket_2_5s: AtomicU64::new(0),
122            bucket_5s: AtomicU64::new(0),
123            bucket_10s: AtomicU64::new(0),
124            bucket_inf: AtomicU64::new(0),
125        }
126    }
127
128    /// Record response time with proper bucket assignment
129    #[inline]
130    pub fn record(&self, duration_us: u64) {
131        let duration_ms = duration_us / 1000;
132
133        if duration_ms < 1 {
134            self.bucket_1ms.fetch_add(1, Ordering::Relaxed);
135        } else if duration_ms < 5 {
136            self.bucket_5ms.fetch_add(1, Ordering::Relaxed);
137        } else if duration_ms < 10 {
138            self.bucket_10ms.fetch_add(1, Ordering::Relaxed);
139        } else if duration_ms < 25 {
140            self.bucket_25ms.fetch_add(1, Ordering::Relaxed);
141        } else if duration_ms < 50 {
142            self.bucket_50ms.fetch_add(1, Ordering::Relaxed);
143        } else if duration_ms < 100 {
144            self.bucket_100ms.fetch_add(1, Ordering::Relaxed);
145        } else if duration_ms < 250 {
146            self.bucket_250ms.fetch_add(1, Ordering::Relaxed);
147        } else if duration_ms < 500 {
148            self.bucket_500ms.fetch_add(1, Ordering::Relaxed);
149        } else if duration_ms < 1000 {
150            self.bucket_1s.fetch_add(1, Ordering::Relaxed);
151        } else if duration_ms < 2500 {
152            self.bucket_2_5s.fetch_add(1, Ordering::Relaxed);
153        } else if duration_ms < 5000 {
154            self.bucket_5s.fetch_add(1, Ordering::Relaxed);
155        } else if duration_ms < 10000 {
156            self.bucket_10s.fetch_add(1, Ordering::Relaxed);
157        } else {
158            self.bucket_inf.fetch_add(1, Ordering::Relaxed);
159        }
160    }
161}
162
163impl ServerMetrics {
164    /// Create production-grade metrics collector with comprehensive initialization
165    pub fn new() -> Self {
166        Self {
167            requests_total: AtomicU64::new(0),
168            requests_successful: AtomicU64::new(0),
169            requests_failed: AtomicU64::new(0),
170            requests_in_flight: AtomicU64::new(0),
171
172            errors_total: AtomicU64::new(0),
173            errors_validation: AtomicU64::new(0),
174            errors_auth: AtomicU64::new(0),
175            errors_network: AtomicU64::new(0),
176            errors_timeout: AtomicU64::new(0),
177
178            total_response_time_us: AtomicU64::new(0),
179            min_response_time_us: AtomicU64::new(u64::MAX),
180            max_response_time_us: AtomicU64::new(0),
181
182            tool_calls_total: AtomicU64::new(0),
183            tool_calls_successful: AtomicU64::new(0),
184            tool_calls_failed: AtomicU64::new(0),
185
186            connections_active: AtomicU64::new(0),
187            connections_total: AtomicU64::new(0),
188            connections_rejected: AtomicU64::new(0),
189
190            memory_usage_bytes: AtomicU64::new(0),
191            cpu_usage_percent_x100: AtomicU64::new(0),
192
193            custom: RwLock::new(HashMap::new()),
194            response_time_buckets: ResponseTimeHistogram::new(),
195            start_time: Instant::now(),
196        }
197    }
198
199    /// Record request start with zero-allocation tracking
200    #[inline]
201    pub fn record_request_start(&self) {
202        self.requests_total.fetch_add(1, Ordering::Relaxed);
203        self.requests_in_flight.fetch_add(1, Ordering::Relaxed);
204    }
205
206    /// Record successful request completion with timing
207    #[inline]
208    pub fn record_request_success(&self, duration: Duration) {
209        self.requests_successful.fetch_add(1, Ordering::Relaxed);
210        self.requests_in_flight.fetch_sub(1, Ordering::Relaxed);
211
212        let duration_us = duration.as_micros() as u64;
213        self.total_response_time_us
214            .fetch_add(duration_us, Ordering::Relaxed);
215        self.response_time_buckets.record(duration_us);
216
217        // Update min/max with compare-and-swap
218        self.update_min_response_time(duration_us);
219        self.update_max_response_time(duration_us);
220    }
221
222    /// Record failed request with error categorization
223    #[inline]
224    pub fn record_request_failure(&self, error_type: &str, duration: Duration) {
225        self.requests_failed.fetch_add(1, Ordering::Relaxed);
226        self.requests_in_flight.fetch_sub(1, Ordering::Relaxed);
227        self.errors_total.fetch_add(1, Ordering::Relaxed);
228
229        // Categorize errors for comprehensive tracking
230        match error_type {
231            "validation" => self.errors_validation.fetch_add(1, Ordering::Relaxed),
232            "auth" => self.errors_auth.fetch_add(1, Ordering::Relaxed),
233            "network" => self.errors_network.fetch_add(1, Ordering::Relaxed),
234            "timeout" => self.errors_timeout.fetch_add(1, Ordering::Relaxed),
235            _ => 0, // Unknown error types don't increment specific counters
236        };
237
238        let duration_us = duration.as_micros() as u64;
239        self.response_time_buckets.record(duration_us);
240    }
241
242    /// Record tool call metrics with comprehensive tracking
243    #[inline]
244    pub fn record_tool_call(&self, success: bool) {
245        self.tool_calls_total.fetch_add(1, Ordering::Relaxed);
246        if success {
247            self.tool_calls_successful.fetch_add(1, Ordering::Relaxed);
248        } else {
249            self.tool_calls_failed.fetch_add(1, Ordering::Relaxed);
250        }
251    }
252
253    /// Update connection metrics with proper lifecycle tracking  
254    #[inline]
255    pub fn record_connection_established(&self) {
256        self.connections_total.fetch_add(1, Ordering::Relaxed);
257        self.connections_active.fetch_add(1, Ordering::Relaxed);
258    }
259
260    /// Record when a connection is closed/terminated
261    #[inline]
262    pub fn record_connection_closed(&self) {
263        self.connections_active.fetch_sub(1, Ordering::Relaxed);
264    }
265
266    /// Record when a connection is rejected due to rate limiting or other policies
267    #[inline]
268    pub fn record_connection_rejected(&self) {
269        self.connections_rejected.fetch_add(1, Ordering::Relaxed);
270    }
271
272    /// Update resource metrics (called periodically by monitoring task)
273    pub fn update_resource_metrics(&self, memory_bytes: u64, cpu_percent: f64) {
274        self.memory_usage_bytes
275            .store(memory_bytes, Ordering::Relaxed);
276        // Store CPU percentage as fixed-point (multiply by 100 to preserve 2 decimal places)
277        self.cpu_usage_percent_x100
278            .store((cpu_percent * 100.0) as u64, Ordering::Relaxed);
279    }
280
281    /// Record custom metric (infrequent operation, lock acceptable)
282    pub fn record_custom(&self, name: &str, value: f64) {
283        let mut custom = self.custom.write();
284        custom.insert(name.to_string(), value);
285    }
286
287    /// Calculate uptime in seconds
288    pub fn uptime_seconds(&self) -> u64 {
289        self.start_time.elapsed().as_secs()
290    }
291
292    /// Calculate average response time in microseconds
293    pub fn avg_response_time_us(&self) -> f64 {
294        let total_time = self.total_response_time_us.load(Ordering::Relaxed);
295        let successful_requests = self.requests_successful.load(Ordering::Relaxed);
296
297        if successful_requests > 0 {
298            total_time as f64 / successful_requests as f64
299        } else {
300            0.0
301        }
302    }
303
304    /// Calculate request rate (requests per second)
305    pub fn request_rate(&self) -> f64 {
306        let total_requests = self.requests_total.load(Ordering::Relaxed);
307        let uptime = self.uptime_seconds();
308
309        if uptime > 0 {
310            total_requests as f64 / uptime as f64
311        } else {
312            0.0
313        }
314    }
315
316    /// Calculate error rate as percentage
317    pub fn error_rate_percent(&self) -> f64 {
318        let total_requests = self.requests_total.load(Ordering::Relaxed);
319        let failed_requests = self.requests_failed.load(Ordering::Relaxed);
320
321        if total_requests > 0 {
322            (failed_requests as f64 / total_requests as f64) * 100.0
323        } else {
324            0.0
325        }
326    }
327
328    /// Lock-free atomic update of minimum response time
329    fn update_min_response_time(&self, new_value: u64) {
330        loop {
331            let current = self.min_response_time_us.load(Ordering::Relaxed);
332            if new_value >= current {
333                break; // New value is not smaller
334            }
335            match self.min_response_time_us.compare_exchange_weak(
336                current,
337                new_value,
338                Ordering::Relaxed,
339                Ordering::Relaxed,
340            ) {
341                Ok(_) => break,     // Successfully updated
342                Err(_) => continue, // Retry with new current value
343            }
344        }
345    }
346
347    /// Lock-free atomic update of maximum response time
348    fn update_max_response_time(&self, new_value: u64) {
349        loop {
350            let current = self.max_response_time_us.load(Ordering::Relaxed);
351            if new_value <= current {
352                break; // New value is not larger
353            }
354            match self.max_response_time_us.compare_exchange_weak(
355                current,
356                new_value,
357                Ordering::Relaxed,
358                Ordering::Relaxed,
359            ) {
360                Ok(_) => break,     // Successfully updated
361                Err(_) => continue, // Retry with new current value
362            }
363        }
364    }
365}
366
367impl Default for ServerMetrics {
368    fn default() -> Self {
369        Self::new()
370    }
371}
372
373/// Metrics collector trait for extensible metric collection systems
374pub trait MetricsCollector: Send + Sync {
375    /// Collect metrics into a HashMap for export to monitoring systems
376    fn collect(&self) -> HashMap<String, f64>;
377}
378
379/// Production-grade comprehensive metrics collector implementation
380#[derive(Debug)]
381pub struct ComprehensiveMetricsCollector {
382    /// Server metrics reference
383    metrics: Arc<ServerMetrics>,
384}
385
386impl ComprehensiveMetricsCollector {
387    /// Create a new comprehensive metrics collector
388    #[must_use]
389    pub const fn new(metrics: Arc<ServerMetrics>) -> Self {
390        Self { metrics }
391    }
392}
393
394impl MetricsCollector for ComprehensiveMetricsCollector {
395    fn collect(&self) -> HashMap<String, f64> {
396        let mut metrics = HashMap::new();
397
398        // Request metrics with comprehensive tracking
399        metrics.insert(
400            "requests_total".to_string(),
401            self.metrics.requests_total.load(Ordering::Relaxed) as f64,
402        );
403        metrics.insert(
404            "requests_successful".to_string(),
405            self.metrics.requests_successful.load(Ordering::Relaxed) as f64,
406        );
407        metrics.insert(
408            "requests_failed".to_string(),
409            self.metrics.requests_failed.load(Ordering::Relaxed) as f64,
410        );
411        metrics.insert(
412            "requests_in_flight".to_string(),
413            self.metrics.requests_in_flight.load(Ordering::Relaxed) as f64,
414        );
415        metrics.insert(
416            "requests_rate_per_second".to_string(),
417            self.metrics.request_rate(),
418        );
419
420        // Error metrics with detailed breakdown
421        metrics.insert(
422            "errors_total".to_string(),
423            self.metrics.errors_total.load(Ordering::Relaxed) as f64,
424        );
425        metrics.insert(
426            "errors_validation".to_string(),
427            self.metrics.errors_validation.load(Ordering::Relaxed) as f64,
428        );
429        metrics.insert(
430            "errors_auth".to_string(),
431            self.metrics.errors_auth.load(Ordering::Relaxed) as f64,
432        );
433        metrics.insert(
434            "errors_network".to_string(),
435            self.metrics.errors_network.load(Ordering::Relaxed) as f64,
436        );
437        metrics.insert(
438            "errors_timeout".to_string(),
439            self.metrics.errors_timeout.load(Ordering::Relaxed) as f64,
440        );
441        metrics.insert(
442            "error_rate_percent".to_string(),
443            self.metrics.error_rate_percent(),
444        );
445
446        // Performance metrics with high-resolution timing
447        metrics.insert(
448            "response_time_avg_us".to_string(),
449            self.metrics.avg_response_time_us(),
450        );
451        let min_time = self.metrics.min_response_time_us.load(Ordering::Relaxed);
452        metrics.insert(
453            "response_time_min_us".to_string(),
454            if min_time == u64::MAX {
455                0.0
456            } else {
457                min_time as f64
458            },
459        );
460        metrics.insert(
461            "response_time_max_us".to_string(),
462            self.metrics.max_response_time_us.load(Ordering::Relaxed) as f64,
463        );
464
465        // Tool-specific metrics
466        metrics.insert(
467            "tool_calls_total".to_string(),
468            self.metrics.tool_calls_total.load(Ordering::Relaxed) as f64,
469        );
470        metrics.insert(
471            "tool_calls_successful".to_string(),
472            self.metrics.tool_calls_successful.load(Ordering::Relaxed) as f64,
473        );
474        metrics.insert(
475            "tool_calls_failed".to_string(),
476            self.metrics.tool_calls_failed.load(Ordering::Relaxed) as f64,
477        );
478
479        // Connection metrics
480        metrics.insert(
481            "connections_active".to_string(),
482            self.metrics.connections_active.load(Ordering::Relaxed) as f64,
483        );
484        metrics.insert(
485            "connections_total".to_string(),
486            self.metrics.connections_total.load(Ordering::Relaxed) as f64,
487        );
488        metrics.insert(
489            "connections_rejected".to_string(),
490            self.metrics.connections_rejected.load(Ordering::Relaxed) as f64,
491        );
492
493        // Resource metrics
494        metrics.insert(
495            "memory_usage_bytes".to_string(),
496            self.metrics.memory_usage_bytes.load(Ordering::Relaxed) as f64,
497        );
498        metrics.insert(
499            "cpu_usage_percent".to_string(),
500            self.metrics.cpu_usage_percent_x100.load(Ordering::Relaxed) as f64 / 100.0,
501        );
502
503        // Server uptime
504        metrics.insert(
505            "uptime_seconds".to_string(),
506            self.metrics.uptime_seconds() as f64,
507        );
508
509        // Response time histogram buckets (Prometheus-compatible)
510        let buckets = &self.metrics.response_time_buckets;
511        metrics.insert(
512            "response_time_bucket_1ms".to_string(),
513            buckets.bucket_1ms.load(Ordering::Relaxed) as f64,
514        );
515        metrics.insert(
516            "response_time_bucket_5ms".to_string(),
517            buckets.bucket_5ms.load(Ordering::Relaxed) as f64,
518        );
519        metrics.insert(
520            "response_time_bucket_10ms".to_string(),
521            buckets.bucket_10ms.load(Ordering::Relaxed) as f64,
522        );
523        metrics.insert(
524            "response_time_bucket_25ms".to_string(),
525            buckets.bucket_25ms.load(Ordering::Relaxed) as f64,
526        );
527        metrics.insert(
528            "response_time_bucket_50ms".to_string(),
529            buckets.bucket_50ms.load(Ordering::Relaxed) as f64,
530        );
531        metrics.insert(
532            "response_time_bucket_100ms".to_string(),
533            buckets.bucket_100ms.load(Ordering::Relaxed) as f64,
534        );
535        metrics.insert(
536            "response_time_bucket_250ms".to_string(),
537            buckets.bucket_250ms.load(Ordering::Relaxed) as f64,
538        );
539        metrics.insert(
540            "response_time_bucket_500ms".to_string(),
541            buckets.bucket_500ms.load(Ordering::Relaxed) as f64,
542        );
543        metrics.insert(
544            "response_time_bucket_1s".to_string(),
545            buckets.bucket_1s.load(Ordering::Relaxed) as f64,
546        );
547        metrics.insert(
548            "response_time_bucket_2_5s".to_string(),
549            buckets.bucket_2_5s.load(Ordering::Relaxed) as f64,
550        );
551        metrics.insert(
552            "response_time_bucket_5s".to_string(),
553            buckets.bucket_5s.load(Ordering::Relaxed) as f64,
554        );
555        metrics.insert(
556            "response_time_bucket_10s".to_string(),
557            buckets.bucket_10s.load(Ordering::Relaxed) as f64,
558        );
559        metrics.insert(
560            "response_time_bucket_inf".to_string(),
561            buckets.bucket_inf.load(Ordering::Relaxed) as f64,
562        );
563
564        // Custom metrics (infrequent read lock acceptable)
565        if let Some(custom_metrics) = self.metrics.custom.try_read() {
566            for (key, value) in custom_metrics.iter() {
567                metrics.insert(format!("custom_{key}"), *value);
568            }
569        }
570
571        metrics
572    }
573}