use sqlx::PgPool;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tasker_shared::metrics::health as health_metrics;
use tasker_shared::monitoring::channel_metrics::ChannelMonitor;
use tokio::task::JoinHandle;
use tracing::{debug, error, info, warn};
use super::caches::HealthStatusCaches;
use super::channel_status::evaluate_channel_status;
use super::db_status::evaluate_db_status;
use super::queue_status::evaluate_queue_status;
use super::types::{
BackpressureSource, BackpressureStatus, ChannelHealthStatus, DatabaseHealthStatus,
HealthConfig, QueueDepthStatus, QueueDepthTier,
};
use crate::api_common::WebDatabaseCircuitBreaker;
use crate::orchestration::channels::OrchestrationCommandSender;
pub struct StatusEvaluator {
caches: HealthStatusCaches,
db_pool: PgPool,
pgmq_pool: PgPool,
channel_monitor: ChannelMonitor,
command_sender: OrchestrationCommandSender,
queue_names: Vec<String>,
circuit_breaker: Arc<WebDatabaseCircuitBreaker>,
config: HealthConfig,
}
impl std::fmt::Debug for StatusEvaluator {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StatusEvaluator")
.field(
"evaluation_interval_ms",
&self.config.evaluation_interval_ms,
)
.field("check_database", &self.config.check_database)
.field("check_channels", &self.config.check_channels)
.field("check_queues", &self.config.check_queues)
.field("queue_count", &self.queue_names.len())
.finish()
}
}
impl StatusEvaluator {
pub fn new(
caches: HealthStatusCaches,
db_pool: PgPool,
pgmq_pool: PgPool,
channel_monitor: ChannelMonitor,
command_sender: OrchestrationCommandSender,
queue_names: Vec<String>,
circuit_breaker: Arc<WebDatabaseCircuitBreaker>,
config: HealthConfig,
) -> Self {
Self {
caches,
db_pool,
pgmq_pool,
channel_monitor,
command_sender,
queue_names,
circuit_breaker,
config,
}
}
pub fn spawn(self) -> JoinHandle<()> {
let interval_ms = self.config.evaluation_interval_ms;
info!(
interval_ms = interval_ms,
check_database = self.config.check_database,
check_channels = self.config.check_channels,
check_queues = self.config.check_queues,
queue_count = self.queue_names.len(),
"Starting health status evaluator"
);
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_millis(interval_ms));
loop {
interval.tick().await;
if let Err(e) = self.evaluate_all().await {
error!(error = %e, "Health evaluation cycle failed");
}
}
})
}
async fn evaluate_all(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let cycle_start = Instant::now();
debug!("Starting health evaluation cycle");
let db_status = if self.config.check_database {
evaluate_db_status(&self.db_pool, &self.circuit_breaker, &self.config.database).await
} else {
debug!("Database health check disabled - returning Unknown status");
DatabaseHealthStatus::default() };
let channel_status = if self.config.check_channels {
evaluate_channel_status(
&self.channel_monitor,
self.command_sender.inner(),
&self.config.channels,
)
} else {
debug!("Channel health check disabled - returning Unknown status");
ChannelHealthStatus::default() };
let queue_status = if self.config.check_queues {
evaluate_queue_status(&self.pgmq_pool, &self.queue_names, &self.config.queues).await
} else {
debug!("Queue depth check disabled - returning Unknown tier");
QueueDepthStatus::default() };
let backpressure = self.compute_backpressure(&db_status, &channel_status, &queue_status);
self.caches
.update_all(
db_status.clone(),
channel_status.clone(),
queue_status.clone(),
backpressure.clone(),
)
.await;
let cycle_duration_ms = cycle_start.elapsed().as_secs_f64() * 1000.0;
let backpressure_source = backpressure.source.as_ref().map(|s| match s {
BackpressureSource::CircuitBreaker => "circuit_breaker",
BackpressureSource::ChannelSaturation { .. } => "channel_saturation",
BackpressureSource::QueueDepth { .. } => "queue_depth",
});
let queue_depths: Vec<(String, i64)> = queue_status
.queue_depths
.iter()
.map(|(k, v)| (k.clone(), *v))
.collect();
health_metrics::record_evaluation_cycle(
cycle_duration_ms,
backpressure.active,
backpressure_source,
db_status.is_connected,
if db_status.evaluated {
Some(db_status.last_check_duration_ms as f64)
} else {
None
},
db_status.circuit_breaker_failures,
channel_status.command_saturation_percent,
queue_status.tier as u8,
&queue_depths,
);
debug!(
duration_ms = cycle_duration_ms,
backpressure_active = backpressure.active,
"Health evaluation cycle complete"
);
Ok(())
}
fn compute_backpressure(
&self,
db_status: &DatabaseHealthStatus,
channel_status: &ChannelHealthStatus,
queue_status: &QueueDepthStatus,
) -> BackpressureStatus {
if db_status.circuit_breaker_open {
return BackpressureStatus {
active: true,
reason: Some("Circuit breaker open".to_string()),
retry_after_secs: Some(30),
source: Some(BackpressureSource::CircuitBreaker),
};
}
if channel_status.is_critical {
return BackpressureStatus {
active: true,
reason: Some(format!(
"Command channel critically saturated ({:.1}%)",
channel_status.command_saturation_percent
)),
retry_after_secs: Some(15),
source: Some(BackpressureSource::ChannelSaturation {
channel: "command".to_string(),
saturation_percent: channel_status.command_saturation_percent,
}),
};
}
match queue_status.tier {
QueueDepthTier::Overflow => {
return BackpressureStatus {
active: true,
reason: Some(format!(
"Queue '{}' at overflow depth ({} messages)",
queue_status.worst_queue, queue_status.max_depth
)),
retry_after_secs: Some(60),
source: Some(BackpressureSource::QueueDepth {
queue: queue_status.worst_queue.clone(),
depth: queue_status.max_depth,
tier: QueueDepthTier::Overflow,
}),
};
}
QueueDepthTier::Critical => {
return BackpressureStatus {
active: true,
reason: Some(format!(
"Queue '{}' at critical depth ({} messages)",
queue_status.worst_queue, queue_status.max_depth
)),
retry_after_secs: Some(30),
source: Some(BackpressureSource::QueueDepth {
queue: queue_status.worst_queue.clone(),
depth: queue_status.max_depth,
tier: QueueDepthTier::Critical,
}),
};
}
QueueDepthTier::Warning => {
warn!(
queue = %queue_status.worst_queue,
depth = queue_status.max_depth,
"Queue depth at warning level"
);
}
QueueDepthTier::Normal => {
}
QueueDepthTier::Unknown => {
debug!("Queue depth status unknown - check may be disabled or evaluation failed");
}
}
BackpressureStatus::default()
}
#[cfg(test)]
pub fn caches(&self) -> &HealthStatusCaches {
&self.caches
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
#[test]
fn test_compute_backpressure_healthy() {
let config = HealthConfig::default();
let db_status = DatabaseHealthStatus {
evaluated: true,
is_connected: true,
circuit_breaker_open: false,
circuit_breaker_failures: 0,
last_check_duration_ms: 5,
error_message: None,
};
let channel_status = ChannelHealthStatus {
evaluated: true,
command_saturation_percent: 25.0,
command_available_capacity: 750,
command_messages_sent: 100,
command_overflow_events: 0,
is_saturated: false,
is_critical: false,
};
let queue_status = QueueDepthStatus {
tier: QueueDepthTier::Normal,
max_depth: 50,
worst_queue: "test_queue".to_string(),
queue_depths: HashMap::new(),
};
let backpressure =
compute_backpressure_helper(&config, &db_status, &channel_status, &queue_status);
assert!(!backpressure.active);
assert!(backpressure.reason.is_none());
}
#[test]
fn test_compute_backpressure_circuit_breaker_open() {
let config = HealthConfig::default();
let db_status = DatabaseHealthStatus {
evaluated: true,
is_connected: false,
circuit_breaker_open: true,
circuit_breaker_failures: 5,
last_check_duration_ms: 0,
error_message: Some("Circuit breaker open".to_string()),
};
let channel_status = ChannelHealthStatus::default();
let queue_status = QueueDepthStatus::default();
let backpressure =
compute_backpressure_helper(&config, &db_status, &channel_status, &queue_status);
assert!(backpressure.active);
assert!(backpressure.reason.as_ref().unwrap().contains("Circuit"));
assert!(matches!(
backpressure.source,
Some(BackpressureSource::CircuitBreaker)
));
}
#[test]
fn test_compute_backpressure_channel_critical() {
let config = HealthConfig::default();
let db_status = DatabaseHealthStatus {
evaluated: true,
is_connected: true,
circuit_breaker_open: false,
circuit_breaker_failures: 0,
last_check_duration_ms: 5,
error_message: None,
};
let channel_status = ChannelHealthStatus {
evaluated: true,
command_saturation_percent: 97.0,
command_available_capacity: 30,
command_messages_sent: 1000,
command_overflow_events: 2,
is_saturated: true,
is_critical: true,
};
let queue_status = QueueDepthStatus::default();
let backpressure =
compute_backpressure_helper(&config, &db_status, &channel_status, &queue_status);
assert!(backpressure.active);
assert!(backpressure.reason.as_ref().unwrap().contains("saturated"));
assert!(matches!(
backpressure.source,
Some(BackpressureSource::ChannelSaturation { .. })
));
}
#[test]
fn test_compute_backpressure_queue_critical() {
let config = HealthConfig::default();
let db_status = DatabaseHealthStatus {
evaluated: true,
is_connected: true,
circuit_breaker_open: false,
circuit_breaker_failures: 0,
last_check_duration_ms: 5,
error_message: None,
};
let channel_status = ChannelHealthStatus::default();
let queue_status = QueueDepthStatus {
tier: QueueDepthTier::Critical,
max_depth: 7500,
worst_queue: "orchestration_step_results".to_string(),
queue_depths: HashMap::new(),
};
let backpressure =
compute_backpressure_helper(&config, &db_status, &channel_status, &queue_status);
assert!(backpressure.active);
assert!(backpressure.reason.as_ref().unwrap().contains("critical"));
assert!(matches!(
backpressure.source,
Some(BackpressureSource::QueueDepth {
tier: QueueDepthTier::Critical,
..
})
));
}
#[test]
fn test_compute_backpressure_queue_overflow() {
let config = HealthConfig::default();
let db_status = DatabaseHealthStatus {
evaluated: true,
is_connected: true,
circuit_breaker_open: false,
circuit_breaker_failures: 0,
last_check_duration_ms: 5,
error_message: None,
};
let channel_status = ChannelHealthStatus::default();
let queue_status = QueueDepthStatus {
tier: QueueDepthTier::Overflow,
max_depth: 15000,
worst_queue: "orchestration_step_results".to_string(),
queue_depths: HashMap::new(),
};
let backpressure =
compute_backpressure_helper(&config, &db_status, &channel_status, &queue_status);
assert!(backpressure.active);
assert!(backpressure.reason.as_ref().unwrap().contains("overflow"));
assert_eq!(backpressure.retry_after_secs, Some(60)); }
#[test]
fn test_compute_backpressure_priority_order() {
let config = HealthConfig::default();
let db_status = DatabaseHealthStatus {
evaluated: true,
is_connected: false,
circuit_breaker_open: true,
circuit_breaker_failures: 5,
last_check_duration_ms: 0,
error_message: Some("Circuit breaker open".to_string()),
};
let channel_status = ChannelHealthStatus {
evaluated: true,
command_saturation_percent: 99.0,
command_available_capacity: 10,
command_messages_sent: 1000,
command_overflow_events: 5,
is_saturated: true,
is_critical: true,
};
let queue_status = QueueDepthStatus {
tier: QueueDepthTier::Overflow,
max_depth: 20000,
worst_queue: "test".to_string(),
queue_depths: HashMap::new(),
};
let backpressure =
compute_backpressure_helper(&config, &db_status, &channel_status, &queue_status);
assert!(backpressure.active);
assert!(matches!(
backpressure.source,
Some(BackpressureSource::CircuitBreaker)
));
}
fn compute_backpressure_helper(
_config: &HealthConfig,
db_status: &DatabaseHealthStatus,
channel_status: &ChannelHealthStatus,
queue_status: &QueueDepthStatus,
) -> BackpressureStatus {
if db_status.circuit_breaker_open {
return BackpressureStatus {
active: true,
reason: Some("Circuit breaker open".to_string()),
retry_after_secs: Some(30),
source: Some(BackpressureSource::CircuitBreaker),
};
}
if channel_status.is_critical {
return BackpressureStatus {
active: true,
reason: Some(format!(
"Command channel critically saturated ({:.1}%)",
channel_status.command_saturation_percent
)),
retry_after_secs: Some(15),
source: Some(BackpressureSource::ChannelSaturation {
channel: "command".to_string(),
saturation_percent: channel_status.command_saturation_percent,
}),
};
}
match queue_status.tier {
QueueDepthTier::Overflow => {
return BackpressureStatus {
active: true,
reason: Some(format!(
"Queue '{}' at overflow depth ({} messages)",
queue_status.worst_queue, queue_status.max_depth
)),
retry_after_secs: Some(60),
source: Some(BackpressureSource::QueueDepth {
queue: queue_status.worst_queue.clone(),
depth: queue_status.max_depth,
tier: QueueDepthTier::Overflow,
}),
};
}
QueueDepthTier::Critical => {
return BackpressureStatus {
active: true,
reason: Some(format!(
"Queue '{}' at critical depth ({} messages)",
queue_status.worst_queue, queue_status.max_depth
)),
retry_after_secs: Some(30),
source: Some(BackpressureSource::QueueDepth {
queue: queue_status.worst_queue.clone(),
depth: queue_status.max_depth,
tier: QueueDepthTier::Critical,
}),
};
}
_ => {}
}
BackpressureStatus::default()
}
}