Skip to main content

hyperstack_server/
health.rs

1use std::collections::HashMap;
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::sync::Arc;
4use std::time::{Duration, Instant, SystemTime};
5use tokio::sync::RwLock;
6use tokio::time::interval;
7use tracing::{error, info, warn};
8
9/// Tracks the last processed slot for stream resumption after reconnection.
10///
11/// Includes a `Notify` that wakes waiters whenever the slot advances,
12/// allowing the scheduler to react immediately to new slots instead of polling.
13/// Also maintains a cache of slot hashes indexed by slot number.
14#[derive(Clone)]
15pub struct SlotTracker {
16    last_slot: Arc<AtomicU64>,
17    notify: Arc<tokio::sync::Notify>,
18    /// Cache of slot hashes indexed by slot number
19    slot_hashes: Arc<RwLock<HashMap<u64, String>>>,
20}
21
22impl SlotTracker {
23    pub fn new() -> Self {
24        Self {
25            last_slot: Arc::new(AtomicU64::new(0)),
26            notify: Arc::new(tokio::sync::Notify::new()),
27            slot_hashes: Arc::new(RwLock::new(HashMap::new())),
28        }
29    }
30
31    pub fn record(&self, slot: u64) {
32        let old = self.last_slot.fetch_max(slot, Ordering::Relaxed);
33        if slot > old {
34            self.notify.notify_waiters();
35        }
36    }
37
38    /// Record a slot hash for a specific slot
39    pub async fn record_slot_hash(&self, slot: u64, slot_hash: String) {
40        let mut hashes = self.slot_hashes.write().await;
41        hashes.insert(slot, slot_hash);
42
43        // Prune old entries to prevent unbounded growth (keep last 10000 slots)
44        let slots_to_remove: Vec<u64> = hashes
45            .keys()
46            .filter(|&&s| s < slot.saturating_sub(1000))
47            .copied()
48            .collect();
49        for s in slots_to_remove {
50            hashes.remove(&s);
51        }
52    }
53
54    /// Get the slot hash for a specific slot (if cached)
55    pub async fn get_slot_hash(&self, slot: u64) -> Option<String> {
56        let hashes = self.slot_hashes.read().await;
57        hashes.get(&slot).cloned()
58    }
59
60    pub fn get(&self) -> u64 {
61        self.last_slot.load(Ordering::Relaxed)
62    }
63
64    /// Returns a future that resolves when the slot advances.
65    /// Use with `tokio::select!` and a fallback timeout.
66    pub fn notified(&self) -> impl std::future::Future<Output = ()> + '_ {
67        self.notify.notified()
68    }
69}
70
71impl Default for SlotTracker {
72    fn default() -> Self {
73        Self::new()
74    }
75}
76
77/// Global slot tracker instance for accessing slot hashes from computed fields
78static GLOBAL_SLOT_TRACKER: once_cell::sync::Lazy<Arc<tokio::sync::RwLock<Option<SlotTracker>>>> =
79    once_cell::sync::Lazy::new(|| Arc::new(tokio::sync::RwLock::new(None)));
80
81/// Initialize the global slot tracker
82pub async fn init_global_slot_tracker(slot_tracker: SlotTracker) {
83    let mut global = GLOBAL_SLOT_TRACKER.write().await;
84    *global = Some(slot_tracker);
85}
86
87/// Get the slot hash for a specific slot (if available)
88/// This can be called from computed fields
89pub async fn get_slot_hash(slot: u64) -> Option<String> {
90    let global = GLOBAL_SLOT_TRACKER.read().await;
91    if let Some(ref tracker) = *global {
92        tracker.get_slot_hash(slot).await
93    } else {
94        None
95    }
96}
97
98#[derive(Debug, Clone)]
99pub enum StreamStatus {
100    Connected,
101    Disconnected,
102    Reconnecting,
103    Error(String),
104}
105
106/// Configuration for health monitoring
107#[derive(Debug, Clone)]
108pub struct HealthConfig {
109    pub heartbeat_interval: Duration,
110    pub health_check_timeout: Duration,
111}
112
113impl Default for HealthConfig {
114    fn default() -> Self {
115        Self {
116            heartbeat_interval: Duration::from_secs(30),
117            health_check_timeout: Duration::from_secs(10),
118        }
119    }
120}
121
122impl HealthConfig {
123    pub fn new() -> Self {
124        Self::default()
125    }
126
127    pub fn with_heartbeat_interval(mut self, interval: Duration) -> Self {
128        self.heartbeat_interval = interval;
129        self
130    }
131
132    pub fn with_health_check_timeout(mut self, timeout: Duration) -> Self {
133        self.health_check_timeout = timeout;
134        self
135    }
136}
137
138/// Health monitor for tracking stream status and connectivity
139pub struct HealthMonitor {
140    config: HealthConfig,
141    stream_status: Arc<RwLock<StreamStatus>>,
142    last_event_time: Arc<RwLock<Option<SystemTime>>>,
143    error_count: Arc<RwLock<u32>>,
144    connection_start_time: Arc<RwLock<Option<Instant>>>,
145}
146
147impl HealthMonitor {
148    pub fn new(config: HealthConfig) -> Self {
149        Self {
150            config,
151            stream_status: Arc::new(RwLock::new(StreamStatus::Disconnected)),
152            last_event_time: Arc::new(RwLock::new(None)),
153            error_count: Arc::new(RwLock::new(0)),
154            connection_start_time: Arc::new(RwLock::new(None)),
155        }
156    }
157
158    /// Start the health monitoring background task
159    pub async fn start(&self) -> tokio::task::JoinHandle<()> {
160        let monitor = self.clone();
161
162        tokio::spawn(async move {
163            let mut interval = interval(monitor.config.heartbeat_interval);
164
165            loop {
166                interval.tick().await;
167                monitor.check_health().await;
168            }
169        })
170    }
171
172    /// Record that an event was received from the stream
173    pub async fn record_event(&self) {
174        *self.last_event_time.write().await = Some(SystemTime::now());
175    }
176
177    /// Record that the stream connection was established
178    pub async fn record_connection(&self) {
179        *self.stream_status.write().await = StreamStatus::Connected;
180        *self.connection_start_time.write().await = Some(Instant::now());
181        info!("Stream connection established");
182    }
183
184    /// Record that the stream disconnected
185    pub async fn record_disconnection(&self) {
186        *self.stream_status.write().await = StreamStatus::Disconnected;
187        *self.connection_start_time.write().await = None;
188        warn!("Stream disconnected");
189    }
190
191    /// Record that the stream is attempting to reconnect
192    pub async fn record_reconnecting(&self) {
193        *self.stream_status.write().await = StreamStatus::Reconnecting;
194        info!("Stream reconnecting");
195    }
196
197    /// Record an error from the stream
198    pub async fn record_error(&self, error: String) {
199        *self.stream_status.write().await = StreamStatus::Error(error.clone());
200        *self.error_count.write().await += 1;
201        error!("Stream error: {}", error);
202    }
203
204    /// Check if the stream is currently healthy
205    pub async fn is_healthy(&self) -> bool {
206        let status = self.stream_status.read().await;
207        let last_event_time = *self.last_event_time.read().await;
208
209        match *status {
210            StreamStatus::Connected => {
211                // Check if we've received events recently
212                if let Some(last_event) = last_event_time {
213                    let time_since_last_event = SystemTime::now()
214                        .duration_since(last_event)
215                        .unwrap_or(Duration::from_secs(u64::MAX));
216
217                    // Consider unhealthy if no events for 2x heartbeat interval
218                    time_since_last_event < (self.config.heartbeat_interval * 2)
219                } else {
220                    // No events yet, but connected - might be waiting for first event
221                    let connection_time = self.connection_start_time.read().await;
222                    if let Some(start_time) = *connection_time {
223                        let time_since_connection = start_time.elapsed();
224                        // Give it some time to receive first event
225                        time_since_connection < Duration::from_secs(60)
226                    } else {
227                        false
228                    }
229                }
230            }
231            StreamStatus::Reconnecting => true, // Considered healthy if actively reconnecting
232            _ => false,
233        }
234    }
235
236    /// Get the current stream status
237    pub async fn status(&self) -> StreamStatus {
238        self.stream_status.read().await.clone()
239    }
240
241    /// Get the current error count
242    pub async fn error_count(&self) -> u32 {
243        *self.error_count.read().await
244    }
245
246    async fn check_health(&self) {
247        let is_healthy = self.is_healthy().await;
248        let status = self.stream_status.read().await.clone();
249
250        if !is_healthy {
251            match status {
252                StreamStatus::Connected => {
253                    warn!("Stream appears to be stale - no recent events");
254                }
255                StreamStatus::Disconnected => {
256                    warn!("Stream is disconnected");
257                }
258                StreamStatus::Error(ref error) => {
259                    error!("Stream in error state: {}", error);
260                }
261                StreamStatus::Reconnecting => {
262                    info!("Stream is reconnecting");
263                }
264            }
265        }
266    }
267}
268
269impl Clone for HealthMonitor {
270    fn clone(&self) -> Self {
271        Self {
272            config: self.config.clone(),
273            stream_status: Arc::clone(&self.stream_status),
274            last_event_time: Arc::clone(&self.last_event_time),
275            error_count: Arc::clone(&self.error_count),
276            connection_start_time: Arc::clone(&self.connection_start_time),
277        }
278    }
279}