1use crate::error::{Error, ErrorSeverity, Result};
7use std::collections::HashMap;
8use std::sync::{Arc, Mutex};
9use std::time::{Duration, Instant};
10use tracing::{debug, error, info, warn};
11
12#[derive(Debug, Clone)]
14pub struct MetricsCollector {
15 metrics: Arc<Mutex<Metrics>>,
16}
17
18#[derive(Debug, Clone)]
19struct Metrics {
20 error_counts: HashMap<String, u64>,
22 error_severity_counts: HashMap<ErrorSeverity, u64>,
24 operation_latencies: HashMap<String, Vec<Duration>>,
26 operation_success_rates: HashMap<String, (u64, u64)>, resource_usage: HashMap<String, u64>,
30 start_time: Instant,
32}
33
34impl Default for Metrics {
35 fn default() -> Self {
36 Self {
37 error_counts: HashMap::new(),
38 error_severity_counts: HashMap::new(),
39 operation_latencies: HashMap::new(),
40 operation_success_rates: HashMap::new(),
41 resource_usage: HashMap::new(),
42 start_time: Instant::now(),
43 }
44 }
45}
46
47impl Default for MetricsCollector {
48 fn default() -> Self {
49 Self::new()
50 }
51}
52
53impl MetricsCollector {
54 pub fn new() -> Self {
56 Self {
57 metrics: Arc::new(Mutex::new(Metrics::default())),
58 }
59 }
60
61 pub fn record_error(&self, error: &Error, operation: Option<&str>) {
63 let mut metrics = self.metrics.lock().unwrap();
64
65 let error_type = format!("{:?}", std::mem::discriminant(error));
67 *metrics.error_counts.entry(error_type.clone()).or_insert(0) += 1;
68
69 *metrics
71 .error_severity_counts
72 .entry(error.severity())
73 .or_insert(0) += 1;
74
75 if let Some(op) = operation {
77 let (_success, total) = metrics
78 .operation_success_rates
79 .entry(op.to_string())
80 .or_insert((0, 0));
81 *total += 1;
82 }
83
84 error!(
86 error = %error,
87 error_type = error_type,
88 severity = ?error.severity(),
89 operation = operation,
90 error_code = error.error_code(),
91 "Error recorded"
92 );
93 }
94
95 pub fn record_success(&self, operation: &str, duration: Duration) {
97 let mut metrics = self.metrics.lock().unwrap();
98
99 metrics
101 .operation_latencies
102 .entry(operation.to_string())
103 .or_default()
104 .push(duration);
105
106 let (success, total) = metrics
108 .operation_success_rates
109 .entry(operation.to_string())
110 .or_insert((0, 0));
111 *success += 1;
112 *total += 1;
113
114 debug!(
115 operation = operation,
116 duration_ms = duration.as_millis(),
117 "Operation completed successfully"
118 );
119 }
120
121 pub fn record_resource_usage(&self, resource: &str, usage: u64) {
123 let mut metrics = self.metrics.lock().unwrap();
124 metrics.resource_usage.insert(resource.to_string(), usage);
125 }
126
127 pub fn get_error_rate(&self, operation: &str) -> f64 {
129 let metrics = self.metrics.lock().unwrap();
130 if let Some((success, total)) = metrics.operation_success_rates.get(operation) {
131 if *total == 0 {
132 0.0
133 } else {
134 1.0 - (*success as f64 / *total as f64)
135 }
136 } else {
137 0.0
138 }
139 }
140
141 pub fn get_average_latency(&self, operation: &str) -> Option<Duration> {
143 let metrics = self.metrics.lock().unwrap();
144 if let Some(latencies) = metrics.operation_latencies.get(operation) {
145 if latencies.is_empty() {
146 None
147 } else {
148 let total_ms: u64 = latencies.iter().map(|d| d.as_millis() as u64).sum();
149 Some(Duration::from_millis(total_ms / latencies.len() as u64))
150 }
151 } else {
152 None
153 }
154 }
155
156 pub fn uptime(&self) -> Duration {
158 let metrics = self.metrics.lock().unwrap();
159 Instant::now().duration_since(metrics.start_time)
160 }
161
162 pub fn get_metrics_snapshot(&self) -> MetricsSnapshot {
164 let metrics = self.metrics.lock().unwrap();
165
166 let mut operation_metrics = HashMap::new();
167 for (operation, (success, total)) in &metrics.operation_success_rates {
168 let error_rate = if *total == 0 {
169 0.0
170 } else {
171 1.0 - (*success as f64 / *total as f64)
172 };
173 let avg_latency = metrics
174 .operation_latencies
175 .get(operation)
176 .and_then(|latencies| {
177 if latencies.is_empty() {
178 None
179 } else {
180 let total_ms: u64 = latencies.iter().map(|d| d.as_millis() as u64).sum();
181 Some(Duration::from_millis(total_ms / latencies.len() as u64))
182 }
183 });
184
185 operation_metrics.insert(
186 operation.clone(),
187 OperationMetrics {
188 success_count: *success,
189 total_count: *total,
190 error_rate,
191 average_latency_ms: avg_latency.map(|d| d.as_millis() as u64),
192 },
193 );
194 }
195
196 MetricsSnapshot {
197 uptime_seconds: Instant::now().duration_since(metrics.start_time).as_secs(),
198 error_counts: metrics.error_counts.clone(),
199 error_severity_distribution: metrics
200 .error_severity_counts
201 .iter()
202 .map(|(k, v)| (format!("{:?}", k), *v))
203 .collect(),
204 operation_metrics,
205 resource_usage: metrics.resource_usage.clone(),
206 }
207 }
208}
209
210#[derive(Debug, Clone, serde::Serialize)]
212pub struct MetricsSnapshot {
213 pub uptime_seconds: u64,
215 pub error_counts: HashMap<String, u64>,
217 pub error_severity_distribution: HashMap<String, u64>,
219 pub operation_metrics: HashMap<String, OperationMetrics>,
221 pub resource_usage: HashMap<String, u64>,
223}
224
225#[derive(Debug, Clone, serde::Serialize)]
227pub struct OperationMetrics {
228 pub success_count: u64,
230 pub total_count: u64,
232 pub error_rate: f64,
234 pub average_latency_ms: Option<u64>,
236}
237
238#[derive(Debug, Clone, PartialEq, serde::Serialize)]
240pub enum HealthStatus {
241 Healthy,
243 Degraded,
245 Unhealthy,
247}
248
249#[derive(Debug, Clone, serde::Serialize)]
251pub struct HealthCheckResult {
252 pub status: HealthStatus,
254 pub checks: HashMap<String, ComponentHealth>,
256 pub overall_message: String,
258 pub timestamp: chrono::DateTime<chrono::Utc>,
260}
261
262#[derive(Debug, Clone, serde::Serialize)]
264pub struct ComponentHealth {
265 pub status: HealthStatus,
267 pub message: String,
269 pub metrics: Option<HashMap<String, serde_json::Value>>,
271}
272
273pub struct HealthMonitor {
275 metrics_collector: MetricsCollector,
276 circuit_states: Arc<Mutex<HashMap<String, crate::resilience::CircuitState>>>,
277}
278
279impl HealthMonitor {
280 pub fn new(metrics_collector: MetricsCollector) -> Self {
282 Self {
283 metrics_collector,
284 circuit_states: Arc::new(Mutex::new(HashMap::new())),
285 }
286 }
287
288 pub fn update_circuit_state(&self, component: &str, state: crate::resilience::CircuitState) {
290 let mut states = self.circuit_states.lock().unwrap();
291 states.insert(component.to_string(), state);
292 }
293
294 pub fn health_check(&self) -> HealthCheckResult {
296 let mut checks = HashMap::new();
297 let mut overall_status = HealthStatus::Healthy;
298
299 let error_rate_health = self.check_error_rates();
301 if error_rate_health.status != HealthStatus::Healthy {
302 overall_status = match overall_status {
303 HealthStatus::Healthy => error_rate_health.status.clone(),
304 HealthStatus::Degraded => {
305 if error_rate_health.status == HealthStatus::Unhealthy {
306 HealthStatus::Unhealthy
307 } else {
308 HealthStatus::Degraded
309 }
310 }
311 HealthStatus::Unhealthy => HealthStatus::Unhealthy,
312 };
313 }
314 checks.insert("error_rates".to_string(), error_rate_health);
315
316 let circuit_health = self.check_circuit_breakers();
318 if circuit_health.status != HealthStatus::Healthy {
319 overall_status = match overall_status {
320 HealthStatus::Healthy => circuit_health.status.clone(),
321 HealthStatus::Degraded => {
322 if circuit_health.status == HealthStatus::Unhealthy {
323 HealthStatus::Unhealthy
324 } else {
325 HealthStatus::Degraded
326 }
327 }
328 HealthStatus::Unhealthy => HealthStatus::Unhealthy,
329 };
330 }
331 checks.insert("circuit_breakers".to_string(), circuit_health);
332
333 let resource_health = self.check_resource_usage();
335 if resource_health.status != HealthStatus::Healthy {
336 overall_status = match overall_status {
337 HealthStatus::Healthy => resource_health.status.clone(),
338 HealthStatus::Degraded => {
339 if resource_health.status == HealthStatus::Unhealthy {
340 HealthStatus::Unhealthy
341 } else {
342 HealthStatus::Degraded
343 }
344 }
345 HealthStatus::Unhealthy => HealthStatus::Unhealthy,
346 };
347 }
348 checks.insert("resource_usage".to_string(), resource_health);
349
350 let overall_message = match overall_status {
351 HealthStatus::Healthy => "All systems operational".to_string(),
352 HealthStatus::Degraded => "Some systems experiencing issues".to_string(),
353 HealthStatus::Unhealthy => "Critical systems failing".to_string(),
354 };
355
356 HealthCheckResult {
357 status: overall_status,
358 checks,
359 overall_message,
360 timestamp: chrono::Utc::now(),
361 }
362 }
363
364 fn check_error_rates(&self) -> ComponentHealth {
365 let metrics = self.metrics_collector.get_metrics_snapshot();
366
367 let mut high_error_operations = Vec::new();
368 let mut warning_operations = Vec::new();
369
370 for (operation, metrics) in &metrics.operation_metrics {
371 if metrics.error_rate > 0.1 {
372 high_error_operations.push(operation.clone());
374 } else if metrics.error_rate > 0.05 {
375 warning_operations.push(operation.clone());
377 }
378 }
379
380 let status = if !high_error_operations.is_empty() {
381 HealthStatus::Unhealthy
382 } else if !warning_operations.is_empty() {
383 HealthStatus::Degraded
384 } else {
385 HealthStatus::Healthy
386 };
387
388 let message = match status {
389 HealthStatus::Healthy => "Error rates within acceptable limits".to_string(),
390 HealthStatus::Degraded => format!(
391 "Warning: High error rates in operations: {:?}",
392 warning_operations
393 ),
394 HealthStatus::Unhealthy => format!(
395 "Critical: Very high error rates in operations: {:?}",
396 high_error_operations
397 ),
398 };
399
400 ComponentHealth {
401 status,
402 message,
403 metrics: Some(
404 serde_json::to_value(&metrics.operation_metrics)
405 .and_then(serde_json::from_value)
406 .unwrap_or_default(),
407 ),
408 }
409 }
410
411 fn check_circuit_breakers(&self) -> ComponentHealth {
412 let states = self.circuit_states.lock().unwrap();
413
414 let mut open_circuits = Vec::new();
415 let mut half_open_circuits = Vec::new();
416
417 for (component, state) in states.iter() {
418 match state {
419 crate::resilience::CircuitState::Open => open_circuits.push(component.clone()),
420 crate::resilience::CircuitState::HalfOpen => {
421 half_open_circuits.push(component.clone())
422 }
423 crate::resilience::CircuitState::Closed => {}
424 }
425 }
426
427 let status = if !open_circuits.is_empty() {
428 HealthStatus::Unhealthy
429 } else if !half_open_circuits.is_empty() {
430 HealthStatus::Degraded
431 } else {
432 HealthStatus::Healthy
433 };
434
435 let message = match status {
436 HealthStatus::Healthy => "All circuit breakers closed".to_string(),
437 HealthStatus::Degraded => {
438 format!("Circuit breakers in recovery: {:?}", half_open_circuits)
439 }
440 HealthStatus::Unhealthy => format!("Open circuit breakers: {:?}", open_circuits),
441 };
442
443 let circuit_metrics = states
444 .iter()
445 .map(|(k, v)| (k.clone(), serde_json::Value::String(format!("{:?}", v))))
446 .collect();
447
448 ComponentHealth {
449 status,
450 message,
451 metrics: Some(circuit_metrics),
452 }
453 }
454
455 fn check_resource_usage(&self) -> ComponentHealth {
456 let metrics = self.metrics_collector.get_metrics_snapshot();
457
458 if metrics.resource_usage.is_empty() {
460 return ComponentHealth {
461 status: HealthStatus::Healthy,
462 message: "Resource usage monitoring not configured".to_string(),
463 metrics: None,
464 };
465 }
466
467 let mut high_usage_resources = Vec::new();
469
470 for (resource, usage) in &metrics.resource_usage {
471 let threshold = match resource.as_str() {
473 "memory_mb" => 1024, "cpu_percent" => 80,
475 "disk_usage_percent" => 85,
476 _ => continue,
477 };
478
479 if *usage > threshold {
480 high_usage_resources.push(format!("{}: {}", resource, usage));
481 }
482 }
483
484 let status = if high_usage_resources.len() > 1 {
485 HealthStatus::Unhealthy
486 } else if !high_usage_resources.is_empty() {
487 HealthStatus::Degraded
488 } else {
489 HealthStatus::Healthy
490 };
491
492 let message = match status {
493 HealthStatus::Healthy => "Resource usage normal".to_string(),
494 HealthStatus::Degraded => format!("High resource usage: {:?}", high_usage_resources),
495 HealthStatus::Unhealthy => {
496 format!("Critical resource usage: {:?}", high_usage_resources)
497 }
498 };
499
500 ComponentHealth {
501 status,
502 message,
503 metrics: Some(
504 serde_json::to_value(&metrics.resource_usage)
505 .and_then(serde_json::from_value)
506 .unwrap_or_default(),
507 ),
508 }
509 }
510}
511
512pub struct PerformanceMonitor {
514 metrics_collector: MetricsCollector,
515}
516
517impl PerformanceMonitor {
518 pub fn new(metrics_collector: MetricsCollector) -> Self {
520 Self { metrics_collector }
521 }
522
523 pub async fn time_operation<F, Fut, T>(&self, operation_name: &str, operation: F) -> Result<T>
525 where
526 F: FnOnce() -> Fut,
527 Fut: std::future::Future<Output = Result<T>>,
528 {
529 let start = Instant::now();
530 let result = operation().await;
531 let duration = start.elapsed();
532
533 match &result {
534 Ok(_) => {
535 self.metrics_collector
536 .record_success(operation_name, duration);
537 info!(
538 operation = operation_name,
539 duration_ms = duration.as_millis(),
540 "Operation completed successfully"
541 );
542 }
543 Err(error) => {
544 self.metrics_collector
545 .record_error(error, Some(operation_name));
546 warn!(
547 operation = operation_name,
548 duration_ms = duration.as_millis(),
549 error = %error,
550 "Operation failed"
551 );
552 }
553 }
554
555 result
556 }
557
558 pub fn get_operation_performance(&self, operation: &str) -> Option<OperationPerformance> {
560 let error_rate = self.metrics_collector.get_error_rate(operation);
561 let avg_latency = self.metrics_collector.get_average_latency(operation)?;
562
563 Some(OperationPerformance {
564 operation: operation.to_string(),
565 error_rate,
566 average_latency: avg_latency,
567 })
568 }
569}
570
571#[derive(Debug, Clone)]
573pub struct OperationPerformance {
574 pub operation: String,
576 pub error_rate: f64,
578 pub average_latency: Duration,
580}
581
582#[cfg(test)]
583mod tests {
584 use super::*;
585 use tokio::time::{sleep, Duration};
586
587 #[test]
588 fn test_metrics_collector() {
589 let collector = MetricsCollector::new();
590
591 collector.record_success("parse_file", Duration::from_millis(100));
593 collector.record_success("parse_file", Duration::from_millis(150));
594
595 let error = Error::storage("test error");
596 collector.record_error(&error, Some("parse_file"));
597
598 let error_rate = collector.get_error_rate("parse_file");
600 assert!((error_rate - 0.333).abs() < 0.01); let avg_latency = collector.get_average_latency("parse_file").unwrap();
603 assert_eq!(avg_latency, Duration::from_millis(125));
604 }
605
606 #[test]
607 fn test_health_monitor() {
608 let metrics = MetricsCollector::new();
609 let monitor = HealthMonitor::new(metrics);
610
611 let health = monitor.health_check();
613 assert_eq!(health.status, HealthStatus::Healthy);
614 assert!(health.checks.contains_key("error_rates"));
615 assert!(health.checks.contains_key("circuit_breakers"));
616 assert!(health.checks.contains_key("resource_usage"));
617 }
618
619 #[tokio::test]
620 async fn test_performance_monitor() {
621 let metrics = MetricsCollector::new();
622 let monitor = PerformanceMonitor::new(metrics);
623
624 let result = monitor
626 .time_operation("test_op", || async {
627 sleep(Duration::from_millis(10)).await;
628 Ok("success")
629 })
630 .await;
631
632 assert!(result.is_ok());
633 assert_eq!(result.unwrap(), "success");
634
635 let perf = monitor.get_operation_performance("test_op");
637 assert!(perf.is_some());
638 let perf = perf.unwrap();
639 assert_eq!(perf.error_rate, 0.0);
640 assert!(perf.average_latency >= Duration::from_millis(10));
641 }
642
643 #[test]
644 fn test_metrics_snapshot() {
645 let collector = MetricsCollector::new();
646
647 collector.record_success("op1", Duration::from_millis(100));
648 let error = Error::validation("test_field", "test error");
649 collector.record_error(&error, Some("op1"));
650 collector.record_resource_usage("memory_mb", 512);
651
652 let snapshot = collector.get_metrics_snapshot();
653
654 assert!(snapshot.uptime_seconds < 365 * 24 * 3600); assert!(snapshot.operation_metrics.contains_key("op1"));
657 assert_eq!(snapshot.resource_usage.get("memory_mb"), Some(&512));
658 assert!(!snapshot.error_counts.is_empty());
659 }
660}