use super::{check_liveness, read_state, write_state, HealthConfig, HealthStatus};
use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
use std::sync::Arc;
use std::time::Duration;
pub fn run_health_monitor(name: String, pid: i32, config: HealthConfig, stop: Arc<AtomicBool>) {
log::debug!(
"health: monitor starting for '{}' (pid={}, interval={}s, timeout={}s, retries={})",
name,
pid,
config.interval_secs,
config.timeout_secs,
config.retries
);
update_health(&name, HealthStatus::Starting);
if config.start_period_secs > 0 {
sleep_interruptible(config.start_period_secs, &stop);
}
if stop.load(Ordering::Relaxed) {
return;
}
let mut consecutive_failures: u32 = 0;
let mut current_status = HealthStatus::Starting;
loop {
if stop.load(Ordering::Relaxed) {
break;
}
if !check_liveness(pid) {
log::debug!(
"health: container '{}' (pid={}) gone — stopping monitor",
name,
pid
);
break;
}
let passed = run_probe(pid, &config);
let new_status = if passed {
consecutive_failures = 0;
HealthStatus::Healthy
} else {
consecutive_failures += 1;
if consecutive_failures >= config.retries {
HealthStatus::Unhealthy
} else {
current_status.clone()
}
};
if new_status != current_status {
log::info!(
"health: '{}' → {:?} (failures={})",
name,
new_status,
consecutive_failures
);
update_health(&name, new_status.clone());
current_status = new_status;
}
sleep_interruptible(config.interval_secs, &stop);
}
log::debug!("health: monitor exiting for '{}'", name);
}
fn run_probe(pid: i32, config: &HealthConfig) -> bool {
if config.cmd.is_empty() {
return false;
}
let args = config.cmd.clone();
let timeout = Duration::from_secs(config.timeout_secs.max(1));
let child_pid = Arc::new(AtomicI32::new(0));
let child_pid_clone = Arc::clone(&child_pid);
let (tx, rx) = std::sync::mpsc::channel::<bool>();
std::thread::spawn(move || {
let result = super::exec::exec_in_container_with_pid_sink(pid, &args, child_pid_clone)
.unwrap_or(false);
let _ = tx.send(result);
});
match rx.recv_timeout(timeout) {
Ok(passed) => passed,
Err(_) => {
log::warn!("health: probe timed out after {}s", config.timeout_secs);
let cpid = child_pid.load(Ordering::Relaxed);
if cpid > 0 {
log::warn!("health: killing timed-out probe child (pid={})", cpid);
unsafe { libc::kill(cpid, libc::SIGKILL) };
}
false
}
}
}
fn update_health(name: &str, status: HealthStatus) {
match read_state(name) {
Ok(mut state) => {
state.health = Some(status);
if let Err(e) = write_state(&state) {
log::warn!("health: failed to write state for '{}': {}", name, e);
}
}
Err(e) => {
log::warn!("health: failed to read state for '{}': {}", name, e);
}
}
}
fn sleep_interruptible(duration_secs: u64, stop: &AtomicBool) {
let total = Duration::from_secs(duration_secs);
let tick = Duration::from_millis(100);
let mut elapsed = Duration::ZERO;
while elapsed < total {
if stop.load(Ordering::Relaxed) {
return;
}
std::thread::sleep(tick);
elapsed += tick;
}
}