use std::time::Duration;
use async_trait::async_trait;
use tokio::time::Instant;
use tracing::{debug, info, warn};
use crate::docker::DockerError;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum HealthOutcome {
Healthy,
Timeout,
Crashed,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ContainerRuntimeState {
HealthStarting,
HealthHealthy,
HealthUnhealthy,
RunningNoHealthcheck,
Exited { exit_code: i64 },
}
#[derive(Debug, Clone)]
pub struct HealthConfig {
pub health_timeout: Duration,
pub grace_period: Duration,
pub poll_interval: Duration,
}
impl Default for HealthConfig {
fn default() -> Self {
Self {
health_timeout: Duration::from_secs(120),
grace_period: Duration::from_secs(10),
poll_interval: Duration::from_secs(1),
}
}
}
#[async_trait]
pub trait HealthProbe {
async fn probe_state(&self, name_or_id: &str) -> Result<ContainerRuntimeState, DockerError>;
}
#[async_trait]
pub trait Clock: Send + Sync {
fn now(&self) -> Instant;
async fn sleep(&self, dur: Duration);
}
pub struct TokioClock;
#[async_trait]
impl Clock for TokioClock {
fn now(&self) -> Instant {
Instant::now()
}
async fn sleep(&self, dur: Duration) {
tokio::time::sleep(dur).await;
}
}
pub async fn wait_for_health(
ops: &impl HealthProbe,
id: &str,
cfg: &HealthConfig,
clock: &impl Clock,
) -> HealthOutcome {
let start = clock.now();
let mut prev: Option<ContainerRuntimeState> = None;
let mut error_deadline = cfg.health_timeout;
loop {
match ops.probe_state(id).await {
Ok(state) => {
if prev != Some(state) {
debug!(container = %id, ?state, "health: state transition");
prev = Some(state);
}
match state {
ContainerRuntimeState::HealthHealthy => {
info!(container = %id, "health: healthy");
return HealthOutcome::Healthy;
}
ContainerRuntimeState::Exited { exit_code } => {
info!(container = %id, exit_code, "health: container exited before healthy");
return HealthOutcome::Crashed;
}
ContainerRuntimeState::HealthStarting
| ContainerRuntimeState::HealthUnhealthy => {
error_deadline = cfg.health_timeout;
if clock.now().duration_since(start) >= cfg.health_timeout {
info!(container = %id, "health: timed out waiting for healthy");
return HealthOutcome::Timeout;
}
}
ContainerRuntimeState::RunningNoHealthcheck => {
error_deadline = cfg.grace_period;
if clock.now().duration_since(start) >= cfg.grace_period {
info!(container = %id, "health: grace period elapsed, still running");
return HealthOutcome::Healthy;
}
}
}
}
Err(e) => {
warn!(container = %id, error = %e, "health: probe failed, will retry");
if clock.now().duration_since(start) >= error_deadline {
warn!(container = %id, "health: probe failed persistently, treating as timeout");
return HealthOutcome::Timeout;
}
}
}
clock.sleep(cfg.poll_interval).await;
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::VecDeque;
use std::sync::Mutex;
struct ScriptedProbe {
states: Mutex<VecDeque<ContainerRuntimeState>>,
}
impl ScriptedProbe {
fn new(states: &[ContainerRuntimeState]) -> Self {
Self {
states: Mutex::new(states.iter().copied().collect()),
}
}
}
#[async_trait]
impl HealthProbe for ScriptedProbe {
async fn probe_state(&self, _id: &str) -> Result<ContainerRuntimeState, DockerError> {
let mut q = self.states.lock().unwrap();
let next = if q.len() > 1 {
q.pop_front().unwrap()
} else {
*q.front().expect("script must have at least one state")
};
Ok(next)
}
}
fn test_cfg() -> HealthConfig {
HealthConfig {
health_timeout: Duration::from_secs(5),
grace_period: Duration::from_secs(3),
poll_interval: Duration::from_secs(1),
}
}
use ContainerRuntimeState::*;
#[tokio::test(start_paused = true)]
async fn healthcheck_becomes_healthy_within_timeout() {
let ops = ScriptedProbe::new(&[HealthStarting, HealthStarting, HealthHealthy]);
let outcome = wait_for_health(&ops, "c", &test_cfg(), &TokioClock).await;
assert_eq!(outcome, HealthOutcome::Healthy);
}
#[tokio::test(start_paused = true)]
async fn healthcheck_stays_unhealthy_until_timeout() {
let ops = ScriptedProbe::new(&[HealthUnhealthy]);
let outcome = wait_for_health(&ops, "c", &test_cfg(), &TokioClock).await;
assert_eq!(outcome, HealthOutcome::Timeout);
}
#[tokio::test(start_paused = true)]
async fn no_healthcheck_running_through_grace_is_healthy() {
let ops = ScriptedProbe::new(&[RunningNoHealthcheck]);
let outcome = wait_for_health(&ops, "c", &test_cfg(), &TokioClock).await;
assert_eq!(outcome, HealthOutcome::Healthy);
}
#[tokio::test(start_paused = true)]
async fn exit_during_grace_period_is_crashed() {
let ops = ScriptedProbe::new(&[
RunningNoHealthcheck,
RunningNoHealthcheck,
Exited { exit_code: 1 },
]);
let outcome = wait_for_health(&ops, "c", &test_cfg(), &TokioClock).await;
assert_eq!(outcome, HealthOutcome::Crashed);
}
#[tokio::test(start_paused = true)]
async fn exit_while_starting_is_crashed() {
let ops = ScriptedProbe::new(&[HealthStarting, Exited { exit_code: 137 }]);
let outcome = wait_for_health(&ops, "c", &test_cfg(), &TokioClock).await;
assert_eq!(outcome, HealthOutcome::Crashed);
}
struct FlakyProbe {
remaining_failures: Mutex<u32>,
then: ContainerRuntimeState,
}
fn probe_error() -> DockerError {
DockerError::Spec(crate::docker::spec::SpecError::Missing("probe"))
}
#[async_trait]
impl HealthProbe for FlakyProbe {
async fn probe_state(&self, _id: &str) -> Result<ContainerRuntimeState, DockerError> {
let mut left = self.remaining_failures.lock().unwrap();
if *left > 0 {
*left -= 1;
Err(probe_error())
} else {
Ok(self.then)
}
}
}
#[tokio::test(start_paused = true)]
async fn transient_probe_errors_are_tolerated_then_resolve() {
let ops = FlakyProbe {
remaining_failures: Mutex::new(2),
then: HealthHealthy,
};
let outcome = wait_for_health(&ops, "c", &test_cfg(), &TokioClock).await;
assert_eq!(outcome, HealthOutcome::Healthy);
}
#[tokio::test(start_paused = true)]
async fn persistent_probe_errors_time_out() {
let ops = FlakyProbe {
remaining_failures: Mutex::new(u32::MAX),
then: HealthHealthy,
};
let outcome = wait_for_health(&ops, "c", &test_cfg(), &TokioClock).await;
assert_eq!(outcome, HealthOutcome::Timeout);
}
}