use crate::admin::db::insert_health_check;
use crate::admin::state::SharedState;
use std::collections::HashMap;
use std::time::Duration;
use tokio::task::JoinSet;
async fn probe_backend(client: &reqwest::Client, base_url: &str) -> (bool, Option<u64>) {
let url = format!("{}/v1/models", base_url.trim_end_matches('/'));
let start = std::time::Instant::now();
match client
.get(&url)
.timeout(Duration::from_secs(5))
.send()
.await
{
Ok(resp) => {
let latency = start.elapsed().as_millis() as u64;
let status = resp.status().as_u16();
let is_up = resp.status().is_success() || matches!(status, 401 | 403 | 404);
(is_up, Some(latency))
}
Err(_) => (false, None),
}
}
pub fn spawn(shared: SharedState, backend_urls: Vec<(String, String)>) {
tokio::spawn(async move {
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(6))
.build()
.expect("health check reqwest client");
let mut last_status: HashMap<String, bool> = HashMap::new();
loop {
let mut set: JoinSet<(String, bool, Option<u64>)> = JoinSet::new();
for (name, base_url) in backend_urls.iter().cloned() {
let c = client.clone();
set.spawn(async move {
let (is_up, latency_ms) = probe_backend(&c, &base_url).await;
(name, is_up, latency_ms)
});
}
while let Some(Ok((name, is_up, latency_ms))) = set.join_next().await {
let status_str = if is_up { "up" } else { "down" };
let db = shared.db.clone();
let _ = tokio::task::spawn_blocking({
let name = name.clone();
move || {
let conn = db.lock().unwrap_or_else(|e| e.into_inner());
if let Err(e) = insert_health_check(&conn, &name, status_str, latency_ms) {
tracing::warn!(backend = %name, error = %e, "failed to record health check");
}
}
})
.await;
let prev = last_status.insert(name.clone(), is_up);
if prev != Some(is_up) {
let _ = shared.events_tx.send(
crate::admin::state::AdminEvent::BackendHealthChanged {
backend: name,
status: status_str.to_string(),
latency_ms,
},
);
}
}
tokio::time::sleep(Duration::from_secs(30)).await;
}
});
}