1use crate::{
7 config::MonitoringConfig,
8 error::Result,
9 types::{Request, Response},
10};
11use serde::{Deserialize, Serialize};
12use std::{
13 collections::HashMap,
14 sync::{Arc, Mutex, RwLock},
15 time::{Duration, Instant, SystemTime},
16};
17use tracing::{debug, error, info, warn};
18
19pub struct MonitoringSystem {
21 #[allow(dead_code)]
22 config: MonitoringConfig,
23 metrics: Arc<RwLock<MetricsRegistry>>,
24 health_checks: Arc<RwLock<Vec<HealthCheck>>>,
25 alerts: Arc<Mutex<Vec<Alert>>>,
26 traces: Arc<Mutex<Vec<TraceSpan>>>,
27}
28
29impl MonitoringSystem {
30 pub fn new(config: MonitoringConfig) -> Self {
31 Self {
32 config,
33 metrics: Arc::new(RwLock::new(MetricsRegistry::new())),
34 health_checks: Arc::new(RwLock::new(Vec::new())),
35 alerts: Arc::new(Mutex::new(Vec::new())),
36 traces: Arc::new(Mutex::new(Vec::new())),
37 }
38 }
39
40 pub fn record_request(&self, request: &Request, response: &Response, duration: Duration) {
42 let mut metrics = self.metrics.write().unwrap();
43
44 metrics.increment_counter(
46 "requests_total",
47 &[
48 ("method", request.method.to_string()),
49 ("status", response.status.as_u16().to_string()),
50 ],
51 );
52
53 metrics.record_histogram(
55 "request_duration_seconds",
56 duration.as_secs_f64(),
57 &[
58 ("method", request.method.to_string()),
59 ("endpoint", request.uri.path().to_string()),
60 ],
61 );
62
63 metrics.increment_counter(
65 "responses_by_status",
66 &[
67 ("status_code", response.status.as_u16().to_string()),
68 (
69 "status_class",
70 format!("{}xx", response.status.as_u16() / 100),
71 ),
72 ],
73 );
74
75 if response.status.as_u16() >= 400 {
77 metrics.increment_counter(
78 "errors_total",
79 &[
80 ("status", response.status.as_u16().to_string()),
81 ("endpoint", request.uri.path().to_string()),
82 ],
83 );
84 }
85 }
86
87 pub fn record_metric(&self, name: &str, value: f64, labels: &[(&str, String)]) {
89 let mut metrics = self.metrics.write().unwrap();
90 metrics.record_gauge(name, value, labels);
91 }
92
93 pub fn start_trace(&self, operation: &str, request: &Request) -> TraceContext {
95 let trace_id = generate_trace_id();
96 let span_id = generate_span_id();
97
98 let span = TraceSpan {
99 trace_id: trace_id.clone(),
100 span_id: span_id.clone(),
101 parent_span_id: None,
102 operation_name: operation.to_string(),
103 start_time: Instant::now(),
104 end_time: None,
105 duration: None,
106 tags: {
107 let mut tags = HashMap::new();
108 tags.insert("http.method".to_string(), request.method.to_string());
109 tags.insert("http.url".to_string(), request.uri.to_string());
110 if let Some(user_agent) = request.headers.get("user-agent") {
111 tags.insert("http.user_agent".to_string(), user_agent.clone());
112 }
113 tags
114 },
115 logs: Vec::new(),
116 };
117
118 {
119 let mut traces = self.traces.lock().unwrap();
120 traces.push(span);
121 }
122
123 TraceContext {
124 trace_id,
125 span_id,
126 monitoring: Arc::downgrade(&self.metrics),
127 }
128 }
129
130 pub fn add_health_check(&self, check: HealthCheck) {
132 let mut checks = self.health_checks.write().unwrap();
133 checks.push(check);
134 }
135
136 pub async fn run_health_checks(&self) -> HealthStatus {
138 let checks = self.health_checks.read().unwrap().clone();
139 let mut results = Vec::new();
140 let mut overall_status = HealthLevel::Healthy;
141
142 for check in checks {
143 let start_time = Instant::now();
144 let result = (check.check_fn)().await;
145 let duration = start_time.elapsed();
146
147 if result.status == HealthLevel::Critical {
149 overall_status = HealthLevel::Critical;
150 } else if result.status == HealthLevel::Warning
151 && overall_status == HealthLevel::Healthy
152 {
153 overall_status = HealthLevel::Warning;
154 }
155
156 let check_name = check.name.clone();
157 let result_status = result.status;
158 let result_message = result.message.clone();
159
160 let check_result = HealthCheckResult {
161 name: check.name,
162 status: result.status,
163 message: result.message,
164 duration,
165 timestamp: SystemTime::now(),
166 };
167
168 results.push(check_result);
169
170 match result_status {
172 HealthLevel::Healthy => {
173 debug!("Health check '{}' passed: {}", check_name, result_message)
174 }
175 HealthLevel::Warning => {
176 warn!("Health check '{}' warning: {}", check_name, result_message)
177 }
178 HealthLevel::Critical => {
179 error!("Health check '{}' failed: {}", check_name, result_message)
180 }
181 }
182 }
183
184 HealthStatus {
185 overall_status,
186 checks: results,
187 timestamp: SystemTime::now(),
188 }
189 }
190
191 pub fn create_alert(&self, alert: Alert) {
193 info!("Alert created: {} - {}", alert.severity, alert.message);
194
195 let mut alerts = self.alerts.lock().unwrap();
196 alerts.push(alert);
197
198 if alerts.len() > 100 {
200 let excess = alerts.len() - 100;
201 alerts.drain(0..excess);
202 }
203 }
204
205 pub fn get_metrics(&self) -> MetricsSnapshot {
207 let metrics = self.metrics.read().unwrap();
208 metrics.snapshot()
209 }
210
211 pub fn get_recent_alerts(&self, since: SystemTime) -> Vec<Alert> {
213 let alerts = self.alerts.lock().unwrap();
214 alerts
215 .iter()
216 .filter(|alert| alert.timestamp > since)
217 .cloned()
218 .collect()
219 }
220
221 pub fn get_performance_stats(&self) -> PerformanceStats {
223 let metrics = self.metrics.read().unwrap();
224
225 let total_requests = metrics.get_counter("requests_total").unwrap_or(0.0);
226 let error_requests = metrics.get_counter("errors_total").unwrap_or(0.0);
227 let error_rate = if total_requests > 0.0 {
228 error_requests / total_requests
229 } else {
230 0.0
231 };
232
233 let avg_response_time = metrics
234 .get_histogram_avg("request_duration_seconds")
235 .unwrap_or(0.0);
236 let p95_response_time = metrics
237 .get_histogram_percentile("request_duration_seconds", 95.0)
238 .unwrap_or(0.0);
239
240 PerformanceStats {
241 total_requests: total_requests as u64,
242 error_rate,
243 avg_response_time,
244 p95_response_time,
245 timestamp: SystemTime::now(),
246 }
247 }
248}
249
250#[derive(Debug)]
252pub struct MetricsRegistry {
253 counters: HashMap<String, CounterMetric>,
254 gauges: HashMap<String, GaugeMetric>,
255 histograms: HashMap<String, HistogramMetric>,
256}
257
258impl MetricsRegistry {
259 pub fn new() -> Self {
260 Self {
261 counters: HashMap::new(),
262 gauges: HashMap::new(),
263 histograms: HashMap::new(),
264 }
265 }
266
267 pub fn increment_counter(&mut self, name: &str, labels: &[(&str, String)]) {
269 let key = format!("{}:{}", name, serialize_labels(labels));
270 let counter = self
271 .counters
272 .entry(key)
273 .or_insert_with(|| CounterMetric::new(name, labels));
274 counter.increment();
275 }
276
277 pub fn record_gauge(&mut self, name: &str, value: f64, labels: &[(&str, String)]) {
279 let key = format!("{}:{}", name, serialize_labels(labels));
280 let gauge = self
281 .gauges
282 .entry(key)
283 .or_insert_with(|| GaugeMetric::new(name, labels));
284 gauge.set(value);
285 }
286
287 pub fn record_histogram(&mut self, name: &str, value: f64, labels: &[(&str, String)]) {
289 let key = format!("{}:{}", name, serialize_labels(labels));
290 let histogram = self
291 .histograms
292 .entry(key)
293 .or_insert_with(|| HistogramMetric::new(name, labels));
294 histogram.record(value);
295 }
296
297 pub fn get_counter(&self, name: &str) -> Option<f64> {
299 self.counters
300 .values()
301 .filter(|c| c.name == name)
302 .map(|c| c.value)
303 .sum::<f64>()
304 .into()
305 }
306
307 pub fn get_histogram_avg(&self, name: &str) -> Option<f64> {
309 let histograms: Vec<_> = self
310 .histograms
311 .values()
312 .filter(|h| h.name == name)
313 .collect();
314
315 if histograms.is_empty() {
316 return None;
317 }
318
319 let total_sum: f64 = histograms.iter().map(|h| h.sum).sum();
320 let total_count: u64 = histograms.iter().map(|h| h.count).sum();
321
322 if total_count > 0 {
323 Some(total_sum / total_count as f64)
324 } else {
325 None
326 }
327 }
328
329 pub fn get_histogram_percentile(&self, name: &str, percentile: f64) -> Option<f64> {
331 let histograms: Vec<_> = self
332 .histograms
333 .values()
334 .filter(|h| h.name == name)
335 .collect();
336
337 if histograms.is_empty() {
338 return None;
339 }
340
341 let mut all_values = Vec::new();
343 for histogram in histograms {
344 for &value in &histogram.values {
345 all_values.push(value);
346 }
347 }
348
349 if all_values.is_empty() {
350 return None;
351 }
352
353 all_values.sort_by(|a, b| a.partial_cmp(b).unwrap());
354 let index = ((percentile / 100.0) * (all_values.len() - 1) as f64) as usize;
355 Some(all_values[index])
356 }
357
358 pub fn snapshot(&self) -> MetricsSnapshot {
360 MetricsSnapshot {
361 counters: self.counters.clone(),
362 gauges: self.gauges.clone(),
363 histograms: self.histograms.clone(),
364 timestamp: SystemTime::now(),
365 }
366 }
367}
368
369impl Default for MetricsRegistry {
370 fn default() -> Self {
371 Self::new()
372 }
373}
374
375#[derive(Debug, Clone)]
377pub struct CounterMetric {
378 pub name: String,
379 pub labels: HashMap<String, String>,
380 pub value: f64,
381}
382
383impl CounterMetric {
384 pub fn new(name: &str, labels: &[(&str, String)]) -> Self {
385 Self {
386 name: name.to_string(),
387 labels: labels
388 .iter()
389 .map(|(k, v)| (k.to_string(), v.clone()))
390 .collect(),
391 value: 0.0,
392 }
393 }
394
395 pub fn increment(&mut self) {
396 self.value += 1.0;
397 }
398
399 pub fn add(&mut self, value: f64) {
400 self.value += value;
401 }
402}
403
404#[derive(Debug, Clone)]
406pub struct GaugeMetric {
407 pub name: String,
408 pub labels: HashMap<String, String>,
409 pub value: f64,
410}
411
412impl GaugeMetric {
413 pub fn new(name: &str, labels: &[(&str, String)]) -> Self {
414 Self {
415 name: name.to_string(),
416 labels: labels
417 .iter()
418 .map(|(k, v)| (k.to_string(), v.clone()))
419 .collect(),
420 value: 0.0,
421 }
422 }
423
424 pub fn set(&mut self, value: f64) {
425 self.value = value;
426 }
427}
428
429#[derive(Debug, Clone)]
431pub struct HistogramMetric {
432 pub name: String,
433 pub labels: HashMap<String, String>,
434 pub values: Vec<f64>,
435 pub sum: f64,
436 pub count: u64,
437}
438
439impl HistogramMetric {
440 pub fn new(name: &str, labels: &[(&str, String)]) -> Self {
441 Self {
442 name: name.to_string(),
443 labels: labels
444 .iter()
445 .map(|(k, v)| (k.to_string(), v.clone()))
446 .collect(),
447 values: Vec::new(),
448 sum: 0.0,
449 count: 0,
450 }
451 }
452
453 pub fn record(&mut self, value: f64) {
454 self.values.push(value);
455 self.sum += value;
456 self.count += 1;
457
458 if self.values.len() > 1000 {
460 let removed = self.values.remove(0);
461 self.sum -= removed;
462 self.count -= 1;
463 }
464 }
465}
466
467#[derive(Debug, Clone)]
469pub struct MetricsSnapshot {
470 pub counters: HashMap<String, CounterMetric>,
471 pub gauges: HashMap<String, GaugeMetric>,
472 pub histograms: HashMap<String, HistogramMetric>,
473 pub timestamp: SystemTime,
474}
475
476pub struct TraceContext {
478 pub trace_id: String,
479 pub span_id: String,
480 monitoring: std::sync::Weak<RwLock<MetricsRegistry>>,
481}
482
483impl TraceContext {
484 pub fn create_child(&self, _operation: &str) -> TraceContext {
486 TraceContext {
487 trace_id: self.trace_id.clone(),
488 span_id: generate_span_id(),
489 monitoring: self.monitoring.clone(),
490 }
491 }
492
493 pub fn set_tag(&self, key: &str, value: &str) {
495 debug!(
496 "Trace {} span {}: {} = {}",
497 self.trace_id, self.span_id, key, value
498 );
499 }
500
501 pub fn log_event(&self, message: &str) {
503 info!("Trace {} span {}: {}", self.trace_id, self.span_id, message);
504 }
505}
506
507#[derive(Debug, Clone)]
509pub struct TraceSpan {
510 pub trace_id: String,
511 pub span_id: String,
512 pub parent_span_id: Option<String>,
513 pub operation_name: String,
514 pub start_time: Instant,
515 pub end_time: Option<Instant>,
516 pub duration: Option<Duration>,
517 pub tags: HashMap<String, String>,
518 pub logs: Vec<TraceLog>,
519}
520
521#[derive(Debug, Clone)]
523pub struct TraceLog {
524 pub timestamp: Instant,
525 pub message: String,
526 pub level: LogLevel,
527}
528
529#[derive(Debug, Clone)]
530pub enum LogLevel {
531 Debug,
532 Info,
533 Warn,
534 Error,
535}
536
537pub type HealthCheckFn = Box<
539 dyn Fn() -> std::pin::Pin<Box<dyn std::future::Future<Output = HealthCheckResult> + Send>>
540 + Send
541 + Sync,
542>;
543
544#[derive(Clone)]
545pub struct HealthCheck {
546 pub name: String,
547 pub check_fn: Arc<HealthCheckFn>,
548}
549
550impl std::fmt::Debug for HealthCheck {
551 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
552 f.debug_struct("HealthCheck")
553 .field("name", &self.name)
554 .finish()
555 }
556}
557
558#[derive(Debug, Clone)]
560pub struct HealthCheckResult {
561 pub name: String,
562 pub status: HealthLevel,
563 pub message: String,
564 pub duration: Duration,
565 pub timestamp: SystemTime,
566}
567
568#[derive(Debug, Clone, Copy, PartialEq, Eq)]
570pub enum HealthLevel {
571 Healthy,
572 Warning,
573 Critical,
574}
575
576#[derive(Debug, Clone)]
578pub struct HealthStatus {
579 pub overall_status: HealthLevel,
580 pub checks: Vec<HealthCheckResult>,
581 pub timestamp: SystemTime,
582}
583
584#[derive(Debug, Clone)]
586pub struct Alert {
587 pub id: String,
588 pub severity: AlertSeverity,
589 pub message: String,
590 pub source: String,
591 pub timestamp: SystemTime,
592 pub metadata: HashMap<String, String>,
593}
594
595#[derive(Debug, Clone, PartialEq, Eq)]
596pub enum AlertSeverity {
597 Info,
598 Warning,
599 Critical,
600}
601
602impl std::fmt::Display for AlertSeverity {
603 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
604 match self {
605 AlertSeverity::Info => write!(f, "INFO"),
606 AlertSeverity::Warning => write!(f, "WARNING"),
607 AlertSeverity::Critical => write!(f, "CRITICAL"),
608 }
609 }
610}
611
612#[derive(Debug, Clone, Serialize, Deserialize)]
614pub struct PerformanceStats {
615 pub total_requests: u64,
616 pub error_rate: f64,
617 pub avg_response_time: f64,
618 pub p95_response_time: f64,
619 pub timestamp: SystemTime,
620}
621
622fn serialize_labels(labels: &[(&str, String)]) -> String {
624 labels
625 .iter()
626 .map(|(k, v)| format!("{}={}", k, v))
627 .collect::<Vec<_>>()
628 .join(",")
629}
630
631fn generate_trace_id() -> String {
632 format!("{:016x}", rand::random::<u64>())
633}
634
635fn generate_span_id() -> String {
636 format!("{:08x}", rand::random::<u32>())
637}
638
639pub mod health_checks {
641 use super::*;
642
643 pub fn memory_usage(threshold_mb: u64) -> HealthCheck {
645 HealthCheck {
646 name: "memory_usage".to_string(),
647 check_fn: Arc::new(Box::new(move || {
648 Box::pin(async move {
649 let memory_mb = get_memory_usage_mb();
651
652 if memory_mb > threshold_mb {
653 HealthCheckResult {
654 name: "memory_usage".to_string(),
655 status: HealthLevel::Critical,
656 message: format!(
657 "Memory usage {}MB exceeds threshold {}MB",
658 memory_mb, threshold_mb
659 ),
660 duration: Duration::from_millis(1),
661 timestamp: SystemTime::now(),
662 }
663 } else if memory_mb > threshold_mb * 80 / 100 {
664 HealthCheckResult {
665 name: "memory_usage".to_string(),
666 status: HealthLevel::Warning,
667 message: format!(
668 "Memory usage {}MB approaching threshold {}MB",
669 memory_mb, threshold_mb
670 ),
671 duration: Duration::from_millis(1),
672 timestamp: SystemTime::now(),
673 }
674 } else {
675 HealthCheckResult {
676 name: "memory_usage".to_string(),
677 status: HealthLevel::Healthy,
678 message: format!("Memory usage {}MB is normal", memory_mb),
679 duration: Duration::from_millis(1),
680 timestamp: SystemTime::now(),
681 }
682 }
683 })
684 })),
685 }
686 }
687
688 pub fn disk_space(path: String, threshold_gb: u64) -> HealthCheck {
690 HealthCheck {
691 name: format!("disk_space_{}", path),
692 check_fn: Arc::new(Box::new(move || {
693 let path = path.clone();
694 Box::pin(async move {
695 let free_gb = get_free_disk_space_gb(&path);
697
698 if free_gb < threshold_gb {
699 HealthCheckResult {
700 name: format!("disk_space_{}", path),
701 status: HealthLevel::Critical,
702 message: format!(
703 "Free disk space {}GB below threshold {}GB",
704 free_gb, threshold_gb
705 ),
706 duration: Duration::from_millis(1),
707 timestamp: SystemTime::now(),
708 }
709 } else if free_gb < threshold_gb * 2 {
710 HealthCheckResult {
711 name: format!("disk_space_{}", path),
712 status: HealthLevel::Warning,
713 message: format!("Free disk space {}GB low", free_gb),
714 duration: Duration::from_millis(1),
715 timestamp: SystemTime::now(),
716 }
717 } else {
718 HealthCheckResult {
719 name: format!("disk_space_{}", path),
720 status: HealthLevel::Healthy,
721 message: format!("Free disk space {}GB is sufficient", free_gb),
722 duration: Duration::from_millis(1),
723 timestamp: SystemTime::now(),
724 }
725 }
726 })
727 })),
728 }
729 }
730
731 pub fn database_connection(connection_string: String) -> HealthCheck {
733 HealthCheck {
734 name: "database_connection".to_string(),
735 check_fn: Arc::new(Box::new(move || {
736 let _connection_string = connection_string.clone();
737 Box::pin(async move {
738 let start = Instant::now();
740 let connected = test_database_connection().await;
741 let duration = start.elapsed();
742
743 if connected {
744 HealthCheckResult {
745 name: "database_connection".to_string(),
746 status: HealthLevel::Healthy,
747 message: "Database connection successful".to_string(),
748 duration,
749 timestamp: SystemTime::now(),
750 }
751 } else {
752 HealthCheckResult {
753 name: "database_connection".to_string(),
754 status: HealthLevel::Critical,
755 message: "Database connection failed".to_string(),
756 duration,
757 timestamp: SystemTime::now(),
758 }
759 }
760 })
761 })),
762 }
763 }
764
765 fn get_memory_usage_mb() -> u64 {
767 100
769 }
770
771 fn get_free_disk_space_gb(_path: &str) -> u64 {
772 10
774 }
775
776 async fn test_database_connection() -> bool {
777 true
779 }
780}
781
782#[cfg(test)]
783mod tests {
784 use super::*;
785
786 #[tokio::test]
787 async fn test_monitoring_system() {
788 let config = MonitoringConfig::default();
789 let monitoring = MonitoringSystem::new(config);
790
791 monitoring.record_metric("test_metric", 42.0, &[("label", "value".to_string())]);
793
794 let metrics = monitoring.get_metrics();
795 assert!(!metrics.gauges.is_empty());
796 }
797
798 #[tokio::test]
799 async fn test_health_checks() {
800 let config = MonitoringConfig::default();
801 let monitoring = MonitoringSystem::new(config);
802
803 monitoring.add_health_check(health_checks::memory_usage(1000));
804
805 let health_status = monitoring.run_health_checks().await;
806 assert_eq!(health_status.checks.len(), 1);
807 }
808
809 #[test]
810 fn test_metrics_registry() {
811 let mut registry = MetricsRegistry::new();
812
813 registry.increment_counter("test_counter", &[("method", "GET".to_string())]);
814 registry.record_gauge("test_gauge", 42.0, &[]);
815 registry.record_histogram("test_histogram", 1.5, &[]);
816
817 assert_eq!(registry.get_counter("test_counter"), Some(1.0));
818 assert_eq!(registry.get_histogram_avg("test_histogram"), Some(1.5));
819 }
820
821 #[test]
822 fn test_trace_context() {
823 let trace = TraceContext {
824 trace_id: "test_trace".to_string(),
825 span_id: "test_span".to_string(),
826 monitoring: std::sync::Weak::new(),
827 };
828
829 let child = trace.create_child("child_operation");
830 assert_eq!(child.trace_id, "test_trace");
831 assert_ne!(child.span_id, "test_span");
832 }
833
834 #[test]
835 fn test_alert_creation() {
836 let config = MonitoringConfig::default();
837 let monitoring = MonitoringSystem::new(config);
838
839 let alert = Alert {
840 id: "test_alert".to_string(),
841 severity: AlertSeverity::Warning,
842 message: "Test alert message".to_string(),
843 source: "test_source".to_string(),
844 timestamp: SystemTime::now(),
845 metadata: HashMap::new(),
846 };
847
848 monitoring.create_alert(alert);
849
850 let recent_alerts =
851 monitoring.get_recent_alerts(SystemTime::now() - Duration::from_secs(60));
852 assert_eq!(recent_alerts.len(), 1);
853 }
854}