use std::collections::BTreeMap;
use std::path::PathBuf;
use std::time::{Duration, Instant};
use serde::{Deserialize, Serialize};
use super::manifest::{PortDiscovery, ServiceDecl, ServicesManifest, expand_tilde_owned};
#[cfg(test)]
mod tests;
pub const CACHE_TTL: Duration = Duration::from_secs(5);
pub const HEALTH_PROBE_TIMEOUT: Duration = Duration::from_millis(1500);
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServiceStatus {
pub name: String,
pub declared: bool,
pub running: bool,
pub pid: Option<u32>,
pub port: Option<u16>,
pub url: Option<String>,
pub version: Option<String>,
pub health: HealthState,
pub log_path: Option<PathBuf>,
#[serde(rename = "uptime_secs")]
pub uptime_secs: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum HealthState {
Ok,
Unknown,
Fail {
detail: String,
},
}
pub struct HealthResult {
pub name: String,
pub state: HealthState,
pub message: String,
}
pub trait ProcessProber: Send + Sync {
fn pgrep(&self, pattern: &str) -> Option<u32>;
}
pub trait PortProber: Send + Sync {
fn read_port_file(&self, path: &std::path::Path) -> Option<u16>;
}
pub trait HttpProber: Send + Sync {
fn get_health(&self, url: &str, timeout: Duration) -> HealthState;
}
pub trait VersionRunner: Send + Sync {
fn run(&self, cmd: &str) -> Option<String>;
}
pub struct RealProcessProber;
impl ProcessProber for RealProcessProber {
fn pgrep(&self, pattern: &str) -> Option<u32> {
let out = std::process::Command::new("pgrep")
.args(["-f", pattern])
.output()
.ok()?;
if !out.status.success() {
return None;
}
let stdout = String::from_utf8_lossy(&out.stdout);
stdout
.split_whitespace()
.next()
.and_then(|s| s.parse::<u32>().ok())
}
}
pub struct RealPortProber;
impl PortProber for RealPortProber {
fn read_port_file(&self, path: &std::path::Path) -> Option<u16> {
let content = std::fs::read_to_string(path).ok()?;
let trimmed = content.trim();
let port_str = trimmed.rsplit(':').next()?;
port_str.trim().parse::<u16>().ok()
}
}
pub struct RealHttpProber;
impl HttpProber for RealHttpProber {
fn get_health(&self, url: &str, timeout: Duration) -> HealthState {
let client = match reqwest::blocking::Client::builder()
.timeout(timeout)
.build()
{
Ok(c) => c,
Err(e) => {
return HealthState::Fail {
detail: format!("failed to build HTTP client: {e}"),
};
}
};
match client.get(url).send() {
Ok(resp) if resp.status().is_success() => HealthState::Ok,
Ok(resp) => HealthState::Fail {
detail: format!("HTTP {}", resp.status()),
},
Err(e) => HealthState::Fail {
detail: e.to_string(),
},
}
}
}
pub struct RealVersionRunner;
impl VersionRunner for RealVersionRunner {
fn run(&self, cmd: &str) -> Option<String> {
let out = std::process::Command::new("sh")
.args(["-c", cmd])
.output()
.ok()?;
String::from_utf8_lossy(&out.stdout)
.lines()
.find(|l| !l.trim().is_empty())
.map(|l| l.trim().to_string())
}
}
pub struct Discoverer {
manifest: ServicesManifest,
pub(crate) cache: BTreeMap<String, (Instant, ServiceStatus)>,
process_prober: Box<dyn ProcessProber>,
port_prober: Box<dyn PortProber>,
http_prober: Box<dyn HttpProber>,
version_runner: Box<dyn VersionRunner>,
}
impl Discoverer {
pub fn new(manifest: ServicesManifest) -> Self {
Self {
manifest,
cache: BTreeMap::new(),
process_prober: Box::new(RealProcessProber),
port_prober: Box::new(RealPortProber),
http_prober: Box::new(RealHttpProber),
version_runner: Box::new(RealVersionRunner),
}
}
#[cfg(test)]
pub fn with_probers(
manifest: ServicesManifest,
process_prober: Box<dyn ProcessProber>,
port_prober: Box<dyn PortProber>,
http_prober: Box<dyn HttpProber>,
version_runner: Box<dyn VersionRunner>,
) -> Self {
Self {
manifest,
cache: BTreeMap::new(),
process_prober,
port_prober,
http_prober,
version_runner,
}
}
pub fn list(&mut self) -> Vec<ServiceStatus> {
let names: Vec<String> = self.manifest.services.keys().cloned().collect();
names
.into_iter()
.map(|name| {
let decl = self.manifest.services[&name].clone();
self.probe_or_cached(&name, &decl)
})
.collect()
}
pub fn status(&mut self, name: &str) -> Option<ServiceStatus> {
let decl = self.manifest.services.get(name)?.clone();
Some(self.probe_or_cached(name, &decl))
}
pub fn health(&mut self, name: &str) -> Option<HealthResult> {
let decl = self.manifest.services.get(name)?.clone();
let mut status = self.probe_or_cached(name, &decl);
let fresh_health = if let Some(url) = &status.url {
if decl.health_url.is_some() {
self.probe_health_url(url, &decl)
} else if status.running {
HealthState::Unknown
} else {
HealthState::Fail {
detail: "not running".into(),
}
}
} else if decl.health_url.is_none() {
if status.running {
HealthState::Unknown
} else {
HealthState::Fail {
detail: "not running".into(),
}
}
} else {
HealthState::Fail {
detail: "no URL available".into(),
}
};
status.health = fresh_health.clone();
self.cache
.insert(name.to_string(), (Instant::now(), status));
let message = match &fresh_health {
HealthState::Ok => "healthy".to_string(),
HealthState::Unknown => "no health endpoint (process-liveness only)".to_string(),
HealthState::Fail { detail } => format!("unhealthy: {detail}"),
};
Some(HealthResult {
name: name.to_string(),
state: fresh_health,
message,
})
}
fn probe_or_cached(&mut self, name: &str, decl: &ServiceDecl) -> ServiceStatus {
if let Some((inserted_at, cached)) = self.cache.get(name)
&& inserted_at.elapsed() < CACHE_TTL
{
return cached.clone();
}
let status = self.probe(name, decl);
self.cache
.insert(name.to_string(), (Instant::now(), status.clone()));
status
}
fn probe(&mut self, name: &str, decl: &ServiceDecl) -> ServiceStatus {
let pid = self.probe_process(decl);
let port = self.probe_port(decl);
let url = port.map(|p| format!("http://localhost:{p}"));
let health = if let Some(u) = &url {
if decl.health_url.is_some() {
self.probe_health_url(u, decl)
} else if pid.is_some() {
HealthState::Unknown
} else {
HealthState::Fail {
detail: "not running".into(),
}
}
} else if decl.health_url.is_none() {
if pid.is_some() {
HealthState::Unknown
} else {
HealthState::Fail {
detail: "not running".into(),
}
}
} else {
HealthState::Fail {
detail: "not running".into(),
}
};
let version = if pid.is_some() {
self.probe_version(decl)
} else {
None
};
let uptime_secs = pid.and_then(|p| self.probe_uptime(p));
let log_path = decl.log_path.as_ref().and_then(|p| {
let expanded = expand_tilde_owned(p);
expanded.exists().then_some(expanded)
});
ServiceStatus {
name: name.to_string(),
declared: true,
running: pid.is_some(),
pid,
port,
url,
version,
health,
log_path,
uptime_secs,
}
}
fn probe_process(&self, decl: &ServiceDecl) -> Option<u32> {
let pattern = decl.process_match.as_deref()?;
self.process_prober.pgrep(pattern)
}
fn probe_port(&self, decl: &ServiceDecl) -> Option<u16> {
match &decl.port_discovery {
PortDiscovery::File => {
let port_file_str = decl.port_file.as_deref()?;
let path = expand_tilde_owned(port_file_str);
self.port_prober.read_port_file(&path)
}
PortDiscovery::Static => decl.default_port,
}
}
fn probe_health_url(&self, url: &str, decl: &ServiceDecl) -> HealthState {
let template = match &decl.health_url {
Some(t) => t,
None => return HealthState::Unknown,
};
let port_str = url.rsplit(':').next().unwrap_or("0");
let health_url = template.replace("{port}", port_str);
self.http_prober
.get_health(&health_url, HEALTH_PROBE_TIMEOUT)
}
fn probe_version(&self, decl: &ServiceDecl) -> Option<String> {
let cmd = decl.version_cmd.as_deref()?;
self.version_runner.run(cmd)
}
fn probe_uptime(&self, pid: u32) -> Option<u64> {
use sysinfo::{Pid, ProcessRefreshKind, RefreshKind, System};
let mut sys = System::new_with_specifics(
RefreshKind::nothing().with_processes(ProcessRefreshKind::nothing()),
);
sys.refresh_processes(sysinfo::ProcessesToUpdate::All, true);
let sysinfo_pid = Pid::from_u32(pid);
let proc_ = sys.process(sysinfo_pid)?;
let start = proc_.start_time(); let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.ok()?
.as_secs();
Some(now.saturating_sub(start))
}
}