use reqwest::Client;
use std::time::Duration;
use tracing::{debug, warn};
use crate::config::HealthCheckConfig;
use crate::error::Result;
use super::observer::ObservedPod;
const DEFAULT_TIMEOUT_SECS: u64 = 10;
const DEFAULT_CONNECT_TIMEOUT_SECS: u64 = 5;
#[derive(Debug, Clone)]
pub struct HealthStatus {
pub pod_id: String,
pub pod_name: String,
pub healthy: bool,
pub checks: Vec<EndpointCheck>,
pub error: Option<String>,
}
#[derive(Debug, Clone)]
pub struct EndpointCheck {
pub port: u16,
pub url: String,
pub healthy: bool,
pub status_code: Option<u16>,
pub response_time_ms: Option<u64>,
pub error: Option<String>,
}
#[derive(Debug)]
pub struct HealthChecker {
client: Client,
default_config: HealthCheckConfig,
}
impl HealthChecker {
pub fn new() -> Result<Self> {
let client = Client::builder()
.timeout(Duration::from_secs(DEFAULT_TIMEOUT_SECS))
.connect_timeout(Duration::from_secs(DEFAULT_CONNECT_TIMEOUT_SECS))
.build()
.map_err(|e| crate::error::HalldyllError::internal(format!("Failed to create HTTP client: {e}")))?;
Ok(Self {
client,
default_config: HealthCheckConfig {
endpoint: String::from("/health"),
port: 8000,
interval_secs: 30,
timeout_secs: 5,
failure_threshold: 3,
},
})
}
pub fn with_config(config: HealthCheckConfig) -> Result<Self> {
let client = Client::builder()
.timeout(Duration::from_secs(u64::from(config.timeout_secs)))
.connect_timeout(Duration::from_secs(DEFAULT_CONNECT_TIMEOUT_SECS))
.build()
.map_err(|e| crate::error::HalldyllError::internal(format!("Failed to create HTTP client: {e}")))?;
Ok(Self {
client,
default_config: config,
})
}
pub async fn check_pod(&self, pod: &ObservedPod, config: Option<&HealthCheckConfig>) -> HealthStatus {
let config = config.map_or(&self.default_config, |c| c);
debug!("Checking health of pod: {}", pod.id);
let mut checks = Vec::new();
let mut all_healthy = true;
for (port, url) in &pod.endpoints {
let check_url = format!("{url}{}", config.endpoint);
let check = self.check_endpoint(*port, &check_url).await;
if !check.healthy {
all_healthy = false;
}
checks.push(check);
}
if checks.is_empty() {
all_healthy = pod.is_running();
}
HealthStatus {
pod_id: pod.id.clone(),
pod_name: pod.name.clone(),
healthy: all_healthy && pod.is_running(),
checks,
error: if pod.is_running() {
None
} else {
Some(format!("Pod is not running: {}", pod.status))
},
}
}
async fn check_endpoint(&self, port: u16, url: &str) -> EndpointCheck {
let start = std::time::Instant::now();
match self.client.get(url).send().await {
Ok(response) => {
let status = response.status();
let response_time = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
let healthy = status.is_success();
if !healthy {
debug!("Endpoint {url} returned status {status}");
}
EndpointCheck {
port,
url: url.to_string(),
healthy,
status_code: Some(status.as_u16()),
response_time_ms: Some(response_time),
error: if healthy {
None
} else {
Some(format!("HTTP {status}"))
},
}
}
Err(e) => {
warn!("Health check failed for {url}: {e}");
EndpointCheck {
port,
url: url.to_string(),
healthy: false,
status_code: None,
response_time_ms: None,
error: Some(e.to_string()),
}
}
}
}
pub async fn check_pods(&self, pods: &[ObservedPod]) -> Vec<HealthStatus> {
let mut results = Vec::with_capacity(pods.len());
for pod in pods {
results.push(self.check_pod(pod, None).await);
}
results
}
pub async fn wait_for_healthy(
&self,
pod: &ObservedPod,
config: Option<&HealthCheckConfig>,
timeout_secs: u64,
) -> Result<HealthStatus> {
let start = std::time::Instant::now();
let timeout = Duration::from_secs(timeout_secs);
let interval_secs = config.map_or(self.default_config.interval_secs, |c| c.interval_secs);
let check_interval = Duration::from_secs(u64::from(interval_secs));
loop {
let status = self.check_pod(pod, config).await;
if status.healthy {
return Ok(status);
}
if start.elapsed() > timeout {
return Err(crate::error::HalldyllError::RunPod(
crate::error::RunPodError::Timeout {
pod_id: pod.id.clone(),
expected_state: String::from("healthy"),
},
));
}
tokio::time::sleep(check_interval).await;
}
}
}
impl HealthStatus {
#[must_use]
pub fn all_endpoints_healthy(&self) -> bool {
self.checks.iter().all(|c| c.healthy)
}
#[must_use]
pub fn healthy_endpoint_count(&self) -> usize {
self.checks.iter().filter(|c| c.healthy).count()
}
#[must_use]
pub fn average_response_time_ms(&self) -> Option<u64> {
let times: Vec<u64> = self
.checks
.iter()
.filter_map(|c| c.response_time_ms)
.collect();
if times.is_empty() {
None
} else {
Some(times.iter().sum::<u64>() / times.len() as u64)
}
}
}
impl std::fmt::Display for HealthStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let status = if self.healthy { "healthy" } else { "unhealthy" };
write!(f, "{}: {status}", self.pod_name)?;
if !self.checks.is_empty() {
write!(
f,
" ({}/{} endpoints)",
self.healthy_endpoint_count(),
self.checks.len()
)?;
}
if let Some(error) = &self.error {
write!(f, " - {error}")?;
}
Ok(())
}
}