use tasker_shared::monitoring::channel_metrics::ChannelMonitor;
use tokio::sync::mpsc;
use tracing::debug;
use super::types::{ChannelHealthConfig, ChannelHealthStatus};
pub fn evaluate_channel_status<T>(
channel_monitor: &ChannelMonitor,
command_sender: &mpsc::Sender<T>,
config: &ChannelHealthConfig,
) -> ChannelHealthStatus {
let available_capacity = command_sender.capacity();
let buffer_size = channel_monitor.buffer_size();
let metrics = channel_monitor.metrics();
let saturation_percent = if buffer_size > 0 {
let used = buffer_size.saturating_sub(available_capacity);
(used as f64 / buffer_size as f64) * 100.0
} else {
0.0
};
let is_saturated = saturation_percent >= config.critical_threshold;
let is_critical = saturation_percent >= config.emergency_threshold;
debug!(
channel_name = %channel_monitor.channel_name(),
saturation_percent = saturation_percent,
available_capacity = available_capacity,
buffer_size = buffer_size,
messages_sent = metrics.messages_sent,
overflow_events = metrics.overflow_events,
is_saturated = is_saturated,
is_critical = is_critical,
"Channel health evaluated"
);
ChannelHealthStatus {
evaluated: true, command_saturation_percent: saturation_percent,
command_available_capacity: available_capacity,
command_messages_sent: metrics.messages_sent,
command_overflow_events: metrics.overflow_events,
is_saturated,
is_critical,
}
}
#[allow(dead_code, reason = "dead in --lib, used by test targets")]
pub fn evaluate_channel_status_basic<T>(
command_sender: &mpsc::Sender<T>,
buffer_size: usize,
config: &ChannelHealthConfig,
) -> ChannelHealthStatus {
let available_capacity = command_sender.capacity();
let saturation_percent = if buffer_size > 0 {
let used = buffer_size.saturating_sub(available_capacity);
(used as f64 / buffer_size as f64) * 100.0
} else {
0.0
};
let is_saturated = saturation_percent >= config.critical_threshold;
let is_critical = saturation_percent >= config.emergency_threshold;
ChannelHealthStatus {
evaluated: true, command_saturation_percent: saturation_percent,
command_available_capacity: available_capacity,
command_messages_sent: 0, command_overflow_events: 0, is_saturated,
is_critical,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_default_config() {
let config = ChannelHealthConfig::default();
assert_eq!(config.warning_threshold, 70.0);
assert_eq!(config.critical_threshold, 80.0);
assert_eq!(config.emergency_threshold, 95.0);
}
#[tokio::test]
async fn test_evaluate_channel_status_basic_empty_channel() {
let (tx, _rx) = mpsc::channel::<()>(100);
let config = ChannelHealthConfig::default();
let status = evaluate_channel_status_basic(&tx, 100, &config);
assert_eq!(status.command_saturation_percent, 0.0);
assert_eq!(status.command_available_capacity, 100);
assert!(!status.is_saturated);
assert!(!status.is_critical);
}
#[tokio::test]
async fn test_evaluate_channel_status_basic_partial_fill() {
let (tx, _rx) = mpsc::channel::<()>(100);
let config = ChannelHealthConfig::default();
for _ in 0..50 {
tx.send(()).await.unwrap();
}
let status = evaluate_channel_status_basic(&tx, 100, &config);
assert!((status.command_saturation_percent - 50.0).abs() < 1.0);
assert_eq!(status.command_available_capacity, 50);
assert!(!status.is_saturated); assert!(!status.is_critical); }
#[tokio::test]
async fn test_evaluate_channel_status_basic_saturated() {
let (tx, _rx) = mpsc::channel::<()>(100);
let config = ChannelHealthConfig::default();
for _ in 0..85 {
tx.send(()).await.unwrap();
}
let status = evaluate_channel_status_basic(&tx, 100, &config);
assert!((status.command_saturation_percent - 85.0).abs() < 1.0);
assert!(status.is_saturated); assert!(!status.is_critical); }
#[tokio::test]
async fn test_evaluate_channel_status_basic_critical() {
let (tx, _rx) = mpsc::channel::<()>(100);
let config = ChannelHealthConfig::default();
for _ in 0..96 {
tx.send(()).await.unwrap();
}
let status = evaluate_channel_status_basic(&tx, 100, &config);
assert!((status.command_saturation_percent - 96.0).abs() < 1.0);
assert!(status.is_saturated); assert!(status.is_critical); }
#[tokio::test]
async fn test_evaluate_channel_status_with_monitor() {
let (tx, _rx) = mpsc::channel::<()>(100);
let monitor = ChannelMonitor::new("test_channel", 100);
let config = ChannelHealthConfig::default();
for _ in 0..30 {
tx.send(()).await.unwrap();
monitor.record_send_success();
}
let status = evaluate_channel_status(&monitor, &tx, &config);
assert!((status.command_saturation_percent - 30.0).abs() < 1.0);
assert_eq!(status.command_messages_sent, 30);
assert!(!status.is_saturated);
assert!(!status.is_critical);
}
#[test]
fn test_evaluate_channel_status_zero_buffer() {
let config = ChannelHealthConfig::default();
let (tx, _rx) = mpsc::channel::<()>(1);
let status = evaluate_channel_status_basic(&tx, 0, &config);
assert_eq!(status.command_saturation_percent, 0.0);
assert!(!status.is_saturated);
assert!(!status.is_critical);
}
}