turbomcp_server/
metrics.rs

1//! Production-grade metrics collection and monitoring system
2//!
3//! This module provides a comprehensive, lock-free metrics collection system designed
4//! for high-performance production environments with zero-allocation hot paths.
5
6use parking_lot::RwLock;
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::time::{Duration, Instant};
11
12/// Production-grade server metrics collector with lock-free atomic operations
13#[derive(Debug)]
14pub struct ServerMetrics {
15    /// Total number of requests received since server start
16    pub requests_total: AtomicU64,
17    /// Number of requests that completed successfully
18    pub requests_successful: AtomicU64,
19    /// Number of requests that failed with errors
20    pub requests_failed: AtomicU64,
21    /// Number of requests currently being processed
22    pub requests_in_flight: AtomicU64,
23
24    /// Total number of errors across all categories
25    pub errors_total: AtomicU64,
26    /// Number of validation/schema errors
27    pub errors_validation: AtomicU64,
28    /// Number of authentication/authorization errors
29    pub errors_auth: AtomicU64,
30    /// Number of network-related errors
31    pub errors_network: AtomicU64,
32    /// Number of timeout errors
33    pub errors_timeout: AtomicU64,
34
35    /// Security and timeout-specific metrics for audit and DoS detection
36    /// Number of tool executions that exceeded timeout
37    pub tool_timeouts_total: AtomicU64,
38    /// Number of tool executions cancelled cooperatively
39    pub tool_cancellations_total: AtomicU64,
40    /// Number of tool executions that completed within timeout
41    pub tool_executions_successful: AtomicU64,
42    /// Total time spent in timed-out operations (microseconds)
43    pub timeout_wasted_time_us: AtomicU64,
44    /// Number of active tool executions currently running
45    pub tool_executions_active: AtomicU64,
46
47    /// Sum of all response times in microseconds
48    pub total_response_time_us: AtomicU64,
49    /// Minimum response time observed (microseconds)
50    pub min_response_time_us: AtomicU64,
51    /// Maximum response time observed (microseconds)
52    pub max_response_time_us: AtomicU64,
53
54    /// Total number of tool calls initiated
55    pub tool_calls_total: AtomicU64,
56    /// Number of tool calls that completed successfully
57    pub tool_calls_successful: AtomicU64,
58    /// Number of tool calls that failed
59    pub tool_calls_failed: AtomicU64,
60
61    /// Number of currently active connections
62    pub connections_active: AtomicU64,
63    /// Total connections accepted since server start
64    pub connections_total: AtomicU64,
65    /// Number of connections rejected (rate limiting, etc.)
66    pub connections_rejected: AtomicU64,
67
68    /// Current memory usage in bytes
69    pub memory_usage_bytes: AtomicU64,
70    /// Current CPU usage as percentage × 100 (due to no AtomicF64)
71    pub cpu_usage_percent_x100: AtomicU64,
72
73    /// Custom application-specific metrics (rare updates, RwLock acceptable)
74    pub custom: RwLock<HashMap<String, f64>>,
75
76    /// Response time histogram for latency distribution analysis
77    pub response_time_buckets: ResponseTimeHistogram,
78
79    /// Server start time for uptime calculation
80    pub start_time: Instant,
81}
82
83/// High-performance histogram for response time distribution
84#[derive(Debug)]
85pub struct ResponseTimeHistogram {
86    /// Requests completed in under 1 millisecond
87    pub bucket_1ms: AtomicU64,
88    /// Requests completed in 1-5 milliseconds
89    pub bucket_5ms: AtomicU64,
90    /// Requests completed in 5-10 milliseconds
91    pub bucket_10ms: AtomicU64,
92    /// Requests completed in 10-25 milliseconds
93    pub bucket_25ms: AtomicU64,
94    /// Requests completed in 25-50 milliseconds
95    pub bucket_50ms: AtomicU64,
96    /// Requests completed in 50-100 milliseconds
97    pub bucket_100ms: AtomicU64,
98    /// Requests completed in 100-250 milliseconds
99    pub bucket_250ms: AtomicU64,
100    /// Requests completed in 250-500 milliseconds
101    pub bucket_500ms: AtomicU64,
102    /// Requests completed in 500ms-1 second
103    pub bucket_1s: AtomicU64,
104    /// Requests completed in 1-2.5 seconds
105    pub bucket_2_5s: AtomicU64,
106    /// Requests completed in 2.5-5 seconds
107    pub bucket_5s: AtomicU64,
108    /// Requests completed in 5-10 seconds
109    pub bucket_10s: AtomicU64,
110    /// Requests completed in over 10 seconds
111    pub bucket_inf: AtomicU64,
112}
113
114impl Default for ResponseTimeHistogram {
115    fn default() -> Self {
116        Self::new()
117    }
118}
119
120impl ResponseTimeHistogram {
121    /// Production-grade histogram creation with proper bucket initialization
122    pub fn new() -> Self {
123        Self {
124            bucket_1ms: AtomicU64::new(0),
125            bucket_5ms: AtomicU64::new(0),
126            bucket_10ms: AtomicU64::new(0),
127            bucket_25ms: AtomicU64::new(0),
128            bucket_50ms: AtomicU64::new(0),
129            bucket_100ms: AtomicU64::new(0),
130            bucket_250ms: AtomicU64::new(0),
131            bucket_500ms: AtomicU64::new(0),
132            bucket_1s: AtomicU64::new(0),
133            bucket_2_5s: AtomicU64::new(0),
134            bucket_5s: AtomicU64::new(0),
135            bucket_10s: AtomicU64::new(0),
136            bucket_inf: AtomicU64::new(0),
137        }
138    }
139
140    /// Record response time with proper bucket assignment
141    #[inline]
142    pub fn record(&self, duration_us: u64) {
143        let duration_ms = duration_us / 1000;
144
145        if duration_ms < 1 {
146            self.bucket_1ms.fetch_add(1, Ordering::Relaxed);
147        } else if duration_ms < 5 {
148            self.bucket_5ms.fetch_add(1, Ordering::Relaxed);
149        } else if duration_ms < 10 {
150            self.bucket_10ms.fetch_add(1, Ordering::Relaxed);
151        } else if duration_ms < 25 {
152            self.bucket_25ms.fetch_add(1, Ordering::Relaxed);
153        } else if duration_ms < 50 {
154            self.bucket_50ms.fetch_add(1, Ordering::Relaxed);
155        } else if duration_ms < 100 {
156            self.bucket_100ms.fetch_add(1, Ordering::Relaxed);
157        } else if duration_ms < 250 {
158            self.bucket_250ms.fetch_add(1, Ordering::Relaxed);
159        } else if duration_ms < 500 {
160            self.bucket_500ms.fetch_add(1, Ordering::Relaxed);
161        } else if duration_ms < 1000 {
162            self.bucket_1s.fetch_add(1, Ordering::Relaxed);
163        } else if duration_ms < 2500 {
164            self.bucket_2_5s.fetch_add(1, Ordering::Relaxed);
165        } else if duration_ms < 5000 {
166            self.bucket_5s.fetch_add(1, Ordering::Relaxed);
167        } else if duration_ms < 10000 {
168            self.bucket_10s.fetch_add(1, Ordering::Relaxed);
169        } else {
170            self.bucket_inf.fetch_add(1, Ordering::Relaxed);
171        }
172    }
173}
174
175impl ServerMetrics {
176    /// Create proven metrics collector with comprehensive initialization
177    pub fn new() -> Self {
178        Self {
179            requests_total: AtomicU64::new(0),
180            requests_successful: AtomicU64::new(0),
181            requests_failed: AtomicU64::new(0),
182            requests_in_flight: AtomicU64::new(0),
183
184            errors_total: AtomicU64::new(0),
185            errors_validation: AtomicU64::new(0),
186            errors_auth: AtomicU64::new(0),
187            errors_network: AtomicU64::new(0),
188            errors_timeout: AtomicU64::new(0),
189
190            // Initialize timeout-specific metrics
191            tool_timeouts_total: AtomicU64::new(0),
192            tool_cancellations_total: AtomicU64::new(0),
193            tool_executions_successful: AtomicU64::new(0),
194            timeout_wasted_time_us: AtomicU64::new(0),
195            tool_executions_active: AtomicU64::new(0),
196
197            total_response_time_us: AtomicU64::new(0),
198            min_response_time_us: AtomicU64::new(u64::MAX),
199            max_response_time_us: AtomicU64::new(0),
200
201            tool_calls_total: AtomicU64::new(0),
202            tool_calls_successful: AtomicU64::new(0),
203            tool_calls_failed: AtomicU64::new(0),
204
205            connections_active: AtomicU64::new(0),
206            connections_total: AtomicU64::new(0),
207            connections_rejected: AtomicU64::new(0),
208
209            memory_usage_bytes: AtomicU64::new(0),
210            cpu_usage_percent_x100: AtomicU64::new(0),
211
212            custom: RwLock::new(HashMap::new()),
213            response_time_buckets: ResponseTimeHistogram::new(),
214            start_time: Instant::now(),
215        }
216    }
217
218    /// Record request start with zero-allocation tracking
219    #[inline]
220    pub fn record_request_start(&self) {
221        self.requests_total.fetch_add(1, Ordering::Relaxed);
222        self.requests_in_flight.fetch_add(1, Ordering::Relaxed);
223    }
224
225    /// Record successful request completion with timing
226    #[inline]
227    pub fn record_request_success(&self, duration: Duration) {
228        self.requests_successful.fetch_add(1, Ordering::Relaxed);
229        self.requests_in_flight.fetch_sub(1, Ordering::Relaxed);
230
231        let duration_us = duration.as_micros() as u64;
232        self.total_response_time_us
233            .fetch_add(duration_us, Ordering::Relaxed);
234        self.response_time_buckets.record(duration_us);
235
236        // Update min/max with compare-and-swap
237        self.update_min_response_time(duration_us);
238        self.update_max_response_time(duration_us);
239    }
240
241    /// Record failed request with error categorization
242    #[inline]
243    pub fn record_request_failure(&self, error_type: &str, duration: Duration) {
244        self.requests_failed.fetch_add(1, Ordering::Relaxed);
245        self.requests_in_flight.fetch_sub(1, Ordering::Relaxed);
246        self.errors_total.fetch_add(1, Ordering::Relaxed);
247
248        // Categorize errors for comprehensive tracking
249        match error_type {
250            "validation" => self.errors_validation.fetch_add(1, Ordering::Relaxed),
251            "auth" => self.errors_auth.fetch_add(1, Ordering::Relaxed),
252            "network" => self.errors_network.fetch_add(1, Ordering::Relaxed),
253            "timeout" => self.errors_timeout.fetch_add(1, Ordering::Relaxed),
254            _ => 0, // Unknown error types don't increment specific counters
255        };
256
257        let duration_us = duration.as_micros() as u64;
258        self.response_time_buckets.record(duration_us);
259    }
260
261    /// Record tool call metrics with comprehensive tracking
262    #[inline]
263    pub fn record_tool_call(&self, success: bool) {
264        self.tool_calls_total.fetch_add(1, Ordering::Relaxed);
265        if success {
266            self.tool_calls_successful.fetch_add(1, Ordering::Relaxed);
267        } else {
268            self.tool_calls_failed.fetch_add(1, Ordering::Relaxed);
269        }
270    }
271
272    /// Update connection metrics with proper lifecycle tracking  
273    #[inline]
274    pub fn record_connection_established(&self) {
275        self.connections_total.fetch_add(1, Ordering::Relaxed);
276        self.connections_active.fetch_add(1, Ordering::Relaxed);
277    }
278
279    /// Record when a connection is closed/terminated
280    #[inline]
281    pub fn record_connection_closed(&self) {
282        self.connections_active.fetch_sub(1, Ordering::Relaxed);
283    }
284
285    /// Record when a connection is rejected due to rate limiting or other policies
286    #[inline]
287    pub fn record_connection_rejected(&self) {
288        self.connections_rejected.fetch_add(1, Ordering::Relaxed);
289    }
290
291    /// Update resource metrics (called periodically by monitoring task)
292    pub fn update_resource_metrics(&self, memory_bytes: u64, cpu_percent: f64) {
293        self.memory_usage_bytes
294            .store(memory_bytes, Ordering::Relaxed);
295        // Store CPU percentage as fixed-point (multiply by 100 to preserve 2 decimal places)
296        self.cpu_usage_percent_x100
297            .store((cpu_percent * 100.0) as u64, Ordering::Relaxed);
298    }
299
300    /// Record custom metric (infrequent operation, lock acceptable)
301    pub fn record_custom(&self, name: &str, value: f64) {
302        let mut custom = self.custom.write();
303        custom.insert(name.to_string(), value);
304    }
305
306    /// Calculate uptime in seconds
307    pub fn uptime_seconds(&self) -> u64 {
308        self.start_time.elapsed().as_secs()
309    }
310
311    /// Calculate average response time in microseconds
312    pub fn avg_response_time_us(&self) -> f64 {
313        let total_time = self.total_response_time_us.load(Ordering::Relaxed);
314        let successful_requests = self.requests_successful.load(Ordering::Relaxed);
315
316        if successful_requests > 0 {
317            total_time as f64 / successful_requests as f64
318        } else {
319            0.0
320        }
321    }
322
323    /// Calculate request rate (requests per second)
324    pub fn request_rate(&self) -> f64 {
325        let total_requests = self.requests_total.load(Ordering::Relaxed);
326        let uptime = self.uptime_seconds();
327
328        if uptime > 0 {
329            total_requests as f64 / uptime as f64
330        } else {
331            0.0
332        }
333    }
334
335    /// Calculate error rate as percentage
336    pub fn error_rate_percent(&self) -> f64 {
337        let total_requests = self.requests_total.load(Ordering::Relaxed);
338        let failed_requests = self.requests_failed.load(Ordering::Relaxed);
339
340        if total_requests > 0 {
341            (failed_requests as f64 / total_requests as f64) * 100.0
342        } else {
343            0.0
344        }
345    }
346
347    /// Lock-free atomic update of minimum response time
348    fn update_min_response_time(&self, new_value: u64) {
349        loop {
350            let current = self.min_response_time_us.load(Ordering::Relaxed);
351            if new_value >= current {
352                break; // New value is not smaller
353            }
354            match self.min_response_time_us.compare_exchange_weak(
355                current,
356                new_value,
357                Ordering::Relaxed,
358                Ordering::Relaxed,
359            ) {
360                Ok(_) => break,     // Successfully updated
361                Err(_) => continue, // Retry with new current value
362            }
363        }
364    }
365
366    /// Lock-free atomic update of maximum response time
367    fn update_max_response_time(&self, new_value: u64) {
368        loop {
369            let current = self.max_response_time_us.load(Ordering::Relaxed);
370            if new_value <= current {
371                break; // New value is not larger
372            }
373            match self.max_response_time_us.compare_exchange_weak(
374                current,
375                new_value,
376                Ordering::Relaxed,
377                Ordering::Relaxed,
378            ) {
379                Ok(_) => break,     // Successfully updated
380                Err(_) => continue, // Retry with new current value
381            }
382        }
383    }
384}
385
386impl Default for ServerMetrics {
387    fn default() -> Self {
388        Self::new()
389    }
390}
391
392/// Metrics collector trait for extensible metric collection systems
393pub trait MetricsCollector: Send + Sync {
394    /// Collect metrics into a HashMap for export to monitoring systems
395    fn collect(&self) -> HashMap<String, f64>;
396}
397
398/// Production-grade comprehensive metrics collector implementation
399#[derive(Debug)]
400pub struct ComprehensiveMetricsCollector {
401    /// Server metrics reference
402    metrics: Arc<ServerMetrics>,
403}
404
405impl ComprehensiveMetricsCollector {
406    /// Create a new comprehensive metrics collector
407    #[must_use]
408    pub const fn new(metrics: Arc<ServerMetrics>) -> Self {
409        Self { metrics }
410    }
411}
412
413impl MetricsCollector for ComprehensiveMetricsCollector {
414    fn collect(&self) -> HashMap<String, f64> {
415        let mut metrics = HashMap::new();
416
417        // Request metrics with comprehensive tracking
418        metrics.insert(
419            "requests_total".to_string(),
420            self.metrics.requests_total.load(Ordering::Relaxed) as f64,
421        );
422        metrics.insert(
423            "requests_successful".to_string(),
424            self.metrics.requests_successful.load(Ordering::Relaxed) as f64,
425        );
426        metrics.insert(
427            "requests_failed".to_string(),
428            self.metrics.requests_failed.load(Ordering::Relaxed) as f64,
429        );
430        metrics.insert(
431            "requests_in_flight".to_string(),
432            self.metrics.requests_in_flight.load(Ordering::Relaxed) as f64,
433        );
434        metrics.insert(
435            "requests_rate_per_second".to_string(),
436            self.metrics.request_rate(),
437        );
438
439        // Error metrics with detailed breakdown
440        metrics.insert(
441            "errors_total".to_string(),
442            self.metrics.errors_total.load(Ordering::Relaxed) as f64,
443        );
444        metrics.insert(
445            "errors_validation".to_string(),
446            self.metrics.errors_validation.load(Ordering::Relaxed) as f64,
447        );
448        metrics.insert(
449            "errors_auth".to_string(),
450            self.metrics.errors_auth.load(Ordering::Relaxed) as f64,
451        );
452        metrics.insert(
453            "errors_network".to_string(),
454            self.metrics.errors_network.load(Ordering::Relaxed) as f64,
455        );
456        metrics.insert(
457            "errors_timeout".to_string(),
458            self.metrics.errors_timeout.load(Ordering::Relaxed) as f64,
459        );
460
461        // Security-focused timeout metrics for audit and DoS detection
462        metrics.insert(
463            "tool_timeouts_total".to_string(),
464            self.metrics.tool_timeouts_total.load(Ordering::Relaxed) as f64,
465        );
466        metrics.insert(
467            "tool_cancellations_total".to_string(),
468            self.metrics
469                .tool_cancellations_total
470                .load(Ordering::Relaxed) as f64,
471        );
472        metrics.insert(
473            "tool_executions_successful".to_string(),
474            self.metrics
475                .tool_executions_successful
476                .load(Ordering::Relaxed) as f64,
477        );
478        metrics.insert(
479            "timeout_wasted_time_us".to_string(),
480            self.metrics.timeout_wasted_time_us.load(Ordering::Relaxed) as f64,
481        );
482        metrics.insert(
483            "tool_executions_active".to_string(),
484            self.metrics.tool_executions_active.load(Ordering::Relaxed) as f64,
485        );
486
487        metrics.insert(
488            "error_rate_percent".to_string(),
489            self.metrics.error_rate_percent(),
490        );
491
492        // Performance metrics with high-resolution timing
493        metrics.insert(
494            "response_time_avg_us".to_string(),
495            self.metrics.avg_response_time_us(),
496        );
497        let min_time = self.metrics.min_response_time_us.load(Ordering::Relaxed);
498        metrics.insert(
499            "response_time_min_us".to_string(),
500            if min_time == u64::MAX {
501                0.0
502            } else {
503                min_time as f64
504            },
505        );
506        metrics.insert(
507            "response_time_max_us".to_string(),
508            self.metrics.max_response_time_us.load(Ordering::Relaxed) as f64,
509        );
510
511        // Tool-specific metrics
512        metrics.insert(
513            "tool_calls_total".to_string(),
514            self.metrics.tool_calls_total.load(Ordering::Relaxed) as f64,
515        );
516        metrics.insert(
517            "tool_calls_successful".to_string(),
518            self.metrics.tool_calls_successful.load(Ordering::Relaxed) as f64,
519        );
520        metrics.insert(
521            "tool_calls_failed".to_string(),
522            self.metrics.tool_calls_failed.load(Ordering::Relaxed) as f64,
523        );
524
525        // Connection metrics
526        metrics.insert(
527            "connections_active".to_string(),
528            self.metrics.connections_active.load(Ordering::Relaxed) as f64,
529        );
530        metrics.insert(
531            "connections_total".to_string(),
532            self.metrics.connections_total.load(Ordering::Relaxed) as f64,
533        );
534        metrics.insert(
535            "connections_rejected".to_string(),
536            self.metrics.connections_rejected.load(Ordering::Relaxed) as f64,
537        );
538
539        // Resource metrics
540        metrics.insert(
541            "memory_usage_bytes".to_string(),
542            self.metrics.memory_usage_bytes.load(Ordering::Relaxed) as f64,
543        );
544        metrics.insert(
545            "cpu_usage_percent".to_string(),
546            self.metrics.cpu_usage_percent_x100.load(Ordering::Relaxed) as f64 / 100.0,
547        );
548
549        // Server uptime
550        metrics.insert(
551            "uptime_seconds".to_string(),
552            self.metrics.uptime_seconds() as f64,
553        );
554
555        // Response time histogram buckets (Prometheus-compatible)
556        let buckets = &self.metrics.response_time_buckets;
557        metrics.insert(
558            "response_time_bucket_1ms".to_string(),
559            buckets.bucket_1ms.load(Ordering::Relaxed) as f64,
560        );
561        metrics.insert(
562            "response_time_bucket_5ms".to_string(),
563            buckets.bucket_5ms.load(Ordering::Relaxed) as f64,
564        );
565        metrics.insert(
566            "response_time_bucket_10ms".to_string(),
567            buckets.bucket_10ms.load(Ordering::Relaxed) as f64,
568        );
569        metrics.insert(
570            "response_time_bucket_25ms".to_string(),
571            buckets.bucket_25ms.load(Ordering::Relaxed) as f64,
572        );
573        metrics.insert(
574            "response_time_bucket_50ms".to_string(),
575            buckets.bucket_50ms.load(Ordering::Relaxed) as f64,
576        );
577        metrics.insert(
578            "response_time_bucket_100ms".to_string(),
579            buckets.bucket_100ms.load(Ordering::Relaxed) as f64,
580        );
581        metrics.insert(
582            "response_time_bucket_250ms".to_string(),
583            buckets.bucket_250ms.load(Ordering::Relaxed) as f64,
584        );
585        metrics.insert(
586            "response_time_bucket_500ms".to_string(),
587            buckets.bucket_500ms.load(Ordering::Relaxed) as f64,
588        );
589        metrics.insert(
590            "response_time_bucket_1s".to_string(),
591            buckets.bucket_1s.load(Ordering::Relaxed) as f64,
592        );
593        metrics.insert(
594            "response_time_bucket_2_5s".to_string(),
595            buckets.bucket_2_5s.load(Ordering::Relaxed) as f64,
596        );
597        metrics.insert(
598            "response_time_bucket_5s".to_string(),
599            buckets.bucket_5s.load(Ordering::Relaxed) as f64,
600        );
601        metrics.insert(
602            "response_time_bucket_10s".to_string(),
603            buckets.bucket_10s.load(Ordering::Relaxed) as f64,
604        );
605        metrics.insert(
606            "response_time_bucket_inf".to_string(),
607            buckets.bucket_inf.load(Ordering::Relaxed) as f64,
608        );
609
610        // Custom metrics (infrequent read lock acceptable)
611        if let Some(custom_metrics) = self.metrics.custom.try_read() {
612            for (key, value) in custom_metrics.iter() {
613                metrics.insert(format!("custom_{key}"), *value);
614            }
615        }
616
617        metrics
618    }
619}
620
621// Comprehensive tests in separate file (tokio/axum pattern)
622#[cfg(test)]
623mod tests;