1use crate::error::{Error, ErrorSeverity, Result};
7use std::collections::HashMap;
8use std::sync::{Arc, Mutex};
9use std::time::{Duration, Instant};
10use tracing::{debug, error, info, warn};
11
12#[derive(Debug, Clone)]
14pub struct MetricsCollector {
15 metrics: Arc<Mutex<Metrics>>,
16}
17
18#[derive(Debug, Clone)]
19struct Metrics {
20 error_counts: HashMap<String, u64>,
22 error_severity_counts: HashMap<ErrorSeverity, u64>,
24 operation_latencies: HashMap<String, Vec<Duration>>,
26 operation_success_rates: HashMap<String, (u64, u64)>, resource_usage: HashMap<String, u64>,
30 start_time: Instant,
32}
33
34impl Default for Metrics {
35 fn default() -> Self {
36 Self {
37 error_counts: HashMap::new(),
38 error_severity_counts: HashMap::new(),
39 operation_latencies: HashMap::new(),
40 operation_success_rates: HashMap::new(),
41 resource_usage: HashMap::new(),
42 start_time: Instant::now(),
43 }
44 }
45}
46
47impl Default for MetricsCollector {
48 fn default() -> Self {
49 Self::new()
50 }
51}
52
53impl MetricsCollector {
54 pub fn new() -> Self {
56 Self {
57 metrics: Arc::new(Mutex::new(Metrics::default())),
58 }
59 }
60
61 pub fn record_error(&self, error: &Error, operation: Option<&str>) {
63 let mut metrics = self.metrics.lock().unwrap();
64
65 let error_type = format!("{:?}", std::mem::discriminant(error));
67 *metrics.error_counts.entry(error_type.clone()).or_insert(0) += 1;
68
69 *metrics
71 .error_severity_counts
72 .entry(error.severity())
73 .or_insert(0) += 1;
74
75 if let Some(op) = operation {
77 let (_success, total) = metrics
78 .operation_success_rates
79 .entry(op.to_string())
80 .or_insert((0, 0));
81 *total += 1;
82 }
83
84 error!(
86 error = %error,
87 error_type = error_type,
88 severity = ?error.severity(),
89 operation = operation,
90 error_code = error.error_code(),
91 "Error recorded"
92 );
93 }
94
95 pub fn record_success(&self, operation: &str, duration: Duration) {
97 let mut metrics = self.metrics.lock().unwrap();
98
99 metrics
101 .operation_latencies
102 .entry(operation.to_string())
103 .or_default()
104 .push(duration);
105
106 let (success, total) = metrics
108 .operation_success_rates
109 .entry(operation.to_string())
110 .or_insert((0, 0));
111 *success += 1;
112 *total += 1;
113
114 debug!(
115 operation = operation,
116 duration_ms = duration.as_millis(),
117 "Operation completed successfully"
118 );
119 }
120
121 pub fn record_resource_usage(&self, resource: &str, usage: u64) {
123 let mut metrics = self.metrics.lock().unwrap();
124 metrics.resource_usage.insert(resource.to_string(), usage);
125 }
126
127 pub fn get_error_rate(&self, operation: &str) -> f64 {
129 let metrics = self.metrics.lock().unwrap();
130 if let Some((success, total)) = metrics.operation_success_rates.get(operation) {
131 if *total == 0 {
132 0.0
133 } else {
134 1.0 - (*success as f64 / *total as f64)
135 }
136 } else {
137 0.0
138 }
139 }
140
141 pub fn get_average_latency(&self, operation: &str) -> Option<Duration> {
143 let metrics = self.metrics.lock().unwrap();
144 if let Some(latencies) = metrics.operation_latencies.get(operation) {
145 if latencies.is_empty() {
146 None
147 } else {
148 let total_ms: u64 = latencies.iter().map(|d| d.as_millis() as u64).sum();
149 Some(Duration::from_millis(total_ms / latencies.len() as u64))
150 }
151 } else {
152 None
153 }
154 }
155
156 pub fn uptime(&self) -> Duration {
158 let metrics = self.metrics.lock().unwrap();
159 Instant::now().duration_since(metrics.start_time)
160 }
161
162 pub fn get_metrics_snapshot(&self) -> MetricsSnapshot {
164 let metrics = self.metrics.lock().unwrap();
165
166 let mut operation_metrics = HashMap::new();
167 for (operation, (success, total)) in &metrics.operation_success_rates {
168 let error_rate = if *total == 0 {
169 0.0
170 } else {
171 1.0 - (*success as f64 / *total as f64)
172 };
173 let avg_latency = metrics
174 .operation_latencies
175 .get(operation)
176 .and_then(|latencies| {
177 if latencies.is_empty() {
178 None
179 } else {
180 let total_ms: u64 = latencies.iter().map(|d| d.as_millis() as u64).sum();
181 Some(Duration::from_millis(total_ms / latencies.len() as u64))
182 }
183 });
184
185 operation_metrics.insert(
186 operation.clone(),
187 OperationMetrics {
188 success_count: *success,
189 total_count: *total,
190 error_rate,
191 average_latency_ms: avg_latency.map(|d| d.as_millis() as u64),
192 },
193 );
194 }
195
196 MetricsSnapshot {
197 uptime_seconds: Instant::now().duration_since(metrics.start_time).as_secs(),
198 error_counts: metrics.error_counts.clone(),
199 error_severity_distribution: metrics
200 .error_severity_counts
201 .iter()
202 .map(|(k, v)| (format!("{k:?}"), *v))
203 .collect(),
204 operation_metrics,
205 resource_usage: metrics.resource_usage.clone(),
206 }
207 }
208}
209
210#[derive(Debug, Clone, serde::Serialize)]
212pub struct MetricsSnapshot {
213 pub uptime_seconds: u64,
215 pub error_counts: HashMap<String, u64>,
217 pub error_severity_distribution: HashMap<String, u64>,
219 pub operation_metrics: HashMap<String, OperationMetrics>,
221 pub resource_usage: HashMap<String, u64>,
223}
224
225#[derive(Debug, Clone, serde::Serialize)]
227pub struct OperationMetrics {
228 pub success_count: u64,
230 pub total_count: u64,
232 pub error_rate: f64,
234 pub average_latency_ms: Option<u64>,
236}
237
238#[derive(Debug, Clone, PartialEq, serde::Serialize)]
240pub enum HealthStatus {
241 Healthy,
243 Degraded,
245 Unhealthy,
247}
248
249#[derive(Debug, Clone, serde::Serialize)]
251pub struct HealthCheckResult {
252 pub status: HealthStatus,
254 pub checks: HashMap<String, ComponentHealth>,
256 pub overall_message: String,
258 pub timestamp: chrono::DateTime<chrono::Utc>,
260}
261
262#[derive(Debug, Clone, serde::Serialize)]
264pub struct ComponentHealth {
265 pub status: HealthStatus,
267 pub message: String,
269 pub metrics: Option<HashMap<String, serde_json::Value>>,
271}
272
273pub struct HealthMonitor {
275 metrics_collector: MetricsCollector,
276 circuit_states: Arc<Mutex<HashMap<String, crate::resilience::CircuitState>>>,
277}
278
279impl HealthMonitor {
280 pub fn new(metrics_collector: MetricsCollector) -> Self {
282 Self {
283 metrics_collector,
284 circuit_states: Arc::new(Mutex::new(HashMap::new())),
285 }
286 }
287
288 pub fn update_circuit_state(&self, component: &str, state: crate::resilience::CircuitState) {
290 let mut states = self.circuit_states.lock().unwrap();
291 states.insert(component.to_string(), state);
292 }
293
294 pub fn health_check(&self) -> HealthCheckResult {
296 let mut checks = HashMap::new();
297 let mut overall_status = HealthStatus::Healthy;
298
299 let error_rate_health = self.check_error_rates();
301 if error_rate_health.status != HealthStatus::Healthy {
302 overall_status = match overall_status {
303 HealthStatus::Healthy => error_rate_health.status.clone(),
304 HealthStatus::Degraded => {
305 if error_rate_health.status == HealthStatus::Unhealthy {
306 HealthStatus::Unhealthy
307 } else {
308 HealthStatus::Degraded
309 }
310 }
311 HealthStatus::Unhealthy => HealthStatus::Unhealthy,
312 };
313 }
314 checks.insert("error_rates".to_string(), error_rate_health);
315
316 let circuit_health = self.check_circuit_breakers();
318 if circuit_health.status != HealthStatus::Healthy {
319 overall_status = match overall_status {
320 HealthStatus::Healthy => circuit_health.status.clone(),
321 HealthStatus::Degraded => {
322 if circuit_health.status == HealthStatus::Unhealthy {
323 HealthStatus::Unhealthy
324 } else {
325 HealthStatus::Degraded
326 }
327 }
328 HealthStatus::Unhealthy => HealthStatus::Unhealthy,
329 };
330 }
331 checks.insert("circuit_breakers".to_string(), circuit_health);
332
333 let resource_health = self.check_resource_usage();
335 if resource_health.status != HealthStatus::Healthy {
336 overall_status = match overall_status {
337 HealthStatus::Healthy => resource_health.status.clone(),
338 HealthStatus::Degraded => {
339 if resource_health.status == HealthStatus::Unhealthy {
340 HealthStatus::Unhealthy
341 } else {
342 HealthStatus::Degraded
343 }
344 }
345 HealthStatus::Unhealthy => HealthStatus::Unhealthy,
346 };
347 }
348 checks.insert("resource_usage".to_string(), resource_health);
349
350 let overall_message = match overall_status {
351 HealthStatus::Healthy => "All systems operational".to_string(),
352 HealthStatus::Degraded => "Some systems experiencing issues".to_string(),
353 HealthStatus::Unhealthy => "Critical systems failing".to_string(),
354 };
355
356 HealthCheckResult {
357 status: overall_status,
358 checks,
359 overall_message,
360 timestamp: chrono::Utc::now(),
361 }
362 }
363
364 fn check_error_rates(&self) -> ComponentHealth {
365 let metrics = self.metrics_collector.get_metrics_snapshot();
366
367 let mut high_error_operations = Vec::new();
368 let mut warning_operations = Vec::new();
369
370 for (operation, metrics) in &metrics.operation_metrics {
371 if metrics.error_rate > 0.1 {
372 high_error_operations.push(operation.clone());
374 } else if metrics.error_rate > 0.05 {
375 warning_operations.push(operation.clone());
377 }
378 }
379
380 let status = if !high_error_operations.is_empty() {
381 HealthStatus::Unhealthy
382 } else if !warning_operations.is_empty() {
383 HealthStatus::Degraded
384 } else {
385 HealthStatus::Healthy
386 };
387
388 let message = match status {
389 HealthStatus::Healthy => "Error rates within acceptable limits".to_string(),
390 HealthStatus::Degraded => {
391 format!("Warning: High error rates in operations: {warning_operations:?}")
392 }
393 HealthStatus::Unhealthy => {
394 format!("Critical: Very high error rates in operations: {high_error_operations:?}")
395 }
396 };
397
398 ComponentHealth {
399 status,
400 message,
401 metrics: Some(
402 serde_json::to_value(&metrics.operation_metrics)
403 .and_then(serde_json::from_value)
404 .unwrap_or_default(),
405 ),
406 }
407 }
408
409 fn check_circuit_breakers(&self) -> ComponentHealth {
410 let states = self.circuit_states.lock().unwrap();
411
412 let mut open_circuits = Vec::new();
413 let mut half_open_circuits = Vec::new();
414
415 for (component, state) in states.iter() {
416 match state {
417 crate::resilience::CircuitState::Open => open_circuits.push(component.clone()),
418 crate::resilience::CircuitState::HalfOpen => {
419 half_open_circuits.push(component.clone())
420 }
421 crate::resilience::CircuitState::Closed => {}
422 }
423 }
424
425 let status = if !open_circuits.is_empty() {
426 HealthStatus::Unhealthy
427 } else if !half_open_circuits.is_empty() {
428 HealthStatus::Degraded
429 } else {
430 HealthStatus::Healthy
431 };
432
433 let message = match status {
434 HealthStatus::Healthy => "All circuit breakers closed".to_string(),
435 HealthStatus::Degraded => {
436 format!("Circuit breakers in recovery: {half_open_circuits:?}")
437 }
438 HealthStatus::Unhealthy => format!("Open circuit breakers: {open_circuits:?}"),
439 };
440
441 let circuit_metrics = states
442 .iter()
443 .map(|(k, v)| (k.clone(), serde_json::Value::String(format!("{v:?}"))))
444 .collect();
445
446 ComponentHealth {
447 status,
448 message,
449 metrics: Some(circuit_metrics),
450 }
451 }
452
453 fn check_resource_usage(&self) -> ComponentHealth {
454 let metrics = self.metrics_collector.get_metrics_snapshot();
455
456 if metrics.resource_usage.is_empty() {
458 return ComponentHealth {
459 status: HealthStatus::Healthy,
460 message: "Resource usage monitoring not configured".to_string(),
461 metrics: None,
462 };
463 }
464
465 let mut high_usage_resources = Vec::new();
467
468 for (resource, usage) in &metrics.resource_usage {
469 let threshold = match resource.as_str() {
471 "memory_mb" => 1024, "cpu_percent" => 80,
473 "disk_usage_percent" => 85,
474 _ => continue,
475 };
476
477 if *usage > threshold {
478 high_usage_resources.push(format!("{resource}: {usage}"));
479 }
480 }
481
482 let status = if high_usage_resources.len() > 1 {
483 HealthStatus::Unhealthy
484 } else if !high_usage_resources.is_empty() {
485 HealthStatus::Degraded
486 } else {
487 HealthStatus::Healthy
488 };
489
490 let message = match status {
491 HealthStatus::Healthy => "Resource usage normal".to_string(),
492 HealthStatus::Degraded => format!("High resource usage: {high_usage_resources:?}"),
493 HealthStatus::Unhealthy => {
494 format!("Critical resource usage: {high_usage_resources:?}")
495 }
496 };
497
498 ComponentHealth {
499 status,
500 message,
501 metrics: Some(
502 serde_json::to_value(&metrics.resource_usage)
503 .and_then(serde_json::from_value)
504 .unwrap_or_default(),
505 ),
506 }
507 }
508}
509
510pub struct PerformanceMonitor {
512 metrics_collector: MetricsCollector,
513}
514
515impl PerformanceMonitor {
516 pub fn new(metrics_collector: MetricsCollector) -> Self {
518 Self { metrics_collector }
519 }
520
521 pub async fn time_operation<F, Fut, T>(&self, operation_name: &str, operation: F) -> Result<T>
523 where
524 F: FnOnce() -> Fut,
525 Fut: std::future::Future<Output = Result<T>>,
526 {
527 let start = Instant::now();
528 let result = operation().await;
529 let duration = start.elapsed();
530
531 match &result {
532 Ok(_) => {
533 self.metrics_collector
534 .record_success(operation_name, duration);
535 info!(
536 operation = operation_name,
537 duration_ms = duration.as_millis(),
538 "Operation completed successfully"
539 );
540 }
541 Err(error) => {
542 self.metrics_collector
543 .record_error(error, Some(operation_name));
544 warn!(
545 operation = operation_name,
546 duration_ms = duration.as_millis(),
547 error = %error,
548 "Operation failed"
549 );
550 }
551 }
552
553 result
554 }
555
556 pub fn get_operation_performance(&self, operation: &str) -> Option<OperationPerformance> {
558 let error_rate = self.metrics_collector.get_error_rate(operation);
559 let avg_latency = self.metrics_collector.get_average_latency(operation)?;
560
561 Some(OperationPerformance {
562 operation: operation.to_string(),
563 error_rate,
564 average_latency: avg_latency,
565 })
566 }
567}
568
569#[derive(Debug, Clone)]
571pub struct OperationPerformance {
572 pub operation: String,
574 pub error_rate: f64,
576 pub average_latency: Duration,
578}
579
580#[cfg(test)]
581mod tests {
582 use super::*;
583 use tokio::time::{sleep, Duration};
584
585 #[test]
586 fn test_metrics_collector() {
587 let collector = MetricsCollector::new();
588
589 collector.record_success("parse_file", Duration::from_millis(100));
591 collector.record_success("parse_file", Duration::from_millis(150));
592
593 let error = Error::storage("test error");
594 collector.record_error(&error, Some("parse_file"));
595
596 let error_rate = collector.get_error_rate("parse_file");
598 assert!((error_rate - 0.333).abs() < 0.01); let avg_latency = collector.get_average_latency("parse_file").unwrap();
601 assert_eq!(avg_latency, Duration::from_millis(125));
602 }
603
604 #[test]
605 fn test_health_monitor() {
606 let metrics = MetricsCollector::new();
607 let monitor = HealthMonitor::new(metrics);
608
609 let health = monitor.health_check();
611 assert_eq!(health.status, HealthStatus::Healthy);
612 assert!(health.checks.contains_key("error_rates"));
613 assert!(health.checks.contains_key("circuit_breakers"));
614 assert!(health.checks.contains_key("resource_usage"));
615 }
616
617 #[tokio::test]
618 async fn test_performance_monitor() {
619 let metrics = MetricsCollector::new();
620 let monitor = PerformanceMonitor::new(metrics);
621
622 let result = monitor
624 .time_operation("test_op", || async {
625 sleep(Duration::from_millis(10)).await;
626 Ok("success")
627 })
628 .await;
629
630 assert!(result.is_ok(), "Observability operation should succeed");
631 assert_eq!(result.unwrap(), "success");
632
633 let perf = monitor.get_operation_performance("test_op");
635 assert!(perf.is_some(), "Should have value");
636 let perf = perf.unwrap();
637 assert_eq!(perf.error_rate, 0.0);
638 assert!(perf.average_latency >= Duration::from_millis(10));
639 }
640
641 #[test]
642 fn test_metrics_snapshot() {
643 let collector = MetricsCollector::new();
644
645 collector.record_success("op1", Duration::from_millis(100));
646 let error = Error::validation("test_field", "test error");
647 collector.record_error(&error, Some("op1"));
648 collector.record_resource_usage("memory_mb", 512);
649
650 let snapshot = collector.get_metrics_snapshot();
651
652 assert!(snapshot.uptime_seconds < 365 * 24 * 3600); assert!(snapshot.operation_metrics.contains_key("op1"));
655 assert_eq!(snapshot.resource_usage.get("memory_mb"), Some(&512));
656 assert!(
657 !snapshot.error_counts.is_empty(),
658 "Should have error count after recording error"
659 );
660 }
661}