openlatch-provider 0.2.0

Self-service onboarding CLI + runtime daemon for OpenLatch Editors and Providers
//! Startup + liveness HTTP probes against the managed tool's `/healthz`.
//!
//! Two-tier model (Kubernetes-style):
//!   - **Startup probe** — polls aggressively (period 1s default, budget 30s
//!     default). First 2xx response = ready. Used to gate "binding is up".
//!   - **Liveness probe** — polls slowly (period 10s default). After
//!     `liveness_failure_threshold` consecutive failures the supervisor
//!     restarts the tool.

use std::sync::OnceLock;
use std::time::{Duration, Instant};

use reqwest::Client;
use tracing::debug;

use crate::error::{OlError, OL_4302_STARTUP_PROBE_TIMEOUT};
use crate::runtime::supervisor::spec::HealthCheckSpec;

/// Process-wide reqwest client for /healthz polls. A single warm pool
/// (separate from the proxy hot path) is reused across every binding so
/// liveness probes don't pay the TLS-handshake / connection-setup cost
/// on each tick.
fn probe_client() -> &'static Client {
    static CLIENT: OnceLock<Client> = OnceLock::new();
    CLIENT.get_or_init(|| {
        Client::builder()
            .pool_idle_timeout(Duration::from_secs(90))
            .pool_max_idle_per_host(4)
            .build()
            .expect("probe_client builds with rustls defaults")
    })
}

/// Single probe attempt — returns `Ok(())` only on 2xx.
pub async fn probe_once(url: &str, timeout: Duration) -> Result<(), reqwest::Error> {
    let resp = probe_client().get(url).timeout(timeout).send().await?;
    let status = resp.status();
    if status.is_success() {
        Ok(())
    } else {
        debug!(url, status = %status, "healthz returned non-2xx");
        Err(resp.error_for_status().expect_err("non-2xx must error"))
    }
}

/// Poll `/healthz` until it returns 2xx or `total_budget` elapses.
/// Returns `OL-4302` on timeout.
pub async fn wait_for_startup(
    binding_id: &str,
    spec: &HealthCheckSpec,
    url: &str,
    total_budget: Duration,
) -> Result<(), OlError> {
    let started = Instant::now();
    let mut attempt: u32 = 0;
    loop {
        let elapsed = started.elapsed();
        if elapsed >= total_budget {
            return Err(OlError::new(
                OL_4302_STARTUP_PROBE_TIMEOUT,
                format!(
                    "binding `{binding_id}`: /healthz did not respond within {} ms (probed {} times)",
                    total_budget.as_millis(),
                    attempt
                ),
            )
            .with_suggestion(
                "Increase process.start_timeout_ms in the manifest if the tool needs more cold-start time.",
            ));
        }
        attempt += 1;
        match probe_once(url, spec.startup_timeout).await {
            Ok(()) => return Ok(()),
            Err(_) => {
                tokio::time::sleep(spec.startup_period).await;
            }
        }
    }
}

/// Outcome of one liveness-probe iteration.
#[derive(Debug, PartialEq, Eq)]
pub enum LivenessOutcome {
    Healthy,
    /// Threshold of consecutive failures hit; supervisor should restart.
    FailureThresholdHit,
}

/// Run the liveness loop until shutdown OR `liveness_failure_threshold`
/// consecutive failures, at which point the supervisor restarts the tool.
pub async fn liveness_loop(
    binding_id: &str,
    spec: &HealthCheckSpec,
    url: &str,
    mut shutdown: tokio::sync::watch::Receiver<bool>,
) -> LivenessOutcome {
    let mut consecutive_failures: u32 = 0;
    loop {
        // Sleep first — startup-probe success implies "fresh enough"; we
        // don't need to immediately re-probe.
        tokio::select! {
            _ = tokio::time::sleep(spec.liveness_period) => {}
            _ = shutdown.changed() => return LivenessOutcome::Healthy,
        }
        if *shutdown.borrow() {
            return LivenessOutcome::Healthy;
        }
        match probe_once(url, spec.liveness_timeout).await {
            Ok(()) => {
                consecutive_failures = 0;
            }
            Err(e) => {
                consecutive_failures += 1;
                tracing::warn!(
                    binding_id = %binding_id,
                    failures = consecutive_failures,
                    threshold = spec.liveness_failure_threshold,
                    error = %e,
                    "liveness probe failed"
                );
                if consecutive_failures >= spec.liveness_failure_threshold {
                    return LivenessOutcome::FailureThresholdHit;
                }
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::atomic::{AtomicU32, Ordering};
    use std::sync::Arc;

    /// Stand up a tiny axum server on a random local port that returns
    /// either 200 or 503 based on a switch the test controls. Useful for
    /// driving both the startup and liveness probes.
    async fn spawn_toggle_server(
        initial_ok: bool,
    ) -> (String, Arc<AtomicU32>, tokio::task::JoinHandle<()>) {
        use axum::routing::get;
        use axum::Router;
        let counter = Arc::new(AtomicU32::new(if initial_ok { 1 } else { 0 }));
        let c = counter.clone();
        let app = Router::new().route(
            "/healthz",
            get(move || {
                let c = c.clone();
                async move {
                    let cur = c.load(Ordering::SeqCst);
                    if cur > 0 {
                        axum::http::StatusCode::OK
                    } else {
                        axum::http::StatusCode::SERVICE_UNAVAILABLE
                    }
                }
            }),
        );
        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
        let addr = listener.local_addr().unwrap();
        let handle = tokio::spawn(async move {
            axum::serve(listener, app).await.ok();
        });
        (format!("http://{addr}/healthz"), counter, handle)
    }

    #[tokio::test]
    async fn wait_for_startup_returns_quickly_on_2xx() {
        let (url, _, h) = spawn_toggle_server(true).await;
        let spec = HealthCheckSpec {
            path: "/healthz".into(),
            port: 0,
            startup_period: Duration::from_millis(50),
            startup_timeout: Duration::from_secs(1),
            liveness_period: Duration::from_secs(1),
            liveness_failure_threshold: 3,
            liveness_timeout: Duration::from_secs(1),
        };
        let result = wait_for_startup("bnd_test", &spec, &url, Duration::from_secs(5)).await;
        assert!(result.is_ok());
        h.abort();
    }

    #[tokio::test]
    async fn wait_for_startup_times_out_with_ol_4302() {
        // No server bound — connect refused on every attempt.
        let spec = HealthCheckSpec {
            path: "/healthz".into(),
            port: 0,
            startup_period: Duration::from_millis(50),
            startup_timeout: Duration::from_millis(100),
            liveness_period: Duration::from_secs(1),
            liveness_failure_threshold: 3,
            liveness_timeout: Duration::from_secs(1),
        };
        let result = wait_for_startup(
            "bnd_test",
            &spec,
            "http://127.0.0.1:1/healthz",
            Duration::from_millis(500),
        )
        .await;
        let err = result.unwrap_err();
        assert_eq!(err.code.code, "OL-4302");
    }
}