codeprism_core/
observability.rs

1//! Observability and monitoring for CodePrism
2//!
3//! This module provides comprehensive monitoring, metrics collection, health checks,
4//! and structured logging for production deployments.
5
6use crate::error::{Error, ErrorSeverity, Result};
7use std::collections::HashMap;
8use std::sync::{Arc, Mutex};
9use std::time::{Duration, Instant};
10use tracing::{debug, error, info, warn};
11
12/// Metrics collector for error rates and performance monitoring
13#[derive(Debug, Clone)]
14pub struct MetricsCollector {
15    metrics: Arc<Mutex<Metrics>>,
16}
17
18#[derive(Debug, Clone)]
19struct Metrics {
20    /// Error counts by type
21    error_counts: HashMap<String, u64>,
22    /// Error counts by severity
23    error_severity_counts: HashMap<ErrorSeverity, u64>,
24    /// Operation latencies
25    operation_latencies: HashMap<String, Vec<Duration>>,
26    /// Success/failure rates
27    operation_success_rates: HashMap<String, (u64, u64)>, // (success, total)
28    /// Resource usage tracking
29    resource_usage: HashMap<String, u64>,
30    /// Start time for uptime calculation
31    start_time: Instant,
32}
33
34impl Default for Metrics {
35    fn default() -> Self {
36        Self {
37            error_counts: HashMap::new(),
38            error_severity_counts: HashMap::new(),
39            operation_latencies: HashMap::new(),
40            operation_success_rates: HashMap::new(),
41            resource_usage: HashMap::new(),
42            start_time: Instant::now(),
43        }
44    }
45}
46
47impl Default for MetricsCollector {
48    fn default() -> Self {
49        Self::new()
50    }
51}
52
53impl MetricsCollector {
54    /// Create a new metrics collector
55    pub fn new() -> Self {
56        Self {
57            metrics: Arc::new(Mutex::new(Metrics::default())),
58        }
59    }
60
61    /// Record an error occurrence
62    pub fn record_error(&self, error: &Error, operation: Option<&str>) {
63        let mut metrics = self.metrics.lock().unwrap();
64
65        // Count error by type
66        let error_type = format!("{:?}", std::mem::discriminant(error));
67        *metrics.error_counts.entry(error_type.clone()).or_insert(0) += 1;
68
69        // Count error by severity
70        *metrics
71            .error_severity_counts
72            .entry(error.severity())
73            .or_insert(0) += 1;
74
75        // Update operation failure rate
76        if let Some(op) = operation {
77            let (_success, total) = metrics
78                .operation_success_rates
79                .entry(op.to_string())
80                .or_insert((0, 0));
81            *total += 1;
82        }
83
84        // Log structured error
85        error!(
86            error = %error,
87            error_type = error_type,
88            severity = ?error.severity(),
89            operation = operation,
90            error_code = error.error_code(),
91            "Error recorded"
92        );
93    }
94
95    /// Record a successful operation
96    pub fn record_success(&self, operation: &str, duration: Duration) {
97        let mut metrics = self.metrics.lock().unwrap();
98
99        // Record latency
100        metrics
101            .operation_latencies
102            .entry(operation.to_string())
103            .or_default()
104            .push(duration);
105
106        // Update success rate
107        let (success, total) = metrics
108            .operation_success_rates
109            .entry(operation.to_string())
110            .or_insert((0, 0));
111        *success += 1;
112        *total += 1;
113
114        debug!(
115            operation = operation,
116            duration_ms = duration.as_millis(),
117            "Operation completed successfully"
118        );
119    }
120
121    /// Record resource usage
122    pub fn record_resource_usage(&self, resource: &str, usage: u64) {
123        let mut metrics = self.metrics.lock().unwrap();
124        metrics.resource_usage.insert(resource.to_string(), usage);
125    }
126
127    /// Get error rate for an operation
128    pub fn get_error_rate(&self, operation: &str) -> f64 {
129        let metrics = self.metrics.lock().unwrap();
130        if let Some((success, total)) = metrics.operation_success_rates.get(operation) {
131            if *total == 0 {
132                0.0
133            } else {
134                1.0 - (*success as f64 / *total as f64)
135            }
136        } else {
137            0.0
138        }
139    }
140
141    /// Get average latency for an operation
142    pub fn get_average_latency(&self, operation: &str) -> Option<Duration> {
143        let metrics = self.metrics.lock().unwrap();
144        if let Some(latencies) = metrics.operation_latencies.get(operation) {
145            if latencies.is_empty() {
146                None
147            } else {
148                let total_ms: u64 = latencies.iter().map(|d| d.as_millis() as u64).sum();
149                Some(Duration::from_millis(total_ms / latencies.len() as u64))
150            }
151        } else {
152            None
153        }
154    }
155
156    /// Get system uptime
157    pub fn uptime(&self) -> Duration {
158        let metrics = self.metrics.lock().unwrap();
159        Instant::now().duration_since(metrics.start_time)
160    }
161
162    /// Get all metrics as a JSON-serializable structure
163    pub fn get_metrics_snapshot(&self) -> MetricsSnapshot {
164        let metrics = self.metrics.lock().unwrap();
165
166        let mut operation_metrics = HashMap::new();
167        for (operation, (success, total)) in &metrics.operation_success_rates {
168            let error_rate = if *total == 0 {
169                0.0
170            } else {
171                1.0 - (*success as f64 / *total as f64)
172            };
173            let avg_latency = metrics
174                .operation_latencies
175                .get(operation)
176                .and_then(|latencies| {
177                    if latencies.is_empty() {
178                        None
179                    } else {
180                        let total_ms: u64 = latencies.iter().map(|d| d.as_millis() as u64).sum();
181                        Some(Duration::from_millis(total_ms / latencies.len() as u64))
182                    }
183                });
184
185            operation_metrics.insert(
186                operation.clone(),
187                OperationMetrics {
188                    success_count: *success,
189                    total_count: *total,
190                    error_rate,
191                    average_latency_ms: avg_latency.map(|d| d.as_millis() as u64),
192                },
193            );
194        }
195
196        MetricsSnapshot {
197            uptime_seconds: Instant::now().duration_since(metrics.start_time).as_secs(),
198            error_counts: metrics.error_counts.clone(),
199            error_severity_distribution: metrics
200                .error_severity_counts
201                .iter()
202                .map(|(k, v)| (format!("{k:?}"), *v))
203                .collect(),
204            operation_metrics,
205            resource_usage: metrics.resource_usage.clone(),
206        }
207    }
208}
209
210/// Snapshot of metrics at a point in time
211#[derive(Debug, Clone, serde::Serialize)]
212pub struct MetricsSnapshot {
213    /// System uptime in seconds
214    pub uptime_seconds: u64,
215    /// Error counts by error type
216    pub error_counts: HashMap<String, u64>,
217    /// Distribution of errors by severity level
218    pub error_severity_distribution: HashMap<String, u64>,
219    /// Metrics for each tracked operation
220    pub operation_metrics: HashMap<String, OperationMetrics>,
221    /// Resource usage statistics
222    pub resource_usage: HashMap<String, u64>,
223}
224
225/// Metrics for a specific operation
226#[derive(Debug, Clone, serde::Serialize)]
227pub struct OperationMetrics {
228    /// Number of successful executions
229    pub success_count: u64,
230    /// Total number of executions
231    pub total_count: u64,
232    /// Error rate as a decimal (0.0 to 1.0)
233    pub error_rate: f64,
234    /// Average latency in milliseconds
235    pub average_latency_ms: Option<u64>,
236}
237
238/// Health check status
239#[derive(Debug, Clone, PartialEq, serde::Serialize)]
240pub enum HealthStatus {
241    /// All systems functioning normally
242    Healthy,
243    /// Some issues detected but system is still operational
244    Degraded,
245    /// Critical issues preventing normal operation
246    Unhealthy,
247}
248
249/// Health check result
250#[derive(Debug, Clone, serde::Serialize)]
251pub struct HealthCheckResult {
252    /// Overall system health status
253    pub status: HealthStatus,
254    /// Individual component health checks
255    pub checks: HashMap<String, ComponentHealth>,
256    /// Human-readable status message
257    pub overall_message: String,
258    /// When this health check was performed
259    pub timestamp: chrono::DateTime<chrono::Utc>,
260}
261
262/// Health status of a specific system component
263#[derive(Debug, Clone, serde::Serialize)]
264pub struct ComponentHealth {
265    /// Health status of this component
266    pub status: HealthStatus,
267    /// Descriptive message about the component's health
268    pub message: String,
269    /// Additional metrics relevant to this component
270    pub metrics: Option<HashMap<String, serde_json::Value>>,
271}
272
273/// Health monitor for system components
274pub struct HealthMonitor {
275    metrics_collector: MetricsCollector,
276    circuit_states: Arc<Mutex<HashMap<String, crate::resilience::CircuitState>>>,
277}
278
279impl HealthMonitor {
280    /// Create a new health monitor
281    pub fn new(metrics_collector: MetricsCollector) -> Self {
282        Self {
283            metrics_collector,
284            circuit_states: Arc::new(Mutex::new(HashMap::new())),
285        }
286    }
287
288    /// Update circuit breaker state for a component
289    pub fn update_circuit_state(&self, component: &str, state: crate::resilience::CircuitState) {
290        let mut states = self.circuit_states.lock().unwrap();
291        states.insert(component.to_string(), state);
292    }
293
294    /// Perform comprehensive health check
295    pub fn health_check(&self) -> HealthCheckResult {
296        let mut checks = HashMap::new();
297        let mut overall_status = HealthStatus::Healthy;
298
299        // Check error rates
300        let error_rate_health = self.check_error_rates();
301        if error_rate_health.status != HealthStatus::Healthy {
302            overall_status = match overall_status {
303                HealthStatus::Healthy => error_rate_health.status.clone(),
304                HealthStatus::Degraded => {
305                    if error_rate_health.status == HealthStatus::Unhealthy {
306                        HealthStatus::Unhealthy
307                    } else {
308                        HealthStatus::Degraded
309                    }
310                }
311                HealthStatus::Unhealthy => HealthStatus::Unhealthy,
312            };
313        }
314        checks.insert("error_rates".to_string(), error_rate_health);
315
316        // Check circuit breakers
317        let circuit_health = self.check_circuit_breakers();
318        if circuit_health.status != HealthStatus::Healthy {
319            overall_status = match overall_status {
320                HealthStatus::Healthy => circuit_health.status.clone(),
321                HealthStatus::Degraded => {
322                    if circuit_health.status == HealthStatus::Unhealthy {
323                        HealthStatus::Unhealthy
324                    } else {
325                        HealthStatus::Degraded
326                    }
327                }
328                HealthStatus::Unhealthy => HealthStatus::Unhealthy,
329            };
330        }
331        checks.insert("circuit_breakers".to_string(), circuit_health);
332
333        // Check resource usage
334        let resource_health = self.check_resource_usage();
335        if resource_health.status != HealthStatus::Healthy {
336            overall_status = match overall_status {
337                HealthStatus::Healthy => resource_health.status.clone(),
338                HealthStatus::Degraded => {
339                    if resource_health.status == HealthStatus::Unhealthy {
340                        HealthStatus::Unhealthy
341                    } else {
342                        HealthStatus::Degraded
343                    }
344                }
345                HealthStatus::Unhealthy => HealthStatus::Unhealthy,
346            };
347        }
348        checks.insert("resource_usage".to_string(), resource_health);
349
350        let overall_message = match overall_status {
351            HealthStatus::Healthy => "All systems operational".to_string(),
352            HealthStatus::Degraded => "Some systems experiencing issues".to_string(),
353            HealthStatus::Unhealthy => "Critical systems failing".to_string(),
354        };
355
356        HealthCheckResult {
357            status: overall_status,
358            checks,
359            overall_message,
360            timestamp: chrono::Utc::now(),
361        }
362    }
363
364    fn check_error_rates(&self) -> ComponentHealth {
365        let metrics = self.metrics_collector.get_metrics_snapshot();
366
367        let mut high_error_operations = Vec::new();
368        let mut warning_operations = Vec::new();
369
370        for (operation, metrics) in &metrics.operation_metrics {
371            if metrics.error_rate > 0.1 {
372                // 10% error rate threshold
373                high_error_operations.push(operation.clone());
374            } else if metrics.error_rate > 0.05 {
375                // 5% warning threshold
376                warning_operations.push(operation.clone());
377            }
378        }
379
380        let status = if !high_error_operations.is_empty() {
381            HealthStatus::Unhealthy
382        } else if !warning_operations.is_empty() {
383            HealthStatus::Degraded
384        } else {
385            HealthStatus::Healthy
386        };
387
388        let message = match status {
389            HealthStatus::Healthy => "Error rates within acceptable limits".to_string(),
390            HealthStatus::Degraded => {
391                format!("Warning: High error rates in operations: {warning_operations:?}")
392            }
393            HealthStatus::Unhealthy => {
394                format!("Critical: Very high error rates in operations: {high_error_operations:?}")
395            }
396        };
397
398        ComponentHealth {
399            status,
400            message,
401            metrics: Some(
402                serde_json::to_value(&metrics.operation_metrics)
403                    .and_then(serde_json::from_value)
404                    .unwrap_or_default(),
405            ),
406        }
407    }
408
409    fn check_circuit_breakers(&self) -> ComponentHealth {
410        let states = self.circuit_states.lock().unwrap();
411
412        let mut open_circuits = Vec::new();
413        let mut half_open_circuits = Vec::new();
414
415        for (component, state) in states.iter() {
416            match state {
417                crate::resilience::CircuitState::Open => open_circuits.push(component.clone()),
418                crate::resilience::CircuitState::HalfOpen => {
419                    half_open_circuits.push(component.clone())
420                }
421                crate::resilience::CircuitState::Closed => {}
422            }
423        }
424
425        let status = if !open_circuits.is_empty() {
426            HealthStatus::Unhealthy
427        } else if !half_open_circuits.is_empty() {
428            HealthStatus::Degraded
429        } else {
430            HealthStatus::Healthy
431        };
432
433        let message = match status {
434            HealthStatus::Healthy => "All circuit breakers closed".to_string(),
435            HealthStatus::Degraded => {
436                format!("Circuit breakers in recovery: {half_open_circuits:?}")
437            }
438            HealthStatus::Unhealthy => format!("Open circuit breakers: {open_circuits:?}"),
439        };
440
441        let circuit_metrics = states
442            .iter()
443            .map(|(k, v)| (k.clone(), serde_json::Value::String(format!("{v:?}"))))
444            .collect();
445
446        ComponentHealth {
447            status,
448            message,
449            metrics: Some(circuit_metrics),
450        }
451    }
452
453    fn check_resource_usage(&self) -> ComponentHealth {
454        let metrics = self.metrics_collector.get_metrics_snapshot();
455
456        // Check if resource usage is being tracked
457        if metrics.resource_usage.is_empty() {
458            return ComponentHealth {
459                status: HealthStatus::Healthy,
460                message: "Resource usage monitoring not configured".to_string(),
461                metrics: None,
462            };
463        }
464
465        // Simple resource usage check (would be more sophisticated in production)
466        let mut high_usage_resources = Vec::new();
467
468        for (resource, usage) in &metrics.resource_usage {
469            // Example thresholds (would be configurable)
470            let threshold = match resource.as_str() {
471                "memory_mb" => 1024, // 1GB threshold
472                "cpu_percent" => 80,
473                "disk_usage_percent" => 85,
474                _ => continue,
475            };
476
477            if *usage > threshold {
478                high_usage_resources.push(format!("{resource}: {usage}"));
479            }
480        }
481
482        let status = if high_usage_resources.len() > 1 {
483            HealthStatus::Unhealthy
484        } else if !high_usage_resources.is_empty() {
485            HealthStatus::Degraded
486        } else {
487            HealthStatus::Healthy
488        };
489
490        let message = match status {
491            HealthStatus::Healthy => "Resource usage normal".to_string(),
492            HealthStatus::Degraded => format!("High resource usage: {high_usage_resources:?}"),
493            HealthStatus::Unhealthy => {
494                format!("Critical resource usage: {high_usage_resources:?}")
495            }
496        };
497
498        ComponentHealth {
499            status,
500            message,
501            metrics: Some(
502                serde_json::to_value(&metrics.resource_usage)
503                    .and_then(serde_json::from_value)
504                    .unwrap_or_default(),
505            ),
506        }
507    }
508}
509
510/// Performance monitor for tracking operation latencies
511pub struct PerformanceMonitor {
512    metrics_collector: MetricsCollector,
513}
514
515impl PerformanceMonitor {
516    /// Create a new performance monitor
517    pub fn new(metrics_collector: MetricsCollector) -> Self {
518        Self { metrics_collector }
519    }
520
521    /// Time an operation and record its performance
522    pub async fn time_operation<F, Fut, T>(&self, operation_name: &str, operation: F) -> Result<T>
523    where
524        F: FnOnce() -> Fut,
525        Fut: std::future::Future<Output = Result<T>>,
526    {
527        let start = Instant::now();
528        let result = operation().await;
529        let duration = start.elapsed();
530
531        match &result {
532            Ok(_) => {
533                self.metrics_collector
534                    .record_success(operation_name, duration);
535                info!(
536                    operation = operation_name,
537                    duration_ms = duration.as_millis(),
538                    "Operation completed successfully"
539                );
540            }
541            Err(error) => {
542                self.metrics_collector
543                    .record_error(error, Some(operation_name));
544                warn!(
545                    operation = operation_name,
546                    duration_ms = duration.as_millis(),
547                    error = %error,
548                    "Operation failed"
549                );
550            }
551        }
552
553        result
554    }
555
556    /// Get performance metrics for an operation
557    pub fn get_operation_performance(&self, operation: &str) -> Option<OperationPerformance> {
558        let error_rate = self.metrics_collector.get_error_rate(operation);
559        let avg_latency = self.metrics_collector.get_average_latency(operation)?;
560
561        Some(OperationPerformance {
562            operation: operation.to_string(),
563            error_rate,
564            average_latency: avg_latency,
565        })
566    }
567}
568
569/// Performance metrics for a specific operation
570#[derive(Debug, Clone)]
571pub struct OperationPerformance {
572    /// Name of the operation
573    pub operation: String,
574    /// Error rate as a decimal (0.0 to 1.0)
575    pub error_rate: f64,
576    /// Average execution time
577    pub average_latency: Duration,
578}
579
580#[cfg(test)]
581mod tests {
582    use super::*;
583    use tokio::time::{sleep, Duration};
584
585    #[test]
586    fn test_metrics_collector() {
587        let collector = MetricsCollector::new();
588
589        // Record some metrics
590        collector.record_success("parse_file", Duration::from_millis(100));
591        collector.record_success("parse_file", Duration::from_millis(150));
592
593        let error = Error::storage("test error");
594        collector.record_error(&error, Some("parse_file"));
595
596        // Check metrics
597        let error_rate = collector.get_error_rate("parse_file");
598        assert!((error_rate - 0.333).abs() < 0.01); // Approximately 1/3
599
600        let avg_latency = collector.get_average_latency("parse_file").unwrap();
601        assert_eq!(avg_latency, Duration::from_millis(125));
602    }
603
604    #[test]
605    fn test_health_monitor() {
606        let metrics = MetricsCollector::new();
607        let monitor = HealthMonitor::new(metrics);
608
609        // Initial health check should be healthy
610        let health = monitor.health_check();
611        assert_eq!(health.status, HealthStatus::Healthy);
612        assert!(health.checks.contains_key("error_rates"));
613        assert!(health.checks.contains_key("circuit_breakers"));
614        assert!(health.checks.contains_key("resource_usage"));
615    }
616
617    #[tokio::test]
618    async fn test_performance_monitor() {
619        let metrics = MetricsCollector::new();
620        let monitor = PerformanceMonitor::new(metrics);
621
622        // Time a successful operation
623        let result = monitor
624            .time_operation("test_op", || async {
625                sleep(Duration::from_millis(10)).await;
626                Ok("success")
627            })
628            .await;
629
630        assert!(result.is_ok(), "Observability operation should succeed");
631        assert_eq!(result.unwrap(), "success");
632
633        // Check performance metrics
634        let perf = monitor.get_operation_performance("test_op");
635        assert!(perf.is_some(), "Should have value");
636        let perf = perf.unwrap();
637        assert_eq!(perf.error_rate, 0.0);
638        assert!(perf.average_latency >= Duration::from_millis(10));
639    }
640
641    #[test]
642    fn test_metrics_snapshot() {
643        let collector = MetricsCollector::new();
644
645        collector.record_success("op1", Duration::from_millis(100));
646        let error = Error::validation("test_field", "test error");
647        collector.record_error(&error, Some("op1"));
648        collector.record_resource_usage("memory_mb", 512);
649
650        let snapshot = collector.get_metrics_snapshot();
651
652        // Uptime should be a valid positive number - check it's reasonable
653        assert!(snapshot.uptime_seconds < 365 * 24 * 3600); // Less than a year
654        assert!(snapshot.operation_metrics.contains_key("op1"));
655        assert_eq!(snapshot.resource_usage.get("memory_mb"), Some(&512));
656        assert!(
657            !snapshot.error_counts.is_empty(),
658            "Should have error count after recording error"
659        );
660    }
661}