use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
use tracing::{info, warn};
const DEFAULT_HEALTH_PORT: u16 = 9090;
const USAGE_FLUSH_INTERVAL_SECS: u64 = 60;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum HealthStatus {
Ok,
Degraded,
Down,
}
impl HealthStatus {
fn as_str(&self) -> &'static str {
match self {
HealthStatus::Ok => "ok",
HealthStatus::Degraded => "degraded",
HealthStatus::Down => "down",
}
}
}
#[derive(Debug, Clone)]
pub struct HealthCheck {
pub name: String,
pub status: HealthStatus,
pub message: Option<String>,
pub restart_count: u64,
pub last_error: Option<String>,
}
impl Default for HealthCheck {
fn default() -> Self {
Self {
name: String::new(),
status: HealthStatus::Ok,
message: None,
restart_count: 0,
last_error: None,
}
}
}
#[derive(Clone)]
pub struct HealthRegistry {
checks: Arc<RwLock<HashMap<String, HealthCheck>>>,
start_time: Instant,
}
impl HealthRegistry {
pub fn new() -> Self {
Self {
checks: Arc::new(RwLock::new(HashMap::new())),
start_time: Instant::now(),
}
}
pub fn register(&self, check: HealthCheck) {
self.checks
.write()
.unwrap()
.insert(check.name.clone(), check);
}
pub fn update(&self, name: &str, status: HealthStatus, message: Option<String>) {
let mut checks = self.checks.write().unwrap();
if let Some(check) = checks.get_mut(name) {
check.status = status;
check.message = message;
}
}
pub fn is_ready(&self) -> bool {
let checks = self.checks.read().unwrap();
checks.values().all(|c| c.status != HealthStatus::Down)
}
pub fn bump_restart(&self, name: &str) {
let mut checks = self.checks.write().unwrap();
if let Some(check) = checks.get_mut(name) {
check.restart_count += 1;
}
}
pub fn set_error(&self, name: &str, error: &str) {
let mut checks = self.checks.write().unwrap();
if let Some(check) = checks.get_mut(name) {
check.status = HealthStatus::Down;
check.last_error = Some(error.to_string());
}
}
pub fn all_checks(&self) -> Vec<HealthCheck> {
self.checks.read().unwrap().values().cloned().collect()
}
pub fn uptime(&self) -> Duration {
self.start_time.elapsed()
}
fn render_checks_json(&self) -> String {
let checks = self.checks.read().unwrap();
if checks.is_empty() {
return "{}".to_string();
}
let parts: Vec<String> = checks
.values()
.map(|c| {
if let Some(ref msg) = c.message {
format!(
"\"{}\":{{\"status\":\"{}\",\"message\":\"{}\"}}",
c.name,
c.status.as_str(),
msg.replace('"', "\\\"")
)
} else {
format!("\"{}\":{{\"status\":\"{}\"}}", c.name, c.status.as_str())
}
})
.collect();
format!("{{{}}}", parts.join(","))
}
}
impl Default for HealthRegistry {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct UsageMetrics {
pub requests: AtomicU64,
pub tool_calls: AtomicU64,
pub input_tokens: AtomicU64,
pub output_tokens: AtomicU64,
pub errors: AtomicU64,
pub ready: AtomicBool,
}
impl UsageMetrics {
pub fn new() -> Self {
Self {
requests: AtomicU64::new(0),
tool_calls: AtomicU64::new(0),
input_tokens: AtomicU64::new(0),
output_tokens: AtomicU64::new(0),
errors: AtomicU64::new(0),
ready: AtomicBool::new(false),
}
}
pub fn record_request(&self) {
self.requests.fetch_add(1, Ordering::Relaxed);
}
pub fn record_tool_calls(&self, count: u64) {
self.tool_calls.fetch_add(count, Ordering::Relaxed);
}
pub fn record_tokens(&self, input: u64, output: u64) {
self.input_tokens.fetch_add(input, Ordering::Relaxed);
self.output_tokens.fetch_add(output, Ordering::Relaxed);
}
pub fn record_error(&self) {
self.errors.fetch_add(1, Ordering::Relaxed);
}
pub fn set_ready(&self, ready: bool) {
self.ready.store(ready, Ordering::SeqCst);
}
pub fn emit_usage(&self, reason: &str) {
info!(
event = "usage_summary",
reason = reason,
requests = self.requests.load(Ordering::Relaxed),
tool_calls = self.tool_calls.load(Ordering::Relaxed),
input_tokens = self.input_tokens.load(Ordering::Relaxed),
output_tokens = self.output_tokens.load(Ordering::Relaxed),
errors = self.errors.load(Ordering::Relaxed),
"Usage metrics"
);
}
}
impl Default for UsageMetrics {
fn default() -> Self {
Self::new()
}
}
pub async fn start_health_server(
host: &str,
port: u16,
registry: HealthRegistry,
) -> Result<tokio::task::JoinHandle<()>, Box<dyn std::error::Error + Send + Sync>> {
let addr = format!("{}:{}", host, port);
let listener = TcpListener::bind(&addr).await?;
info!(addr = %addr, "Health server listening on http://{}", addr);
let handle = tokio::spawn(async move {
loop {
match listener.accept().await {
Ok((mut stream, _addr)) => {
let registry = registry.clone();
tokio::spawn(async move {
let mut buf = [0u8; 512];
let n = match tokio::time::timeout(
Duration::from_secs(5),
tokio::io::AsyncReadExt::read(&mut stream, &mut buf),
)
.await
{
Ok(Ok(n)) => n,
_ => return,
};
let request = String::from_utf8_lossy(&buf[..n]);
let request_line = request.lines().next().unwrap_or_default();
let mut parts = request_line.split_whitespace();
let method = parts.next().unwrap_or_default();
let raw_path = parts.next().unwrap_or_default();
let path = raw_path.split('?').next().unwrap_or(raw_path);
let (status_line, body) = match (method, path) {
("GET", "/health") | ("GET", "/healthz") => {
let checks_json = registry.render_checks_json();
let uptime = registry.uptime().as_secs();
let body = format!(
"{{\"status\":\"ok\",\"uptime_secs\":{},\"checks\":{}}}",
uptime, checks_json
);
("200 OK", body)
}
("GET", "/ready") | ("GET", "/readyz") => {
if registry.is_ready() {
("200 OK", "{\"status\":\"ready\"}".to_string())
} else {
(
"503 Service Unavailable",
"{\"status\":\"not_ready\"}".to_string(),
)
}
}
_ => ("404 Not Found", "{\"error\":\"not_found\"}".to_string()),
};
let response = format!(
"HTTP/1.1 {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
status_line,
body.len(),
body
);
let _ = stream.write_all(response.as_bytes()).await;
let _ = stream.shutdown().await;
});
}
Err(e) => {
warn!(error = %e, "Health server accept error");
}
}
}
});
Ok(handle)
}
pub async fn start_health_server_legacy(
port: u16,
metrics: Arc<UsageMetrics>,
) -> std::io::Result<tokio::task::JoinHandle<()>> {
let listener = TcpListener::bind(format!("0.0.0.0:{}", port)).await?;
info!(port = port, "Health server listening");
let handle = tokio::spawn(async move {
loop {
match listener.accept().await {
Ok((mut stream, _addr)) => {
let metrics = Arc::clone(&metrics);
tokio::spawn(async move {
let mut buf = [0u8; 512];
let n = match tokio::time::timeout(
Duration::from_secs(5),
tokio::io::AsyncReadExt::read(&mut stream, &mut buf),
)
.await
{
Ok(Ok(n)) => n,
_ => return,
};
let request = String::from_utf8_lossy(&buf[..n]);
let request_line = request.lines().next().unwrap_or_default();
let mut parts = request_line.split_whitespace();
let method = parts.next().unwrap_or_default();
let raw_path = parts.next().unwrap_or_default();
let path = raw_path.split('?').next().unwrap_or(raw_path);
let (status, body) = match (method, path) {
("GET", "/healthz") | ("GET", "/health") => {
("200 OK", "{\"status\":\"ok\"}")
}
("GET", "/readyz") | ("GET", "/ready") => {
if metrics.ready.load(Ordering::SeqCst) {
("200 OK", "{\"status\":\"ready\"}")
} else {
("503 Service Unavailable", "{\"status\":\"not_ready\"}")
}
}
_ => ("404 Not Found", "{\"error\":\"not_found\"}"),
};
let response = format!(
"HTTP/1.1 {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
status,
body.len(),
body
);
let _ = stream.write_all(response.as_bytes()).await;
let _ = stream.shutdown().await;
});
}
Err(e) => {
warn!(error = %e, "Health server accept error");
}
}
}
});
Ok(handle)
}
pub fn start_periodic_usage_flush(
metrics: Arc<UsageMetrics>,
mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(USAGE_FLUSH_INTERVAL_SECS));
interval.tick().await;
loop {
tokio::select! {
_ = interval.tick() => {
metrics.emit_usage("periodic");
}
_ = shutdown_rx.changed() => {
if *shutdown_rx.borrow() {
metrics.emit_usage("shutdown");
break;
}
}
}
}
})
}
pub fn health_port() -> u16 {
std::env::var("ZEPTOCLAW_HEALTH_PORT")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_HEALTH_PORT)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_registry_ready_when_empty() {
let reg = HealthRegistry::new();
assert!(reg.is_ready());
}
#[test]
fn test_registry_not_ready_when_check_down() {
let reg = HealthRegistry::new();
reg.register(HealthCheck {
name: "telegram".into(),
status: HealthStatus::Down,
message: None,
..Default::default()
});
assert!(!reg.is_ready());
}
#[test]
fn test_registry_ready_when_all_ok() {
let reg = HealthRegistry::new();
reg.register(HealthCheck {
name: "telegram".into(),
status: HealthStatus::Ok,
message: None,
..Default::default()
});
reg.register(HealthCheck {
name: "provider".into(),
status: HealthStatus::Ok,
message: None,
..Default::default()
});
assert!(reg.is_ready());
}
#[test]
fn test_registry_ready_with_degraded() {
let reg = HealthRegistry::new();
reg.register(HealthCheck {
name: "web".into(),
status: HealthStatus::Degraded,
message: None,
..Default::default()
});
assert!(reg.is_ready()); }
#[test]
fn test_update_check_status() {
let reg = HealthRegistry::new();
reg.register(HealthCheck {
name: "db".into(),
status: HealthStatus::Ok,
message: None,
..Default::default()
});
reg.update("db", HealthStatus::Down, Some("connection refused".into()));
assert!(!reg.is_ready());
}
#[test]
fn test_update_nonexistent_noop() {
let reg = HealthRegistry::new();
reg.update("ghost", HealthStatus::Down, None);
assert!(reg.is_ready());
}
#[test]
fn test_uptime_increases() {
let reg = HealthRegistry::new();
std::thread::sleep(Duration::from_millis(10));
assert!(reg.uptime().as_millis() >= 10);
}
#[test]
fn test_render_checks_json_empty() {
let reg = HealthRegistry::new();
assert_eq!(reg.render_checks_json(), "{}");
}
#[test]
fn test_render_checks_json_ok() {
let reg = HealthRegistry::new();
reg.register(HealthCheck {
name: "db".into(),
status: HealthStatus::Ok,
message: None,
..Default::default()
});
let json = reg.render_checks_json();
assert!(json.contains("\"db\""));
assert!(json.contains("\"status\":\"ok\""));
}
#[test]
fn test_render_checks_json_with_message() {
let reg = HealthRegistry::new();
reg.register(HealthCheck {
name: "db".into(),
status: HealthStatus::Down,
message: Some("timeout".into()),
..Default::default()
});
let json = reg.render_checks_json();
assert!(json.contains("\"message\":\"timeout\""));
}
#[test]
fn test_render_checks_json_message_escapes_quotes() {
let reg = HealthRegistry::new();
reg.register(HealthCheck {
name: "x".into(),
status: HealthStatus::Ok,
message: Some("say \"hi\"".into()),
..Default::default()
});
let json = reg.render_checks_json();
assert!(json.contains("\\\"hi\\\""));
}
#[test]
fn test_health_status_as_str() {
assert_eq!(HealthStatus::Ok.as_str(), "ok");
assert_eq!(HealthStatus::Degraded.as_str(), "degraded");
assert_eq!(HealthStatus::Down.as_str(), "down");
}
#[test]
fn test_usage_metrics_creation() {
let metrics = UsageMetrics::new();
assert_eq!(metrics.requests.load(Ordering::Relaxed), 0);
assert_eq!(metrics.tool_calls.load(Ordering::Relaxed), 0);
assert!(!metrics.ready.load(Ordering::SeqCst));
}
#[test]
fn test_usage_metrics_recording() {
let metrics = UsageMetrics::new();
metrics.record_request();
metrics.record_request();
metrics.record_tool_calls(3);
metrics.record_tokens(100, 50);
metrics.record_error();
assert_eq!(metrics.requests.load(Ordering::Relaxed), 2);
assert_eq!(metrics.tool_calls.load(Ordering::Relaxed), 3);
assert_eq!(metrics.input_tokens.load(Ordering::Relaxed), 100);
assert_eq!(metrics.output_tokens.load(Ordering::Relaxed), 50);
assert_eq!(metrics.errors.load(Ordering::Relaxed), 1);
}
#[test]
fn test_ready_flag() {
let metrics = UsageMetrics::new();
assert!(!metrics.ready.load(Ordering::SeqCst));
metrics.set_ready(true);
assert!(metrics.ready.load(Ordering::SeqCst));
metrics.set_ready(false);
assert!(!metrics.ready.load(Ordering::SeqCst));
}
#[test]
fn test_health_port_default() {
std::env::remove_var("ZEPTOCLAW_HEALTH_PORT");
assert_eq!(health_port(), DEFAULT_HEALTH_PORT);
}
#[test]
fn test_registry_register_replaces_existing() {
let reg = HealthRegistry::new();
reg.register(HealthCheck {
name: "svc".into(),
status: HealthStatus::Ok,
message: None,
..Default::default()
});
reg.register(HealthCheck {
name: "svc".into(),
status: HealthStatus::Down,
message: Some("crashed".into()),
..Default::default()
});
assert!(!reg.is_ready());
}
#[tokio::test]
async fn test_health_server_health_endpoint() {
let registry = HealthRegistry::new();
registry.register(HealthCheck {
name: "provider".into(),
status: HealthStatus::Ok,
message: None,
..Default::default()
});
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
drop(listener);
let handle = start_health_server("127.0.0.1", port, registry)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let mut stream = tokio::net::TcpStream::connect(format!("127.0.0.1:{}", port))
.await
.unwrap();
tokio::io::AsyncWriteExt::write_all(
&mut stream,
b"GET /health HTTP/1.1\r\nHost: localhost\r\n\r\n",
)
.await
.unwrap();
let mut buf = vec![0u8; 1024];
let n = tokio::io::AsyncReadExt::read(&mut stream, &mut buf)
.await
.unwrap();
let response = String::from_utf8_lossy(&buf[..n]);
assert!(response.contains("200 OK"), "response: {}", response);
assert!(response.contains("\"status\":\"ok\""));
assert!(response.contains("uptime_secs"));
assert!(response.contains("\"provider\""));
handle.abort();
}
#[tokio::test]
async fn test_health_server_ready_endpoint_all_ok() {
let registry = HealthRegistry::new();
registry.register(HealthCheck {
name: "svc".into(),
status: HealthStatus::Ok,
message: None,
..Default::default()
});
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
drop(listener);
let handle = start_health_server("127.0.0.1", port, registry)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let mut stream = tokio::net::TcpStream::connect(format!("127.0.0.1:{}", port))
.await
.unwrap();
tokio::io::AsyncWriteExt::write_all(
&mut stream,
b"GET /ready HTTP/1.1\r\nHost: localhost\r\n\r\n",
)
.await
.unwrap();
let mut buf = vec![0u8; 1024];
let n = tokio::io::AsyncReadExt::read(&mut stream, &mut buf)
.await
.unwrap();
let response = String::from_utf8_lossy(&buf[..n]);
assert!(response.contains("200 OK"));
handle.abort();
}
#[tokio::test]
async fn test_health_server_ready_endpoint_down() {
let registry = HealthRegistry::new();
registry.register(HealthCheck {
name: "svc".into(),
status: HealthStatus::Down,
message: Some("unreachable".into()),
..Default::default()
});
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
drop(listener);
let handle = start_health_server("127.0.0.1", port, registry)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let mut stream = tokio::net::TcpStream::connect(format!("127.0.0.1:{}", port))
.await
.unwrap();
tokio::io::AsyncWriteExt::write_all(
&mut stream,
b"GET /ready HTTP/1.1\r\nHost: localhost\r\n\r\n",
)
.await
.unwrap();
let mut buf = vec![0u8; 1024];
let n = tokio::io::AsyncReadExt::read(&mut stream, &mut buf)
.await
.unwrap();
let response = String::from_utf8_lossy(&buf[..n]);
assert!(response.contains("503"));
handle.abort();
}
#[tokio::test]
async fn test_health_server_404_on_unknown_path() {
let registry = HealthRegistry::new();
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
drop(listener);
let handle = start_health_server("127.0.0.1", port, registry)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let mut stream = tokio::net::TcpStream::connect(format!("127.0.0.1:{}", port))
.await
.unwrap();
tokio::io::AsyncWriteExt::write_all(
&mut stream,
b"GET /unknown HTTP/1.1\r\nHost: localhost\r\n\r\n",
)
.await
.unwrap();
let mut buf = vec![0u8; 1024];
let n = tokio::io::AsyncReadExt::read(&mut stream, &mut buf)
.await
.unwrap();
let response = String::from_utf8_lossy(&buf[..n]);
assert!(response.contains("404"));
handle.abort();
}
#[tokio::test]
async fn test_health_server_backward_compat_healthz() {
let registry = HealthRegistry::new();
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
drop(listener);
let handle = start_health_server("127.0.0.1", port, registry)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let mut stream = tokio::net::TcpStream::connect(format!("127.0.0.1:{}", port))
.await
.unwrap();
tokio::io::AsyncWriteExt::write_all(
&mut stream,
b"GET /healthz HTTP/1.1\r\nHost: localhost\r\n\r\n",
)
.await
.unwrap();
let mut buf = vec![0u8; 1024];
let n = tokio::io::AsyncReadExt::read(&mut stream, &mut buf)
.await
.unwrap();
let response = String::from_utf8_lossy(&buf[..n]);
assert!(response.contains("200 OK"));
handle.abort();
}
#[tokio::test]
async fn test_legacy_health_server() {
let metrics = Arc::new(UsageMetrics::new());
metrics.set_ready(true);
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
drop(listener);
let handle = start_health_server_legacy(port, Arc::clone(&metrics))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let mut stream = tokio::net::TcpStream::connect(format!("127.0.0.1:{}", port))
.await
.unwrap();
tokio::io::AsyncWriteExt::write_all(
&mut stream,
b"GET /healthz HTTP/1.1\r\nHost: localhost\r\n\r\n",
)
.await
.unwrap();
let mut buf = vec![0u8; 1024];
let n = tokio::io::AsyncReadExt::read(&mut stream, &mut buf)
.await
.unwrap();
let response = String::from_utf8_lossy(&buf[..n]);
assert!(response.contains("200 OK"));
assert!(response.contains("\"status\":\"ok\""));
handle.abort();
}
#[test]
fn test_bump_restart_increments_count() {
let reg = HealthRegistry::new();
reg.register(HealthCheck {
name: "gw".into(),
status: HealthStatus::Ok,
..Default::default()
});
reg.bump_restart("gw");
reg.bump_restart("gw");
let checks = reg.all_checks();
let check = checks.iter().find(|c| c.name == "gw").unwrap();
assert_eq!(check.restart_count, 2);
}
#[test]
fn test_bump_restart_noop_on_unknown() {
let reg = HealthRegistry::new();
reg.bump_restart("nonexistent");
assert!(reg.is_ready());
}
#[test]
fn test_set_error_marks_down_and_records_error() {
let reg = HealthRegistry::new();
reg.register(HealthCheck {
name: "db".into(),
status: HealthStatus::Ok,
..Default::default()
});
reg.set_error("db", "connection timeout");
let checks = reg.all_checks();
let check = checks.iter().find(|c| c.name == "db").unwrap();
assert_eq!(check.status, HealthStatus::Down);
assert_eq!(check.last_error.as_deref(), Some("connection timeout"));
assert!(!reg.is_ready());
}
#[test]
fn test_set_error_noop_on_unknown() {
let reg = HealthRegistry::new();
reg.set_error("ghost", "some error");
assert!(reg.is_ready());
}
#[test]
fn test_all_checks_returns_snapshot() {
let reg = HealthRegistry::new();
reg.register(HealthCheck {
name: "a".into(),
status: HealthStatus::Ok,
..Default::default()
});
reg.register(HealthCheck {
name: "b".into(),
status: HealthStatus::Degraded,
..Default::default()
});
let checks = reg.all_checks();
assert_eq!(checks.len(), 2);
let names: Vec<&str> = checks.iter().map(|c| c.name.as_str()).collect();
assert!(names.contains(&"a"));
assert!(names.contains(&"b"));
}
#[test]
fn test_all_checks_empty_registry() {
let reg = HealthRegistry::new();
assert!(reg.all_checks().is_empty());
}
}