hyperstack_server/
health.rs

1use std::sync::Arc;
2use std::time::{Duration, Instant, SystemTime};
3use tokio::sync::RwLock;
4use tokio::time::interval;
5use tracing::{error, info, warn};
6
7#[derive(Debug, Clone)]
8pub enum StreamStatus {
9    Connected,
10    Disconnected,
11    Reconnecting,
12    Error(String),
13}
14
15/// Configuration for health monitoring
16#[derive(Debug, Clone)]
17pub struct HealthConfig {
18    pub heartbeat_interval: Duration,
19    pub health_check_timeout: Duration,
20}
21
22impl Default for HealthConfig {
23    fn default() -> Self {
24        Self {
25            heartbeat_interval: Duration::from_secs(30),
26            health_check_timeout: Duration::from_secs(10),
27        }
28    }
29}
30
31impl HealthConfig {
32    pub fn new() -> Self {
33        Self::default()
34    }
35
36    pub fn with_heartbeat_interval(mut self, interval: Duration) -> Self {
37        self.heartbeat_interval = interval;
38        self
39    }
40
41    pub fn with_health_check_timeout(mut self, timeout: Duration) -> Self {
42        self.health_check_timeout = timeout;
43        self
44    }
45}
46
47/// Health monitor for tracking stream status and connectivity
48pub struct HealthMonitor {
49    config: HealthConfig,
50    stream_status: Arc<RwLock<StreamStatus>>,
51    last_event_time: Arc<RwLock<Option<SystemTime>>>,
52    error_count: Arc<RwLock<u32>>,
53    connection_start_time: Arc<RwLock<Option<Instant>>>,
54}
55
56impl HealthMonitor {
57    pub fn new(config: HealthConfig) -> Self {
58        Self {
59            config,
60            stream_status: Arc::new(RwLock::new(StreamStatus::Disconnected)),
61            last_event_time: Arc::new(RwLock::new(None)),
62            error_count: Arc::new(RwLock::new(0)),
63            connection_start_time: Arc::new(RwLock::new(None)),
64        }
65    }
66
67    /// Start the health monitoring background task
68    pub async fn start(&self) -> tokio::task::JoinHandle<()> {
69        let monitor = self.clone();
70
71        tokio::spawn(async move {
72            let mut interval = interval(monitor.config.heartbeat_interval);
73
74            loop {
75                interval.tick().await;
76                monitor.check_health().await;
77            }
78        })
79    }
80
81    /// Record that an event was received from the stream
82    pub async fn record_event(&self) {
83        *self.last_event_time.write().await = Some(SystemTime::now());
84    }
85
86    /// Record that the stream connection was established
87    pub async fn record_connection(&self) {
88        *self.stream_status.write().await = StreamStatus::Connected;
89        *self.connection_start_time.write().await = Some(Instant::now());
90        info!("Stream connection established");
91    }
92
93    /// Record that the stream disconnected
94    pub async fn record_disconnection(&self) {
95        *self.stream_status.write().await = StreamStatus::Disconnected;
96        *self.connection_start_time.write().await = None;
97        warn!("Stream disconnected");
98    }
99
100    /// Record that the stream is attempting to reconnect
101    pub async fn record_reconnecting(&self) {
102        *self.stream_status.write().await = StreamStatus::Reconnecting;
103        info!("Stream reconnecting");
104    }
105
106    /// Record an error from the stream
107    pub async fn record_error(&self, error: String) {
108        *self.stream_status.write().await = StreamStatus::Error(error.clone());
109        *self.error_count.write().await += 1;
110        error!("Stream error: {}", error);
111    }
112
113    /// Check if the stream is currently healthy
114    pub async fn is_healthy(&self) -> bool {
115        let status = self.stream_status.read().await;
116        let last_event_time = *self.last_event_time.read().await;
117
118        match *status {
119            StreamStatus::Connected => {
120                // Check if we've received events recently
121                if let Some(last_event) = last_event_time {
122                    let time_since_last_event = SystemTime::now()
123                        .duration_since(last_event)
124                        .unwrap_or(Duration::from_secs(u64::MAX));
125
126                    // Consider unhealthy if no events for 2x heartbeat interval
127                    time_since_last_event < (self.config.heartbeat_interval * 2)
128                } else {
129                    // No events yet, but connected - might be waiting for first event
130                    let connection_time = self.connection_start_time.read().await;
131                    if let Some(start_time) = *connection_time {
132                        let time_since_connection = start_time.elapsed();
133                        // Give it some time to receive first event
134                        time_since_connection < Duration::from_secs(60)
135                    } else {
136                        false
137                    }
138                }
139            }
140            StreamStatus::Reconnecting => true, // Considered healthy if actively reconnecting
141            _ => false,
142        }
143    }
144
145    /// Get the current stream status
146    pub async fn status(&self) -> StreamStatus {
147        self.stream_status.read().await.clone()
148    }
149
150    /// Get the current error count
151    pub async fn error_count(&self) -> u32 {
152        *self.error_count.read().await
153    }
154
155    async fn check_health(&self) {
156        let is_healthy = self.is_healthy().await;
157        let status = self.stream_status.read().await.clone();
158
159        if !is_healthy {
160            match status {
161                StreamStatus::Connected => {
162                    warn!("Stream appears to be stale - no recent events");
163                }
164                StreamStatus::Disconnected => {
165                    warn!("Stream is disconnected");
166                }
167                StreamStatus::Error(ref error) => {
168                    error!("Stream in error state: {}", error);
169                }
170                StreamStatus::Reconnecting => {
171                    info!("Stream is reconnecting");
172                }
173            }
174        }
175    }
176}
177
178impl Clone for HealthMonitor {
179    fn clone(&self) -> Self {
180        Self {
181            config: self.config.clone(),
182            stream_status: Arc::clone(&self.stream_status),
183            last_event_time: Arc::clone(&self.last_event_time),
184            error_count: Arc::clone(&self.error_count),
185            connection_start_time: Arc::clone(&self.connection_start_time),
186        }
187    }
188}