use crate::base::Broker;
use crate::components::ComponentLifecycle;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinHandle;
#[derive(Debug, Clone)]
pub struct HealthcheckConfig {
pub interval: Duration,
}
impl Default for HealthcheckConfig {
fn default() -> Self {
Self {
interval: Duration::from_secs(15),
}
}
}
pub type HealthcheckFunc = Arc<dyn Fn() -> bool + Send + Sync>;
pub struct Healthcheck {
broker: Arc<dyn Broker>,
config: HealthcheckConfig,
done: Arc<AtomicBool>,
is_healthy: Arc<AtomicBool>,
custom_check: Option<HealthcheckFunc>,
}
impl Healthcheck {
pub fn new(broker: Arc<dyn Broker>, config: HealthcheckConfig) -> Self {
Self {
broker,
config,
done: Arc::new(AtomicBool::new(false)),
is_healthy: Arc::new(AtomicBool::new(true)),
custom_check: None,
}
}
pub fn with_custom_check(mut self, check: HealthcheckFunc) -> Self {
self.custom_check = Some(check);
self
}
pub fn start(self: Arc<Self>) -> JoinHandle<()> {
tracing::info!("starting healthcheck");
tokio::spawn(async move {
let mut interval = tokio::time::interval(self.config.interval);
loop {
interval.tick().await;
if self.done.load(Ordering::Relaxed) {
tracing::debug!("Healthcheck: shutting down");
break;
}
self.check().await;
}
})
}
async fn check(&self) {
let mut healthy = true;
if let Err(e) = self.broker.ping().await {
tracing::warn!("Healthcheck: Redis ping failed: {}", e);
healthy = false;
}
if let Some(ref check) = self.custom_check {
if !check() {
tracing::warn!("Healthcheck: custom check failed");
healthy = false;
}
}
self.is_healthy.store(healthy, Ordering::Relaxed);
}
pub fn is_healthy(&self) -> bool {
self.is_healthy.load(Ordering::Relaxed)
}
pub fn shutdown(&self) {
self.done.store(true, Ordering::Relaxed);
}
pub fn is_done(&self) -> bool {
self.done.load(Ordering::Relaxed)
}
}
impl ComponentLifecycle for Healthcheck {
fn start(self: Arc<Self>) -> JoinHandle<()> {
Healthcheck::start(self)
}
fn shutdown(&self) {
Healthcheck::shutdown(self)
}
fn is_done(&self) -> bool {
Healthcheck::is_done(self)
}
}
#[cfg(feature = "default")]
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_healthcheck_config_default() {
let config = HealthcheckConfig::default();
assert_eq!(config.interval, Duration::from_secs(15));
}
#[tokio::test]
async fn test_healthcheck_shutdown() {
use crate::backend::{RedisBroker, RedisConnectionType};
let redis_connection_config = RedisConnectionType::single("redis://localhost:6379").unwrap();
let broker = Arc::new(RedisBroker::new(redis_connection_config).await.unwrap());
let config = HealthcheckConfig::default();
let healthcheck = Healthcheck::new(broker, config);
assert!(!healthcheck.is_done());
assert!(healthcheck.is_healthy());
healthcheck.shutdown();
assert!(healthcheck.is_done());
}
#[tokio::test]
async fn test_healthcheck_with_custom_check() {
use crate::backend::{RedisBroker, RedisConnectionType};
let redis_connection_config = RedisConnectionType::single("redis://localhost:6379").unwrap();
let broker = Arc::new(RedisBroker::new(redis_connection_config).await.unwrap());
let config = HealthcheckConfig::default();
let custom_check: HealthcheckFunc = Arc::new(|| true);
let healthcheck = Healthcheck::new(broker, config).with_custom_check(custom_check);
assert!(healthcheck.is_healthy());
}
}