use std::sync::Arc;
use std::time::Instant;
use tokio::sync::RwLock;
use super::types::{
BackpressureStatus, ChannelHealthStatus, DatabaseHealthStatus, QueueDepthStatus,
};
#[derive(Debug, Clone)]
pub struct HealthStatusCaches {
db_status: Arc<RwLock<DatabaseHealthStatus>>,
channel_status: Arc<RwLock<ChannelHealthStatus>>,
queue_status: Arc<RwLock<QueueDepthStatus>>,
backpressure: Arc<RwLock<BackpressureStatus>>,
last_evaluated: Arc<RwLock<Option<Instant>>>,
}
impl Default for HealthStatusCaches {
fn default() -> Self {
Self::new()
}
}
impl HealthStatusCaches {
#[must_use]
pub fn new() -> Self {
Self {
db_status: Arc::new(RwLock::new(DatabaseHealthStatus::default())),
channel_status: Arc::new(RwLock::new(ChannelHealthStatus::default())),
queue_status: Arc::new(RwLock::new(QueueDepthStatus::default())),
backpressure: Arc::new(RwLock::new(BackpressureStatus::default())),
last_evaluated: Arc::new(RwLock::new(None)),
}
}
#[must_use]
pub fn db_status(&self) -> &Arc<RwLock<DatabaseHealthStatus>> {
&self.db_status
}
#[must_use]
pub fn channel_status(&self) -> &Arc<RwLock<ChannelHealthStatus>> {
&self.channel_status
}
#[must_use]
pub fn queue_status(&self) -> &Arc<RwLock<QueueDepthStatus>> {
&self.queue_status
}
#[must_use]
pub fn backpressure(&self) -> &Arc<RwLock<BackpressureStatus>> {
&self.backpressure
}
#[must_use]
pub fn last_evaluated(&self) -> &Arc<RwLock<Option<Instant>>> {
&self.last_evaluated
}
pub async fn get_db_status(&self) -> DatabaseHealthStatus {
self.db_status.read().await.clone()
}
pub async fn get_channel_status(&self) -> ChannelHealthStatus {
self.channel_status.read().await.clone()
}
pub async fn get_queue_status(&self) -> QueueDepthStatus {
self.queue_status.read().await.clone()
}
pub async fn get_backpressure(&self) -> BackpressureStatus {
self.backpressure.read().await.clone()
}
pub async fn time_since_evaluation(&self) -> Option<std::time::Duration> {
self.last_evaluated
.read()
.await
.map(|instant| instant.elapsed())
}
pub async fn is_stale(&self, stale_threshold: std::time::Duration) -> bool {
match *self.last_evaluated.read().await {
None => true, Some(instant) => instant.elapsed() > stale_threshold,
}
}
pub async fn set_db_status(&self, status: DatabaseHealthStatus) {
*self.db_status.write().await = status;
}
pub async fn set_channel_status(&self, status: ChannelHealthStatus) {
*self.channel_status.write().await = status;
}
pub async fn set_queue_status(&self, status: QueueDepthStatus) {
*self.queue_status.write().await = status;
}
pub async fn set_backpressure(&self, status: BackpressureStatus) {
*self.backpressure.write().await = status;
}
pub async fn mark_evaluated(&self) {
*self.last_evaluated.write().await = Some(Instant::now());
}
pub async fn update_all(
&self,
db_status: DatabaseHealthStatus,
channel_status: ChannelHealthStatus,
queue_status: QueueDepthStatus,
backpressure: BackpressureStatus,
) {
*self.db_status.write().await = db_status;
*self.channel_status.write().await = channel_status;
*self.queue_status.write().await = queue_status;
*self.backpressure.write().await = backpressure;
*self.last_evaluated.write().await = Some(Instant::now());
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::health::types::QueueDepthTier;
#[tokio::test]
async fn test_new_caches_have_defaults() {
let caches = HealthStatusCaches::new();
let db = caches.get_db_status().await;
assert!(!db.is_connected);
let channel = caches.get_channel_status().await;
assert_eq!(channel.command_saturation_percent, 0.0);
let queue = caches.get_queue_status().await;
assert_eq!(queue.tier, QueueDepthTier::Unknown);
assert_eq!(queue.max_depth, 0);
let bp = caches.get_backpressure().await;
assert!(!bp.active);
assert!(caches.time_since_evaluation().await.is_none());
}
#[tokio::test]
async fn test_update_and_read_db_status() {
let caches = HealthStatusCaches::new();
let new_status = DatabaseHealthStatus {
evaluated: true,
is_connected: true,
circuit_breaker_open: false,
circuit_breaker_failures: 0,
last_check_duration_ms: 5,
error_message: None,
};
caches.set_db_status(new_status.clone()).await;
let read_status = caches.get_db_status().await;
assert!(read_status.is_connected);
assert_eq!(read_status.last_check_duration_ms, 5);
}
#[tokio::test]
async fn test_update_and_read_queue_status() {
let caches = HealthStatusCaches::new();
let mut queue_depths = std::collections::HashMap::new();
queue_depths.insert("test_queue".to_string(), 500);
let new_status = QueueDepthStatus {
tier: QueueDepthTier::Warning,
max_depth: 500,
worst_queue: "test_queue".to_string(),
queue_depths,
};
caches.set_queue_status(new_status.clone()).await;
let read_status = caches.get_queue_status().await;
assert_eq!(read_status.tier, QueueDepthTier::Warning);
assert_eq!(read_status.max_depth, 500);
assert_eq!(read_status.worst_queue, "test_queue");
}
#[tokio::test]
async fn test_stale_detection_never_evaluated() {
let caches = HealthStatusCaches::new();
assert!(caches.is_stale(std::time::Duration::from_secs(10)).await);
}
#[tokio::test]
async fn test_stale_detection_recent_evaluation() {
let caches = HealthStatusCaches::new();
caches.mark_evaluated().await;
assert!(!caches.is_stale(std::time::Duration::from_secs(10)).await);
}
#[tokio::test]
async fn test_update_all_sets_timestamp() {
let caches = HealthStatusCaches::new();
assert!(caches.time_since_evaluation().await.is_none());
caches
.update_all(
DatabaseHealthStatus::default(),
ChannelHealthStatus::default(),
QueueDepthStatus::default(),
BackpressureStatus::default(),
)
.await;
assert!(caches.time_since_evaluation().await.is_some());
}
#[tokio::test]
async fn test_caches_are_cloneable_and_shared() {
let caches = HealthStatusCaches::new();
let caches_clone = caches.clone();
let new_status = BackpressureStatus {
active: true,
reason: Some("Test backpressure".to_string()),
retry_after_secs: Some(30),
source: None,
};
caches.set_backpressure(new_status).await;
let read_status = caches_clone.get_backpressure().await;
assert!(read_status.active);
assert_eq!(read_status.reason, Some("Test backpressure".to_string()));
}
}