codeprism_core/
observability.rs

1//! Observability and monitoring for CodePrism
2//!
3//! This module provides comprehensive monitoring, metrics collection, health checks,
4//! and structured logging for production deployments.
5
6use crate::error::{Error, ErrorSeverity, Result};
7use std::collections::HashMap;
8use std::sync::{Arc, Mutex};
9use std::time::{Duration, Instant};
10use tracing::{debug, error, info, warn};
11
12/// Metrics collector for error rates and performance monitoring
13#[derive(Debug, Clone)]
14pub struct MetricsCollector {
15    metrics: Arc<Mutex<Metrics>>,
16}
17
18#[derive(Debug, Clone)]
19struct Metrics {
20    /// Error counts by type
21    error_counts: HashMap<String, u64>,
22    /// Error counts by severity
23    error_severity_counts: HashMap<ErrorSeverity, u64>,
24    /// Operation latencies
25    operation_latencies: HashMap<String, Vec<Duration>>,
26    /// Success/failure rates
27    operation_success_rates: HashMap<String, (u64, u64)>, // (success, total)
28    /// Resource usage tracking
29    resource_usage: HashMap<String, u64>,
30    /// Start time for uptime calculation
31    start_time: Instant,
32}
33
34impl Default for Metrics {
35    fn default() -> Self {
36        Self {
37            error_counts: HashMap::new(),
38            error_severity_counts: HashMap::new(),
39            operation_latencies: HashMap::new(),
40            operation_success_rates: HashMap::new(),
41            resource_usage: HashMap::new(),
42            start_time: Instant::now(),
43        }
44    }
45}
46
47impl Default for MetricsCollector {
48    fn default() -> Self {
49        Self::new()
50    }
51}
52
53impl MetricsCollector {
54    /// Create a new metrics collector
55    pub fn new() -> Self {
56        Self {
57            metrics: Arc::new(Mutex::new(Metrics::default())),
58        }
59    }
60
61    /// Record an error occurrence
62    pub fn record_error(&self, error: &Error, operation: Option<&str>) {
63        let mut metrics = self.metrics.lock().unwrap();
64
65        // Count error by type
66        let error_type = format!("{:?}", std::mem::discriminant(error));
67        *metrics.error_counts.entry(error_type.clone()).or_insert(0) += 1;
68
69        // Count error by severity
70        *metrics
71            .error_severity_counts
72            .entry(error.severity())
73            .or_insert(0) += 1;
74
75        // Update operation failure rate
76        if let Some(op) = operation {
77            let (_success, total) = metrics
78                .operation_success_rates
79                .entry(op.to_string())
80                .or_insert((0, 0));
81            *total += 1;
82        }
83
84        // Log structured error
85        error!(
86            error = %error,
87            error_type = error_type,
88            severity = ?error.severity(),
89            operation = operation,
90            error_code = error.error_code(),
91            "Error recorded"
92        );
93    }
94
95    /// Record a successful operation
96    pub fn record_success(&self, operation: &str, duration: Duration) {
97        let mut metrics = self.metrics.lock().unwrap();
98
99        // Record latency
100        metrics
101            .operation_latencies
102            .entry(operation.to_string())
103            .or_default()
104            .push(duration);
105
106        // Update success rate
107        let (success, total) = metrics
108            .operation_success_rates
109            .entry(operation.to_string())
110            .or_insert((0, 0));
111        *success += 1;
112        *total += 1;
113
114        debug!(
115            operation = operation,
116            duration_ms = duration.as_millis(),
117            "Operation completed successfully"
118        );
119    }
120
121    /// Record resource usage
122    pub fn record_resource_usage(&self, resource: &str, usage: u64) {
123        let mut metrics = self.metrics.lock().unwrap();
124        metrics.resource_usage.insert(resource.to_string(), usage);
125    }
126
127    /// Get error rate for an operation
128    pub fn get_error_rate(&self, operation: &str) -> f64 {
129        let metrics = self.metrics.lock().unwrap();
130        if let Some((success, total)) = metrics.operation_success_rates.get(operation) {
131            if *total == 0 {
132                0.0
133            } else {
134                1.0 - (*success as f64 / *total as f64)
135            }
136        } else {
137            0.0
138        }
139    }
140
141    /// Get average latency for an operation
142    pub fn get_average_latency(&self, operation: &str) -> Option<Duration> {
143        let metrics = self.metrics.lock().unwrap();
144        if let Some(latencies) = metrics.operation_latencies.get(operation) {
145            if latencies.is_empty() {
146                None
147            } else {
148                let total_ms: u64 = latencies.iter().map(|d| d.as_millis() as u64).sum();
149                Some(Duration::from_millis(total_ms / latencies.len() as u64))
150            }
151        } else {
152            None
153        }
154    }
155
156    /// Get system uptime
157    pub fn uptime(&self) -> Duration {
158        let metrics = self.metrics.lock().unwrap();
159        Instant::now().duration_since(metrics.start_time)
160    }
161
162    /// Get all metrics as a JSON-serializable structure
163    pub fn get_metrics_snapshot(&self) -> MetricsSnapshot {
164        let metrics = self.metrics.lock().unwrap();
165
166        let mut operation_metrics = HashMap::new();
167        for (operation, (success, total)) in &metrics.operation_success_rates {
168            let error_rate = if *total == 0 {
169                0.0
170            } else {
171                1.0 - (*success as f64 / *total as f64)
172            };
173            let avg_latency = metrics
174                .operation_latencies
175                .get(operation)
176                .and_then(|latencies| {
177                    if latencies.is_empty() {
178                        None
179                    } else {
180                        let total_ms: u64 = latencies.iter().map(|d| d.as_millis() as u64).sum();
181                        Some(Duration::from_millis(total_ms / latencies.len() as u64))
182                    }
183                });
184
185            operation_metrics.insert(
186                operation.clone(),
187                OperationMetrics {
188                    success_count: *success,
189                    total_count: *total,
190                    error_rate,
191                    average_latency_ms: avg_latency.map(|d| d.as_millis() as u64),
192                },
193            );
194        }
195
196        MetricsSnapshot {
197            uptime_seconds: Instant::now().duration_since(metrics.start_time).as_secs(),
198            error_counts: metrics.error_counts.clone(),
199            error_severity_distribution: metrics
200                .error_severity_counts
201                .iter()
202                .map(|(k, v)| (format!("{:?}", k), *v))
203                .collect(),
204            operation_metrics,
205            resource_usage: metrics.resource_usage.clone(),
206        }
207    }
208}
209
210/// Snapshot of metrics at a point in time
211#[derive(Debug, Clone, serde::Serialize)]
212pub struct MetricsSnapshot {
213    /// System uptime in seconds
214    pub uptime_seconds: u64,
215    /// Error counts by error type
216    pub error_counts: HashMap<String, u64>,
217    /// Distribution of errors by severity level
218    pub error_severity_distribution: HashMap<String, u64>,
219    /// Metrics for each tracked operation
220    pub operation_metrics: HashMap<String, OperationMetrics>,
221    /// Resource usage statistics
222    pub resource_usage: HashMap<String, u64>,
223}
224
225/// Metrics for a specific operation
226#[derive(Debug, Clone, serde::Serialize)]
227pub struct OperationMetrics {
228    /// Number of successful executions
229    pub success_count: u64,
230    /// Total number of executions
231    pub total_count: u64,
232    /// Error rate as a decimal (0.0 to 1.0)
233    pub error_rate: f64,
234    /// Average latency in milliseconds
235    pub average_latency_ms: Option<u64>,
236}
237
238/// Health check status
239#[derive(Debug, Clone, PartialEq, serde::Serialize)]
240pub enum HealthStatus {
241    /// All systems functioning normally
242    Healthy,
243    /// Some issues detected but system is still operational
244    Degraded,
245    /// Critical issues preventing normal operation
246    Unhealthy,
247}
248
249/// Health check result
250#[derive(Debug, Clone, serde::Serialize)]
251pub struct HealthCheckResult {
252    /// Overall system health status
253    pub status: HealthStatus,
254    /// Individual component health checks
255    pub checks: HashMap<String, ComponentHealth>,
256    /// Human-readable status message
257    pub overall_message: String,
258    /// When this health check was performed
259    pub timestamp: chrono::DateTime<chrono::Utc>,
260}
261
262/// Health status of a specific system component
263#[derive(Debug, Clone, serde::Serialize)]
264pub struct ComponentHealth {
265    /// Health status of this component
266    pub status: HealthStatus,
267    /// Descriptive message about the component's health
268    pub message: String,
269    /// Additional metrics relevant to this component
270    pub metrics: Option<HashMap<String, serde_json::Value>>,
271}
272
273/// Health monitor for system components
274pub struct HealthMonitor {
275    metrics_collector: MetricsCollector,
276    circuit_states: Arc<Mutex<HashMap<String, crate::resilience::CircuitState>>>,
277}
278
279impl HealthMonitor {
280    /// Create a new health monitor
281    pub fn new(metrics_collector: MetricsCollector) -> Self {
282        Self {
283            metrics_collector,
284            circuit_states: Arc::new(Mutex::new(HashMap::new())),
285        }
286    }
287
288    /// Update circuit breaker state for a component
289    pub fn update_circuit_state(&self, component: &str, state: crate::resilience::CircuitState) {
290        let mut states = self.circuit_states.lock().unwrap();
291        states.insert(component.to_string(), state);
292    }
293
294    /// Perform comprehensive health check
295    pub fn health_check(&self) -> HealthCheckResult {
296        let mut checks = HashMap::new();
297        let mut overall_status = HealthStatus::Healthy;
298
299        // Check error rates
300        let error_rate_health = self.check_error_rates();
301        if error_rate_health.status != HealthStatus::Healthy {
302            overall_status = match overall_status {
303                HealthStatus::Healthy => error_rate_health.status.clone(),
304                HealthStatus::Degraded => {
305                    if error_rate_health.status == HealthStatus::Unhealthy {
306                        HealthStatus::Unhealthy
307                    } else {
308                        HealthStatus::Degraded
309                    }
310                }
311                HealthStatus::Unhealthy => HealthStatus::Unhealthy,
312            };
313        }
314        checks.insert("error_rates".to_string(), error_rate_health);
315
316        // Check circuit breakers
317        let circuit_health = self.check_circuit_breakers();
318        if circuit_health.status != HealthStatus::Healthy {
319            overall_status = match overall_status {
320                HealthStatus::Healthy => circuit_health.status.clone(),
321                HealthStatus::Degraded => {
322                    if circuit_health.status == HealthStatus::Unhealthy {
323                        HealthStatus::Unhealthy
324                    } else {
325                        HealthStatus::Degraded
326                    }
327                }
328                HealthStatus::Unhealthy => HealthStatus::Unhealthy,
329            };
330        }
331        checks.insert("circuit_breakers".to_string(), circuit_health);
332
333        // Check resource usage
334        let resource_health = self.check_resource_usage();
335        if resource_health.status != HealthStatus::Healthy {
336            overall_status = match overall_status {
337                HealthStatus::Healthy => resource_health.status.clone(),
338                HealthStatus::Degraded => {
339                    if resource_health.status == HealthStatus::Unhealthy {
340                        HealthStatus::Unhealthy
341                    } else {
342                        HealthStatus::Degraded
343                    }
344                }
345                HealthStatus::Unhealthy => HealthStatus::Unhealthy,
346            };
347        }
348        checks.insert("resource_usage".to_string(), resource_health);
349
350        let overall_message = match overall_status {
351            HealthStatus::Healthy => "All systems operational".to_string(),
352            HealthStatus::Degraded => "Some systems experiencing issues".to_string(),
353            HealthStatus::Unhealthy => "Critical systems failing".to_string(),
354        };
355
356        HealthCheckResult {
357            status: overall_status,
358            checks,
359            overall_message,
360            timestamp: chrono::Utc::now(),
361        }
362    }
363
364    fn check_error_rates(&self) -> ComponentHealth {
365        let metrics = self.metrics_collector.get_metrics_snapshot();
366
367        let mut high_error_operations = Vec::new();
368        let mut warning_operations = Vec::new();
369
370        for (operation, metrics) in &metrics.operation_metrics {
371            if metrics.error_rate > 0.1 {
372                // 10% error rate threshold
373                high_error_operations.push(operation.clone());
374            } else if metrics.error_rate > 0.05 {
375                // 5% warning threshold
376                warning_operations.push(operation.clone());
377            }
378        }
379
380        let status = if !high_error_operations.is_empty() {
381            HealthStatus::Unhealthy
382        } else if !warning_operations.is_empty() {
383            HealthStatus::Degraded
384        } else {
385            HealthStatus::Healthy
386        };
387
388        let message = match status {
389            HealthStatus::Healthy => "Error rates within acceptable limits".to_string(),
390            HealthStatus::Degraded => format!(
391                "Warning: High error rates in operations: {:?}",
392                warning_operations
393            ),
394            HealthStatus::Unhealthy => format!(
395                "Critical: Very high error rates in operations: {:?}",
396                high_error_operations
397            ),
398        };
399
400        ComponentHealth {
401            status,
402            message,
403            metrics: Some(
404                serde_json::to_value(&metrics.operation_metrics)
405                    .and_then(serde_json::from_value)
406                    .unwrap_or_default(),
407            ),
408        }
409    }
410
411    fn check_circuit_breakers(&self) -> ComponentHealth {
412        let states = self.circuit_states.lock().unwrap();
413
414        let mut open_circuits = Vec::new();
415        let mut half_open_circuits = Vec::new();
416
417        for (component, state) in states.iter() {
418            match state {
419                crate::resilience::CircuitState::Open => open_circuits.push(component.clone()),
420                crate::resilience::CircuitState::HalfOpen => {
421                    half_open_circuits.push(component.clone())
422                }
423                crate::resilience::CircuitState::Closed => {}
424            }
425        }
426
427        let status = if !open_circuits.is_empty() {
428            HealthStatus::Unhealthy
429        } else if !half_open_circuits.is_empty() {
430            HealthStatus::Degraded
431        } else {
432            HealthStatus::Healthy
433        };
434
435        let message = match status {
436            HealthStatus::Healthy => "All circuit breakers closed".to_string(),
437            HealthStatus::Degraded => {
438                format!("Circuit breakers in recovery: {:?}", half_open_circuits)
439            }
440            HealthStatus::Unhealthy => format!("Open circuit breakers: {:?}", open_circuits),
441        };
442
443        let circuit_metrics = states
444            .iter()
445            .map(|(k, v)| (k.clone(), serde_json::Value::String(format!("{:?}", v))))
446            .collect();
447
448        ComponentHealth {
449            status,
450            message,
451            metrics: Some(circuit_metrics),
452        }
453    }
454
455    fn check_resource_usage(&self) -> ComponentHealth {
456        let metrics = self.metrics_collector.get_metrics_snapshot();
457
458        // Check if resource usage is being tracked
459        if metrics.resource_usage.is_empty() {
460            return ComponentHealth {
461                status: HealthStatus::Healthy,
462                message: "Resource usage monitoring not configured".to_string(),
463                metrics: None,
464            };
465        }
466
467        // Simple resource usage check (would be more sophisticated in production)
468        let mut high_usage_resources = Vec::new();
469
470        for (resource, usage) in &metrics.resource_usage {
471            // Example thresholds (would be configurable)
472            let threshold = match resource.as_str() {
473                "memory_mb" => 1024, // 1GB threshold
474                "cpu_percent" => 80,
475                "disk_usage_percent" => 85,
476                _ => continue,
477            };
478
479            if *usage > threshold {
480                high_usage_resources.push(format!("{}: {}", resource, usage));
481            }
482        }
483
484        let status = if high_usage_resources.len() > 1 {
485            HealthStatus::Unhealthy
486        } else if !high_usage_resources.is_empty() {
487            HealthStatus::Degraded
488        } else {
489            HealthStatus::Healthy
490        };
491
492        let message = match status {
493            HealthStatus::Healthy => "Resource usage normal".to_string(),
494            HealthStatus::Degraded => format!("High resource usage: {:?}", high_usage_resources),
495            HealthStatus::Unhealthy => {
496                format!("Critical resource usage: {:?}", high_usage_resources)
497            }
498        };
499
500        ComponentHealth {
501            status,
502            message,
503            metrics: Some(
504                serde_json::to_value(&metrics.resource_usage)
505                    .and_then(serde_json::from_value)
506                    .unwrap_or_default(),
507            ),
508        }
509    }
510}
511
512/// Performance monitor for tracking operation latencies
513pub struct PerformanceMonitor {
514    metrics_collector: MetricsCollector,
515}
516
517impl PerformanceMonitor {
518    /// Create a new performance monitor
519    pub fn new(metrics_collector: MetricsCollector) -> Self {
520        Self { metrics_collector }
521    }
522
523    /// Time an operation and record its performance
524    pub async fn time_operation<F, Fut, T>(&self, operation_name: &str, operation: F) -> Result<T>
525    where
526        F: FnOnce() -> Fut,
527        Fut: std::future::Future<Output = Result<T>>,
528    {
529        let start = Instant::now();
530        let result = operation().await;
531        let duration = start.elapsed();
532
533        match &result {
534            Ok(_) => {
535                self.metrics_collector
536                    .record_success(operation_name, duration);
537                info!(
538                    operation = operation_name,
539                    duration_ms = duration.as_millis(),
540                    "Operation completed successfully"
541                );
542            }
543            Err(error) => {
544                self.metrics_collector
545                    .record_error(error, Some(operation_name));
546                warn!(
547                    operation = operation_name,
548                    duration_ms = duration.as_millis(),
549                    error = %error,
550                    "Operation failed"
551                );
552            }
553        }
554
555        result
556    }
557
558    /// Get performance metrics for an operation
559    pub fn get_operation_performance(&self, operation: &str) -> Option<OperationPerformance> {
560        let error_rate = self.metrics_collector.get_error_rate(operation);
561        let avg_latency = self.metrics_collector.get_average_latency(operation)?;
562
563        Some(OperationPerformance {
564            operation: operation.to_string(),
565            error_rate,
566            average_latency: avg_latency,
567        })
568    }
569}
570
571/// Performance metrics for a specific operation
572#[derive(Debug, Clone)]
573pub struct OperationPerformance {
574    /// Name of the operation
575    pub operation: String,
576    /// Error rate as a decimal (0.0 to 1.0)
577    pub error_rate: f64,
578    /// Average execution time
579    pub average_latency: Duration,
580}
581
582#[cfg(test)]
583mod tests {
584    use super::*;
585    use tokio::time::{sleep, Duration};
586
587    #[test]
588    fn test_metrics_collector() {
589        let collector = MetricsCollector::new();
590
591        // Record some metrics
592        collector.record_success("parse_file", Duration::from_millis(100));
593        collector.record_success("parse_file", Duration::from_millis(150));
594
595        let error = Error::storage("test error");
596        collector.record_error(&error, Some("parse_file"));
597
598        // Check metrics
599        let error_rate = collector.get_error_rate("parse_file");
600        assert!((error_rate - 0.333).abs() < 0.01); // Approximately 1/3
601
602        let avg_latency = collector.get_average_latency("parse_file").unwrap();
603        assert_eq!(avg_latency, Duration::from_millis(125));
604    }
605
606    #[test]
607    fn test_health_monitor() {
608        let metrics = MetricsCollector::new();
609        let monitor = HealthMonitor::new(metrics);
610
611        // Initial health check should be healthy
612        let health = monitor.health_check();
613        assert_eq!(health.status, HealthStatus::Healthy);
614        assert!(health.checks.contains_key("error_rates"));
615        assert!(health.checks.contains_key("circuit_breakers"));
616        assert!(health.checks.contains_key("resource_usage"));
617    }
618
619    #[tokio::test]
620    async fn test_performance_monitor() {
621        let metrics = MetricsCollector::new();
622        let monitor = PerformanceMonitor::new(metrics);
623
624        // Time a successful operation
625        let result = monitor
626            .time_operation("test_op", || async {
627                sleep(Duration::from_millis(10)).await;
628                Ok("success")
629            })
630            .await;
631
632        assert!(result.is_ok());
633        assert_eq!(result.unwrap(), "success");
634
635        // Check performance metrics
636        let perf = monitor.get_operation_performance("test_op");
637        assert!(perf.is_some());
638        let perf = perf.unwrap();
639        assert_eq!(perf.error_rate, 0.0);
640        assert!(perf.average_latency >= Duration::from_millis(10));
641    }
642
643    #[test]
644    fn test_metrics_snapshot() {
645        let collector = MetricsCollector::new();
646
647        collector.record_success("op1", Duration::from_millis(100));
648        let error = Error::validation("test_field", "test error");
649        collector.record_error(&error, Some("op1"));
650        collector.record_resource_usage("memory_mb", 512);
651
652        let snapshot = collector.get_metrics_snapshot();
653
654        // Uptime should be a valid positive number - check it's reasonable
655        assert!(snapshot.uptime_seconds < 365 * 24 * 3600); // Less than a year
656        assert!(snapshot.operation_metrics.contains_key("op1"));
657        assert_eq!(snapshot.resource_usage.get("memory_mb"), Some(&512));
658        assert!(!snapshot.error_counts.is_empty());
659    }
660}