hyperstack_server/
health.rs1use std::sync::Arc;
2use std::time::{Duration, Instant, SystemTime};
3use tokio::sync::RwLock;
4use tokio::time::interval;
5use tracing::{error, info, warn};
6
7#[derive(Debug, Clone)]
8pub enum StreamStatus {
9 Connected,
10 Disconnected,
11 Reconnecting,
12 Error(String),
13}
14
15#[derive(Debug, Clone)]
17pub struct HealthConfig {
18 pub heartbeat_interval: Duration,
19 pub health_check_timeout: Duration,
20}
21
22impl Default for HealthConfig {
23 fn default() -> Self {
24 Self {
25 heartbeat_interval: Duration::from_secs(30),
26 health_check_timeout: Duration::from_secs(10),
27 }
28 }
29}
30
31impl HealthConfig {
32 pub fn new() -> Self {
33 Self::default()
34 }
35
36 pub fn with_heartbeat_interval(mut self, interval: Duration) -> Self {
37 self.heartbeat_interval = interval;
38 self
39 }
40
41 pub fn with_health_check_timeout(mut self, timeout: Duration) -> Self {
42 self.health_check_timeout = timeout;
43 self
44 }
45}
46
47pub struct HealthMonitor {
49 config: HealthConfig,
50 stream_status: Arc<RwLock<StreamStatus>>,
51 last_event_time: Arc<RwLock<Option<SystemTime>>>,
52 error_count: Arc<RwLock<u32>>,
53 connection_start_time: Arc<RwLock<Option<Instant>>>,
54}
55
56impl HealthMonitor {
57 pub fn new(config: HealthConfig) -> Self {
58 Self {
59 config,
60 stream_status: Arc::new(RwLock::new(StreamStatus::Disconnected)),
61 last_event_time: Arc::new(RwLock::new(None)),
62 error_count: Arc::new(RwLock::new(0)),
63 connection_start_time: Arc::new(RwLock::new(None)),
64 }
65 }
66
67 pub async fn start(&self) -> tokio::task::JoinHandle<()> {
69 let monitor = self.clone();
70
71 tokio::spawn(async move {
72 let mut interval = interval(monitor.config.heartbeat_interval);
73
74 loop {
75 interval.tick().await;
76 monitor.check_health().await;
77 }
78 })
79 }
80
81 pub async fn record_event(&self) {
83 *self.last_event_time.write().await = Some(SystemTime::now());
84 }
85
86 pub async fn record_connection(&self) {
88 *self.stream_status.write().await = StreamStatus::Connected;
89 *self.connection_start_time.write().await = Some(Instant::now());
90 info!("Stream connection established");
91 }
92
93 pub async fn record_disconnection(&self) {
95 *self.stream_status.write().await = StreamStatus::Disconnected;
96 *self.connection_start_time.write().await = None;
97 warn!("Stream disconnected");
98 }
99
100 pub async fn record_reconnecting(&self) {
102 *self.stream_status.write().await = StreamStatus::Reconnecting;
103 info!("Stream reconnecting");
104 }
105
106 pub async fn record_error(&self, error: String) {
108 *self.stream_status.write().await = StreamStatus::Error(error.clone());
109 *self.error_count.write().await += 1;
110 error!("Stream error: {}", error);
111 }
112
113 pub async fn is_healthy(&self) -> bool {
115 let status = self.stream_status.read().await;
116 let last_event_time = *self.last_event_time.read().await;
117
118 match *status {
119 StreamStatus::Connected => {
120 if let Some(last_event) = last_event_time {
122 let time_since_last_event = SystemTime::now()
123 .duration_since(last_event)
124 .unwrap_or(Duration::from_secs(u64::MAX));
125
126 time_since_last_event < (self.config.heartbeat_interval * 2)
128 } else {
129 let connection_time = self.connection_start_time.read().await;
131 if let Some(start_time) = *connection_time {
132 let time_since_connection = start_time.elapsed();
133 time_since_connection < Duration::from_secs(60)
135 } else {
136 false
137 }
138 }
139 }
140 StreamStatus::Reconnecting => true, _ => false,
142 }
143 }
144
145 pub async fn status(&self) -> StreamStatus {
147 self.stream_status.read().await.clone()
148 }
149
150 pub async fn error_count(&self) -> u32 {
152 *self.error_count.read().await
153 }
154
155 async fn check_health(&self) {
156 let is_healthy = self.is_healthy().await;
157 let status = self.stream_status.read().await.clone();
158
159 if !is_healthy {
160 match status {
161 StreamStatus::Connected => {
162 warn!("Stream appears to be stale - no recent events");
163 }
164 StreamStatus::Disconnected => {
165 warn!("Stream is disconnected");
166 }
167 StreamStatus::Error(ref error) => {
168 error!("Stream in error state: {}", error);
169 }
170 StreamStatus::Reconnecting => {
171 info!("Stream is reconnecting");
172 }
173 }
174 }
175 }
176}
177
178impl Clone for HealthMonitor {
179 fn clone(&self) -> Self {
180 Self {
181 config: self.config.clone(),
182 stream_status: Arc::clone(&self.stream_status),
183 last_event_time: Arc::clone(&self.last_event_time),
184 error_count: Arc::clone(&self.error_count),
185 connection_start_time: Arc::clone(&self.connection_start_time),
186 }
187 }
188}