pulseengine_mcp_logging/
metrics.rs

1//! Metrics collection for observability and monitoring
2//!
3//! This module provides comprehensive metrics collection for:
4//! - Request performance monitoring
5//! - System health metrics
6//! - Business logic metrics
7//! - Error tracking and classification
8
9use crate::persistence::{MetricsPersistence, PersistenceConfig};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::sync::Arc;
13use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
14use tokio::sync::RwLock;
15
16/// Global metrics collector instance
17pub struct MetricsCollector {
18    /// Request metrics
19    request_metrics: Arc<RwLock<RequestMetrics>>,
20
21    /// System health metrics
22    health_metrics: Arc<RwLock<HealthMetrics>>,
23
24    /// Business metrics
25    business_metrics: Arc<RwLock<BusinessMetrics>>,
26
27    /// Error metrics
28    error_metrics: Arc<RwLock<ErrorMetrics>>,
29
30    /// Start time for uptime calculation
31    start_time: Instant,
32
33    /// Persistence layer for metrics
34    persistence: Option<Arc<MetricsPersistence>>,
35}
36
37/// Request performance metrics
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct RequestMetrics {
40    /// Total number of requests
41    pub total_requests: u64,
42
43    /// Successful requests
44    pub successful_requests: u64,
45
46    /// Failed requests
47    pub failed_requests: u64,
48
49    /// Average response time in milliseconds
50    pub avg_response_time_ms: f64,
51
52    /// 95th percentile response time
53    pub p95_response_time_ms: f64,
54
55    /// 99th percentile response time
56    pub p99_response_time_ms: f64,
57
58    /// Current active requests
59    pub active_requests: u64,
60
61    /// Requests per tool
62    pub requests_by_tool: HashMap<String, u64>,
63
64    /// Response times by tool
65    pub response_times_by_tool: HashMap<String, Vec<f64>>,
66
67    /// Rate limiting hits
68    pub rate_limit_hits: u64,
69
70    /// Request throughput (requests per second)
71    pub requests_per_second: f64,
72
73    /// Last update timestamp
74    pub last_updated: u64,
75}
76
77/// System health metrics
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct HealthMetrics {
80    /// CPU usage percentage
81    pub cpu_usage_percent: Option<f64>,
82
83    /// Memory usage in MB
84    pub memory_usage_mb: Option<f64>,
85
86    /// Memory usage percentage
87    pub memory_usage_percent: Option<f64>,
88
89    /// Disk usage percentage
90    pub disk_usage_percent: Option<f64>,
91
92    /// Network latency to Loxone server
93    pub loxone_latency_ms: Option<f64>,
94
95    /// Connection pool statistics
96    pub connection_pool_active: Option<u32>,
97    pub connection_pool_idle: Option<u32>,
98    pub connection_pool_max: Option<u32>,
99
100    /// System uptime in seconds
101    pub uptime_seconds: u64,
102
103    /// Garbage collection metrics (if available)
104    pub gc_collections: Option<u64>,
105    pub gc_time_ms: Option<f64>,
106
107    /// Last health check status
108    pub last_health_check_success: bool,
109    pub last_health_check_time: u64,
110}
111
112/// Business logic metrics specific to Loxone MCP
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct BusinessMetrics {
115    /// Device operations
116    pub device_operations_total: u64,
117    pub device_operations_success: u64,
118    pub device_operations_failed: u64,
119
120    /// Operations by device type
121    pub operations_by_device_type: HashMap<String, u64>,
122
123    /// Operations by room
124    pub operations_by_room: HashMap<String, u64>,
125
126    /// Loxone API calls
127    pub loxone_api_calls_total: u64,
128    pub loxone_api_calls_success: u64,
129    pub loxone_api_calls_failed: u64,
130
131    /// Structure refresh operations
132    pub structure_refreshes: u64,
133    pub last_structure_refresh: u64,
134
135    /// Authentication operations
136    pub auth_attempts: u64,
137    pub auth_successes: u64,
138    pub auth_failures: u64,
139
140    /// Cache hit/miss ratios
141    pub cache_hits: u64,
142    pub cache_misses: u64,
143
144    /// Schema validation metrics
145    pub schema_validations_total: u64,
146    pub schema_validations_failed: u64,
147
148    /// Request coalescing effectiveness
149    pub coalesced_requests: u64,
150    pub coalescing_time_saved_ms: f64,
151}
152
153/// Error classification and tracking
154#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct ErrorMetrics {
156    /// Total errors
157    pub total_errors: u64,
158
159    /// Errors by classification
160    pub client_errors: u64,
161    pub server_errors: u64,
162    pub network_errors: u64,
163    pub auth_errors: u64,
164    pub business_errors: u64,
165
166    /// Errors by tool
167    pub errors_by_tool: HashMap<String, u64>,
168
169    /// Error rates
170    pub error_rate_5min: f64,
171    pub error_rate_1hour: f64,
172    pub error_rate_24hour: f64,
173
174    /// Most recent errors (last 10)
175    pub recent_errors: Vec<ErrorRecord>,
176
177    /// Error patterns
178    pub timeout_errors: u64,
179    pub connection_errors: u64,
180    pub validation_errors: u64,
181    pub device_control_errors: u64,
182}
183
184/// Individual error record for tracking
185#[derive(Debug, Clone, Serialize, Deserialize)]
186pub struct ErrorRecord {
187    pub timestamp: u64,
188    pub error_type: String,
189    pub error_message: String,
190    pub tool_name: String,
191    pub request_id: String,
192    pub duration_ms: u64,
193}
194
195impl Default for RequestMetrics {
196    fn default() -> Self {
197        Self {
198            total_requests: 0,
199            successful_requests: 0,
200            failed_requests: 0,
201            avg_response_time_ms: 0.0,
202            p95_response_time_ms: 0.0,
203            p99_response_time_ms: 0.0,
204            active_requests: 0,
205            requests_by_tool: HashMap::new(),
206            response_times_by_tool: HashMap::new(),
207            rate_limit_hits: 0,
208            requests_per_second: 0.0,
209            last_updated: current_timestamp(),
210        }
211    }
212}
213
214impl Default for HealthMetrics {
215    fn default() -> Self {
216        Self {
217            cpu_usage_percent: None,
218            memory_usage_mb: None,
219            memory_usage_percent: None,
220            disk_usage_percent: None,
221            loxone_latency_ms: None,
222            connection_pool_active: None,
223            connection_pool_idle: None,
224            connection_pool_max: None,
225            uptime_seconds: 0,
226            gc_collections: None,
227            gc_time_ms: None,
228            last_health_check_success: false,
229            last_health_check_time: current_timestamp(),
230        }
231    }
232}
233
234impl Default for BusinessMetrics {
235    fn default() -> Self {
236        Self {
237            device_operations_total: 0,
238            device_operations_success: 0,
239            device_operations_failed: 0,
240            operations_by_device_type: HashMap::new(),
241            operations_by_room: HashMap::new(),
242            loxone_api_calls_total: 0,
243            loxone_api_calls_success: 0,
244            loxone_api_calls_failed: 0,
245            structure_refreshes: 0,
246            last_structure_refresh: 0,
247            auth_attempts: 0,
248            auth_successes: 0,
249            auth_failures: 0,
250            cache_hits: 0,
251            cache_misses: 0,
252            schema_validations_total: 0,
253            schema_validations_failed: 0,
254            coalesced_requests: 0,
255            coalescing_time_saved_ms: 0.0,
256        }
257    }
258}
259
260impl Default for ErrorMetrics {
261    fn default() -> Self {
262        Self {
263            total_errors: 0,
264            client_errors: 0,
265            server_errors: 0,
266            network_errors: 0,
267            auth_errors: 0,
268            business_errors: 0,
269            errors_by_tool: HashMap::new(),
270            error_rate_5min: 0.0,
271            error_rate_1hour: 0.0,
272            error_rate_24hour: 0.0,
273            recent_errors: Vec::new(),
274            timeout_errors: 0,
275            connection_errors: 0,
276            validation_errors: 0,
277            device_control_errors: 0,
278        }
279    }
280}
281
282impl Default for MetricsCollector {
283    fn default() -> Self {
284        Self::new()
285    }
286}
287
288impl MetricsCollector {
289    /// Create a new metrics collector
290    pub fn new() -> Self {
291        Self {
292            request_metrics: Arc::new(RwLock::new(RequestMetrics::default())),
293            health_metrics: Arc::new(RwLock::new(HealthMetrics::default())),
294            business_metrics: Arc::new(RwLock::new(BusinessMetrics::default())),
295            error_metrics: Arc::new(RwLock::new(ErrorMetrics::default())),
296            start_time: Instant::now(),
297            persistence: None,
298        }
299    }
300
301    /// Create a new metrics collector with persistence
302    pub fn with_persistence(persistence_config: PersistenceConfig) -> Result<Self, std::io::Error> {
303        let persistence = Arc::new(MetricsPersistence::new(persistence_config)?);
304        Ok(Self {
305            request_metrics: Arc::new(RwLock::new(RequestMetrics::default())),
306            health_metrics: Arc::new(RwLock::new(HealthMetrics::default())),
307            business_metrics: Arc::new(RwLock::new(BusinessMetrics::default())),
308            error_metrics: Arc::new(RwLock::new(ErrorMetrics::default())),
309            start_time: Instant::now(),
310            persistence: Some(persistence),
311        })
312    }
313
314    /// Enable persistence for this metrics collector
315    pub async fn enable_persistence(
316        &self,
317        _persistence_config: PersistenceConfig,
318    ) -> Result<(), std::io::Error> {
319        // Note: For simplicity, we can't change persistence after creation
320        // This method is here for API compatibility
321        Ok(())
322    }
323
324    /// Save current metrics snapshot to persistence
325    pub async fn save_snapshot(&self) -> Result<(), std::io::Error> {
326        if let Some(persistence) = &self.persistence {
327            let snapshot = self.get_metrics_snapshot().await;
328            persistence.save_snapshot(snapshot).await?;
329        }
330        Ok(())
331    }
332
333    /// Record a request start
334    pub async fn record_request_start(&self, tool_name: &str) {
335        let mut metrics = self.request_metrics.write().await;
336        metrics.total_requests += 1;
337        metrics.active_requests += 1;
338        *metrics
339            .requests_by_tool
340            .entry(tool_name.to_string())
341            .or_insert(0) += 1;
342        metrics.last_updated = current_timestamp();
343    }
344
345    /// Record a request completion
346    pub async fn record_request_end(&self, tool_name: &str, duration: Duration, success: bool) {
347        #[allow(clippy::cast_precision_loss)]
348        let duration_ms = duration.as_millis() as f64;
349        let mut metrics = self.request_metrics.write().await;
350
351        metrics.active_requests = metrics.active_requests.saturating_sub(1);
352
353        if success {
354            metrics.successful_requests += 1;
355        } else {
356            metrics.failed_requests += 1;
357        }
358
359        // Update response times
360        metrics
361            .response_times_by_tool
362            .entry(tool_name.to_string())
363            .or_insert_with(Vec::new)
364            .push(duration_ms);
365
366        // Keep only last 1000 response times per tool for memory efficiency
367        if let Some(times) = metrics.response_times_by_tool.get_mut(tool_name) {
368            if times.len() > 1000 {
369                times.drain(..times.len() - 1000);
370            }
371        }
372
373        // Recalculate averages and percentiles
374        Self::update_response_time_statistics(&mut metrics);
375        metrics.last_updated = current_timestamp();
376    }
377
378    /// Record a rate limit hit
379    pub async fn record_rate_limit_hit(&self) {
380        let mut metrics = self.request_metrics.write().await;
381        metrics.rate_limit_hits += 1;
382        metrics.last_updated = current_timestamp();
383    }
384
385    /// Record an error
386    pub async fn record_error<E: crate::ErrorClassification>(
387        &self,
388        tool_name: &str,
389        request_id: &str,
390        error: &E,
391        duration: Duration,
392    ) {
393        let mut metrics = self.error_metrics.write().await;
394        metrics.total_errors += 1;
395
396        // Classify error using the trait
397        if error.is_auth_error() {
398            metrics.auth_errors += 1;
399            let mut business = self.business_metrics.write().await;
400            business.auth_failures += 1;
401        } else if error.is_connection_error() {
402            metrics.network_errors += 1;
403            metrics.connection_errors += 1;
404        } else if error.is_timeout() {
405            metrics.network_errors += 1;
406            metrics.timeout_errors += 1;
407        } else if error.is_retryable() {
408            metrics.server_errors += 1;
409        } else {
410            metrics.client_errors += 1;
411        }
412
413        // Track by tool
414        *metrics
415            .errors_by_tool
416            .entry(tool_name.to_string())
417            .or_insert(0) += 1;
418
419        // Add to recent errors (keep last 10)
420        let error_record = ErrorRecord {
421            timestamp: current_timestamp(),
422            error_type: error.error_type().to_string(),
423            error_message: error.to_string(),
424            tool_name: tool_name.to_string(),
425            request_id: request_id.to_string(),
426            duration_ms: duration.as_millis().try_into().unwrap_or(u64::MAX),
427        };
428
429        metrics.recent_errors.push(error_record);
430        if metrics.recent_errors.len() > 100 {
431            metrics.recent_errors.remove(0);
432        }
433    }
434
435    /// Record a device operation
436    pub async fn record_device_operation(
437        &self,
438        device_type: Option<&str>,
439        room_name: Option<&str>,
440        success: bool,
441    ) {
442        let mut metrics = self.business_metrics.write().await;
443        metrics.device_operations_total += 1;
444
445        if success {
446            metrics.device_operations_success += 1;
447        } else {
448            metrics.device_operations_failed += 1;
449        }
450
451        if let Some(dev_type) = device_type {
452            *metrics
453                .operations_by_device_type
454                .entry(dev_type.to_string())
455                .or_insert(0) += 1;
456        }
457
458        if let Some(room) = room_name {
459            *metrics
460                .operations_by_room
461                .entry(room.to_string())
462                .or_insert(0) += 1;
463        }
464    }
465
466    /// Record a Loxone API call
467    pub async fn record_loxone_api_call(&self, success: bool) {
468        let mut metrics = self.business_metrics.write().await;
469        metrics.loxone_api_calls_total += 1;
470
471        if success {
472            metrics.loxone_api_calls_success += 1;
473        } else {
474            metrics.loxone_api_calls_failed += 1;
475        }
476    }
477
478    /// Record schema validation
479    pub async fn record_schema_validation(&self, success: bool) {
480        let mut metrics = self.business_metrics.write().await;
481        metrics.schema_validations_total += 1;
482
483        if !success {
484            metrics.schema_validations_failed += 1;
485        }
486    }
487
488    /// Update health metrics
489    pub async fn update_health_metrics(
490        &self,
491        cpu_usage: Option<f64>,
492        memory_usage_mb: Option<f64>,
493        loxone_latency_ms: Option<f64>,
494        health_check_success: bool,
495    ) {
496        let mut metrics = self.health_metrics.write().await;
497        metrics.cpu_usage_percent = cpu_usage;
498        metrics.memory_usage_mb = memory_usage_mb;
499        metrics.loxone_latency_ms = loxone_latency_ms;
500        metrics.uptime_seconds = self.start_time.elapsed().as_secs();
501        metrics.last_health_check_success = health_check_success;
502        metrics.last_health_check_time = current_timestamp();
503    }
504
505    /// Get comprehensive metrics snapshot
506    pub async fn get_metrics_snapshot(&self) -> MetricsSnapshot {
507        let request_metrics = self.request_metrics.read().await.clone();
508        let health_metrics = self.health_metrics.read().await.clone();
509        let business_metrics = self.business_metrics.read().await.clone();
510        let error_metrics = self.error_metrics.read().await.clone();
511
512        MetricsSnapshot {
513            request_metrics,
514            health_metrics,
515            business_metrics,
516            error_metrics,
517            snapshot_timestamp: current_timestamp(),
518        }
519    }
520
521    /// Update response time statistics
522    fn update_response_time_statistics(metrics: &mut RequestMetrics) {
523        let mut all_times = Vec::new();
524        for times in metrics.response_times_by_tool.values() {
525            all_times.extend(times);
526        }
527
528        if !all_times.is_empty() {
529            all_times
530                .sort_by(|a: &f64, b: &f64| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
531
532            // Calculate average
533            #[allow(clippy::cast_precision_loss)]
534            {
535                metrics.avg_response_time_ms =
536                    all_times.iter().sum::<f64>() / all_times.len() as f64;
537            }
538
539            // Calculate percentiles
540            if all_times.len() >= 2 {
541                #[allow(
542                    clippy::cast_precision_loss,
543                    clippy::cast_possible_truncation,
544                    clippy::cast_sign_loss
545                )]
546                let p95_idx = (all_times.len() as f64 * 0.95) as usize;
547                #[allow(
548                    clippy::cast_precision_loss,
549                    clippy::cast_possible_truncation,
550                    clippy::cast_sign_loss
551                )]
552                let p99_idx = (all_times.len() as f64 * 0.99) as usize;
553                metrics.p95_response_time_ms = all_times[p95_idx.min(all_times.len() - 1)];
554                metrics.p99_response_time_ms = all_times[p99_idx.min(all_times.len() - 1)];
555            } else if !all_times.is_empty() {
556                // For very small sample sizes, use the max value for percentiles
557                metrics.p95_response_time_ms = *all_times.last().unwrap();
558                metrics.p99_response_time_ms = *all_times.last().unwrap();
559            }
560        }
561    }
562}
563
564/// Complete metrics snapshot
565#[derive(Debug, Clone, Serialize, Deserialize)]
566pub struct MetricsSnapshot {
567    pub request_metrics: RequestMetrics,
568    pub health_metrics: HealthMetrics,
569    pub business_metrics: BusinessMetrics,
570    pub error_metrics: ErrorMetrics,
571    pub snapshot_timestamp: u64,
572}
573
574impl MetricsSnapshot {
575    /// Calculate error rate
576    pub fn error_rate(&self) -> f64 {
577        if self.request_metrics.total_requests == 0 {
578            0.0
579        } else {
580            #[allow(clippy::cast_precision_loss)]
581            {
582                self.request_metrics.failed_requests as f64
583                    / self.request_metrics.total_requests as f64
584            }
585        }
586    }
587
588    /// Calculate success rate
589    pub fn success_rate(&self) -> f64 {
590        1.0 - self.error_rate()
591    }
592
593    /// Get availability percentage
594    pub fn availability_percentage(&self) -> f64 {
595        if self.health_metrics.last_health_check_success {
596            99.9 // Assume high availability if last check was successful
597        } else {
598            95.0 // Degraded if health check failed
599        }
600    }
601}
602
603/// Get current timestamp in seconds since Unix epoch
604pub fn current_timestamp() -> u64 {
605    SystemTime::now()
606        .duration_since(UNIX_EPOCH)
607        .unwrap_or_default()
608        .as_secs()
609}
610
611/// Global metrics collector instance
612static METRICS: once_cell::sync::Lazy<MetricsCollector> =
613    once_cell::sync::Lazy::new(MetricsCollector::new);
614
615/// Get the global metrics collector
616pub fn get_metrics() -> &'static MetricsCollector {
617    &METRICS
618}
619
620#[cfg(test)]
621#[path = "metrics_tests.rs"]
622mod metrics_tests;
623
624#[cfg(test)]
625mod tests {
626    use super::*;
627    use std::time::Duration;
628
629    #[tokio::test]
630    async fn test_metrics_collection() {
631        let collector = MetricsCollector::new();
632
633        // Record some operations
634        collector.record_request_start("test_tool").await;
635        tokio::time::sleep(Duration::from_millis(10)).await;
636        collector
637            .record_request_end("test_tool", Duration::from_millis(10), true)
638            .await;
639
640        let snapshot = collector.get_metrics_snapshot().await;
641        assert_eq!(snapshot.request_metrics.total_requests, 1);
642        assert_eq!(snapshot.request_metrics.successful_requests, 1);
643        assert!(snapshot.request_metrics.avg_response_time_ms > 0.0);
644    }
645
646    #[tokio::test]
647    async fn test_error_recording() {
648        let collector = MetricsCollector::new();
649
650        // Create a mock error implementing ErrorClassification
651        #[derive(Debug)]
652        #[allow(clippy::items_after_statements)]
653        struct MockAuthError;
654
655        impl std::fmt::Display for MockAuthError {
656            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
657                write!(f, "Test auth error")
658            }
659        }
660
661        impl std::error::Error for MockAuthError {}
662
663        impl crate::ErrorClassification for MockAuthError {
664            fn error_type(&self) -> &'static str {
665                "auth_error"
666            }
667            fn is_retryable(&self) -> bool {
668                false
669            }
670            fn is_timeout(&self) -> bool {
671                false
672            }
673            fn is_auth_error(&self) -> bool {
674                true
675            }
676            fn is_connection_error(&self) -> bool {
677                false
678            }
679        }
680
681        let error = MockAuthError;
682        collector
683            .record_error("test_tool", "req-123", &error, Duration::from_millis(100))
684            .await;
685
686        let snapshot = collector.get_metrics_snapshot().await;
687        assert_eq!(snapshot.error_metrics.total_errors, 1);
688        assert_eq!(snapshot.error_metrics.auth_errors, 1);
689        assert_eq!(snapshot.error_metrics.recent_errors.len(), 1);
690    }
691}