hyperstack_server/
health.rs1use std::collections::HashMap;
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::sync::Arc;
4use std::time::{Duration, Instant, SystemTime};
5use tokio::sync::RwLock;
6use tokio::time::interval;
7use tracing::{error, info, warn};
8
9#[derive(Clone)]
15pub struct SlotTracker {
16 last_slot: Arc<AtomicU64>,
17 notify: Arc<tokio::sync::Notify>,
18 slot_hashes: Arc<RwLock<HashMap<u64, String>>>,
20}
21
22impl SlotTracker {
23 pub fn new() -> Self {
24 Self {
25 last_slot: Arc::new(AtomicU64::new(0)),
26 notify: Arc::new(tokio::sync::Notify::new()),
27 slot_hashes: Arc::new(RwLock::new(HashMap::new())),
28 }
29 }
30
31 pub fn record(&self, slot: u64) {
32 let old = self.last_slot.fetch_max(slot, Ordering::Relaxed);
33 if slot > old {
34 self.notify.notify_waiters();
35 }
36 }
37
38 pub async fn record_slot_hash(&self, slot: u64, slot_hash: String) {
40 let mut hashes = self.slot_hashes.write().await;
41 hashes.insert(slot, slot_hash);
42
43 let slots_to_remove: Vec<u64> = hashes
45 .keys()
46 .filter(|&&s| s < slot.saturating_sub(1000))
47 .copied()
48 .collect();
49 for s in slots_to_remove {
50 hashes.remove(&s);
51 }
52 }
53
54 pub async fn get_slot_hash(&self, slot: u64) -> Option<String> {
56 let hashes = self.slot_hashes.read().await;
57 hashes.get(&slot).cloned()
58 }
59
60 pub fn get(&self) -> u64 {
61 self.last_slot.load(Ordering::Relaxed)
62 }
63
64 pub fn notified(&self) -> impl std::future::Future<Output = ()> + '_ {
67 self.notify.notified()
68 }
69}
70
71impl Default for SlotTracker {
72 fn default() -> Self {
73 Self::new()
74 }
75}
76
77static GLOBAL_SLOT_TRACKER: once_cell::sync::Lazy<Arc<tokio::sync::RwLock<Option<SlotTracker>>>> =
79 once_cell::sync::Lazy::new(|| Arc::new(tokio::sync::RwLock::new(None)));
80
81pub async fn init_global_slot_tracker(slot_tracker: SlotTracker) {
83 let mut global = GLOBAL_SLOT_TRACKER.write().await;
84 *global = Some(slot_tracker);
85}
86
87pub async fn get_slot_hash(slot: u64) -> Option<String> {
90 let global = GLOBAL_SLOT_TRACKER.read().await;
91 if let Some(ref tracker) = *global {
92 tracker.get_slot_hash(slot).await
93 } else {
94 None
95 }
96}
97
98#[derive(Debug, Clone)]
99pub enum StreamStatus {
100 Connected,
101 Disconnected,
102 Reconnecting,
103 Error(String),
104}
105
106#[derive(Debug, Clone)]
108pub struct HealthConfig {
109 pub heartbeat_interval: Duration,
110 pub health_check_timeout: Duration,
111}
112
113impl Default for HealthConfig {
114 fn default() -> Self {
115 Self {
116 heartbeat_interval: Duration::from_secs(30),
117 health_check_timeout: Duration::from_secs(10),
118 }
119 }
120}
121
122impl HealthConfig {
123 pub fn new() -> Self {
124 Self::default()
125 }
126
127 pub fn with_heartbeat_interval(mut self, interval: Duration) -> Self {
128 self.heartbeat_interval = interval;
129 self
130 }
131
132 pub fn with_health_check_timeout(mut self, timeout: Duration) -> Self {
133 self.health_check_timeout = timeout;
134 self
135 }
136}
137
138pub struct HealthMonitor {
140 config: HealthConfig,
141 stream_status: Arc<RwLock<StreamStatus>>,
142 last_event_time: Arc<RwLock<Option<SystemTime>>>,
143 error_count: Arc<RwLock<u32>>,
144 connection_start_time: Arc<RwLock<Option<Instant>>>,
145}
146
147impl HealthMonitor {
148 pub fn new(config: HealthConfig) -> Self {
149 Self {
150 config,
151 stream_status: Arc::new(RwLock::new(StreamStatus::Disconnected)),
152 last_event_time: Arc::new(RwLock::new(None)),
153 error_count: Arc::new(RwLock::new(0)),
154 connection_start_time: Arc::new(RwLock::new(None)),
155 }
156 }
157
158 pub async fn start(&self) -> tokio::task::JoinHandle<()> {
160 let monitor = self.clone();
161
162 tokio::spawn(async move {
163 let mut interval = interval(monitor.config.heartbeat_interval);
164
165 loop {
166 interval.tick().await;
167 monitor.check_health().await;
168 }
169 })
170 }
171
172 pub async fn record_event(&self) {
174 *self.last_event_time.write().await = Some(SystemTime::now());
175 }
176
177 pub async fn record_connection(&self) {
179 *self.stream_status.write().await = StreamStatus::Connected;
180 *self.connection_start_time.write().await = Some(Instant::now());
181 info!("Stream connection established");
182 }
183
184 pub async fn record_disconnection(&self) {
186 *self.stream_status.write().await = StreamStatus::Disconnected;
187 *self.connection_start_time.write().await = None;
188 warn!("Stream disconnected");
189 }
190
191 pub async fn record_reconnecting(&self) {
193 *self.stream_status.write().await = StreamStatus::Reconnecting;
194 info!("Stream reconnecting");
195 }
196
197 pub async fn record_error(&self, error: String) {
199 *self.stream_status.write().await = StreamStatus::Error(error.clone());
200 *self.error_count.write().await += 1;
201 error!("Stream error: {}", error);
202 }
203
204 pub async fn is_healthy(&self) -> bool {
206 let status = self.stream_status.read().await;
207 let last_event_time = *self.last_event_time.read().await;
208
209 match *status {
210 StreamStatus::Connected => {
211 if let Some(last_event) = last_event_time {
213 let time_since_last_event = SystemTime::now()
214 .duration_since(last_event)
215 .unwrap_or(Duration::from_secs(u64::MAX));
216
217 time_since_last_event < (self.config.heartbeat_interval * 2)
219 } else {
220 let connection_time = self.connection_start_time.read().await;
222 if let Some(start_time) = *connection_time {
223 let time_since_connection = start_time.elapsed();
224 time_since_connection < Duration::from_secs(60)
226 } else {
227 false
228 }
229 }
230 }
231 StreamStatus::Reconnecting => true, _ => false,
233 }
234 }
235
236 pub async fn status(&self) -> StreamStatus {
238 self.stream_status.read().await.clone()
239 }
240
241 pub async fn error_count(&self) -> u32 {
243 *self.error_count.read().await
244 }
245
246 async fn check_health(&self) {
247 let is_healthy = self.is_healthy().await;
248 let status = self.stream_status.read().await.clone();
249
250 if !is_healthy {
251 match status {
252 StreamStatus::Connected => {
253 warn!("Stream appears to be stale - no recent events");
254 }
255 StreamStatus::Disconnected => {
256 warn!("Stream is disconnected");
257 }
258 StreamStatus::Error(ref error) => {
259 error!("Stream in error state: {}", error);
260 }
261 StreamStatus::Reconnecting => {
262 info!("Stream is reconnecting");
263 }
264 }
265 }
266 }
267}
268
269impl Clone for HealthMonitor {
270 fn clone(&self) -> Self {
271 Self {
272 config: self.config.clone(),
273 stream_status: Arc::clone(&self.stream_status),
274 last_event_time: Arc::clone(&self.last_event_time),
275 error_count: Arc::clone(&self.error_count),
276 connection_start_time: Arc::clone(&self.connection_start_time),
277 }
278 }
279}