Skip to main content

oxirs_stream/
monitoring.rs

1//! # Monitoring and Observability
2//!
3//! Comprehensive monitoring, metrics collection, and observability features
4//! for the OxiRS streaming platform.
5
6use anyhow::Result;
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12use sysinfo::{Pid, ProcessesToUpdate, System};
13use tokio::sync::RwLock;
14use tracing::{debug, error, info, warn};
15
16/// Monitoring configuration
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct MonitoringConfig {
19    pub enable_metrics: bool,
20    pub enable_tracing: bool,
21    pub metrics_interval: Duration,
22    pub health_check_interval: Duration,
23    pub enable_profiling: bool,
24    pub prometheus_endpoint: Option<String>,
25    pub jaeger_endpoint: Option<String>,
26    pub log_level: String,
27}
28
29/// Metrics collector for streaming operations
30pub struct MetricsCollector {
31    config: MonitoringConfig,
32    metrics: Arc<RwLock<StreamingMetrics>>,
33    health_checker: Arc<HealthChecker>,
34    profiler: Option<Profiler>,
35    system: Arc<RwLock<System>>,
36}
37
38/// Comprehensive streaming metrics
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct StreamingMetrics {
41    // Producer metrics
42    pub producer_events_published: u64,
43    pub producer_events_failed: u64,
44    pub producer_bytes_sent: u64,
45    pub producer_batches_sent: u64,
46    pub producer_average_latency_ms: f64,
47    pub producer_throughput_eps: f64,
48
49    // Consumer metrics
50    pub consumer_events_consumed: u64,
51    pub consumer_events_processed: u64,
52    pub consumer_events_filtered: u64,
53    pub consumer_events_failed: u64,
54    pub consumer_bytes_received: u64,
55    pub consumer_batches_received: u64,
56    pub consumer_average_processing_time_ms: f64,
57    pub consumer_throughput_eps: f64,
58    pub consumer_lag_ms: Option<f64>,
59
60    // System metrics
61    pub system_memory_usage_bytes: u64,
62    pub system_cpu_usage_percent: f64,
63    pub system_network_bytes_in: u64,
64    pub system_network_bytes_out: u64,
65    pub system_gc_collections: u64,
66    pub system_gc_time_ms: u64,
67
68    // Backend metrics
69    pub backend_connections_active: u32,
70    pub backend_connections_idle: u32,
71    pub backend_connection_errors: u64,
72    pub backend_circuit_breaker_trips: u64,
73    pub backend_retry_attempts: u64,
74
75    // Stream processing metrics
76    pub window_operations_count: u64,
77    pub aggregation_operations_count: u64,
78    pub pattern_matches_found: u64,
79    pub state_store_operations: u64,
80    pub subscriptions_active: u32,
81
82    // Quality metrics
83    pub message_loss_rate: f64,
84    pub duplicate_rate: f64,
85    pub out_of_order_rate: f64,
86    pub error_rate: f64,
87    pub success_rate: f64,
88    pub availability: f64,
89
90    // Dead Letter Queue metrics
91    pub dlq_messages_count: u64,
92    pub dlq_messages_per_second: f64,
93    pub dlq_processing_rate: f64,
94    pub dlq_oldest_message_age_ms: u64,
95    pub dlq_replay_success_rate: f64,
96    pub dlq_total_replayed: u64,
97    pub dlq_size_bytes: u64,
98    pub dlq_error_categories: HashMap<String, u64>,
99
100    // Timestamps
101    pub last_updated: DateTime<Utc>,
102    pub collection_start_time: DateTime<Utc>,
103}
104
105impl Default for StreamingMetrics {
106    fn default() -> Self {
107        let now = Utc::now();
108        Self {
109            // Producer metrics
110            producer_events_published: 0,
111            producer_events_failed: 0,
112            producer_bytes_sent: 0,
113            producer_batches_sent: 0,
114            producer_average_latency_ms: 0.0,
115            producer_throughput_eps: 0.0,
116
117            // Consumer metrics
118            consumer_events_consumed: 0,
119            consumer_events_processed: 0,
120            consumer_events_filtered: 0,
121            consumer_events_failed: 0,
122            consumer_bytes_received: 0,
123            consumer_batches_received: 0,
124            consumer_average_processing_time_ms: 0.0,
125            consumer_throughput_eps: 0.0,
126            consumer_lag_ms: None,
127
128            // System metrics
129            system_memory_usage_bytes: 0,
130            system_cpu_usage_percent: 0.0,
131            system_network_bytes_in: 0,
132            system_network_bytes_out: 0,
133            system_gc_collections: 0,
134            system_gc_time_ms: 0,
135
136            // Backend metrics
137            backend_connections_active: 0,
138            backend_connections_idle: 0,
139            backend_connection_errors: 0,
140            backend_circuit_breaker_trips: 0,
141            backend_retry_attempts: 0,
142
143            // Stream processing metrics
144            window_operations_count: 0,
145            aggregation_operations_count: 0,
146            pattern_matches_found: 0,
147            state_store_operations: 0,
148            subscriptions_active: 0,
149
150            // Quality metrics
151            message_loss_rate: 0.0,
152            duplicate_rate: 0.0,
153            out_of_order_rate: 0.0,
154            error_rate: 0.0,
155            success_rate: 100.0, // Start with 100% success rate
156            availability: 100.0, // Start with 100% availability
157
158            // Dead Letter Queue metrics
159            dlq_messages_count: 0,
160            dlq_messages_per_second: 0.0,
161            dlq_processing_rate: 0.0,
162            dlq_oldest_message_age_ms: 0,
163            dlq_replay_success_rate: 100.0, // Start with 100% replay success rate
164            dlq_total_replayed: 0,
165            dlq_size_bytes: 0,
166            dlq_error_categories: HashMap::new(),
167
168            // Timestamps
169            last_updated: now,
170            collection_start_time: now,
171        }
172    }
173}
174
175/// Health checker for system health monitoring
176pub struct HealthChecker {
177    config: MonitoringConfig,
178    health_status: Arc<RwLock<SystemHealth>>,
179    component_checkers: Vec<Box<dyn ComponentHealthChecker>>,
180}
181
182/// System health status
183#[derive(Debug, Clone, Serialize, Deserialize)]
184pub struct SystemHealth {
185    pub overall_status: HealthStatus,
186    pub component_health: HashMap<String, ComponentHealth>,
187    pub last_check: DateTime<Utc>,
188    pub uptime: Duration,
189    pub alerts: Vec<HealthAlert>,
190}
191
192/// Health status levels
193#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
194pub enum HealthStatus {
195    Healthy,
196    Warning,
197    Critical,
198    Unknown,
199}
200
201impl std::fmt::Display for HealthStatus {
202    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203        match self {
204            HealthStatus::Healthy => write!(f, "healthy"),
205            HealthStatus::Warning => write!(f, "warning"),
206            HealthStatus::Critical => write!(f, "critical"),
207            HealthStatus::Unknown => write!(f, "unknown"),
208        }
209    }
210}
211
212/// Component health information
213#[derive(Debug, Clone, Serialize, Deserialize)]
214pub struct ComponentHealth {
215    pub status: HealthStatus,
216    pub message: String,
217    pub last_check: DateTime<Utc>,
218    pub metrics: HashMap<String, f64>,
219    pub dependencies: Vec<String>,
220}
221
222/// Health alert
223#[derive(Debug, Clone, Serialize, Deserialize)]
224pub struct HealthAlert {
225    pub id: String,
226    pub component: String,
227    pub severity: AlertSeverity,
228    pub message: String,
229    pub timestamp: DateTime<Utc>,
230    pub resolved: bool,
231}
232
233/// Alert severity levels
234#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
235pub enum AlertSeverity {
236    Info,
237    Warning,
238    Critical,
239}
240
241/// Profiler for performance analysis
242pub struct Profiler {
243    enabled: bool,
244    traces: Arc<RwLock<Vec<PerformanceTrace>>>,
245    sampling_rate: f64,
246}
247
248/// Performance trace data
249#[derive(Debug, Clone, Serialize, Deserialize)]
250pub struct PerformanceTrace {
251    pub operation: String,
252    pub start_time: DateTime<Utc>,
253    pub duration: Duration,
254    pub metadata: HashMap<String, String>,
255    pub call_stack: Vec<String>,
256}
257
258/// Load average information
259#[derive(Debug, Clone, Serialize, Deserialize)]
260pub struct LoadAverage {
261    pub one: f64,
262    pub five: f64,
263    pub fifteen: f64,
264}
265
266/// System information for detailed monitoring
267#[derive(Debug, Clone, Serialize, Deserialize)]
268pub struct SystemInfo {
269    pub total_memory: u64,
270    pub used_memory: u64,
271    pub total_swap: u64,
272    pub used_swap: u64,
273    pub cpu_count: usize,
274    pub load_average: LoadAverage,
275    pub boot_time: u64,
276    pub uptime: u64,
277}
278
279/// Component health checker trait
280pub trait ComponentHealthChecker: Send + Sync {
281    fn component_name(&self) -> &str;
282    fn check_health(
283        &self,
284    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ComponentHealth> + Send + '_>>;
285}
286
287impl MetricsCollector {
288    /// Create a new metrics collector
289    pub fn new(config: MonitoringConfig) -> Self {
290        let health_checker = Arc::new(HealthChecker::new(config.clone()));
291        let profiler = if config.enable_profiling {
292            Some(Profiler::new(0.1)) // 10% sampling rate
293        } else {
294            None
295        };
296        let mut sys = System::new_all();
297        sys.refresh_all();
298
299        Self {
300            config,
301            metrics: Arc::new(RwLock::new(StreamingMetrics::default())),
302            health_checker,
303            profiler,
304            system: Arc::new(RwLock::new(sys)),
305        }
306    }
307
308    /// Start metrics collection
309    pub async fn start(&self) -> Result<()> {
310        info!(
311            "Starting metrics collection with interval: {:?}",
312            self.config.metrics_interval
313        );
314
315        // Start metrics collection task
316        self.start_metrics_collection().await;
317
318        // Start health checking task
319        self.start_health_checking().await;
320
321        // Start system metrics collection
322        self.start_system_metrics_collection().await;
323
324        Ok(())
325    }
326
327    /// Update producer metrics
328    pub async fn update_producer_metrics(&self, metrics: ProducerMetricsUpdate) {
329        let mut current_metrics = self.metrics.write().await;
330
331        current_metrics.producer_events_published += metrics.events_published;
332        current_metrics.producer_events_failed += metrics.events_failed;
333        current_metrics.producer_bytes_sent += metrics.bytes_sent;
334        current_metrics.producer_batches_sent += metrics.batches_sent;
335
336        if metrics.latency_ms > 0.0 {
337            current_metrics.producer_average_latency_ms =
338                (current_metrics.producer_average_latency_ms + metrics.latency_ms) / 2.0;
339        }
340
341        current_metrics.producer_throughput_eps = metrics.throughput_eps;
342        current_metrics.last_updated = Utc::now();
343    }
344
345    /// Update consumer metrics
346    pub async fn update_consumer_metrics(&self, metrics: ConsumerMetricsUpdate) {
347        let mut current_metrics = self.metrics.write().await;
348
349        current_metrics.consumer_events_consumed += metrics.events_consumed;
350        current_metrics.consumer_events_processed += metrics.events_processed;
351        current_metrics.consumer_events_filtered += metrics.events_filtered;
352        current_metrics.consumer_events_failed += metrics.events_failed;
353        current_metrics.consumer_bytes_received += metrics.bytes_received;
354        current_metrics.consumer_batches_received += metrics.batches_received;
355
356        // Enhanced health assessment based on metrics trends
357        let _health = self.health_checker.get_health().await;
358
359        if metrics.processing_time_ms > 0.0 {
360            current_metrics.consumer_average_processing_time_ms =
361                (current_metrics.consumer_average_processing_time_ms + metrics.processing_time_ms)
362                    / 2.0;
363        }
364
365        current_metrics.consumer_throughput_eps = metrics.throughput_eps;
366        current_metrics.consumer_lag_ms = metrics.lag_ms;
367        current_metrics.last_updated = Utc::now();
368    }
369
370    /// Update backend metrics
371    pub async fn update_backend_metrics(&self, metrics: BackendMetricsUpdate) {
372        let mut current_metrics = self.metrics.write().await;
373
374        current_metrics.backend_connections_active = metrics.connections_active;
375        current_metrics.backend_connections_idle = metrics.connections_idle;
376        current_metrics.backend_connection_errors += metrics.connection_errors;
377        current_metrics.backend_circuit_breaker_trips += metrics.circuit_breaker_trips;
378        current_metrics.backend_retry_attempts += metrics.retry_attempts;
379        current_metrics.last_updated = Utc::now();
380    }
381
382    /// Get current metrics snapshot
383    pub async fn get_metrics(&self) -> StreamingMetrics {
384        self.metrics.read().await.clone()
385    }
386
387    /// Start metrics collection task
388    async fn start_metrics_collection(&self) {
389        let metrics = self.metrics.clone();
390        let interval = self.config.metrics_interval;
391
392        tokio::spawn(async move {
393            let mut interval_timer = tokio::time::interval(interval);
394
395            loop {
396                interval_timer.tick().await;
397
398                // Collect system metrics
399                let mut current_metrics = metrics.write().await;
400
401                // Calculate rates and derived metrics
402                let elapsed = current_metrics
403                    .last_updated
404                    .signed_duration_since(current_metrics.collection_start_time)
405                    .num_seconds() as f64;
406
407                if elapsed > 0.0 {
408                    // Calculate error rate
409                    let total_events = current_metrics.producer_events_published
410                        + current_metrics.producer_events_failed;
411                    if total_events > 0 {
412                        current_metrics.error_rate =
413                            current_metrics.producer_events_failed as f64 / total_events as f64;
414                        current_metrics.success_rate = 1.0 - current_metrics.error_rate;
415                    }
416
417                    // Calculate availability (simplified)
418                    current_metrics.availability = if current_metrics.error_rate < 0.01 {
419                        99.9 + (1.0 - current_metrics.error_rate) * 0.1
420                    } else {
421                        100.0 - (current_metrics.error_rate * 100.0)
422                    };
423                }
424
425                debug!(
426                    "Updated metrics: throughput={:.2} eps, error_rate={:.4}",
427                    current_metrics.producer_throughput_eps, current_metrics.error_rate
428                );
429            }
430        });
431    }
432
433    /// Start health checking task
434    async fn start_health_checking(&self) {
435        let health_checker = self.health_checker.clone();
436        let interval = self.config.health_check_interval;
437
438        tokio::spawn(async move {
439            let mut interval_timer = tokio::time::interval(interval);
440
441            loop {
442                interval_timer.tick().await;
443
444                if let Err(e) = health_checker.check_all_components().await {
445                    error!("Health check failed: {}", e);
446                }
447            }
448        });
449    }
450
451    /// Start system metrics collection
452    async fn start_system_metrics_collection(&self) {
453        let metrics = self.metrics.clone();
454        let system = self.system.clone();
455
456        tokio::spawn(async move {
457            let mut interval = tokio::time::interval(Duration::from_secs(10));
458            let mut previous_network_in = 0u64;
459            let mut previous_network_out = 0u64;
460
461            loop {
462                interval.tick().await;
463
464                // Refresh system information
465                {
466                    let mut sys = system.write().await;
467                    sys.refresh_cpu_all(); // sysinfo 0.33: refresh_cpu() → refresh_cpu_all()
468                    sys.refresh_memory();
469                    // Network refresh is handled separately if available
470                    sys.refresh_processes(ProcessesToUpdate::All, true); // sysinfo 0.33: now requires 2 args
471                }
472
473                let mut current_metrics = metrics.write().await;
474                let sys = system.read().await;
475
476                // Real system metrics collection
477                current_metrics.system_memory_usage_bytes = sys.used_memory();
478                current_metrics.system_cpu_usage_percent = sys.global_cpu_usage() as f64; // sysinfo 0.33: global_cpu_info().cpu_usage() → global_cpu_usage()
479
480                // Network metrics (cumulative)
481                let (network_in, network_out) = Self::get_network_metrics(&sys);
482                current_metrics.system_network_bytes_in = previous_network_in + network_in;
483                current_metrics.system_network_bytes_out = previous_network_out + network_out;
484                previous_network_in = current_metrics.system_network_bytes_in;
485                previous_network_out = current_metrics.system_network_bytes_out;
486
487                // Process-specific metrics
488                if let Some(process) = sys.process(Pid::from_u32(std::process::id())) {
489                    // Add process-specific metrics here if needed
490                    debug!("Process memory: {} bytes", process.memory());
491                }
492            }
493        });
494    }
495
496    /// Get network metrics from system information
497    fn get_network_metrics(_sys: &System) -> (u64, u64) {
498        // Basic network metrics implementation
499        // In sysinfo 0.32, network API access is different and may require
500        // a separate Networks struct. For now, we provide a placeholder
501        // that can be enhanced with proper implementation later.
502
503        // Future improvement: Use std::fs to read /proc/net/dev on Linux
504        // or implement platform-specific network metric collection
505
506        // Return placeholder values for now to ensure compilation
507        // This maintains functionality while allowing for future enhancement
508        (0, 0)
509    }
510
511    /// Get detailed system information for health assessment
512    pub async fn get_system_info(&self) -> SystemInfo {
513        let sys = self.system.read().await;
514
515        SystemInfo {
516            total_memory: sys.total_memory(),
517            used_memory: sys.used_memory(),
518            total_swap: sys.total_swap(),
519            used_swap: sys.used_swap(),
520            cpu_count: sys.cpus().len(),
521            load_average: {
522                let load_avg = System::load_average();
523                LoadAverage {
524                    one: load_avg.one,
525                    five: load_avg.five,
526                    fifteen: load_avg.fifteen,
527                }
528            },
529            boot_time: System::boot_time(),
530            uptime: System::uptime(),
531        }
532    }
533
534    /// Export metrics in Prometheus format
535    pub async fn export_prometheus(&self) -> String {
536        let metrics = self.metrics.read().await;
537
538        format!(
539            r#"# HELP oxirs_producer_events_published_total Total number of events published by producers
540# TYPE oxirs_producer_events_published_total counter
541oxirs_producer_events_published_total {}
542
543# HELP oxirs_producer_events_failed_total Total number of failed events in producers
544# TYPE oxirs_producer_events_failed_total counter
545oxirs_producer_events_failed_total {}
546
547# HELP oxirs_producer_throughput_eps Current producer throughput in events per second
548# TYPE oxirs_producer_throughput_eps gauge
549oxirs_producer_throughput_eps {}
550
551# HELP oxirs_consumer_events_consumed_total Total number of events consumed
552# TYPE oxirs_consumer_events_consumed_total counter
553oxirs_consumer_events_consumed_total {}
554
555# HELP oxirs_consumer_throughput_eps Current consumer throughput in events per second
556# TYPE oxirs_consumer_throughput_eps gauge
557oxirs_consumer_throughput_eps {}
558
559# HELP oxirs_error_rate Current error rate
560# TYPE oxirs_error_rate gauge
561oxirs_error_rate {}
562
563# HELP oxirs_availability Current system availability percentage
564# TYPE oxirs_availability gauge
565oxirs_availability {}
566"#,
567            metrics.producer_events_published,
568            metrics.producer_events_failed,
569            metrics.producer_throughput_eps,
570            metrics.consumer_events_consumed,
571            metrics.consumer_throughput_eps,
572            metrics.error_rate,
573            metrics.availability
574        )
575    }
576
577    /// Get health status
578    pub async fn get_health(&self) -> SystemHealth {
579        self.health_checker.get_health().await
580    }
581}
582
583impl HealthChecker {
584    pub fn new(config: MonitoringConfig) -> Self {
585        Self {
586            config,
587            health_status: Arc::new(RwLock::new(SystemHealth::default())),
588            component_checkers: Vec::new(),
589        }
590    }
591
592    /// Add a component health checker
593    pub fn add_component_checker(&mut self, checker: Box<dyn ComponentHealthChecker>) {
594        self.component_checkers.push(checker);
595    }
596
597    /// Check health of all components
598    pub async fn check_all_components(&self) -> Result<()> {
599        let mut component_health = HashMap::new();
600        let mut overall_status = HealthStatus::Healthy;
601        let mut alerts = Vec::new();
602
603        for checker in &self.component_checkers {
604            let health = checker.check_health().await;
605            let component_name = checker.component_name().to_string();
606
607            match health.status {
608                HealthStatus::Warning => {
609                    if overall_status == HealthStatus::Healthy {
610                        overall_status = HealthStatus::Warning;
611                    }
612                    alerts.push(HealthAlert {
613                        id: uuid::Uuid::new_v4().to_string(),
614                        component: component_name.clone(),
615                        severity: AlertSeverity::Warning,
616                        message: health.message.clone(),
617                        timestamp: Utc::now(),
618                        resolved: false,
619                    });
620                }
621                HealthStatus::Critical => {
622                    overall_status = HealthStatus::Critical;
623                    alerts.push(HealthAlert {
624                        id: uuid::Uuid::new_v4().to_string(),
625                        component: component_name.clone(),
626                        severity: AlertSeverity::Critical,
627                        message: health.message.clone(),
628                        timestamp: Utc::now(),
629                        resolved: false,
630                    });
631                }
632                _ => {}
633            }
634
635            component_health.insert(component_name, health);
636        }
637
638        let mut health_status = self.health_status.write().await;
639        health_status.overall_status = overall_status;
640        health_status.component_health = component_health;
641        health_status.last_check = Utc::now();
642        health_status.alerts.extend(alerts);
643
644        Ok(())
645    }
646
647    /// Get current health status
648    pub async fn get_health(&self) -> SystemHealth {
649        self.health_status.read().await.clone()
650    }
651
652    /// Assess system health based on current metrics trends and system resources
653    pub async fn assess_system_health(
654        &self,
655        metrics: &StreamingMetrics,
656        system_info: &SystemInfo,
657    ) -> Result<()> {
658        let mut health_alerts = Vec::new();
659        let now = Utc::now();
660        let mut alert_id = 1;
661
662        // Memory health assessment
663        let memory_usage_percent =
664            (system_info.used_memory as f64 / system_info.total_memory as f64) * 100.0;
665        if memory_usage_percent > 90.0 {
666            health_alerts.push(HealthAlert {
667                id: format!("memory_critical_{alert_id}"),
668                component: "system".to_string(),
669                severity: AlertSeverity::Critical,
670                message: format!("Critical memory usage: {memory_usage_percent:.1}%"),
671                timestamp: now,
672                resolved: false,
673            });
674            alert_id += 1;
675        } else if memory_usage_percent > 80.0 {
676            health_alerts.push(HealthAlert {
677                id: format!("memory_warning_{alert_id}"),
678                component: "system".to_string(),
679                severity: AlertSeverity::Warning,
680                message: format!("High memory usage: {memory_usage_percent:.1}%"),
681                timestamp: now,
682                resolved: false,
683            });
684            alert_id += 1;
685        }
686
687        // CPU health assessment
688        if metrics.system_cpu_usage_percent > 95.0 {
689            health_alerts.push(HealthAlert {
690                id: format!("cpu_critical_{alert_id}"),
691                component: "system".to_string(),
692                severity: AlertSeverity::Critical,
693                message: format!(
694                    "Critical CPU usage: {:.1}%",
695                    metrics.system_cpu_usage_percent
696                ),
697                timestamp: now,
698                resolved: false,
699            });
700            alert_id += 1;
701        } else if metrics.system_cpu_usage_percent > 85.0 {
702            health_alerts.push(HealthAlert {
703                id: format!("cpu_warning_{alert_id}"),
704                component: "system".to_string(),
705                severity: AlertSeverity::Warning,
706                message: format!("High CPU usage: {:.1}%", metrics.system_cpu_usage_percent),
707                timestamp: now,
708                resolved: false,
709            });
710            alert_id += 1;
711        }
712
713        // Producer health assessment
714        if metrics.producer_events_failed > 0 {
715            let total_producer_events =
716                metrics.producer_events_published + metrics.producer_events_failed;
717            if total_producer_events > 0 {
718                let failure_rate =
719                    metrics.producer_events_failed as f64 / total_producer_events as f64;
720                if failure_rate > 0.10 {
721                    health_alerts.push(HealthAlert {
722                        id: format!("producer_failure_{alert_id}"),
723                        component: "producer".to_string(),
724                        severity: AlertSeverity::Critical,
725                        message: format!(
726                            "High producer failure rate: {:.2}%",
727                            failure_rate * 100.0
728                        ),
729                        timestamp: now,
730                        resolved: false,
731                    });
732                    alert_id += 1;
733                } else if failure_rate > 0.05 {
734                    health_alerts.push(HealthAlert {
735                        id: format!("producer_failure_{alert_id}"),
736                        component: "producer".to_string(),
737                        severity: AlertSeverity::Warning,
738                        message: format!(
739                            "Elevated producer failure rate: {:.2}%",
740                            failure_rate * 100.0
741                        ),
742                        timestamp: now,
743                        resolved: false,
744                    });
745                    alert_id += 1;
746                }
747            }
748        }
749
750        // Consumer health assessment
751        if metrics.consumer_events_consumed > 0 && metrics.consumer_events_failed > 0 {
752            let failure_rate =
753                metrics.consumer_events_failed as f64 / metrics.consumer_events_consumed as f64;
754            if failure_rate > 0.10 {
755                health_alerts.push(HealthAlert {
756                    id: format!("consumer_failure_{alert_id}"),
757                    component: "consumer".to_string(),
758                    severity: AlertSeverity::Critical,
759                    message: format!("High consumer failure rate: {:.2}%", failure_rate * 100.0),
760                    timestamp: now,
761                    resolved: false,
762                });
763                alert_id += 1;
764            } else if failure_rate > 0.05 {
765                health_alerts.push(HealthAlert {
766                    id: format!("consumer_failure_{alert_id}"),
767                    component: "consumer".to_string(),
768                    severity: AlertSeverity::Warning,
769                    message: format!(
770                        "Elevated consumer failure rate: {:.2}%",
771                        failure_rate * 100.0
772                    ),
773                    timestamp: now,
774                    resolved: false,
775                });
776                alert_id += 1;
777            }
778        }
779
780        // Performance health assessment
781        if metrics.producer_average_latency_ms > 2000.0 {
782            health_alerts.push(HealthAlert {
783                id: format!("producer_latency_{alert_id}"),
784                component: "producer".to_string(),
785                severity: AlertSeverity::Critical,
786                message: format!(
787                    "Critical producer latency: {:.2}ms",
788                    metrics.producer_average_latency_ms
789                ),
790                timestamp: now,
791                resolved: false,
792            });
793            alert_id += 1;
794        } else if metrics.producer_average_latency_ms > 1000.0 {
795            health_alerts.push(HealthAlert {
796                id: format!("producer_latency_{alert_id}"),
797                component: "producer".to_string(),
798                severity: AlertSeverity::Warning,
799                message: format!(
800                    "High producer latency: {:.2}ms",
801                    metrics.producer_average_latency_ms
802                ),
803                timestamp: now,
804                resolved: false,
805            });
806            alert_id += 1;
807        }
808
809        if metrics.consumer_average_processing_time_ms > 1000.0 {
810            health_alerts.push(HealthAlert {
811                id: format!("consumer_processing_{alert_id}"),
812                component: "consumer".to_string(),
813                severity: AlertSeverity::Critical,
814                message: format!(
815                    "Critical consumer processing time: {:.2}ms",
816                    metrics.consumer_average_processing_time_ms
817                ),
818                timestamp: now,
819                resolved: false,
820            });
821            alert_id += 1;
822        } else if metrics.consumer_average_processing_time_ms > 500.0 {
823            health_alerts.push(HealthAlert {
824                id: format!("consumer_processing_{alert_id}"),
825                component: "consumer".to_string(),
826                severity: AlertSeverity::Warning,
827                message: format!(
828                    "High consumer processing time: {:.2}ms",
829                    metrics.consumer_average_processing_time_ms
830                ),
831                timestamp: now,
832                resolved: false,
833            });
834            alert_id += 1;
835        }
836
837        // Connection health assessment
838        if metrics.backend_connection_errors > 0 {
839            let total_connections =
840                metrics.backend_connections_active + metrics.backend_connections_idle;
841            if total_connections > 0 {
842                let error_rate =
843                    metrics.backend_connection_errors as f64 / total_connections as f64;
844                if error_rate > 0.20 {
845                    health_alerts.push(HealthAlert {
846                        id: format!("connection_errors_{alert_id}"),
847                        component: "backend".to_string(),
848                        severity: AlertSeverity::Critical,
849                        message: format!("High connection error rate: {:.2}%", error_rate * 100.0),
850                        timestamp: now,
851                        resolved: false,
852                    });
853                }
854            }
855        }
856
857        // Update health status based on assessments
858        let health_status = if health_alerts.is_empty() {
859            HealthStatus::Healthy
860        } else {
861            let critical_alerts = health_alerts
862                .iter()
863                .filter(|a| matches!(a.severity, AlertSeverity::Critical))
864                .count();
865            if critical_alerts > 0 {
866                HealthStatus::Critical
867            } else {
868                HealthStatus::Warning
869            }
870        };
871
872        if !health_alerts.is_empty() {
873            warn!(
874                "System health alerts detected: {} total, {} critical",
875                health_alerts.len(),
876                health_alerts
877                    .iter()
878                    .filter(|a| matches!(a.severity, AlertSeverity::Critical))
879                    .count()
880            );
881        }
882
883        // Update health status with system uptime
884        let system_health = SystemHealth {
885            overall_status: health_status,
886            component_health: HashMap::new(),
887            last_check: now,
888            uptime: Duration::from_secs(system_info.uptime),
889            alerts: health_alerts,
890        };
891
892        *self.health_status.write().await = system_health;
893        Ok(())
894    }
895}
896
897impl Profiler {
898    fn new(sampling_rate: f64) -> Self {
899        Self {
900            enabled: true,
901            traces: Arc::new(RwLock::new(Vec::new())),
902            sampling_rate,
903        }
904    }
905
906    /// Start a performance trace
907    pub async fn start_trace(&self, operation: String) -> Option<TraceHandle> {
908        if !self.enabled || fastrand::f64() > self.sampling_rate {
909            return None;
910        }
911
912        Some(TraceHandle {
913            operation,
914            start_time: Instant::now(),
915            timestamp: Utc::now(),
916            traces: self.traces.clone(),
917        })
918    }
919}
920
921/// Handle for performance tracing
922pub struct TraceHandle {
923    operation: String,
924    start_time: Instant,
925    timestamp: DateTime<Utc>,
926    traces: Arc<RwLock<Vec<PerformanceTrace>>>,
927}
928
929impl Drop for TraceHandle {
930    fn drop(&mut self) {
931        let duration = self.start_time.elapsed();
932        let trace = PerformanceTrace {
933            operation: self.operation.clone(),
934            start_time: self.timestamp,
935            duration,
936            metadata: HashMap::new(),
937            call_stack: Vec::new(), // Would be populated with actual call stack
938        };
939
940        let traces = self.traces.clone();
941        tokio::spawn(async move {
942            traces.write().await.push(trace);
943        });
944    }
945}
946
947impl Default for SystemHealth {
948    fn default() -> Self {
949        Self {
950            overall_status: HealthStatus::Unknown,
951            component_health: HashMap::new(),
952            last_check: Utc::now(),
953            uptime: Duration::from_secs(0),
954            alerts: Vec::new(),
955        }
956    }
957}
958
959/// Metrics update structures
960#[derive(Debug, Clone)]
961pub struct ProducerMetricsUpdate {
962    pub events_published: u64,
963    pub events_failed: u64,
964    pub bytes_sent: u64,
965    pub batches_sent: u64,
966    pub latency_ms: f64,
967    pub throughput_eps: f64,
968}
969
970#[derive(Debug, Clone)]
971pub struct ConsumerMetricsUpdate {
972    pub events_consumed: u64,
973    pub events_processed: u64,
974    pub events_filtered: u64,
975    pub events_failed: u64,
976    pub bytes_received: u64,
977    pub batches_received: u64,
978    pub processing_time_ms: f64,
979    pub throughput_eps: f64,
980    pub lag_ms: Option<f64>,
981}
982
983#[derive(Debug, Clone)]
984pub struct BackendMetricsUpdate {
985    pub connections_active: u32,
986    pub connections_idle: u32,
987    pub connection_errors: u64,
988    pub circuit_breaker_trips: u64,
989    pub retry_attempts: u64,
990}
991
992#[cfg(test)]
993mod tests {
994    use super::*;
995
996    #[tokio::test]
997    async fn test_metrics_collection() {
998        let config = MonitoringConfig {
999            enable_metrics: true,
1000            enable_tracing: true,
1001            metrics_interval: Duration::from_millis(100),
1002            health_check_interval: Duration::from_millis(100),
1003            enable_profiling: false,
1004            prometheus_endpoint: None,
1005            jaeger_endpoint: None,
1006            log_level: "info".to_string(),
1007        };
1008
1009        let collector = MetricsCollector::new(config);
1010
1011        // Update some metrics
1012        collector
1013            .update_producer_metrics(ProducerMetricsUpdate {
1014                events_published: 100,
1015                events_failed: 5,
1016                bytes_sent: 1024,
1017                batches_sent: 10,
1018                latency_ms: 5.0,
1019                throughput_eps: 1000.0,
1020            })
1021            .await;
1022
1023        let metrics = collector.get_metrics().await;
1024        assert_eq!(metrics.producer_events_published, 100);
1025        assert_eq!(metrics.producer_events_failed, 5);
1026        assert_eq!(metrics.producer_throughput_eps, 1000.0);
1027    }
1028
1029    #[tokio::test]
1030    async fn test_prometheus_export() {
1031        let config = MonitoringConfig {
1032            enable_metrics: true,
1033            enable_tracing: false,
1034            metrics_interval: Duration::from_secs(60),
1035            health_check_interval: Duration::from_secs(30),
1036            enable_profiling: false,
1037            prometheus_endpoint: None,
1038            jaeger_endpoint: None,
1039            log_level: "info".to_string(),
1040        };
1041
1042        let collector = MetricsCollector::new(config);
1043
1044        collector
1045            .update_producer_metrics(ProducerMetricsUpdate {
1046                events_published: 500,
1047                events_failed: 10,
1048                bytes_sent: 2048,
1049                batches_sent: 50,
1050                latency_ms: 3.0,
1051                throughput_eps: 2000.0,
1052            })
1053            .await;
1054
1055        let prometheus_output = collector.export_prometheus().await;
1056        assert!(prometheus_output.contains("oxirs_producer_events_published_total 500"));
1057        assert!(prometheus_output.contains("oxirs_producer_events_failed_total 10"));
1058    }
1059
1060    #[tokio::test]
1061    async fn test_health_checking() {
1062        let config = MonitoringConfig {
1063            enable_metrics: true,
1064            enable_tracing: false,
1065            metrics_interval: Duration::from_secs(60),
1066            health_check_interval: Duration::from_secs(30),
1067            enable_profiling: false,
1068            prometheus_endpoint: None,
1069            jaeger_endpoint: None,
1070            log_level: "info".to_string(),
1071        };
1072
1073        let mut health_checker = HealthChecker::new(config);
1074
1075        // Add a mock component checker
1076        struct MockChecker;
1077
1078        impl ComponentHealthChecker for MockChecker {
1079            fn component_name(&self) -> &str {
1080                "mock_component"
1081            }
1082
1083            fn check_health(
1084                &self,
1085            ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ComponentHealth> + Send + '_>>
1086            {
1087                Box::pin(async move {
1088                    ComponentHealth {
1089                        status: HealthStatus::Healthy,
1090                        message: "Component is healthy".to_string(),
1091                        last_check: Utc::now(),
1092                        metrics: HashMap::new(),
1093                        dependencies: vec!["database".to_string()],
1094                    }
1095                })
1096            }
1097        }
1098
1099        health_checker.add_component_checker(Box::new(MockChecker));
1100        health_checker.check_all_components().await.unwrap();
1101
1102        let health = health_checker.get_health().await;
1103        assert_eq!(health.overall_status, HealthStatus::Healthy);
1104        assert!(health.component_health.contains_key("mock_component"));
1105    }
1106
1107    #[tokio::test]
1108    async fn test_consumer_metrics_update() {
1109        let config = MonitoringConfig {
1110            enable_metrics: true,
1111            enable_tracing: false,
1112            metrics_interval: Duration::from_secs(60),
1113            health_check_interval: Duration::from_secs(30),
1114            enable_profiling: false,
1115            prometheus_endpoint: None,
1116            jaeger_endpoint: None,
1117            log_level: "info".to_string(),
1118        };
1119
1120        let collector = MetricsCollector::new(config);
1121
1122        collector
1123            .update_consumer_metrics(ConsumerMetricsUpdate {
1124                events_consumed: 1000,
1125                events_processed: 950,
1126                events_filtered: 50,
1127                events_failed: 10,
1128                bytes_received: 4096,
1129                batches_received: 100,
1130                processing_time_ms: 2.5,
1131                throughput_eps: 1500.0,
1132                lag_ms: Some(100.0),
1133            })
1134            .await;
1135
1136        let metrics = collector.get_metrics().await;
1137        assert_eq!(metrics.consumer_events_consumed, 1000);
1138        assert_eq!(metrics.consumer_events_processed, 950);
1139        assert_eq!(metrics.consumer_throughput_eps, 1500.0);
1140        assert_eq!(metrics.consumer_lag_ms, Some(100.0));
1141    }
1142
1143    #[tokio::test]
1144    async fn test_backend_metrics_update() {
1145        let config = MonitoringConfig {
1146            enable_metrics: true,
1147            enable_tracing: false,
1148            metrics_interval: Duration::from_secs(60),
1149            health_check_interval: Duration::from_secs(30),
1150            enable_profiling: false,
1151            prometheus_endpoint: None,
1152            jaeger_endpoint: None,
1153            log_level: "info".to_string(),
1154        };
1155
1156        let collector = MetricsCollector::new(config);
1157
1158        collector
1159            .update_backend_metrics(BackendMetricsUpdate {
1160                connections_active: 5,
1161                connections_idle: 3,
1162                connection_errors: 2,
1163                circuit_breaker_trips: 1,
1164                retry_attempts: 5,
1165            })
1166            .await;
1167
1168        let metrics = collector.get_metrics().await;
1169        assert_eq!(metrics.backend_connections_active, 5);
1170        assert_eq!(metrics.backend_connections_idle, 3);
1171        assert_eq!(metrics.backend_connection_errors, 2);
1172    }
1173
1174    #[test]
1175    fn test_health_status_serialization() {
1176        let health = SystemHealth {
1177            overall_status: HealthStatus::Warning,
1178            component_health: {
1179                let mut health_map = HashMap::new();
1180                health_map.insert(
1181                    "database".to_string(),
1182                    ComponentHealth {
1183                        status: HealthStatus::Warning,
1184                        message: "High latency detected".to_string(),
1185                        last_check: Utc::now(),
1186                        metrics: {
1187                            let mut metrics = HashMap::new();
1188                            metrics.insert("latency_ms".to_string(), 150.0);
1189                            metrics
1190                        },
1191                        dependencies: vec!["network".to_string()],
1192                    },
1193                );
1194                health_map
1195            },
1196            last_check: Utc::now(),
1197            uptime: Duration::from_secs(3600),
1198            alerts: vec![HealthAlert {
1199                id: "alert-1".to_string(),
1200                component: "database".to_string(),
1201                severity: AlertSeverity::Warning,
1202                message: "High latency detected".to_string(),
1203                timestamp: Utc::now(),
1204                resolved: false,
1205            }],
1206        };
1207
1208        let serialized = serde_json::to_string(&health).unwrap();
1209        let deserialized: SystemHealth = serde_json::from_str(&serialized).unwrap();
1210
1211        assert_eq!(deserialized.overall_status, HealthStatus::Warning);
1212        assert_eq!(deserialized.component_health.len(), 1);
1213        assert_eq!(deserialized.alerts.len(), 1);
1214    }
1215
1216    #[tokio::test]
1217    async fn test_profiler() {
1218        let profiler = Profiler::new(1.0); // 100% sampling for testing
1219
1220        {
1221            let _trace = profiler.start_trace("test_operation".to_string()).await;
1222            // Simulate some work
1223            tokio::time::sleep(Duration::from_millis(10)).await;
1224        } // TraceHandle dropped here, trace should be recorded
1225
1226        // Give some time for async trace recording
1227        tokio::time::sleep(Duration::from_millis(50)).await;
1228
1229        let traces = profiler.traces.read().await;
1230        assert_eq!(traces.len(), 1);
1231        assert_eq!(traces[0].operation, "test_operation");
1232        assert!(traces[0].duration >= Duration::from_millis(10));
1233    }
1234
1235    #[test]
1236    fn test_metrics_update_structures() {
1237        let producer_update = ProducerMetricsUpdate {
1238            events_published: 100,
1239            events_failed: 2,
1240            bytes_sent: 1024,
1241            batches_sent: 10,
1242            latency_ms: 5.5,
1243            throughput_eps: 200.0,
1244        };
1245
1246        assert_eq!(producer_update.events_published, 100);
1247        assert_eq!(producer_update.latency_ms, 5.5);
1248
1249        let consumer_update = ConsumerMetricsUpdate {
1250            events_consumed: 95,
1251            events_processed: 90,
1252            events_filtered: 5,
1253            events_failed: 1,
1254            bytes_received: 950,
1255            batches_received: 9,
1256            processing_time_ms: 2.0,
1257            throughput_eps: 190.0,
1258            lag_ms: Some(50.0),
1259        };
1260
1261        assert_eq!(consumer_update.events_consumed, 95);
1262        assert_eq!(consumer_update.lag_ms, Some(50.0));
1263
1264        let backend_update = BackendMetricsUpdate {
1265            connections_active: 3,
1266            connections_idle: 2,
1267            connection_errors: 1,
1268            circuit_breaker_trips: 0,
1269            retry_attempts: 2,
1270        };
1271
1272        assert_eq!(backend_update.connections_active, 3);
1273        assert_eq!(backend_update.retry_attempts, 2);
1274    }
1275}