kiteticker_async_manager/manager/
health_monitor.rs1use crate::manager::{ConnectionStats, ManagerStats};
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4use tokio::sync::RwLock;
5use tokio::task::JoinHandle;
6use tokio::time::sleep;
7
8#[derive(Debug)]
10pub struct HealthMonitor {
11 pub connection_stats: Vec<Arc<RwLock<ConnectionStats>>>,
12 pub manager_start_time: Instant,
13 pub monitoring_task: Option<JoinHandle<()>>,
14 pub health_check_interval: Duration,
15}
16
17impl HealthMonitor {
18 pub fn new(
20 connection_stats: Vec<Arc<RwLock<ConnectionStats>>>,
21 health_check_interval: Duration,
22 ) -> Self {
23 Self {
24 connection_stats,
25 manager_start_time: Instant::now(),
26 monitoring_task: None,
27 health_check_interval,
28 }
29 }
30
31 pub fn start(&mut self) {
33 let connection_stats = self.connection_stats.clone();
34 let health_check_interval = self.health_check_interval;
35 let manager_start_time = self.manager_start_time;
36
37 let handle = tokio::spawn(async move {
38 Self::monitoring_loop(
39 connection_stats,
40 health_check_interval,
41 manager_start_time,
42 ).await;
43 });
44
45 self.monitoring_task = Some(handle);
46 }
47
48 async fn monitoring_loop(
50 connection_stats: Vec<Arc<RwLock<ConnectionStats>>>,
51 health_check_interval: Duration,
52 manager_start_time: Instant,
53 ) {
54 log::info!("Health monitor started with interval: {:?}", health_check_interval);
55
56 loop {
57 sleep(health_check_interval).await;
58
59 let mut healthy_connections = 0;
61 let mut total_symbols = 0;
62 let mut total_messages = 0;
63 let mut total_errors = 0;
64
65 for (i, stats_arc) in connection_stats.iter().enumerate() {
66 let mut stats = stats_arc.write().await;
67
68 if stats.is_connected {
70 stats.connection_uptime = manager_start_time.elapsed();
71 healthy_connections += 1;
72 }
73
74 total_symbols += stats.symbol_count;
75 total_messages += stats.messages_received;
76 total_errors += stats.errors_count;
77
78 log::debug!(
80 "Connection {}: healthy={}, symbols={}, messages={}, errors={}",
81 i,
82 stats.is_connected,
83 stats.symbol_count,
84 stats.messages_received,
85 stats.errors_count
86 );
87 }
88
89 log::info!(
91 "Health Check: {}/{} connections healthy, {} total symbols, {} messages, {} errors",
92 healthy_connections,
93 connection_stats.len(),
94 total_symbols,
95 total_messages,
96 total_errors
97 );
98
99 if healthy_connections == 0 {
101 log::error!("CRITICAL: All connections are unhealthy!");
102 } else if healthy_connections < connection_stats.len() {
103 log::warn!(
104 "WARNING: {}/{} connections are unhealthy",
105 connection_stats.len() - healthy_connections,
106 connection_stats.len()
107 );
108 }
109 }
110 }
111
112 pub async fn get_manager_stats(&self) -> ManagerStats {
114 let mut manager_stats = ManagerStats::default();
115 manager_stats.uptime = self.manager_start_time.elapsed();
116
117 let mut active_connections = 0;
118
119 for stats_arc in &self.connection_stats {
120 let stats = stats_arc.read().await;
121
122 if stats.is_connected {
123 active_connections += 1;
124 }
125
126 manager_stats.total_symbols += stats.symbol_count;
127 manager_stats.total_messages_received += stats.messages_received;
128 manager_stats.total_errors += stats.errors_count;
129 manager_stats.connection_stats.push(stats.clone());
130 }
131
132 manager_stats.active_connections = active_connections;
133 manager_stats
134 }
135
136 pub async fn get_health_summary(&self) -> HealthSummary {
138 let mut summary = HealthSummary::default();
139
140 for (i, stats_arc) in self.connection_stats.iter().enumerate() {
141 let stats = stats_arc.read().await;
142
143 if stats.is_connected {
144 summary.healthy_connections += 1;
145 } else {
146 summary.unhealthy_connections.push(i);
147 }
148
149 summary.total_symbols += stats.symbol_count;
150 summary.total_messages += stats.messages_received;
151 summary.total_errors += stats.errors_count;
152
153 if let Some(last_msg_time) = stats.last_message_time {
155 if last_msg_time.elapsed() < Duration::from_secs(60) {
156 summary.active_message_flows += 1;
157 }
158 }
159 }
160
161 summary.uptime = self.manager_start_time.elapsed();
162 summary
163 }
164
165 pub async fn stop(&mut self) {
167 if let Some(handle) = self.monitoring_task.take() {
168 handle.abort();
169 let _ = handle.await;
170 }
171 }
172}
173
174#[derive(Debug, Clone, Default)]
176pub struct HealthSummary {
177 pub healthy_connections: usize,
178 pub unhealthy_connections: Vec<usize>,
179 pub total_symbols: usize,
180 pub total_messages: u64,
181 pub total_errors: u64,
182 pub active_message_flows: usize,
183 pub uptime: Duration,
184}
185
186impl HealthSummary {
187 pub fn is_healthy(&self) -> bool {
189 self.unhealthy_connections.is_empty() && self.total_errors == 0
190 }
191
192 pub fn is_degraded(&self) -> bool {
194 !self.unhealthy_connections.is_empty() && self.healthy_connections > 0
195 }
196
197 pub fn is_critical(&self) -> bool {
199 self.healthy_connections == 0
200 }
201
202 pub fn health_percentage(&self) -> f64 {
204 let total_connections = self.healthy_connections + self.unhealthy_connections.len();
205 if total_connections == 0 {
206 100.0
207 } else {
208 (self.healthy_connections as f64 / total_connections as f64) * 100.0
209 }
210 }
211}