use std::sync::OnceLock;
use std::time::{Duration, Instant};
use reqwest::Client;
use tracing::debug;
use crate::error::{OlError, OL_4302_STARTUP_PROBE_TIMEOUT};
use crate::runtime::supervisor::spec::HealthCheckSpec;
fn probe_client() -> &'static Client {
static CLIENT: OnceLock<Client> = OnceLock::new();
CLIENT.get_or_init(|| {
Client::builder()
.pool_idle_timeout(Duration::from_secs(90))
.pool_max_idle_per_host(4)
.build()
.expect("probe_client builds with rustls defaults")
})
}
pub async fn probe_once(url: &str, timeout: Duration) -> Result<(), reqwest::Error> {
let resp = probe_client().get(url).timeout(timeout).send().await?;
let status = resp.status();
if status.is_success() {
Ok(())
} else {
debug!(url, status = %status, "healthz returned non-2xx");
Err(resp.error_for_status().expect_err("non-2xx must error"))
}
}
pub async fn wait_for_startup(
binding_id: &str,
spec: &HealthCheckSpec,
url: &str,
total_budget: Duration,
) -> Result<(), OlError> {
let started = Instant::now();
let mut attempt: u32 = 0;
loop {
let elapsed = started.elapsed();
if elapsed >= total_budget {
return Err(OlError::new(
OL_4302_STARTUP_PROBE_TIMEOUT,
format!(
"binding `{binding_id}`: /healthz did not respond within {} ms (probed {} times)",
total_budget.as_millis(),
attempt
),
)
.with_suggestion(
"Increase process.start_timeout_ms in the manifest if the tool needs more cold-start time.",
));
}
attempt += 1;
match probe_once(url, spec.startup_timeout).await {
Ok(()) => return Ok(()),
Err(_) => {
tokio::time::sleep(spec.startup_period).await;
}
}
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum LivenessOutcome {
Healthy,
FailureThresholdHit,
}
pub async fn liveness_loop(
binding_id: &str,
spec: &HealthCheckSpec,
url: &str,
mut shutdown: tokio::sync::watch::Receiver<bool>,
) -> LivenessOutcome {
let mut consecutive_failures: u32 = 0;
loop {
tokio::select! {
_ = tokio::time::sleep(spec.liveness_period) => {}
_ = shutdown.changed() => return LivenessOutcome::Healthy,
}
if *shutdown.borrow() {
return LivenessOutcome::Healthy;
}
match probe_once(url, spec.liveness_timeout).await {
Ok(()) => {
consecutive_failures = 0;
}
Err(e) => {
consecutive_failures += 1;
tracing::warn!(
binding_id = %binding_id,
failures = consecutive_failures,
threshold = spec.liveness_failure_threshold,
error = %e,
"liveness probe failed"
);
if consecutive_failures >= spec.liveness_failure_threshold {
return LivenessOutcome::FailureThresholdHit;
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
async fn spawn_toggle_server(
initial_ok: bool,
) -> (String, Arc<AtomicU32>, tokio::task::JoinHandle<()>) {
use axum::routing::get;
use axum::Router;
let counter = Arc::new(AtomicU32::new(if initial_ok { 1 } else { 0 }));
let c = counter.clone();
let app = Router::new().route(
"/healthz",
get(move || {
let c = c.clone();
async move {
let cur = c.load(Ordering::SeqCst);
if cur > 0 {
axum::http::StatusCode::OK
} else {
axum::http::StatusCode::SERVICE_UNAVAILABLE
}
}
}),
);
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let handle = tokio::spawn(async move {
axum::serve(listener, app).await.ok();
});
(format!("http://{addr}/healthz"), counter, handle)
}
#[tokio::test]
async fn wait_for_startup_returns_quickly_on_2xx() {
let (url, _, h) = spawn_toggle_server(true).await;
let spec = HealthCheckSpec {
path: "/healthz".into(),
port: 0,
startup_period: Duration::from_millis(50),
startup_timeout: Duration::from_secs(1),
liveness_period: Duration::from_secs(1),
liveness_failure_threshold: 3,
liveness_timeout: Duration::from_secs(1),
};
let result = wait_for_startup("bnd_test", &spec, &url, Duration::from_secs(5)).await;
assert!(result.is_ok());
h.abort();
}
#[tokio::test]
async fn wait_for_startup_times_out_with_ol_4302() {
let spec = HealthCheckSpec {
path: "/healthz".into(),
port: 0,
startup_period: Duration::from_millis(50),
startup_timeout: Duration::from_millis(100),
liveness_period: Duration::from_secs(1),
liveness_failure_threshold: 3,
liveness_timeout: Duration::from_secs(1),
};
let result = wait_for_startup(
"bnd_test",
&spec,
"http://127.0.0.1:1/healthz",
Duration::from_millis(500),
)
.await;
let err = result.unwrap_err();
assert_eq!(err.code.code, "OL-4302");
}
}