use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use tokio::sync::{broadcast, mpsc};
use tokio::task::JoinHandle;
use crate::client::CortexHandle;
use crate::config::HealthConfig;
use crate::types::CortexEvent;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum HealthStatus {
Healthy,
Degraded { consecutive_failures: u32 },
Unhealthy { consecutive_failures: u32 },
}
pub struct HealthMonitor {
handle: Option<JoinHandle<()>>,
running: Arc<AtomicBool>,
}
impl HealthMonitor {
pub fn start(
cortex_handle: CortexHandle,
event_rx: broadcast::Receiver<CortexEvent>,
config: &HealthConfig,
) -> (Self, mpsc::Receiver<HealthStatus>) {
let interval = Duration::from_secs(config.interval_secs);
let max_failures = config.max_consecutive_failures;
let running = Arc::new(AtomicBool::new(true));
let (tx, rx) = mpsc::channel(16);
let handle = {
let running = Arc::clone(&running);
let mut event_rx = event_rx;
tokio::spawn(async move {
let mut consecutive_failures: u32 = 0;
while running.load(Ordering::SeqCst) {
tokio::time::sleep(interval).await;
if !running.load(Ordering::SeqCst) {
break;
}
if let Err(e) = cortex_handle.get_cortex_info().await {
consecutive_failures += 1;
tracing::warn!(
consecutive_failures,
error = %e,
"Health check: failed to send ping"
);
emit_status(&tx, consecutive_failures, max_failures).await;
continue;
}
let ping_timeout = interval / 2;
let got_response = tokio::time::timeout(ping_timeout, async {
loop {
match event_rx.recv().await {
Ok(CortexEvent::CortexInfo(_)) => return true,
Ok(_) => continue, Err(broadcast::error::RecvError::Lagged(_)) => {
continue;
}
Err(broadcast::error::RecvError::Closed) => return false,
}
}
})
.await;
match got_response {
Ok(true) => {
if consecutive_failures > 0 {
tracing::info!(
previous_failures = consecutive_failures,
"Health check recovered"
);
}
consecutive_failures = 0;
let _ = tx.try_send(HealthStatus::Healthy);
}
_ => {
consecutive_failures += 1;
tracing::warn!(
consecutive_failures,
"Health check: no response to ping"
);
emit_status(&tx, consecutive_failures, max_failures).await;
}
}
}
tracing::debug!("Health monitor stopped");
})
};
(Self { handle: Some(handle), running }, rx)
}
pub async fn stop(&mut self) {
self.running.store(false, Ordering::SeqCst);
if let Some(handle) = self.handle.take() {
let _ = tokio::time::timeout(Duration::from_secs(5), handle).await;
}
}
#[must_use]
pub fn is_running(&self) -> bool {
self.running.load(Ordering::SeqCst)
}
}
impl Drop for HealthMonitor {
fn drop(&mut self) {
self.running.store(false, Ordering::SeqCst);
if let Some(handle) = self.handle.take() {
handle.abort();
}
}
}
async fn emit_status(
tx: &mpsc::Sender<HealthStatus>,
consecutive_failures: u32,
max_failures: u32,
) {
let status = if consecutive_failures >= max_failures {
HealthStatus::Unhealthy { consecutive_failures }
} else {
HealthStatus::Degraded { consecutive_failures }
};
let _ = tx.try_send(status);
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_health_status_variants() {
let healthy = HealthStatus::Healthy;
let degraded = HealthStatus::Degraded { consecutive_failures: 2 };
let unhealthy = HealthStatus::Unhealthy { consecutive_failures: 5 };
assert_eq!(healthy, HealthStatus::Healthy);
assert_eq!(degraded, HealthStatus::Degraded { consecutive_failures: 2 });
assert_ne!(healthy, unhealthy);
}
}