kiteticker_async_manager/manager/
health_monitor.rs

1use crate::manager::{ConnectionStats, ManagerStats};
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4use tokio::sync::RwLock;
5use tokio::task::JoinHandle;
6use tokio::time::sleep;
7
8/// Health monitor for tracking connection and system health
9#[derive(Debug)]
10pub struct HealthMonitor {
11    pub connection_stats: Vec<Arc<RwLock<ConnectionStats>>>,
12    pub manager_start_time: Instant,
13    pub monitoring_task: Option<JoinHandle<()>>,
14    pub health_check_interval: Duration,
15}
16
17impl HealthMonitor {
18    /// Create a new health monitor
19    pub fn new(
20        connection_stats: Vec<Arc<RwLock<ConnectionStats>>>,
21        health_check_interval: Duration,
22    ) -> Self {
23        Self {
24            connection_stats,
25            manager_start_time: Instant::now(),
26            monitoring_task: None,
27            health_check_interval,
28        }
29    }
30    
31    /// Start the health monitoring task
32    pub fn start(&mut self) {
33        let connection_stats = self.connection_stats.clone();
34        let health_check_interval = self.health_check_interval;
35        let manager_start_time = self.manager_start_time;
36        
37        let handle = tokio::spawn(async move {
38            Self::monitoring_loop(
39                connection_stats,
40                health_check_interval,
41                manager_start_time,
42            ).await;
43        });
44        
45        self.monitoring_task = Some(handle);
46    }
47    
48    /// Health monitoring loop
49    async fn monitoring_loop(
50        connection_stats: Vec<Arc<RwLock<ConnectionStats>>>,
51        health_check_interval: Duration,
52        manager_start_time: Instant,
53    ) {
54        log::info!("Health monitor started with interval: {:?}", health_check_interval);
55        
56        loop {
57            sleep(health_check_interval).await;
58            
59            // Collect health information
60            let mut healthy_connections = 0;
61            let mut total_symbols = 0;
62            let mut total_messages = 0;
63            let mut total_errors = 0;
64            
65            for (i, stats_arc) in connection_stats.iter().enumerate() {
66                let mut stats = stats_arc.write().await;
67                
68                // Update connection uptime
69                if stats.is_connected {
70                    stats.connection_uptime = manager_start_time.elapsed();
71                    healthy_connections += 1;
72                }
73                
74                total_symbols += stats.symbol_count;
75                total_messages += stats.messages_received;
76                total_errors += stats.errors_count;
77                
78                // Log individual connection health
79                log::debug!(
80                    "Connection {}: healthy={}, symbols={}, messages={}, errors={}",
81                    i,
82                    stats.is_connected,
83                    stats.symbol_count,
84                    stats.messages_received,
85                    stats.errors_count
86                );
87            }
88            
89            // Log overall health
90            log::info!(
91                "Health Check: {}/{} connections healthy, {} total symbols, {} messages, {} errors",
92                healthy_connections,
93                connection_stats.len(),
94                total_symbols,
95                total_messages,
96                total_errors
97            );
98            
99            // Alert on issues
100            if healthy_connections == 0 {
101                log::error!("CRITICAL: All connections are unhealthy!");
102            } else if healthy_connections < connection_stats.len() {
103                log::warn!(
104                    "WARNING: {}/{} connections are unhealthy",
105                    connection_stats.len() - healthy_connections,
106                    connection_stats.len()
107                );
108            }
109        }
110    }
111    
112    /// Get comprehensive manager statistics
113    pub async fn get_manager_stats(&self) -> ManagerStats {
114        let mut manager_stats = ManagerStats::default();
115        manager_stats.uptime = self.manager_start_time.elapsed();
116        
117        let mut active_connections = 0;
118        
119        for stats_arc in &self.connection_stats {
120            let stats = stats_arc.read().await;
121            
122            if stats.is_connected {
123                active_connections += 1;
124            }
125            
126            manager_stats.total_symbols += stats.symbol_count;
127            manager_stats.total_messages_received += stats.messages_received;
128            manager_stats.total_errors += stats.errors_count;
129            manager_stats.connection_stats.push(stats.clone());
130        }
131        
132        manager_stats.active_connections = active_connections;
133        manager_stats
134    }
135    
136    /// Get health summary
137    pub async fn get_health_summary(&self) -> HealthSummary {
138        let mut summary = HealthSummary::default();
139        
140        for (i, stats_arc) in self.connection_stats.iter().enumerate() {
141            let stats = stats_arc.read().await;
142            
143            if stats.is_connected {
144                summary.healthy_connections += 1;
145            } else {
146                summary.unhealthy_connections.push(i);
147            }
148            
149            summary.total_symbols += stats.symbol_count;
150            summary.total_messages += stats.messages_received;
151            summary.total_errors += stats.errors_count;
152            
153            // Calculate message rate (messages per second over last minute)
154            if let Some(last_msg_time) = stats.last_message_time {
155                if last_msg_time.elapsed() < Duration::from_secs(60) {
156                    summary.active_message_flows += 1;
157                }
158            }
159        }
160        
161        summary.uptime = self.manager_start_time.elapsed();
162        summary
163    }
164    
165    /// Stop the health monitor
166    pub async fn stop(&mut self) {
167        if let Some(handle) = self.monitoring_task.take() {
168            handle.abort();
169            let _ = handle.await;
170        }
171    }
172}
173
174/// Health summary for quick status checks
175#[derive(Debug, Clone, Default)]
176pub struct HealthSummary {
177    pub healthy_connections: usize,
178    pub unhealthy_connections: Vec<usize>,
179    pub total_symbols: usize,
180    pub total_messages: u64,
181    pub total_errors: u64,
182    pub active_message_flows: usize,
183    pub uptime: Duration,
184}
185
186impl HealthSummary {
187    /// Check if the system is healthy
188    pub fn is_healthy(&self) -> bool {
189        self.unhealthy_connections.is_empty() && self.total_errors == 0
190    }
191    
192    /// Check if the system is degraded (some connections unhealthy)
193    pub fn is_degraded(&self) -> bool {
194        !self.unhealthy_connections.is_empty() && self.healthy_connections > 0
195    }
196    
197    /// Check if the system is critical (all connections unhealthy)
198    pub fn is_critical(&self) -> bool {
199        self.healthy_connections == 0
200    }
201    
202    /// Get health percentage (0-100)
203    pub fn health_percentage(&self) -> f64 {
204        let total_connections = self.healthy_connections + self.unhealthy_connections.len();
205        if total_connections == 0 {
206            100.0
207        } else {
208            (self.healthy_connections as f64 / total_connections as f64) * 100.0
209        }
210    }
211}