use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
use tracing::{info, warn};
const DEFAULT_HEALTH_PORT: u16 = 9090;
const USAGE_FLUSH_INTERVAL_SECS: u64 = 60;
pub fn get_rss_bytes() -> Option<u64> {
#[cfg(target_os = "macos")]
{
type MachPort = u32;
type KernReturn = i32;
type TaskFlavor = u32;
type NaturalT = u32;
#[repr(C)]
struct MachTaskBasicInfo {
virtual_size: u64,
resident_size: u64,
resident_size_max: u64,
user_time_sec: i32,
user_time_usec: i32,
system_time_sec: i32,
system_time_usec: i32,
policy: i32,
suspend_count: i32,
}
const MACH_TASK_BASIC_INFO: TaskFlavor = 20;
const KERN_SUCCESS: KernReturn = 0;
extern "C" {
static mach_task_self_: MachPort;
fn task_info(
target_task: MachPort,
flavor: TaskFlavor,
task_info_out: *mut MachTaskBasicInfo,
task_info_out_cnt: *mut NaturalT,
) -> KernReturn;
}
let mut info = MachTaskBasicInfo {
virtual_size: 0,
resident_size: 0,
resident_size_max: 0,
user_time_sec: 0,
user_time_usec: 0,
system_time_sec: 0,
system_time_usec: 0,
policy: 0,
suspend_count: 0,
};
let mut count = (std::mem::size_of::<MachTaskBasicInfo>() / std::mem::size_of::<NaturalT>())
as NaturalT;
let ret =
unsafe { task_info(mach_task_self_, MACH_TASK_BASIC_INFO, &mut info, &mut count) };
if ret == KERN_SUCCESS {
Some(info.resident_size)
} else {
None
}
}
#[cfg(target_os = "linux")]
{
let content = std::fs::read_to_string("/proc/self/statm").ok()?;
let resident_pages: u64 = content.split_whitespace().nth(1)?.parse().ok()?;
extern "C" {
fn sysconf(name: i32) -> i64;
}
const SC_PAGESIZE: i32 = 30;
let page_size = unsafe { sysconf(SC_PAGESIZE) };
if page_size <= 0 {
return None;
}
Some(resident_pages * (page_size as u64))
}
#[cfg(not(any(target_os = "macos", target_os = "linux")))]
{
None
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum HealthStatus {
Ok,
Degraded,
Down,
}
impl HealthStatus {
fn as_str(&self) -> &'static str {
match self {
HealthStatus::Ok => "ok",
HealthStatus::Degraded => "degraded",
HealthStatus::Down => "down",
}
}
}
#[derive(Debug, Clone)]
pub struct HealthCheck {
pub name: String,
pub status: HealthStatus,
pub message: Option<String>,
pub restart_count: u64,
pub last_error: Option<String>,
}
impl Default for HealthCheck {
fn default() -> Self {
Self {
name: String::new(),
status: HealthStatus::Ok,
message: None,
restart_count: 0,
last_error: None,
}
}
}
#[derive(Clone)]
pub struct HealthRegistry {
checks: Arc<RwLock<HashMap<String, HealthCheck>>>,
start_time: Instant,
metrics: Arc<RwLock<Option<Arc<UsageMetrics>>>>,
}
impl HealthRegistry {
pub fn new() -> Self {
Self {
checks: Arc::new(RwLock::new(HashMap::new())),
start_time: Instant::now(),
metrics: Arc::new(RwLock::new(None)),
}
}
pub fn set_metrics(&self, metrics: Arc<UsageMetrics>) {
*self.metrics.write().unwrap() = Some(metrics);
}
pub fn register(&self, check: HealthCheck) {
self.checks
.write()
.unwrap()
.insert(check.name.clone(), check);
}
pub fn update(&self, name: &str, status: HealthStatus, message: Option<String>) {
let mut checks = self.checks.write().unwrap();
if let Some(check) = checks.get_mut(name) {
check.status = status;
check.message = message;
}
}
pub fn is_ready(&self) -> bool {
let checks = self.checks.read().unwrap();
checks.values().all(|c| c.status != HealthStatus::Down)
}
pub fn bump_restart(&self, name: &str) {
let mut checks = self.checks.write().unwrap();
if let Some(check) = checks.get_mut(name) {
check.restart_count += 1;
}
}
pub fn set_error(&self, name: &str, error: &str) {
let mut checks = self.checks.write().unwrap();
if let Some(check) = checks.get_mut(name) {
check.status = HealthStatus::Down;
check.last_error = Some(error.to_string());
}
}
pub fn all_checks(&self) -> Vec<HealthCheck> {
self.checks.read().unwrap().values().cloned().collect()
}
pub fn uptime(&self) -> Duration {
self.start_time.elapsed()
}
pub(crate) fn render_checks_json(&self) -> String {
let checks = self.checks.read().unwrap();
if checks.is_empty() {
return "{}".to_string();
}
let parts: Vec<String> = checks
.values()
.map(|c| {
let mut fields = format!("\"status\":\"{}\"", c.status.as_str());
if let Some(ref msg) = c.message {
fields.push_str(&format!(",\"message\":\"{}\"", msg.replace('"', "\\\"")));
}
if c.restart_count > 0 {
fields.push_str(&format!(",\"restart_count\":{}", c.restart_count));
}
if let Some(ref err) = c.last_error {
fields.push_str(&format!(",\"last_error\":\"{}\"", err.replace('"', "\\\"")));
}
format!("\"{}\":{{{}}}", c.name, fields)
})
.collect();
format!("{{{}}}", parts.join(","))
}
pub fn render_health_json(&self) -> String {
let status = if self.is_ready() { "ok" } else { "degraded" };
let version = env!("CARGO_PKG_VERSION");
let uptime = self.uptime().as_secs();
let checks_json = self.render_checks_json();
let mut json = format!(
"{{\"status\":\"{}\",\"version\":\"{}\",\"uptime_secs\":{}",
status, version, uptime
);
if let Some(rss) = get_rss_bytes() {
let rss_mb = rss as f64 / (1024.0 * 1024.0);
json.push_str(&format!(
",\"memory\":{{\"rss_bytes\":{},\"rss_mb\":{:.1}}}",
rss, rss_mb
));
}
if let Some(ref m) = *self.metrics.read().unwrap() {
let requests = m.requests.load(Ordering::Relaxed);
let tool_calls = m.tool_calls.load(Ordering::Relaxed);
let input_tokens = m.input_tokens.load(Ordering::Relaxed);
let output_tokens = m.output_tokens.load(Ordering::Relaxed);
let errors = m.errors.load(Ordering::Relaxed);
json.push_str(&format!(
",\"usage\":{{\"requests\":{},\"tool_calls\":{},\"input_tokens\":{},\"output_tokens\":{},\"errors\":{}}}",
requests, tool_calls, input_tokens, output_tokens, errors
));
}
json.push_str(&format!(",\"checks\":{}}}", checks_json));
json
}
}
impl Default for HealthRegistry {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct UsageMetrics {
pub requests: AtomicU64,
pub tool_calls: AtomicU64,
pub input_tokens: AtomicU64,
pub output_tokens: AtomicU64,
pub errors: AtomicU64,
pub ready: AtomicBool,
}
impl UsageMetrics {
pub fn new() -> Self {
Self {
requests: AtomicU64::new(0),
tool_calls: AtomicU64::new(0),
input_tokens: AtomicU64::new(0),
output_tokens: AtomicU64::new(0),
errors: AtomicU64::new(0),
ready: AtomicBool::new(false),
}
}
pub fn record_request(&self) {
self.requests.fetch_add(1, Ordering::Relaxed);
}
pub fn record_tool_calls(&self, count: u64) {
self.tool_calls.fetch_add(count, Ordering::Relaxed);
}
pub fn record_tokens(&self, input: u64, output: u64) {
self.input_tokens.fetch_add(input, Ordering::Relaxed);
self.output_tokens.fetch_add(output, Ordering::Relaxed);
}
pub fn record_error(&self) {
self.errors.fetch_add(1, Ordering::Relaxed);
}
pub fn set_ready(&self, ready: bool) {
self.ready.store(ready, Ordering::SeqCst);
}
pub fn emit_usage(&self, reason: &str) {
info!(
event = "usage_summary",
reason = reason,
requests = self.requests.load(Ordering::Relaxed),
tool_calls = self.tool_calls.load(Ordering::Relaxed),
input_tokens = self.input_tokens.load(Ordering::Relaxed),
output_tokens = self.output_tokens.load(Ordering::Relaxed),
errors = self.errors.load(Ordering::Relaxed),
"Usage metrics"
);
}
}
impl Default for UsageMetrics {
fn default() -> Self {
Self::new()
}
}
pub async fn start_health_server(
host: &str,
port: u16,
registry: HealthRegistry,
) -> Result<tokio::task::JoinHandle<()>, Box<dyn std::error::Error + Send + Sync>> {
let addr = format!("{}:{}", host, port);
let listener = TcpListener::bind(&addr).await?;
info!(addr = %addr, "Health server listening on http://{}", addr);
let handle = tokio::spawn(async move {
loop {
match listener.accept().await {
Ok((mut stream, _addr)) => {
let registry = registry.clone();
tokio::spawn(async move {
let mut buf = [0u8; 512];
let n = match tokio::time::timeout(
Duration::from_secs(5),
tokio::io::AsyncReadExt::read(&mut stream, &mut buf),
)
.await
{
Ok(Ok(n)) => n,
_ => return,
};
let request = String::from_utf8_lossy(&buf[..n]);
let request_line = request.lines().next().unwrap_or_default();
let mut parts = request_line.split_whitespace();
let method = parts.next().unwrap_or_default();
let raw_path = parts.next().unwrap_or_default();
let path = raw_path.split('?').next().unwrap_or(raw_path);
let (status_line, body) = match (method, path) {
("GET", "/health") | ("GET", "/healthz") => {
let body = registry.render_health_json();
("200 OK", body)
}
("GET", "/ready") | ("GET", "/readyz") => {
if registry.is_ready() {
("200 OK", "{\"status\":\"ready\"}".to_string())
} else {
(
"503 Service Unavailable",
"{\"status\":\"not_ready\"}".to_string(),
)
}
}
_ => ("404 Not Found", "{\"error\":\"not_found\"}".to_string()),
};
let response = format!(
"HTTP/1.1 {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
status_line,
body.len(),
body
);
let _ = stream.write_all(response.as_bytes()).await;
let _ = stream.shutdown().await;
});
}
Err(e) => {
warn!(error = %e, "Health server accept error");
}
}
}
});
Ok(handle)
}
pub async fn start_health_server_legacy(
port: u16,
metrics: Arc<UsageMetrics>,
) -> std::io::Result<tokio::task::JoinHandle<()>> {
let host = std::env::var("ZEPTOCLAW_HEALTH_HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
let listener = TcpListener::bind(format!("{}:{}", host, port)).await?;
info!(port = port, "Health server listening");
let handle = tokio::spawn(async move {
loop {
match listener.accept().await {
Ok((mut stream, _addr)) => {
let metrics = Arc::clone(&metrics);
tokio::spawn(async move {
let mut buf = [0u8; 512];
let n = match tokio::time::timeout(
Duration::from_secs(5),
tokio::io::AsyncReadExt::read(&mut stream, &mut buf),
)
.await
{
Ok(Ok(n)) => n,
_ => return,
};
let request = String::from_utf8_lossy(&buf[..n]);
let request_line = request.lines().next().unwrap_or_default();
let mut parts = request_line.split_whitespace();
let method = parts.next().unwrap_or_default();
let raw_path = parts.next().unwrap_or_default();
let path = raw_path.split('?').next().unwrap_or(raw_path);
let (status, body): (&str, String) = match (method, path) {
("GET", "/healthz") | ("GET", "/health") => {
let mut parts: Vec<String> = Vec::with_capacity(5);
let ready = metrics.ready.load(Ordering::SeqCst);
parts.push(format!(
"\"status\":\"{}\"",
if ready { "ok" } else { "degraded" }
));
parts
.push(format!("\"version\":\"{}\"", env!("CARGO_PKG_VERSION")));
if let Some(rss) = get_rss_bytes() {
let mb = rss as f64 / 1_048_576.0;
parts.push(format!(
"\"memory\":{{\"rss_bytes\":{},\"rss_mb\":{:.1}}}",
rss, mb
));
}
parts.push(format!(
"\"usage\":{{\"requests\":{},\"tool_calls\":{},\"input_tokens\":{},\"output_tokens\":{},\"errors\":{}}}",
metrics.requests.load(Ordering::Relaxed),
metrics.tool_calls.load(Ordering::Relaxed),
metrics.input_tokens.load(Ordering::Relaxed),
metrics.output_tokens.load(Ordering::Relaxed),
metrics.errors.load(Ordering::Relaxed),
));
("200 OK", format!("{{{}}}", parts.join(",")))
}
("GET", "/readyz") | ("GET", "/ready") => {
if metrics.ready.load(Ordering::SeqCst) {
("200 OK", "{\"status\":\"ready\"}".to_string())
} else {
(
"503 Service Unavailable",
"{\"status\":\"not_ready\"}".to_string(),
)
}
}
_ => ("404 Not Found", "{\"error\":\"not_found\"}".to_string()),
};
let response = format!(
"HTTP/1.1 {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
status,
body.len(),
body
);
let _ = stream.write_all(response.as_bytes()).await;
let _ = stream.shutdown().await;
});
}
Err(e) => {
warn!(error = %e, "Health server accept error");
}
}
}
});
Ok(handle)
}
pub fn start_periodic_usage_flush(
metrics: Arc<UsageMetrics>,
mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(USAGE_FLUSH_INTERVAL_SECS));
interval.tick().await;
loop {
tokio::select! {
_ = interval.tick() => {
metrics.emit_usage("periodic");
}
_ = shutdown_rx.changed() => {
if *shutdown_rx.borrow() {
metrics.emit_usage("shutdown");
break;
}
}
}
}
})
}
pub fn health_port() -> u16 {
std::env::var("ZEPTOCLAW_HEALTH_PORT")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_HEALTH_PORT)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_registry_ready_when_empty() {
let reg = HealthRegistry::new();
assert!(reg.is_ready());
}
#[test]
fn test_registry_not_ready_when_check_down() {
let reg = HealthRegistry::new();
reg.register(HealthCheck {
name: "telegram".into(),
status: HealthStatus::Down,
message: None,
..Default::default()
});
assert!(!reg.is_ready());
}
#[test]
fn test_registry_ready_when_all_ok() {
let reg = HealthRegistry::new();
reg.register(HealthCheck {
name: "telegram".into(),
status: HealthStatus::Ok,
message: None,
..Default::default()
});
reg.register(HealthCheck {
name: "provider".into(),
status: HealthStatus::Ok,
message: None,
..Default::default()
});
assert!(reg.is_ready());
}
#[test]
fn test_registry_ready_with_degraded() {
let reg = HealthRegistry::new();
reg.register(HealthCheck {
name: "web".into(),
status: HealthStatus::Degraded,
message: None,
..Default::default()
});
assert!(reg.is_ready()); }
#[test]
fn test_update_check_status() {
let reg = HealthRegistry::new();
reg.register(HealthCheck {
name: "db".into(),
status: HealthStatus::Ok,
message: None,
..Default::default()
});
reg.update("db", HealthStatus::Down, Some("connection refused".into()));
assert!(!reg.is_ready());
}
#[test]
fn test_update_nonexistent_noop() {
let reg = HealthRegistry::new();
reg.update("ghost", HealthStatus::Down, None);
assert!(reg.is_ready());
}
#[test]
fn test_uptime_increases() {
let reg = HealthRegistry::new();
std::thread::sleep(Duration::from_millis(10));
assert!(reg.uptime().as_millis() >= 10);
}
#[test]
fn test_render_checks_json_empty() {
let reg = HealthRegistry::new();
assert_eq!(reg.render_checks_json(), "{}");
}
#[test]
fn test_render_checks_json_ok() {
let reg = HealthRegistry::new();
reg.register(HealthCheck {
name: "db".into(),
status: HealthStatus::Ok,
message: None,
..Default::default()
});
let json = reg.render_checks_json();
assert!(json.contains("\"db\""));
assert!(json.contains("\"status\":\"ok\""));
}
#[test]
fn test_render_checks_json_with_message() {
let reg = HealthRegistry::new();
reg.register(HealthCheck {
name: "db".into(),
status: HealthStatus::Down,
message: Some("timeout".into()),
..Default::default()
});
let json = reg.render_checks_json();
assert!(json.contains("\"message\":\"timeout\""));
}
#[test]
fn test_render_checks_json_message_escapes_quotes() {
let reg = HealthRegistry::new();
reg.register(HealthCheck {
name: "x".into(),
status: HealthStatus::Ok,
message: Some("say \"hi\"".into()),
..Default::default()
});
let json = reg.render_checks_json();
assert!(json.contains("\\\"hi\\\""));
}
#[test]
fn test_health_status_as_str() {
assert_eq!(HealthStatus::Ok.as_str(), "ok");
assert_eq!(HealthStatus::Degraded.as_str(), "degraded");
assert_eq!(HealthStatus::Down.as_str(), "down");
}
#[test]
fn test_usage_metrics_creation() {
let metrics = UsageMetrics::new();
assert_eq!(metrics.requests.load(Ordering::Relaxed), 0);
assert_eq!(metrics.tool_calls.load(Ordering::Relaxed), 0);
assert!(!metrics.ready.load(Ordering::SeqCst));
}
#[test]
fn test_usage_metrics_recording() {
let metrics = UsageMetrics::new();
metrics.record_request();
metrics.record_request();
metrics.record_tool_calls(3);
metrics.record_tokens(100, 50);
metrics.record_error();
assert_eq!(metrics.requests.load(Ordering::Relaxed), 2);
assert_eq!(metrics.tool_calls.load(Ordering::Relaxed), 3);
assert_eq!(metrics.input_tokens.load(Ordering::Relaxed), 100);
assert_eq!(metrics.output_tokens.load(Ordering::Relaxed), 50);
assert_eq!(metrics.errors.load(Ordering::Relaxed), 1);
}
#[test]
fn test_ready_flag() {
let metrics = UsageMetrics::new();
assert!(!metrics.ready.load(Ordering::SeqCst));
metrics.set_ready(true);
assert!(metrics.ready.load(Ordering::SeqCst));
metrics.set_ready(false);
assert!(!metrics.ready.load(Ordering::SeqCst));
}
#[test]
fn test_health_port_default() {
std::env::remove_var("ZEPTOCLAW_HEALTH_PORT");
assert_eq!(health_port(), DEFAULT_HEALTH_PORT);
}
#[test]
fn test_registry_register_replaces_existing() {
let reg = HealthRegistry::new();
reg.register(HealthCheck {
name: "svc".into(),
status: HealthStatus::Ok,
message: None,
..Default::default()
});
reg.register(HealthCheck {
name: "svc".into(),
status: HealthStatus::Down,
message: Some("crashed".into()),
..Default::default()
});
assert!(!reg.is_ready());
}
#[tokio::test]
async fn test_health_server_health_endpoint() {
let registry = HealthRegistry::new();
registry.register(HealthCheck {
name: "provider".into(),
status: HealthStatus::Ok,
message: None,
..Default::default()
});
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
drop(listener);
let handle = start_health_server("127.0.0.1", port, registry)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let mut stream = tokio::net::TcpStream::connect(format!("127.0.0.1:{}", port))
.await
.unwrap();
tokio::io::AsyncWriteExt::write_all(
&mut stream,
b"GET /health HTTP/1.1\r\nHost: localhost\r\n\r\n",
)
.await
.unwrap();
let mut buf = vec![0u8; 1024];
let n = tokio::io::AsyncReadExt::read(&mut stream, &mut buf)
.await
.unwrap();
let response = String::from_utf8_lossy(&buf[..n]);
assert!(response.contains("200 OK"), "response: {}", response);
assert!(response.contains("\"status\":\"ok\""));
assert!(response.contains("uptime_secs"));
assert!(response.contains("\"provider\""));
handle.abort();
}
#[tokio::test]
async fn test_health_server_ready_endpoint_all_ok() {
let registry = HealthRegistry::new();
registry.register(HealthCheck {
name: "svc".into(),
status: HealthStatus::Ok,
message: None,
..Default::default()
});
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
drop(listener);
let handle = start_health_server("127.0.0.1", port, registry)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let mut stream = tokio::net::TcpStream::connect(format!("127.0.0.1:{}", port))
.await
.unwrap();
tokio::io::AsyncWriteExt::write_all(
&mut stream,
b"GET /ready HTTP/1.1\r\nHost: localhost\r\n\r\n",
)
.await
.unwrap();
let mut buf = vec![0u8; 1024];
let n = tokio::io::AsyncReadExt::read(&mut stream, &mut buf)
.await
.unwrap();
let response = String::from_utf8_lossy(&buf[..n]);
assert!(response.contains("200 OK"));
handle.abort();
}
#[tokio::test]
async fn test_health_server_ready_endpoint_down() {
let registry = HealthRegistry::new();
registry.register(HealthCheck {
name: "svc".into(),
status: HealthStatus::Down,
message: Some("unreachable".into()),
..Default::default()
});
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
drop(listener);
let handle = start_health_server("127.0.0.1", port, registry)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let mut stream = tokio::net::TcpStream::connect(format!("127.0.0.1:{}", port))
.await
.unwrap();
tokio::io::AsyncWriteExt::write_all(
&mut stream,
b"GET /ready HTTP/1.1\r\nHost: localhost\r\n\r\n",
)
.await
.unwrap();
let mut buf = vec![0u8; 1024];
let n = tokio::io::AsyncReadExt::read(&mut stream, &mut buf)
.await
.unwrap();
let response = String::from_utf8_lossy(&buf[..n]);
assert!(response.contains("503"));
handle.abort();
}
#[tokio::test]
async fn test_health_server_404_on_unknown_path() {
let registry = HealthRegistry::new();
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
drop(listener);
let handle = start_health_server("127.0.0.1", port, registry)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let mut stream = tokio::net::TcpStream::connect(format!("127.0.0.1:{}", port))
.await
.unwrap();
tokio::io::AsyncWriteExt::write_all(
&mut stream,
b"GET /unknown HTTP/1.1\r\nHost: localhost\r\n\r\n",
)
.await
.unwrap();
let mut buf = vec![0u8; 1024];
let n = tokio::io::AsyncReadExt::read(&mut stream, &mut buf)
.await
.unwrap();
let response = String::from_utf8_lossy(&buf[..n]);
assert!(response.contains("404"));
handle.abort();
}
#[tokio::test]
async fn test_health_server_backward_compat_healthz() {
let registry = HealthRegistry::new();
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
drop(listener);
let handle = start_health_server("127.0.0.1", port, registry)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let mut stream = tokio::net::TcpStream::connect(format!("127.0.0.1:{}", port))
.await
.unwrap();
tokio::io::AsyncWriteExt::write_all(
&mut stream,
b"GET /healthz HTTP/1.1\r\nHost: localhost\r\n\r\n",
)
.await
.unwrap();
let mut buf = vec![0u8; 1024];
let n = tokio::io::AsyncReadExt::read(&mut stream, &mut buf)
.await
.unwrap();
let response = String::from_utf8_lossy(&buf[..n]);
assert!(response.contains("200 OK"));
handle.abort();
}
#[tokio::test]
async fn test_legacy_health_server() {
let metrics = Arc::new(UsageMetrics::new());
metrics.set_ready(true);
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
drop(listener);
let handle = start_health_server_legacy(port, Arc::clone(&metrics))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let mut stream = tokio::net::TcpStream::connect(format!("127.0.0.1:{}", port))
.await
.unwrap();
tokio::io::AsyncWriteExt::write_all(
&mut stream,
b"GET /healthz HTTP/1.1\r\nHost: localhost\r\n\r\n",
)
.await
.unwrap();
let mut buf = vec![0u8; 1024];
let n = tokio::io::AsyncReadExt::read(&mut stream, &mut buf)
.await
.unwrap();
let response = String::from_utf8_lossy(&buf[..n]);
assert!(response.contains("200 OK"));
assert!(response.contains("\"status\":\"ok\""));
handle.abort();
}
#[tokio::test]
async fn test_health_endpoint_includes_version_and_memory() {
let registry = HealthRegistry::new();
registry.register(HealthCheck {
name: "svc".into(),
status: HealthStatus::Ok,
..Default::default()
});
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
drop(listener);
let handle = start_health_server("127.0.0.1", port, registry)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let mut stream = tokio::net::TcpStream::connect(format!("127.0.0.1:{}", port))
.await
.unwrap();
tokio::io::AsyncWriteExt::write_all(
&mut stream,
b"GET /health HTTP/1.1\r\nHost: localhost\r\n\r\n",
)
.await
.unwrap();
let mut buf = vec![0u8; 2048];
let n = tokio::io::AsyncReadExt::read(&mut stream, &mut buf)
.await
.unwrap();
let response = String::from_utf8_lossy(&buf[..n]);
assert!(
response.contains("\"version\":\""),
"missing version: {}",
response
);
assert!(
response.contains("\"uptime_secs\":"),
"missing uptime: {}",
response
);
assert!(
response.contains("\"checks\":"),
"missing checks: {}",
response
);
handle.abort();
}
#[test]
fn test_bump_restart_increments_count() {
let reg = HealthRegistry::new();
reg.register(HealthCheck {
name: "gw".into(),
status: HealthStatus::Ok,
..Default::default()
});
reg.bump_restart("gw");
reg.bump_restart("gw");
let checks = reg.all_checks();
let check = checks.iter().find(|c| c.name == "gw").unwrap();
assert_eq!(check.restart_count, 2);
}
#[test]
fn test_bump_restart_noop_on_unknown() {
let reg = HealthRegistry::new();
reg.bump_restart("nonexistent");
assert!(reg.is_ready());
}
#[test]
fn test_set_error_marks_down_and_records_error() {
let reg = HealthRegistry::new();
reg.register(HealthCheck {
name: "db".into(),
status: HealthStatus::Ok,
..Default::default()
});
reg.set_error("db", "connection timeout");
let checks = reg.all_checks();
let check = checks.iter().find(|c| c.name == "db").unwrap();
assert_eq!(check.status, HealthStatus::Down);
assert_eq!(check.last_error.as_deref(), Some("connection timeout"));
assert!(!reg.is_ready());
}
#[test]
fn test_set_error_noop_on_unknown() {
let reg = HealthRegistry::new();
reg.set_error("ghost", "some error");
assert!(reg.is_ready());
}
#[test]
fn test_all_checks_returns_snapshot() {
let reg = HealthRegistry::new();
reg.register(HealthCheck {
name: "a".into(),
status: HealthStatus::Ok,
..Default::default()
});
reg.register(HealthCheck {
name: "b".into(),
status: HealthStatus::Degraded,
..Default::default()
});
let checks = reg.all_checks();
assert_eq!(checks.len(), 2);
let names: Vec<&str> = checks.iter().map(|c| c.name.as_str()).collect();
assert!(names.contains(&"a"));
assert!(names.contains(&"b"));
}
#[test]
fn test_all_checks_empty_registry() {
let reg = HealthRegistry::new();
assert!(reg.all_checks().is_empty());
}
#[test]
fn test_registry_with_metrics() {
let reg = HealthRegistry::new();
let metrics = Arc::new(UsageMetrics::new());
metrics.record_request();
metrics.record_request();
metrics.record_tool_calls(5);
metrics.record_tokens(1000, 500);
metrics.record_error();
reg.set_metrics(Arc::clone(&metrics));
let json = reg.render_health_json();
assert!(json.contains("\"requests\":2"));
assert!(json.contains("\"tool_calls\":5"));
assert!(json.contains("\"input_tokens\":1000"));
assert!(json.contains("\"output_tokens\":500"));
assert!(json.contains("\"errors\":1"));
}
#[test]
fn test_registry_without_metrics_omits_usage() {
let reg = HealthRegistry::new();
let json = reg.render_health_json();
assert!(!json.contains("\"usage\""));
}
#[test]
fn test_render_health_json_has_version() {
let reg = HealthRegistry::new();
let json = reg.render_health_json();
assert!(json.contains("\"version\":\""));
assert!(json.contains("\"uptime_secs\":"));
assert!(json.contains("\"status\":\"ok\""));
assert!(json.contains("\"checks\":{}"));
}
#[test]
fn test_render_health_json_status_degraded_when_down() {
let reg = HealthRegistry::new();
reg.register(HealthCheck {
name: "db".into(),
status: HealthStatus::Down,
..Default::default()
});
let json = reg.render_health_json();
assert!(json.contains("\"status\":\"degraded\""));
}
#[test]
fn test_render_checks_json_with_restart_and_error() {
let reg = HealthRegistry::new();
reg.register(HealthCheck {
name: "gw".into(),
status: HealthStatus::Down,
message: None,
restart_count: 3,
last_error: Some("timeout".into()),
});
let json = reg.render_checks_json();
assert!(json.contains("\"restart_count\":3"));
assert!(json.contains("\"last_error\":\"timeout\""));
}
#[test]
#[cfg(any(target_os = "macos", target_os = "linux"))]
fn test_get_rss_bytes_returns_some() {
let rss = get_rss_bytes();
assert!(
rss.is_some(),
"get_rss_bytes() returned None on a supported platform"
);
assert!(
rss.unwrap() > 0,
"get_rss_bytes() returned Some(0), expected a positive RSS value"
);
}
}