kiteticker_async_manager/manager/
health_monitor.rs

1use crate::manager::{ConnectionStats, ManagerStats};
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4use tokio::sync::RwLock;
5use tokio::task::JoinHandle;
6use tokio::time::sleep;
7
8/// Health monitor for tracking connection and system health
9#[derive(Debug)]
10pub struct HealthMonitor {
11  pub connection_stats: Vec<Arc<RwLock<ConnectionStats>>>,
12  pub manager_start_time: Instant,
13  pub monitoring_task: Option<JoinHandle<()>>,
14  pub health_check_interval: Duration,
15}
16
17impl HealthMonitor {
18  /// Create a new health monitor
19  pub fn new(
20    connection_stats: Vec<Arc<RwLock<ConnectionStats>>>,
21    health_check_interval: Duration,
22  ) -> Self {
23    Self {
24      connection_stats,
25      manager_start_time: Instant::now(),
26      monitoring_task: None,
27      health_check_interval,
28    }
29  }
30
31  /// Start the health monitoring task
32  pub fn start(&mut self) {
33    let connection_stats = self.connection_stats.clone();
34    let health_check_interval = self.health_check_interval;
35    let manager_start_time = self.manager_start_time;
36
37    let handle = tokio::spawn(async move {
38      Self::monitoring_loop(
39        connection_stats,
40        health_check_interval,
41        manager_start_time,
42      )
43      .await;
44    });
45
46    self.monitoring_task = Some(handle);
47  }
48
49  /// Health monitoring loop
50  async fn monitoring_loop(
51    connection_stats: Vec<Arc<RwLock<ConnectionStats>>>,
52    health_check_interval: Duration,
53    manager_start_time: Instant,
54  ) {
55    log::info!(
56      "Health monitor started with interval: {:?}",
57      health_check_interval
58    );
59
60    loop {
61      sleep(health_check_interval).await;
62
63      // Collect health information
64      let mut healthy_connections = 0;
65      let mut total_symbols = 0;
66      let mut total_messages = 0;
67      let mut total_errors = 0;
68
69      for (i, stats_arc) in connection_stats.iter().enumerate() {
70        let mut stats = stats_arc.write().await;
71
72        // Update connection uptime
73        if stats.is_connected {
74          stats.connection_uptime = manager_start_time.elapsed();
75          healthy_connections += 1;
76        }
77
78        total_symbols += stats.symbol_count;
79        total_messages += stats.messages_received;
80        total_errors += stats.errors_count;
81
82        // Log individual connection health
83        log::debug!(
84          "Connection {}: healthy={}, symbols={}, messages={}, errors={}",
85          i,
86          stats.is_connected,
87          stats.symbol_count,
88          stats.messages_received,
89          stats.errors_count
90        );
91      }
92
93      // Log overall health
94      log::info!(
95                "Health Check: {}/{} connections healthy, {} total symbols, {} messages, {} errors",
96                healthy_connections,
97                connection_stats.len(),
98                total_symbols,
99                total_messages,
100                total_errors
101            );
102
103      // Alert on issues
104      if healthy_connections == 0 {
105        log::error!("CRITICAL: All connections are unhealthy!");
106      } else if healthy_connections < connection_stats.len() {
107        log::warn!(
108          "WARNING: {}/{} connections are unhealthy",
109          connection_stats.len() - healthy_connections,
110          connection_stats.len()
111        );
112      }
113    }
114  }
115
116  /// Get comprehensive manager statistics
117  pub async fn get_manager_stats(&self) -> ManagerStats {
118    let mut manager_stats = ManagerStats {
119      uptime: self.manager_start_time.elapsed(),
120      ..Default::default()
121    };
122
123    let mut active_connections = 0;
124
125    for stats_arc in &self.connection_stats {
126      let stats = stats_arc.read().await;
127
128      if stats.is_connected {
129        active_connections += 1;
130      }
131
132      manager_stats.total_symbols += stats.symbol_count;
133      manager_stats.total_messages_received += stats.messages_received;
134      manager_stats.total_errors += stats.errors_count;
135      manager_stats.connection_stats.push(stats.clone());
136    }
137
138    manager_stats.active_connections = active_connections;
139    manager_stats
140  }
141
142  /// Get health summary
143  pub async fn get_health_summary(&self) -> HealthSummary {
144    let mut summary = HealthSummary::default();
145
146    for (i, stats_arc) in self.connection_stats.iter().enumerate() {
147      let stats = stats_arc.read().await;
148
149      if stats.is_connected {
150        summary.healthy_connections += 1;
151      } else {
152        summary.unhealthy_connections.push(i);
153      }
154
155      summary.total_symbols += stats.symbol_count;
156      summary.total_messages += stats.messages_received;
157      summary.total_errors += stats.errors_count;
158
159      // Calculate message rate (messages per second over last minute)
160      if let Some(last_msg_time) = stats.last_message_time {
161        if last_msg_time.elapsed() < Duration::from_secs(60) {
162          summary.active_message_flows += 1;
163        }
164      }
165    }
166
167    summary.uptime = self.manager_start_time.elapsed();
168    summary
169  }
170
171  /// Stop the health monitor
172  pub async fn stop(&mut self) {
173    if let Some(handle) = self.monitoring_task.take() {
174      handle.abort();
175      let _ = handle.await;
176    }
177  }
178}
179
180/// Health summary for quick status checks
181#[derive(Debug, Clone, Default)]
182pub struct HealthSummary {
183  pub healthy_connections: usize,
184  pub unhealthy_connections: Vec<usize>,
185  pub total_symbols: usize,
186  pub total_messages: u64,
187  pub total_errors: u64,
188  pub active_message_flows: usize,
189  pub uptime: Duration,
190}
191
192impl HealthSummary {
193  /// Check if the system is healthy
194  pub fn is_healthy(&self) -> bool {
195    self.unhealthy_connections.is_empty() && self.total_errors == 0
196  }
197
198  /// Check if the system is degraded (some connections unhealthy)
199  pub fn is_degraded(&self) -> bool {
200    !self.unhealthy_connections.is_empty() && self.healthy_connections > 0
201  }
202
203  /// Check if the system is critical (all connections unhealthy)
204  pub fn is_critical(&self) -> bool {
205    self.healthy_connections == 0
206  }
207
208  /// Get health percentage (0-100)
209  pub fn health_percentage(&self) -> f64 {
210    let total_connections =
211      self.healthy_connections + self.unhealthy_connections.len();
212    if total_connections == 0 {
213      100.0
214    } else {
215      (self.healthy_connections as f64 / total_connections as f64) * 100.0
216    }
217  }
218}