use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use tokio::sync::RwLock;
use tokio::time::interval;
use tracing::{error, info, warn};
#[derive(Debug, Clone)]
pub enum StreamStatus {
Connected,
Disconnected,
Reconnecting,
Error(String),
}
#[derive(Debug, Clone)]
pub struct HealthConfig {
pub heartbeat_interval: Duration,
pub health_check_timeout: Duration,
}
impl Default for HealthConfig {
fn default() -> Self {
Self {
heartbeat_interval: Duration::from_secs(30),
health_check_timeout: Duration::from_secs(10),
}
}
}
impl HealthConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_heartbeat_interval(mut self, interval: Duration) -> Self {
self.heartbeat_interval = interval;
self
}
pub fn with_health_check_timeout(mut self, timeout: Duration) -> Self {
self.health_check_timeout = timeout;
self
}
}
pub struct HealthMonitor {
config: HealthConfig,
stream_status: Arc<RwLock<StreamStatus>>,
last_event_time: Arc<RwLock<Option<SystemTime>>>,
error_count: Arc<RwLock<u32>>,
connection_start_time: Arc<RwLock<Option<Instant>>>,
}
impl HealthMonitor {
pub fn new(config: HealthConfig) -> Self {
Self {
config,
stream_status: Arc::new(RwLock::new(StreamStatus::Disconnected)),
last_event_time: Arc::new(RwLock::new(None)),
error_count: Arc::new(RwLock::new(0)),
connection_start_time: Arc::new(RwLock::new(None)),
}
}
pub async fn start(&self) -> tokio::task::JoinHandle<()> {
let monitor = self.clone();
tokio::spawn(async move {
let mut interval = interval(monitor.config.heartbeat_interval);
loop {
interval.tick().await;
monitor.check_health().await;
}
})
}
pub async fn record_event(&self) {
*self.last_event_time.write().await = Some(SystemTime::now());
}
pub async fn record_connection(&self) {
*self.stream_status.write().await = StreamStatus::Connected;
*self.connection_start_time.write().await = Some(Instant::now());
info!("Stream connection established");
}
pub async fn record_disconnection(&self) {
*self.stream_status.write().await = StreamStatus::Disconnected;
*self.connection_start_time.write().await = None;
warn!("Stream disconnected");
}
pub async fn record_reconnecting(&self) {
*self.stream_status.write().await = StreamStatus::Reconnecting;
info!("Stream reconnecting");
}
pub async fn record_error(&self, error: String) {
*self.stream_status.write().await = StreamStatus::Error(error.clone());
*self.error_count.write().await += 1;
error!("Stream error: {}", error);
}
pub async fn is_healthy(&self) -> bool {
let status = self.stream_status.read().await;
let last_event_time = *self.last_event_time.read().await;
match *status {
StreamStatus::Connected => {
if let Some(last_event) = last_event_time {
let time_since_last_event = SystemTime::now()
.duration_since(last_event)
.unwrap_or(Duration::from_secs(u64::MAX));
time_since_last_event < (self.config.heartbeat_interval * 2)
} else {
let connection_time = self.connection_start_time.read().await;
if let Some(start_time) = *connection_time {
let time_since_connection = start_time.elapsed();
time_since_connection < Duration::from_secs(60)
} else {
false
}
}
}
StreamStatus::Reconnecting => true, _ => false,
}
}
pub async fn status(&self) -> StreamStatus {
self.stream_status.read().await.clone()
}
pub async fn error_count(&self) -> u32 {
*self.error_count.read().await
}
async fn check_health(&self) {
let is_healthy = self.is_healthy().await;
let status = self.stream_status.read().await.clone();
if !is_healthy {
match status {
StreamStatus::Connected => {
warn!("Stream appears to be stale - no recent events");
}
StreamStatus::Disconnected => {
warn!("Stream is disconnected");
}
StreamStatus::Error(ref error) => {
error!("Stream in error state: {}", error);
}
StreamStatus::Reconnecting => {
info!("Stream is reconnecting");
}
}
}
}
}
impl Clone for HealthMonitor {
fn clone(&self) -> Self {
Self {
config: self.config.clone(),
stream_status: Arc::clone(&self.stream_status),
last_event_time: Arc::clone(&self.last_event_time),
error_count: Arc::clone(&self.error_count),
connection_start_time: Arc::clone(&self.connection_start_time),
}
}
}