hyperstack_server/
health.rs1use std::sync::atomic::{AtomicU64, Ordering};
2use std::sync::Arc;
3use std::time::{Duration, Instant, SystemTime};
4use tokio::sync::RwLock;
5use tokio::time::interval;
6use tracing::{error, info, warn};
7
8#[derive(Clone)]
10pub struct SlotTracker {
11 last_slot: Arc<AtomicU64>,
12}
13
14impl SlotTracker {
15 pub fn new() -> Self {
16 Self {
17 last_slot: Arc::new(AtomicU64::new(0)),
18 }
19 }
20
21 pub fn record(&self, slot: u64) {
22 self.last_slot.fetch_max(slot, Ordering::Relaxed);
23 }
24
25 pub fn get(&self) -> u64 {
26 self.last_slot.load(Ordering::Relaxed)
27 }
28}
29
30impl Default for SlotTracker {
31 fn default() -> Self {
32 Self::new()
33 }
34}
35
36#[derive(Debug, Clone)]
37pub enum StreamStatus {
38 Connected,
39 Disconnected,
40 Reconnecting,
41 Error(String),
42}
43
44#[derive(Debug, Clone)]
46pub struct HealthConfig {
47 pub heartbeat_interval: Duration,
48 pub health_check_timeout: Duration,
49}
50
51impl Default for HealthConfig {
52 fn default() -> Self {
53 Self {
54 heartbeat_interval: Duration::from_secs(30),
55 health_check_timeout: Duration::from_secs(10),
56 }
57 }
58}
59
60impl HealthConfig {
61 pub fn new() -> Self {
62 Self::default()
63 }
64
65 pub fn with_heartbeat_interval(mut self, interval: Duration) -> Self {
66 self.heartbeat_interval = interval;
67 self
68 }
69
70 pub fn with_health_check_timeout(mut self, timeout: Duration) -> Self {
71 self.health_check_timeout = timeout;
72 self
73 }
74}
75
76pub struct HealthMonitor {
78 config: HealthConfig,
79 stream_status: Arc<RwLock<StreamStatus>>,
80 last_event_time: Arc<RwLock<Option<SystemTime>>>,
81 error_count: Arc<RwLock<u32>>,
82 connection_start_time: Arc<RwLock<Option<Instant>>>,
83}
84
85impl HealthMonitor {
86 pub fn new(config: HealthConfig) -> Self {
87 Self {
88 config,
89 stream_status: Arc::new(RwLock::new(StreamStatus::Disconnected)),
90 last_event_time: Arc::new(RwLock::new(None)),
91 error_count: Arc::new(RwLock::new(0)),
92 connection_start_time: Arc::new(RwLock::new(None)),
93 }
94 }
95
96 pub async fn start(&self) -> tokio::task::JoinHandle<()> {
98 let monitor = self.clone();
99
100 tokio::spawn(async move {
101 let mut interval = interval(monitor.config.heartbeat_interval);
102
103 loop {
104 interval.tick().await;
105 monitor.check_health().await;
106 }
107 })
108 }
109
110 pub async fn record_event(&self) {
112 *self.last_event_time.write().await = Some(SystemTime::now());
113 }
114
115 pub async fn record_connection(&self) {
117 *self.stream_status.write().await = StreamStatus::Connected;
118 *self.connection_start_time.write().await = Some(Instant::now());
119 info!("Stream connection established");
120 }
121
122 pub async fn record_disconnection(&self) {
124 *self.stream_status.write().await = StreamStatus::Disconnected;
125 *self.connection_start_time.write().await = None;
126 warn!("Stream disconnected");
127 }
128
129 pub async fn record_reconnecting(&self) {
131 *self.stream_status.write().await = StreamStatus::Reconnecting;
132 info!("Stream reconnecting");
133 }
134
135 pub async fn record_error(&self, error: String) {
137 *self.stream_status.write().await = StreamStatus::Error(error.clone());
138 *self.error_count.write().await += 1;
139 error!("Stream error: {}", error);
140 }
141
142 pub async fn is_healthy(&self) -> bool {
144 let status = self.stream_status.read().await;
145 let last_event_time = *self.last_event_time.read().await;
146
147 match *status {
148 StreamStatus::Connected => {
149 if let Some(last_event) = last_event_time {
151 let time_since_last_event = SystemTime::now()
152 .duration_since(last_event)
153 .unwrap_or(Duration::from_secs(u64::MAX));
154
155 time_since_last_event < (self.config.heartbeat_interval * 2)
157 } else {
158 let connection_time = self.connection_start_time.read().await;
160 if let Some(start_time) = *connection_time {
161 let time_since_connection = start_time.elapsed();
162 time_since_connection < Duration::from_secs(60)
164 } else {
165 false
166 }
167 }
168 }
169 StreamStatus::Reconnecting => true, _ => false,
171 }
172 }
173
174 pub async fn status(&self) -> StreamStatus {
176 self.stream_status.read().await.clone()
177 }
178
179 pub async fn error_count(&self) -> u32 {
181 *self.error_count.read().await
182 }
183
184 async fn check_health(&self) {
185 let is_healthy = self.is_healthy().await;
186 let status = self.stream_status.read().await.clone();
187
188 if !is_healthy {
189 match status {
190 StreamStatus::Connected => {
191 warn!("Stream appears to be stale - no recent events");
192 }
193 StreamStatus::Disconnected => {
194 warn!("Stream is disconnected");
195 }
196 StreamStatus::Error(ref error) => {
197 error!("Stream in error state: {}", error);
198 }
199 StreamStatus::Reconnecting => {
200 info!("Stream is reconnecting");
201 }
202 }
203 }
204 }
205}
206
207impl Clone for HealthMonitor {
208 fn clone(&self) -> Self {
209 Self {
210 config: self.config.clone(),
211 stream_status: Arc::clone(&self.stream_status),
212 last_event_time: Arc::clone(&self.last_event_time),
213 error_count: Arc::clone(&self.error_count),
214 connection_start_time: Arc::clone(&self.connection_start_time),
215 }
216 }
217}