kiteticker_async_manager/manager/
health_monitor.rs1use crate::manager::{ConnectionStats, ManagerStats};
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4use tokio::sync::RwLock;
5use tokio::task::JoinHandle;
6use tokio::time::sleep;
7
8#[derive(Debug)]
10pub struct HealthMonitor {
11 pub connection_stats: Vec<Arc<RwLock<ConnectionStats>>>,
12 pub manager_start_time: Instant,
13 pub monitoring_task: Option<JoinHandle<()>>,
14 pub health_check_interval: Duration,
15}
16
17impl HealthMonitor {
18 pub fn new(
20 connection_stats: Vec<Arc<RwLock<ConnectionStats>>>,
21 health_check_interval: Duration,
22 ) -> Self {
23 Self {
24 connection_stats,
25 manager_start_time: Instant::now(),
26 monitoring_task: None,
27 health_check_interval,
28 }
29 }
30
31 pub fn start(&mut self) {
33 let connection_stats = self.connection_stats.clone();
34 let health_check_interval = self.health_check_interval;
35 let manager_start_time = self.manager_start_time;
36
37 let handle = tokio::spawn(async move {
38 Self::monitoring_loop(
39 connection_stats,
40 health_check_interval,
41 manager_start_time,
42 )
43 .await;
44 });
45
46 self.monitoring_task = Some(handle);
47 }
48
49 async fn monitoring_loop(
51 connection_stats: Vec<Arc<RwLock<ConnectionStats>>>,
52 health_check_interval: Duration,
53 manager_start_time: Instant,
54 ) {
55 log::info!(
56 "Health monitor started with interval: {:?}",
57 health_check_interval
58 );
59
60 loop {
61 sleep(health_check_interval).await;
62
63 let mut healthy_connections = 0;
65 let mut total_symbols = 0;
66 let mut total_messages = 0;
67 let mut total_errors = 0;
68
69 for (i, stats_arc) in connection_stats.iter().enumerate() {
70 let mut stats = stats_arc.write().await;
71
72 if stats.is_connected {
74 stats.connection_uptime = manager_start_time.elapsed();
75 healthy_connections += 1;
76 }
77
78 total_symbols += stats.symbol_count;
79 total_messages += stats.messages_received;
80 total_errors += stats.errors_count;
81
82 log::debug!(
84 "Connection {}: healthy={}, symbols={}, messages={}, errors={}",
85 i,
86 stats.is_connected,
87 stats.symbol_count,
88 stats.messages_received,
89 stats.errors_count
90 );
91 }
92
93 log::info!(
95 "Health Check: {}/{} connections healthy, {} total symbols, {} messages, {} errors",
96 healthy_connections,
97 connection_stats.len(),
98 total_symbols,
99 total_messages,
100 total_errors
101 );
102
103 if healthy_connections == 0 {
105 log::error!("CRITICAL: All connections are unhealthy!");
106 } else if healthy_connections < connection_stats.len() {
107 log::warn!(
108 "WARNING: {}/{} connections are unhealthy",
109 connection_stats.len() - healthy_connections,
110 connection_stats.len()
111 );
112 }
113 }
114 }
115
116 pub async fn get_manager_stats(&self) -> ManagerStats {
118 let mut manager_stats = ManagerStats {
119 uptime: self.manager_start_time.elapsed(),
120 ..Default::default()
121 };
122
123 let mut active_connections = 0;
124
125 for stats_arc in &self.connection_stats {
126 let stats = stats_arc.read().await;
127
128 if stats.is_connected {
129 active_connections += 1;
130 }
131
132 manager_stats.total_symbols += stats.symbol_count;
133 manager_stats.total_messages_received += stats.messages_received;
134 manager_stats.total_errors += stats.errors_count;
135 manager_stats.connection_stats.push(stats.clone());
136 }
137
138 manager_stats.active_connections = active_connections;
139 manager_stats
140 }
141
142 pub async fn get_health_summary(&self) -> HealthSummary {
144 let mut summary = HealthSummary::default();
145
146 for (i, stats_arc) in self.connection_stats.iter().enumerate() {
147 let stats = stats_arc.read().await;
148
149 if stats.is_connected {
150 summary.healthy_connections += 1;
151 } else {
152 summary.unhealthy_connections.push(i);
153 }
154
155 summary.total_symbols += stats.symbol_count;
156 summary.total_messages += stats.messages_received;
157 summary.total_errors += stats.errors_count;
158
159 if let Some(last_msg_time) = stats.last_message_time {
161 if last_msg_time.elapsed() < Duration::from_secs(60) {
162 summary.active_message_flows += 1;
163 }
164 }
165 }
166
167 summary.uptime = self.manager_start_time.elapsed();
168 summary
169 }
170
171 pub async fn stop(&mut self) {
173 if let Some(handle) = self.monitoring_task.take() {
174 handle.abort();
175 let _ = handle.await;
176 }
177 }
178}
179
180#[derive(Debug, Clone, Default)]
182pub struct HealthSummary {
183 pub healthy_connections: usize,
184 pub unhealthy_connections: Vec<usize>,
185 pub total_symbols: usize,
186 pub total_messages: u64,
187 pub total_errors: u64,
188 pub active_message_flows: usize,
189 pub uptime: Duration,
190}
191
192impl HealthSummary {
193 pub fn is_healthy(&self) -> bool {
195 self.unhealthy_connections.is_empty() && self.total_errors == 0
196 }
197
198 pub fn is_degraded(&self) -> bool {
200 !self.unhealthy_connections.is_empty() && self.healthy_connections > 0
201 }
202
203 pub fn is_critical(&self) -> bool {
205 self.healthy_connections == 0
206 }
207
208 pub fn health_percentage(&self) -> f64 {
210 let total_connections =
211 self.healthy_connections + self.unhealthy_connections.len();
212 if total_connections == 0 {
213 100.0
214 } else {
215 (self.healthy_connections as f64 / total_connections as f64) * 100.0
216 }
217 }
218}