1use anyhow::Result;
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12use sysinfo::{Pid, ProcessesToUpdate, System};
13use tokio::sync::RwLock;
14use tracing::{debug, error, info, warn};
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct MonitoringConfig {
19 pub enable_metrics: bool,
20 pub enable_tracing: bool,
21 pub metrics_interval: Duration,
22 pub health_check_interval: Duration,
23 pub enable_profiling: bool,
24 pub prometheus_endpoint: Option<String>,
25 pub jaeger_endpoint: Option<String>,
26 pub log_level: String,
27}
28
29pub struct MetricsCollector {
31 config: MonitoringConfig,
32 metrics: Arc<RwLock<StreamingMetrics>>,
33 health_checker: Arc<HealthChecker>,
34 profiler: Option<Profiler>,
35 system: Arc<RwLock<System>>,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct StreamingMetrics {
41 pub producer_events_published: u64,
43 pub producer_events_failed: u64,
44 pub producer_bytes_sent: u64,
45 pub producer_batches_sent: u64,
46 pub producer_average_latency_ms: f64,
47 pub producer_throughput_eps: f64,
48
49 pub consumer_events_consumed: u64,
51 pub consumer_events_processed: u64,
52 pub consumer_events_filtered: u64,
53 pub consumer_events_failed: u64,
54 pub consumer_bytes_received: u64,
55 pub consumer_batches_received: u64,
56 pub consumer_average_processing_time_ms: f64,
57 pub consumer_throughput_eps: f64,
58 pub consumer_lag_ms: Option<f64>,
59
60 pub system_memory_usage_bytes: u64,
62 pub system_cpu_usage_percent: f64,
63 pub system_network_bytes_in: u64,
64 pub system_network_bytes_out: u64,
65 pub system_gc_collections: u64,
66 pub system_gc_time_ms: u64,
67
68 pub backend_connections_active: u32,
70 pub backend_connections_idle: u32,
71 pub backend_connection_errors: u64,
72 pub backend_circuit_breaker_trips: u64,
73 pub backend_retry_attempts: u64,
74
75 pub window_operations_count: u64,
77 pub aggregation_operations_count: u64,
78 pub pattern_matches_found: u64,
79 pub state_store_operations: u64,
80 pub subscriptions_active: u32,
81
82 pub message_loss_rate: f64,
84 pub duplicate_rate: f64,
85 pub out_of_order_rate: f64,
86 pub error_rate: f64,
87 pub success_rate: f64,
88 pub availability: f64,
89
90 pub dlq_messages_count: u64,
92 pub dlq_messages_per_second: f64,
93 pub dlq_processing_rate: f64,
94 pub dlq_oldest_message_age_ms: u64,
95 pub dlq_replay_success_rate: f64,
96 pub dlq_total_replayed: u64,
97 pub dlq_size_bytes: u64,
98 pub dlq_error_categories: HashMap<String, u64>,
99
100 pub last_updated: DateTime<Utc>,
102 pub collection_start_time: DateTime<Utc>,
103}
104
105impl Default for StreamingMetrics {
106 fn default() -> Self {
107 let now = Utc::now();
108 Self {
109 producer_events_published: 0,
111 producer_events_failed: 0,
112 producer_bytes_sent: 0,
113 producer_batches_sent: 0,
114 producer_average_latency_ms: 0.0,
115 producer_throughput_eps: 0.0,
116
117 consumer_events_consumed: 0,
119 consumer_events_processed: 0,
120 consumer_events_filtered: 0,
121 consumer_events_failed: 0,
122 consumer_bytes_received: 0,
123 consumer_batches_received: 0,
124 consumer_average_processing_time_ms: 0.0,
125 consumer_throughput_eps: 0.0,
126 consumer_lag_ms: None,
127
128 system_memory_usage_bytes: 0,
130 system_cpu_usage_percent: 0.0,
131 system_network_bytes_in: 0,
132 system_network_bytes_out: 0,
133 system_gc_collections: 0,
134 system_gc_time_ms: 0,
135
136 backend_connections_active: 0,
138 backend_connections_idle: 0,
139 backend_connection_errors: 0,
140 backend_circuit_breaker_trips: 0,
141 backend_retry_attempts: 0,
142
143 window_operations_count: 0,
145 aggregation_operations_count: 0,
146 pattern_matches_found: 0,
147 state_store_operations: 0,
148 subscriptions_active: 0,
149
150 message_loss_rate: 0.0,
152 duplicate_rate: 0.0,
153 out_of_order_rate: 0.0,
154 error_rate: 0.0,
155 success_rate: 100.0, availability: 100.0, dlq_messages_count: 0,
160 dlq_messages_per_second: 0.0,
161 dlq_processing_rate: 0.0,
162 dlq_oldest_message_age_ms: 0,
163 dlq_replay_success_rate: 100.0, dlq_total_replayed: 0,
165 dlq_size_bytes: 0,
166 dlq_error_categories: HashMap::new(),
167
168 last_updated: now,
170 collection_start_time: now,
171 }
172 }
173}
174
175pub struct HealthChecker {
177 config: MonitoringConfig,
178 health_status: Arc<RwLock<SystemHealth>>,
179 component_checkers: Vec<Box<dyn ComponentHealthChecker>>,
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize)]
184pub struct SystemHealth {
185 pub overall_status: HealthStatus,
186 pub component_health: HashMap<String, ComponentHealth>,
187 pub last_check: DateTime<Utc>,
188 pub uptime: Duration,
189 pub alerts: Vec<HealthAlert>,
190}
191
192#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
194pub enum HealthStatus {
195 Healthy,
196 Warning,
197 Critical,
198 Unknown,
199}
200
201impl std::fmt::Display for HealthStatus {
202 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203 match self {
204 HealthStatus::Healthy => write!(f, "healthy"),
205 HealthStatus::Warning => write!(f, "warning"),
206 HealthStatus::Critical => write!(f, "critical"),
207 HealthStatus::Unknown => write!(f, "unknown"),
208 }
209 }
210}
211
212#[derive(Debug, Clone, Serialize, Deserialize)]
214pub struct ComponentHealth {
215 pub status: HealthStatus,
216 pub message: String,
217 pub last_check: DateTime<Utc>,
218 pub metrics: HashMap<String, f64>,
219 pub dependencies: Vec<String>,
220}
221
222#[derive(Debug, Clone, Serialize, Deserialize)]
224pub struct HealthAlert {
225 pub id: String,
226 pub component: String,
227 pub severity: AlertSeverity,
228 pub message: String,
229 pub timestamp: DateTime<Utc>,
230 pub resolved: bool,
231}
232
233#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
235pub enum AlertSeverity {
236 Info,
237 Warning,
238 Critical,
239}
240
241pub struct Profiler {
243 enabled: bool,
244 traces: Arc<RwLock<Vec<PerformanceTrace>>>,
245 sampling_rate: f64,
246}
247
248#[derive(Debug, Clone, Serialize, Deserialize)]
250pub struct PerformanceTrace {
251 pub operation: String,
252 pub start_time: DateTime<Utc>,
253 pub duration: Duration,
254 pub metadata: HashMap<String, String>,
255 pub call_stack: Vec<String>,
256}
257
258#[derive(Debug, Clone, Serialize, Deserialize)]
260pub struct LoadAverage {
261 pub one: f64,
262 pub five: f64,
263 pub fifteen: f64,
264}
265
266#[derive(Debug, Clone, Serialize, Deserialize)]
268pub struct SystemInfo {
269 pub total_memory: u64,
270 pub used_memory: u64,
271 pub total_swap: u64,
272 pub used_swap: u64,
273 pub cpu_count: usize,
274 pub load_average: LoadAverage,
275 pub boot_time: u64,
276 pub uptime: u64,
277}
278
279pub trait ComponentHealthChecker: Send + Sync {
281 fn component_name(&self) -> &str;
282 fn check_health(
283 &self,
284 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ComponentHealth> + Send + '_>>;
285}
286
287impl MetricsCollector {
288 pub fn new(config: MonitoringConfig) -> Self {
290 let health_checker = Arc::new(HealthChecker::new(config.clone()));
291 let profiler = if config.enable_profiling {
292 Some(Profiler::new(0.1)) } else {
294 None
295 };
296 let mut sys = System::new_all();
297 sys.refresh_all();
298
299 Self {
300 config,
301 metrics: Arc::new(RwLock::new(StreamingMetrics::default())),
302 health_checker,
303 profiler,
304 system: Arc::new(RwLock::new(sys)),
305 }
306 }
307
308 pub async fn start(&self) -> Result<()> {
310 info!(
311 "Starting metrics collection with interval: {:?}",
312 self.config.metrics_interval
313 );
314
315 self.start_metrics_collection().await;
317
318 self.start_health_checking().await;
320
321 self.start_system_metrics_collection().await;
323
324 Ok(())
325 }
326
327 pub async fn update_producer_metrics(&self, metrics: ProducerMetricsUpdate) {
329 let mut current_metrics = self.metrics.write().await;
330
331 current_metrics.producer_events_published += metrics.events_published;
332 current_metrics.producer_events_failed += metrics.events_failed;
333 current_metrics.producer_bytes_sent += metrics.bytes_sent;
334 current_metrics.producer_batches_sent += metrics.batches_sent;
335
336 if metrics.latency_ms > 0.0 {
337 current_metrics.producer_average_latency_ms =
338 (current_metrics.producer_average_latency_ms + metrics.latency_ms) / 2.0;
339 }
340
341 current_metrics.producer_throughput_eps = metrics.throughput_eps;
342 current_metrics.last_updated = Utc::now();
343 }
344
345 pub async fn update_consumer_metrics(&self, metrics: ConsumerMetricsUpdate) {
347 let mut current_metrics = self.metrics.write().await;
348
349 current_metrics.consumer_events_consumed += metrics.events_consumed;
350 current_metrics.consumer_events_processed += metrics.events_processed;
351 current_metrics.consumer_events_filtered += metrics.events_filtered;
352 current_metrics.consumer_events_failed += metrics.events_failed;
353 current_metrics.consumer_bytes_received += metrics.bytes_received;
354 current_metrics.consumer_batches_received += metrics.batches_received;
355
356 let _health = self.health_checker.get_health().await;
358
359 if metrics.processing_time_ms > 0.0 {
360 current_metrics.consumer_average_processing_time_ms =
361 (current_metrics.consumer_average_processing_time_ms + metrics.processing_time_ms)
362 / 2.0;
363 }
364
365 current_metrics.consumer_throughput_eps = metrics.throughput_eps;
366 current_metrics.consumer_lag_ms = metrics.lag_ms;
367 current_metrics.last_updated = Utc::now();
368 }
369
370 pub async fn update_backend_metrics(&self, metrics: BackendMetricsUpdate) {
372 let mut current_metrics = self.metrics.write().await;
373
374 current_metrics.backend_connections_active = metrics.connections_active;
375 current_metrics.backend_connections_idle = metrics.connections_idle;
376 current_metrics.backend_connection_errors += metrics.connection_errors;
377 current_metrics.backend_circuit_breaker_trips += metrics.circuit_breaker_trips;
378 current_metrics.backend_retry_attempts += metrics.retry_attempts;
379 current_metrics.last_updated = Utc::now();
380 }
381
382 pub async fn get_metrics(&self) -> StreamingMetrics {
384 self.metrics.read().await.clone()
385 }
386
387 async fn start_metrics_collection(&self) {
389 let metrics = self.metrics.clone();
390 let interval = self.config.metrics_interval;
391
392 tokio::spawn(async move {
393 let mut interval_timer = tokio::time::interval(interval);
394
395 loop {
396 interval_timer.tick().await;
397
398 let mut current_metrics = metrics.write().await;
400
401 let elapsed = current_metrics
403 .last_updated
404 .signed_duration_since(current_metrics.collection_start_time)
405 .num_seconds() as f64;
406
407 if elapsed > 0.0 {
408 let total_events = current_metrics.producer_events_published
410 + current_metrics.producer_events_failed;
411 if total_events > 0 {
412 current_metrics.error_rate =
413 current_metrics.producer_events_failed as f64 / total_events as f64;
414 current_metrics.success_rate = 1.0 - current_metrics.error_rate;
415 }
416
417 current_metrics.availability = if current_metrics.error_rate < 0.01 {
419 99.9 + (1.0 - current_metrics.error_rate) * 0.1
420 } else {
421 100.0 - (current_metrics.error_rate * 100.0)
422 };
423 }
424
425 debug!(
426 "Updated metrics: throughput={:.2} eps, error_rate={:.4}",
427 current_metrics.producer_throughput_eps, current_metrics.error_rate
428 );
429 }
430 });
431 }
432
433 async fn start_health_checking(&self) {
435 let health_checker = self.health_checker.clone();
436 let interval = self.config.health_check_interval;
437
438 tokio::spawn(async move {
439 let mut interval_timer = tokio::time::interval(interval);
440
441 loop {
442 interval_timer.tick().await;
443
444 if let Err(e) = health_checker.check_all_components().await {
445 error!("Health check failed: {}", e);
446 }
447 }
448 });
449 }
450
451 async fn start_system_metrics_collection(&self) {
453 let metrics = self.metrics.clone();
454 let system = self.system.clone();
455
456 tokio::spawn(async move {
457 let mut interval = tokio::time::interval(Duration::from_secs(10));
458 let mut previous_network_in = 0u64;
459 let mut previous_network_out = 0u64;
460
461 loop {
462 interval.tick().await;
463
464 {
466 let mut sys = system.write().await;
467 sys.refresh_cpu_all(); sys.refresh_memory();
469 sys.refresh_processes(ProcessesToUpdate::All, true); }
472
473 let mut current_metrics = metrics.write().await;
474 let sys = system.read().await;
475
476 current_metrics.system_memory_usage_bytes = sys.used_memory();
478 current_metrics.system_cpu_usage_percent = sys.global_cpu_usage() as f64; let (network_in, network_out) = Self::get_network_metrics(&sys);
482 current_metrics.system_network_bytes_in = previous_network_in + network_in;
483 current_metrics.system_network_bytes_out = previous_network_out + network_out;
484 previous_network_in = current_metrics.system_network_bytes_in;
485 previous_network_out = current_metrics.system_network_bytes_out;
486
487 if let Some(process) = sys.process(Pid::from_u32(std::process::id())) {
489 debug!("Process memory: {} bytes", process.memory());
491 }
492 }
493 });
494 }
495
496 fn get_network_metrics(_sys: &System) -> (u64, u64) {
498 (0, 0)
509 }
510
511 pub async fn get_system_info(&self) -> SystemInfo {
513 let sys = self.system.read().await;
514
515 SystemInfo {
516 total_memory: sys.total_memory(),
517 used_memory: sys.used_memory(),
518 total_swap: sys.total_swap(),
519 used_swap: sys.used_swap(),
520 cpu_count: sys.cpus().len(),
521 load_average: {
522 let load_avg = System::load_average();
523 LoadAverage {
524 one: load_avg.one,
525 five: load_avg.five,
526 fifteen: load_avg.fifteen,
527 }
528 },
529 boot_time: System::boot_time(),
530 uptime: System::uptime(),
531 }
532 }
533
534 pub async fn export_prometheus(&self) -> String {
536 let metrics = self.metrics.read().await;
537
538 format!(
539 r#"# HELP oxirs_producer_events_published_total Total number of events published by producers
540# TYPE oxirs_producer_events_published_total counter
541oxirs_producer_events_published_total {}
542
543# HELP oxirs_producer_events_failed_total Total number of failed events in producers
544# TYPE oxirs_producer_events_failed_total counter
545oxirs_producer_events_failed_total {}
546
547# HELP oxirs_producer_throughput_eps Current producer throughput in events per second
548# TYPE oxirs_producer_throughput_eps gauge
549oxirs_producer_throughput_eps {}
550
551# HELP oxirs_consumer_events_consumed_total Total number of events consumed
552# TYPE oxirs_consumer_events_consumed_total counter
553oxirs_consumer_events_consumed_total {}
554
555# HELP oxirs_consumer_throughput_eps Current consumer throughput in events per second
556# TYPE oxirs_consumer_throughput_eps gauge
557oxirs_consumer_throughput_eps {}
558
559# HELP oxirs_error_rate Current error rate
560# TYPE oxirs_error_rate gauge
561oxirs_error_rate {}
562
563# HELP oxirs_availability Current system availability percentage
564# TYPE oxirs_availability gauge
565oxirs_availability {}
566"#,
567 metrics.producer_events_published,
568 metrics.producer_events_failed,
569 metrics.producer_throughput_eps,
570 metrics.consumer_events_consumed,
571 metrics.consumer_throughput_eps,
572 metrics.error_rate,
573 metrics.availability
574 )
575 }
576
577 pub async fn get_health(&self) -> SystemHealth {
579 self.health_checker.get_health().await
580 }
581}
582
583impl HealthChecker {
584 pub fn new(config: MonitoringConfig) -> Self {
585 Self {
586 config,
587 health_status: Arc::new(RwLock::new(SystemHealth::default())),
588 component_checkers: Vec::new(),
589 }
590 }
591
592 pub fn add_component_checker(&mut self, checker: Box<dyn ComponentHealthChecker>) {
594 self.component_checkers.push(checker);
595 }
596
597 pub async fn check_all_components(&self) -> Result<()> {
599 let mut component_health = HashMap::new();
600 let mut overall_status = HealthStatus::Healthy;
601 let mut alerts = Vec::new();
602
603 for checker in &self.component_checkers {
604 let health = checker.check_health().await;
605 let component_name = checker.component_name().to_string();
606
607 match health.status {
608 HealthStatus::Warning => {
609 if overall_status == HealthStatus::Healthy {
610 overall_status = HealthStatus::Warning;
611 }
612 alerts.push(HealthAlert {
613 id: uuid::Uuid::new_v4().to_string(),
614 component: component_name.clone(),
615 severity: AlertSeverity::Warning,
616 message: health.message.clone(),
617 timestamp: Utc::now(),
618 resolved: false,
619 });
620 }
621 HealthStatus::Critical => {
622 overall_status = HealthStatus::Critical;
623 alerts.push(HealthAlert {
624 id: uuid::Uuid::new_v4().to_string(),
625 component: component_name.clone(),
626 severity: AlertSeverity::Critical,
627 message: health.message.clone(),
628 timestamp: Utc::now(),
629 resolved: false,
630 });
631 }
632 _ => {}
633 }
634
635 component_health.insert(component_name, health);
636 }
637
638 let mut health_status = self.health_status.write().await;
639 health_status.overall_status = overall_status;
640 health_status.component_health = component_health;
641 health_status.last_check = Utc::now();
642 health_status.alerts.extend(alerts);
643
644 Ok(())
645 }
646
647 pub async fn get_health(&self) -> SystemHealth {
649 self.health_status.read().await.clone()
650 }
651
652 pub async fn assess_system_health(
654 &self,
655 metrics: &StreamingMetrics,
656 system_info: &SystemInfo,
657 ) -> Result<()> {
658 let mut health_alerts = Vec::new();
659 let now = Utc::now();
660 let mut alert_id = 1;
661
662 let memory_usage_percent =
664 (system_info.used_memory as f64 / system_info.total_memory as f64) * 100.0;
665 if memory_usage_percent > 90.0 {
666 health_alerts.push(HealthAlert {
667 id: format!("memory_critical_{alert_id}"),
668 component: "system".to_string(),
669 severity: AlertSeverity::Critical,
670 message: format!("Critical memory usage: {memory_usage_percent:.1}%"),
671 timestamp: now,
672 resolved: false,
673 });
674 alert_id += 1;
675 } else if memory_usage_percent > 80.0 {
676 health_alerts.push(HealthAlert {
677 id: format!("memory_warning_{alert_id}"),
678 component: "system".to_string(),
679 severity: AlertSeverity::Warning,
680 message: format!("High memory usage: {memory_usage_percent:.1}%"),
681 timestamp: now,
682 resolved: false,
683 });
684 alert_id += 1;
685 }
686
687 if metrics.system_cpu_usage_percent > 95.0 {
689 health_alerts.push(HealthAlert {
690 id: format!("cpu_critical_{alert_id}"),
691 component: "system".to_string(),
692 severity: AlertSeverity::Critical,
693 message: format!(
694 "Critical CPU usage: {:.1}%",
695 metrics.system_cpu_usage_percent
696 ),
697 timestamp: now,
698 resolved: false,
699 });
700 alert_id += 1;
701 } else if metrics.system_cpu_usage_percent > 85.0 {
702 health_alerts.push(HealthAlert {
703 id: format!("cpu_warning_{alert_id}"),
704 component: "system".to_string(),
705 severity: AlertSeverity::Warning,
706 message: format!("High CPU usage: {:.1}%", metrics.system_cpu_usage_percent),
707 timestamp: now,
708 resolved: false,
709 });
710 alert_id += 1;
711 }
712
713 if metrics.producer_events_failed > 0 {
715 let total_producer_events =
716 metrics.producer_events_published + metrics.producer_events_failed;
717 if total_producer_events > 0 {
718 let failure_rate =
719 metrics.producer_events_failed as f64 / total_producer_events as f64;
720 if failure_rate > 0.10 {
721 health_alerts.push(HealthAlert {
722 id: format!("producer_failure_{alert_id}"),
723 component: "producer".to_string(),
724 severity: AlertSeverity::Critical,
725 message: format!(
726 "High producer failure rate: {:.2}%",
727 failure_rate * 100.0
728 ),
729 timestamp: now,
730 resolved: false,
731 });
732 alert_id += 1;
733 } else if failure_rate > 0.05 {
734 health_alerts.push(HealthAlert {
735 id: format!("producer_failure_{alert_id}"),
736 component: "producer".to_string(),
737 severity: AlertSeverity::Warning,
738 message: format!(
739 "Elevated producer failure rate: {:.2}%",
740 failure_rate * 100.0
741 ),
742 timestamp: now,
743 resolved: false,
744 });
745 alert_id += 1;
746 }
747 }
748 }
749
750 if metrics.consumer_events_consumed > 0 && metrics.consumer_events_failed > 0 {
752 let failure_rate =
753 metrics.consumer_events_failed as f64 / metrics.consumer_events_consumed as f64;
754 if failure_rate > 0.10 {
755 health_alerts.push(HealthAlert {
756 id: format!("consumer_failure_{alert_id}"),
757 component: "consumer".to_string(),
758 severity: AlertSeverity::Critical,
759 message: format!("High consumer failure rate: {:.2}%", failure_rate * 100.0),
760 timestamp: now,
761 resolved: false,
762 });
763 alert_id += 1;
764 } else if failure_rate > 0.05 {
765 health_alerts.push(HealthAlert {
766 id: format!("consumer_failure_{alert_id}"),
767 component: "consumer".to_string(),
768 severity: AlertSeverity::Warning,
769 message: format!(
770 "Elevated consumer failure rate: {:.2}%",
771 failure_rate * 100.0
772 ),
773 timestamp: now,
774 resolved: false,
775 });
776 alert_id += 1;
777 }
778 }
779
780 if metrics.producer_average_latency_ms > 2000.0 {
782 health_alerts.push(HealthAlert {
783 id: format!("producer_latency_{alert_id}"),
784 component: "producer".to_string(),
785 severity: AlertSeverity::Critical,
786 message: format!(
787 "Critical producer latency: {:.2}ms",
788 metrics.producer_average_latency_ms
789 ),
790 timestamp: now,
791 resolved: false,
792 });
793 alert_id += 1;
794 } else if metrics.producer_average_latency_ms > 1000.0 {
795 health_alerts.push(HealthAlert {
796 id: format!("producer_latency_{alert_id}"),
797 component: "producer".to_string(),
798 severity: AlertSeverity::Warning,
799 message: format!(
800 "High producer latency: {:.2}ms",
801 metrics.producer_average_latency_ms
802 ),
803 timestamp: now,
804 resolved: false,
805 });
806 alert_id += 1;
807 }
808
809 if metrics.consumer_average_processing_time_ms > 1000.0 {
810 health_alerts.push(HealthAlert {
811 id: format!("consumer_processing_{alert_id}"),
812 component: "consumer".to_string(),
813 severity: AlertSeverity::Critical,
814 message: format!(
815 "Critical consumer processing time: {:.2}ms",
816 metrics.consumer_average_processing_time_ms
817 ),
818 timestamp: now,
819 resolved: false,
820 });
821 alert_id += 1;
822 } else if metrics.consumer_average_processing_time_ms > 500.0 {
823 health_alerts.push(HealthAlert {
824 id: format!("consumer_processing_{alert_id}"),
825 component: "consumer".to_string(),
826 severity: AlertSeverity::Warning,
827 message: format!(
828 "High consumer processing time: {:.2}ms",
829 metrics.consumer_average_processing_time_ms
830 ),
831 timestamp: now,
832 resolved: false,
833 });
834 alert_id += 1;
835 }
836
837 if metrics.backend_connection_errors > 0 {
839 let total_connections =
840 metrics.backend_connections_active + metrics.backend_connections_idle;
841 if total_connections > 0 {
842 let error_rate =
843 metrics.backend_connection_errors as f64 / total_connections as f64;
844 if error_rate > 0.20 {
845 health_alerts.push(HealthAlert {
846 id: format!("connection_errors_{alert_id}"),
847 component: "backend".to_string(),
848 severity: AlertSeverity::Critical,
849 message: format!("High connection error rate: {:.2}%", error_rate * 100.0),
850 timestamp: now,
851 resolved: false,
852 });
853 }
854 }
855 }
856
857 let health_status = if health_alerts.is_empty() {
859 HealthStatus::Healthy
860 } else {
861 let critical_alerts = health_alerts
862 .iter()
863 .filter(|a| matches!(a.severity, AlertSeverity::Critical))
864 .count();
865 if critical_alerts > 0 {
866 HealthStatus::Critical
867 } else {
868 HealthStatus::Warning
869 }
870 };
871
872 if !health_alerts.is_empty() {
873 warn!(
874 "System health alerts detected: {} total, {} critical",
875 health_alerts.len(),
876 health_alerts
877 .iter()
878 .filter(|a| matches!(a.severity, AlertSeverity::Critical))
879 .count()
880 );
881 }
882
883 let system_health = SystemHealth {
885 overall_status: health_status,
886 component_health: HashMap::new(),
887 last_check: now,
888 uptime: Duration::from_secs(system_info.uptime),
889 alerts: health_alerts,
890 };
891
892 *self.health_status.write().await = system_health;
893 Ok(())
894 }
895}
896
897impl Profiler {
898 fn new(sampling_rate: f64) -> Self {
899 Self {
900 enabled: true,
901 traces: Arc::new(RwLock::new(Vec::new())),
902 sampling_rate,
903 }
904 }
905
906 pub async fn start_trace(&self, operation: String) -> Option<TraceHandle> {
908 if !self.enabled || fastrand::f64() > self.sampling_rate {
909 return None;
910 }
911
912 Some(TraceHandle {
913 operation,
914 start_time: Instant::now(),
915 timestamp: Utc::now(),
916 traces: self.traces.clone(),
917 })
918 }
919}
920
921pub struct TraceHandle {
923 operation: String,
924 start_time: Instant,
925 timestamp: DateTime<Utc>,
926 traces: Arc<RwLock<Vec<PerformanceTrace>>>,
927}
928
929impl Drop for TraceHandle {
930 fn drop(&mut self) {
931 let duration = self.start_time.elapsed();
932 let trace = PerformanceTrace {
933 operation: self.operation.clone(),
934 start_time: self.timestamp,
935 duration,
936 metadata: HashMap::new(),
937 call_stack: Vec::new(), };
939
940 let traces = self.traces.clone();
941 tokio::spawn(async move {
942 traces.write().await.push(trace);
943 });
944 }
945}
946
947impl Default for SystemHealth {
948 fn default() -> Self {
949 Self {
950 overall_status: HealthStatus::Unknown,
951 component_health: HashMap::new(),
952 last_check: Utc::now(),
953 uptime: Duration::from_secs(0),
954 alerts: Vec::new(),
955 }
956 }
957}
958
959#[derive(Debug, Clone)]
961pub struct ProducerMetricsUpdate {
962 pub events_published: u64,
963 pub events_failed: u64,
964 pub bytes_sent: u64,
965 pub batches_sent: u64,
966 pub latency_ms: f64,
967 pub throughput_eps: f64,
968}
969
970#[derive(Debug, Clone)]
971pub struct ConsumerMetricsUpdate {
972 pub events_consumed: u64,
973 pub events_processed: u64,
974 pub events_filtered: u64,
975 pub events_failed: u64,
976 pub bytes_received: u64,
977 pub batches_received: u64,
978 pub processing_time_ms: f64,
979 pub throughput_eps: f64,
980 pub lag_ms: Option<f64>,
981}
982
983#[derive(Debug, Clone)]
984pub struct BackendMetricsUpdate {
985 pub connections_active: u32,
986 pub connections_idle: u32,
987 pub connection_errors: u64,
988 pub circuit_breaker_trips: u64,
989 pub retry_attempts: u64,
990}
991
992#[cfg(test)]
993mod tests {
994 use super::*;
995
996 #[tokio::test]
997 async fn test_metrics_collection() {
998 let config = MonitoringConfig {
999 enable_metrics: true,
1000 enable_tracing: true,
1001 metrics_interval: Duration::from_millis(100),
1002 health_check_interval: Duration::from_millis(100),
1003 enable_profiling: false,
1004 prometheus_endpoint: None,
1005 jaeger_endpoint: None,
1006 log_level: "info".to_string(),
1007 };
1008
1009 let collector = MetricsCollector::new(config);
1010
1011 collector
1013 .update_producer_metrics(ProducerMetricsUpdate {
1014 events_published: 100,
1015 events_failed: 5,
1016 bytes_sent: 1024,
1017 batches_sent: 10,
1018 latency_ms: 5.0,
1019 throughput_eps: 1000.0,
1020 })
1021 .await;
1022
1023 let metrics = collector.get_metrics().await;
1024 assert_eq!(metrics.producer_events_published, 100);
1025 assert_eq!(metrics.producer_events_failed, 5);
1026 assert_eq!(metrics.producer_throughput_eps, 1000.0);
1027 }
1028
1029 #[tokio::test]
1030 async fn test_prometheus_export() {
1031 let config = MonitoringConfig {
1032 enable_metrics: true,
1033 enable_tracing: false,
1034 metrics_interval: Duration::from_secs(60),
1035 health_check_interval: Duration::from_secs(30),
1036 enable_profiling: false,
1037 prometheus_endpoint: None,
1038 jaeger_endpoint: None,
1039 log_level: "info".to_string(),
1040 };
1041
1042 let collector = MetricsCollector::new(config);
1043
1044 collector
1045 .update_producer_metrics(ProducerMetricsUpdate {
1046 events_published: 500,
1047 events_failed: 10,
1048 bytes_sent: 2048,
1049 batches_sent: 50,
1050 latency_ms: 3.0,
1051 throughput_eps: 2000.0,
1052 })
1053 .await;
1054
1055 let prometheus_output = collector.export_prometheus().await;
1056 assert!(prometheus_output.contains("oxirs_producer_events_published_total 500"));
1057 assert!(prometheus_output.contains("oxirs_producer_events_failed_total 10"));
1058 }
1059
1060 #[tokio::test]
1061 async fn test_health_checking() {
1062 let config = MonitoringConfig {
1063 enable_metrics: true,
1064 enable_tracing: false,
1065 metrics_interval: Duration::from_secs(60),
1066 health_check_interval: Duration::from_secs(30),
1067 enable_profiling: false,
1068 prometheus_endpoint: None,
1069 jaeger_endpoint: None,
1070 log_level: "info".to_string(),
1071 };
1072
1073 let mut health_checker = HealthChecker::new(config);
1074
1075 struct MockChecker;
1077
1078 impl ComponentHealthChecker for MockChecker {
1079 fn component_name(&self) -> &str {
1080 "mock_component"
1081 }
1082
1083 fn check_health(
1084 &self,
1085 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ComponentHealth> + Send + '_>>
1086 {
1087 Box::pin(async move {
1088 ComponentHealth {
1089 status: HealthStatus::Healthy,
1090 message: "Component is healthy".to_string(),
1091 last_check: Utc::now(),
1092 metrics: HashMap::new(),
1093 dependencies: vec!["database".to_string()],
1094 }
1095 })
1096 }
1097 }
1098
1099 health_checker.add_component_checker(Box::new(MockChecker));
1100 health_checker.check_all_components().await.unwrap();
1101
1102 let health = health_checker.get_health().await;
1103 assert_eq!(health.overall_status, HealthStatus::Healthy);
1104 assert!(health.component_health.contains_key("mock_component"));
1105 }
1106
1107 #[tokio::test]
1108 async fn test_consumer_metrics_update() {
1109 let config = MonitoringConfig {
1110 enable_metrics: true,
1111 enable_tracing: false,
1112 metrics_interval: Duration::from_secs(60),
1113 health_check_interval: Duration::from_secs(30),
1114 enable_profiling: false,
1115 prometheus_endpoint: None,
1116 jaeger_endpoint: None,
1117 log_level: "info".to_string(),
1118 };
1119
1120 let collector = MetricsCollector::new(config);
1121
1122 collector
1123 .update_consumer_metrics(ConsumerMetricsUpdate {
1124 events_consumed: 1000,
1125 events_processed: 950,
1126 events_filtered: 50,
1127 events_failed: 10,
1128 bytes_received: 4096,
1129 batches_received: 100,
1130 processing_time_ms: 2.5,
1131 throughput_eps: 1500.0,
1132 lag_ms: Some(100.0),
1133 })
1134 .await;
1135
1136 let metrics = collector.get_metrics().await;
1137 assert_eq!(metrics.consumer_events_consumed, 1000);
1138 assert_eq!(metrics.consumer_events_processed, 950);
1139 assert_eq!(metrics.consumer_throughput_eps, 1500.0);
1140 assert_eq!(metrics.consumer_lag_ms, Some(100.0));
1141 }
1142
1143 #[tokio::test]
1144 async fn test_backend_metrics_update() {
1145 let config = MonitoringConfig {
1146 enable_metrics: true,
1147 enable_tracing: false,
1148 metrics_interval: Duration::from_secs(60),
1149 health_check_interval: Duration::from_secs(30),
1150 enable_profiling: false,
1151 prometheus_endpoint: None,
1152 jaeger_endpoint: None,
1153 log_level: "info".to_string(),
1154 };
1155
1156 let collector = MetricsCollector::new(config);
1157
1158 collector
1159 .update_backend_metrics(BackendMetricsUpdate {
1160 connections_active: 5,
1161 connections_idle: 3,
1162 connection_errors: 2,
1163 circuit_breaker_trips: 1,
1164 retry_attempts: 5,
1165 })
1166 .await;
1167
1168 let metrics = collector.get_metrics().await;
1169 assert_eq!(metrics.backend_connections_active, 5);
1170 assert_eq!(metrics.backend_connections_idle, 3);
1171 assert_eq!(metrics.backend_connection_errors, 2);
1172 }
1173
1174 #[test]
1175 fn test_health_status_serialization() {
1176 let health = SystemHealth {
1177 overall_status: HealthStatus::Warning,
1178 component_health: {
1179 let mut health_map = HashMap::new();
1180 health_map.insert(
1181 "database".to_string(),
1182 ComponentHealth {
1183 status: HealthStatus::Warning,
1184 message: "High latency detected".to_string(),
1185 last_check: Utc::now(),
1186 metrics: {
1187 let mut metrics = HashMap::new();
1188 metrics.insert("latency_ms".to_string(), 150.0);
1189 metrics
1190 },
1191 dependencies: vec!["network".to_string()],
1192 },
1193 );
1194 health_map
1195 },
1196 last_check: Utc::now(),
1197 uptime: Duration::from_secs(3600),
1198 alerts: vec![HealthAlert {
1199 id: "alert-1".to_string(),
1200 component: "database".to_string(),
1201 severity: AlertSeverity::Warning,
1202 message: "High latency detected".to_string(),
1203 timestamp: Utc::now(),
1204 resolved: false,
1205 }],
1206 };
1207
1208 let serialized = serde_json::to_string(&health).unwrap();
1209 let deserialized: SystemHealth = serde_json::from_str(&serialized).unwrap();
1210
1211 assert_eq!(deserialized.overall_status, HealthStatus::Warning);
1212 assert_eq!(deserialized.component_health.len(), 1);
1213 assert_eq!(deserialized.alerts.len(), 1);
1214 }
1215
1216 #[tokio::test]
1217 async fn test_profiler() {
1218 let profiler = Profiler::new(1.0); {
1221 let _trace = profiler.start_trace("test_operation".to_string()).await;
1222 tokio::time::sleep(Duration::from_millis(10)).await;
1224 } tokio::time::sleep(Duration::from_millis(50)).await;
1228
1229 let traces = profiler.traces.read().await;
1230 assert_eq!(traces.len(), 1);
1231 assert_eq!(traces[0].operation, "test_operation");
1232 assert!(traces[0].duration >= Duration::from_millis(10));
1233 }
1234
1235 #[test]
1236 fn test_metrics_update_structures() {
1237 let producer_update = ProducerMetricsUpdate {
1238 events_published: 100,
1239 events_failed: 2,
1240 bytes_sent: 1024,
1241 batches_sent: 10,
1242 latency_ms: 5.5,
1243 throughput_eps: 200.0,
1244 };
1245
1246 assert_eq!(producer_update.events_published, 100);
1247 assert_eq!(producer_update.latency_ms, 5.5);
1248
1249 let consumer_update = ConsumerMetricsUpdate {
1250 events_consumed: 95,
1251 events_processed: 90,
1252 events_filtered: 5,
1253 events_failed: 1,
1254 bytes_received: 950,
1255 batches_received: 9,
1256 processing_time_ms: 2.0,
1257 throughput_eps: 190.0,
1258 lag_ms: Some(50.0),
1259 };
1260
1261 assert_eq!(consumer_update.events_consumed, 95);
1262 assert_eq!(consumer_update.lag_ms, Some(50.0));
1263
1264 let backend_update = BackendMetricsUpdate {
1265 connections_active: 3,
1266 connections_idle: 2,
1267 connection_errors: 1,
1268 circuit_breaker_trips: 0,
1269 retry_attempts: 2,
1270 };
1271
1272 assert_eq!(backend_update.connections_active, 3);
1273 assert_eq!(backend_update.retry_attempts, 2);
1274 }
1275}