hyperstack_server/
health.rs

1use std::sync::atomic::{AtomicU64, Ordering};
2use std::sync::Arc;
3use std::time::{Duration, Instant, SystemTime};
4use tokio::sync::RwLock;
5use tokio::time::interval;
6use tracing::{error, info, warn};
7
8/// Tracks the last processed slot for stream resumption after reconnection
9#[derive(Clone)]
10pub struct SlotTracker {
11    last_slot: Arc<AtomicU64>,
12}
13
14impl SlotTracker {
15    pub fn new() -> Self {
16        Self {
17            last_slot: Arc::new(AtomicU64::new(0)),
18        }
19    }
20
21    pub fn record(&self, slot: u64) {
22        self.last_slot.fetch_max(slot, Ordering::Relaxed);
23    }
24
25    pub fn get(&self) -> u64 {
26        self.last_slot.load(Ordering::Relaxed)
27    }
28}
29
30impl Default for SlotTracker {
31    fn default() -> Self {
32        Self::new()
33    }
34}
35
36#[derive(Debug, Clone)]
37pub enum StreamStatus {
38    Connected,
39    Disconnected,
40    Reconnecting,
41    Error(String),
42}
43
44/// Configuration for health monitoring
45#[derive(Debug, Clone)]
46pub struct HealthConfig {
47    pub heartbeat_interval: Duration,
48    pub health_check_timeout: Duration,
49}
50
51impl Default for HealthConfig {
52    fn default() -> Self {
53        Self {
54            heartbeat_interval: Duration::from_secs(30),
55            health_check_timeout: Duration::from_secs(10),
56        }
57    }
58}
59
60impl HealthConfig {
61    pub fn new() -> Self {
62        Self::default()
63    }
64
65    pub fn with_heartbeat_interval(mut self, interval: Duration) -> Self {
66        self.heartbeat_interval = interval;
67        self
68    }
69
70    pub fn with_health_check_timeout(mut self, timeout: Duration) -> Self {
71        self.health_check_timeout = timeout;
72        self
73    }
74}
75
76/// Health monitor for tracking stream status and connectivity
77pub struct HealthMonitor {
78    config: HealthConfig,
79    stream_status: Arc<RwLock<StreamStatus>>,
80    last_event_time: Arc<RwLock<Option<SystemTime>>>,
81    error_count: Arc<RwLock<u32>>,
82    connection_start_time: Arc<RwLock<Option<Instant>>>,
83}
84
85impl HealthMonitor {
86    pub fn new(config: HealthConfig) -> Self {
87        Self {
88            config,
89            stream_status: Arc::new(RwLock::new(StreamStatus::Disconnected)),
90            last_event_time: Arc::new(RwLock::new(None)),
91            error_count: Arc::new(RwLock::new(0)),
92            connection_start_time: Arc::new(RwLock::new(None)),
93        }
94    }
95
96    /// Start the health monitoring background task
97    pub async fn start(&self) -> tokio::task::JoinHandle<()> {
98        let monitor = self.clone();
99
100        tokio::spawn(async move {
101            let mut interval = interval(monitor.config.heartbeat_interval);
102
103            loop {
104                interval.tick().await;
105                monitor.check_health().await;
106            }
107        })
108    }
109
110    /// Record that an event was received from the stream
111    pub async fn record_event(&self) {
112        *self.last_event_time.write().await = Some(SystemTime::now());
113    }
114
115    /// Record that the stream connection was established
116    pub async fn record_connection(&self) {
117        *self.stream_status.write().await = StreamStatus::Connected;
118        *self.connection_start_time.write().await = Some(Instant::now());
119        info!("Stream connection established");
120    }
121
122    /// Record that the stream disconnected
123    pub async fn record_disconnection(&self) {
124        *self.stream_status.write().await = StreamStatus::Disconnected;
125        *self.connection_start_time.write().await = None;
126        warn!("Stream disconnected");
127    }
128
129    /// Record that the stream is attempting to reconnect
130    pub async fn record_reconnecting(&self) {
131        *self.stream_status.write().await = StreamStatus::Reconnecting;
132        info!("Stream reconnecting");
133    }
134
135    /// Record an error from the stream
136    pub async fn record_error(&self, error: String) {
137        *self.stream_status.write().await = StreamStatus::Error(error.clone());
138        *self.error_count.write().await += 1;
139        error!("Stream error: {}", error);
140    }
141
142    /// Check if the stream is currently healthy
143    pub async fn is_healthy(&self) -> bool {
144        let status = self.stream_status.read().await;
145        let last_event_time = *self.last_event_time.read().await;
146
147        match *status {
148            StreamStatus::Connected => {
149                // Check if we've received events recently
150                if let Some(last_event) = last_event_time {
151                    let time_since_last_event = SystemTime::now()
152                        .duration_since(last_event)
153                        .unwrap_or(Duration::from_secs(u64::MAX));
154
155                    // Consider unhealthy if no events for 2x heartbeat interval
156                    time_since_last_event < (self.config.heartbeat_interval * 2)
157                } else {
158                    // No events yet, but connected - might be waiting for first event
159                    let connection_time = self.connection_start_time.read().await;
160                    if let Some(start_time) = *connection_time {
161                        let time_since_connection = start_time.elapsed();
162                        // Give it some time to receive first event
163                        time_since_connection < Duration::from_secs(60)
164                    } else {
165                        false
166                    }
167                }
168            }
169            StreamStatus::Reconnecting => true, // Considered healthy if actively reconnecting
170            _ => false,
171        }
172    }
173
174    /// Get the current stream status
175    pub async fn status(&self) -> StreamStatus {
176        self.stream_status.read().await.clone()
177    }
178
179    /// Get the current error count
180    pub async fn error_count(&self) -> u32 {
181        *self.error_count.read().await
182    }
183
184    async fn check_health(&self) {
185        let is_healthy = self.is_healthy().await;
186        let status = self.stream_status.read().await.clone();
187
188        if !is_healthy {
189            match status {
190                StreamStatus::Connected => {
191                    warn!("Stream appears to be stale - no recent events");
192                }
193                StreamStatus::Disconnected => {
194                    warn!("Stream is disconnected");
195                }
196                StreamStatus::Error(ref error) => {
197                    error!("Stream in error state: {}", error);
198                }
199                StreamStatus::Reconnecting => {
200                    info!("Stream is reconnecting");
201                }
202            }
203        }
204    }
205}
206
207impl Clone for HealthMonitor {
208    fn clone(&self) -> Self {
209        Self {
210            config: self.config.clone(),
211            stream_status: Arc::clone(&self.stream_status),
212            last_event_time: Arc::clone(&self.last_event_time),
213            error_count: Arc::clone(&self.error_count),
214            connection_start_time: Arc::clone(&self.connection_start_time),
215        }
216    }
217}