use crate::error::*;
use crate::package_definition::{self, PackageDefinition};
use crate::paths;
use crate::receipt;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Write};
use std::net::{TcpListener, TcpStream};
use std::path::{Path, PathBuf};
use std::process::{Child, Command, Stdio};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
const DAEMON_PORT: u16 = 19100;
const HEALTH_CHECK_INTERVAL: Duration = Duration::from_secs(10);
const HEALTH_CHECK_TIMEOUT: Duration = Duration::from_secs(10);
const MAX_RESTART_FAILURES: u32 = 5;
const MAX_BACKOFF: Duration = Duration::from_secs(60);
#[derive(Debug, Clone, PartialEq, Eq)]
enum ServiceState {
Starting,
Running,
Failed,
}
impl std::fmt::Display for ServiceState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ServiceState::Starting => write!(f, "starting"),
ServiceState::Running => write!(f, "running"),
ServiceState::Failed => write!(f, "failed"),
}
}
}
struct ManagedService {
name: String,
port: u16,
health_path: String,
state: ServiceState,
process: Option<Child>,
consecutive_failures: u32,
bin_path: PathBuf,
}
pub struct DaemonServer {
home: PathBuf,
services: Arc<Mutex<HashMap<String, ManagedService>>>,
shutdown: Arc<Mutex<bool>>,
}
#[derive(Debug, Deserialize)]
struct DaemonRequest {
request: String,
#[serde(default)]
name: Option<String>,
}
#[derive(Debug, Serialize)]
struct DaemonResponse {
ok: bool,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
port: Option<u16>,
#[serde(skip_serializing_if = "Option::is_none")]
services: Option<Vec<ServiceStatusEntry>>,
}
#[derive(Debug, Serialize)]
struct ServiceStatusEntry {
name: String,
port: u16,
status: String,
}
impl DaemonServer {
pub fn new(home: PathBuf) -> Self {
DaemonServer {
home,
services: Arc::new(Mutex::new(HashMap::new())),
shutdown: Arc::new(Mutex::new(false)),
}
}
pub fn run(&self) -> Result<()> {
let addr = format!("127.0.0.1:{}", DAEMON_PORT);
let listener = TcpListener::bind(&addr)
.with_context(|| format!("failed to bind daemon on {} — is another daemon running?", addr))?;
eprintln!("daemon: listening on {}", addr);
listener.set_nonblocking(false).ok();
let services = self.services.clone();
let shutdown = self.shutdown.clone();
let home = self.home.clone();
std::thread::Builder::new()
.name("zr-health-monitor".into())
.spawn(move || health_monitor_loop(services, shutdown, home))
.context("failed to spawn health monitor")?;
for stream in listener.incoming() {
if *self.shutdown.lock().unwrap() {
break;
}
match stream {
Ok(stream) => {
let services = self.services.clone();
let shutdown = self.shutdown.clone();
let home = self.home.clone();
std::thread::Builder::new()
.name("zr-daemon-conn".into())
.spawn(move || {
if let Err(e) = handle_connection(stream, &services, &shutdown, &home) {
eprintln!("daemon: connection error: {:#}", e);
}
})
.ok();
}
Err(e) => {
if *self.shutdown.lock().unwrap() {
break;
}
eprintln!("daemon: accept error: {}", e);
}
}
}
self.stop_all_services();
eprintln!("daemon: stopped");
Ok(())
}
fn stop_all_services(&self) {
let mut services = self.services.lock().unwrap();
for (name, svc) in services.iter_mut() {
if let Some(ref mut child) = svc.process {
eprintln!("daemon: stopping service '{}'", name);
let _ = child.kill();
let _ = child.wait();
}
}
services.clear();
}
}
fn handle_connection(
stream: TcpStream,
services: &Arc<Mutex<HashMap<String, ManagedService>>>,
shutdown: &Arc<Mutex<bool>>,
home: &Path,
) -> Result<()> {
let mut reader = BufReader::new(stream.try_clone()?);
let mut line = String::new();
reader.read_line(&mut line)?;
let req: DaemonRequest = serde_json::from_str(line.trim())
.context("invalid daemon request")?;
let response = match req.request.as_str() {
"ping" => DaemonResponse {
ok: true,
error: None,
port: None,
services: None,
},
"status" => handle_status(services),
"start-service" => {
let name = req.name.as_deref().unwrap_or("");
if name.is_empty() {
DaemonResponse {
ok: false,
error: Some("missing service name".into()),
port: None,
services: None,
}
} else {
handle_start_service(services, home, name)
}
}
"stop-service" => {
let name = req.name.as_deref().unwrap_or("");
handle_stop_service(services, name)
}
"shutdown" => {
*shutdown.lock().unwrap() = true;
let _ = TcpStream::connect(format!("127.0.0.1:{}", DAEMON_PORT));
DaemonResponse {
ok: true,
error: None,
port: None,
services: None,
}
}
_ => DaemonResponse {
ok: false,
error: Some(format!("unknown request: {}", req.request)),
port: None,
services: None,
},
};
let mut writer = stream;
let json = serde_json::to_string(&response)?;
writeln!(writer, "{}", json)?;
writer.flush()?;
Ok(())
}
fn handle_status(services: &Arc<Mutex<HashMap<String, ManagedService>>>) -> DaemonResponse {
let services = services.lock().unwrap();
let entries: Vec<ServiceStatusEntry> = services
.values()
.map(|svc| ServiceStatusEntry {
name: svc.name.clone(),
port: svc.port,
status: svc.state.to_string(),
})
.collect();
DaemonResponse {
ok: true,
error: None,
port: None,
services: Some(entries),
}
}
fn handle_start_service(
services: &Arc<Mutex<HashMap<String, ManagedService>>>,
home: &Path,
name: &str,
) -> DaemonResponse {
{
let svcs = services.lock().unwrap();
if let Some(svc) = svcs.get(name) {
if svc.state == ServiceState::Running {
return DaemonResponse {
ok: true,
error: None,
port: Some(svc.port),
services: None,
};
}
}
}
let (def, version) = match load_package_def(home, name) {
Ok(v) => v,
Err(e) => {
return DaemonResponse {
ok: false,
error: Some(format!("{:#}", e)),
port: None,
services: None,
};
}
};
let service = match def.service.as_ref() {
Some(s) => s,
None => {
return DaemonResponse {
ok: false,
error: Some(format!("package '{}' has no service section", name)),
port: None,
services: None,
};
}
};
let port = match service.port {
Some(p) => p,
None => {
return DaemonResponse {
ok: false,
error: Some(format!("service '{}' has no port configured", name)),
port: None,
services: None,
};
}
};
let health_path = service.health.clone().unwrap_or_else(|| "/health".into());
let binary_name = match def.binary.as_ref() {
Some(b) => b,
None => {
return DaemonResponse {
ok: false,
error: Some(format!("package '{}' has no binary", name)),
port: None,
services: None,
};
}
};
let bin_path = paths::store_binary_path(home, name, &version, binary_name);
if !bin_path.exists() {
return DaemonResponse {
ok: false,
error: Some(format!("binary not found: {}", bin_path.display())),
port: None,
services: None,
};
}
let child = match Command::new(&bin_path)
.arg(format!("--listen=:{}", port))
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::inherit())
.spawn()
{
Ok(c) => c,
Err(e) => {
return DaemonResponse {
ok: false,
error: Some(format!("failed to spawn service: {}", e)),
port: None,
services: None,
};
}
};
eprintln!("daemon: started service '{}' on port {}", name, port);
if !wait_for_health(port, &health_path, HEALTH_CHECK_TIMEOUT) {
return DaemonResponse {
ok: false,
error: Some("health check timeout".into()),
port: None,
services: None,
};
}
let mut svcs = services.lock().unwrap();
svcs.insert(
name.to_string(),
ManagedService {
name: name.to_string(),
port,
health_path,
state: ServiceState::Running,
process: Some(child),
consecutive_failures: 0,
bin_path,
},
);
DaemonResponse {
ok: true,
error: None,
port: Some(port),
services: None,
}
}
fn handle_stop_service(
services: &Arc<Mutex<HashMap<String, ManagedService>>>,
name: &str,
) -> DaemonResponse {
let mut svcs = services.lock().unwrap();
if let Some(mut svc) = svcs.remove(name) {
if let Some(ref mut child) = svc.process {
eprintln!("daemon: stopping service '{}'", name);
let _ = child.kill();
let _ = child.wait();
}
}
DaemonResponse {
ok: true,
error: None,
port: None,
services: None,
}
}
fn load_package_def(home: &Path, name: &str) -> Result<(PackageDefinition, String)> {
let receipt = receipt::read(home, name)?
.ok_or_else(|| anyhow!("package '{}' not found", name))?;
let version = receipt.current.clone();
let def_path = paths::definition_path(home, name, &version);
let def = package_definition::parse_file(&def_path)?;
Ok((def, version))
}
fn wait_for_health(port: u16, health_path: &str, timeout: Duration) -> bool {
let start = Instant::now();
let interval = Duration::from_millis(100);
loop {
if check_health(port, health_path) {
return true;
}
if start.elapsed() > timeout {
return false;
}
std::thread::sleep(interval);
}
}
fn check_health(port: u16, health_path: &str) -> bool {
let addr = format!("127.0.0.1:{}", port);
let Ok(mut stream) = TcpStream::connect(&addr) else {
return false;
};
stream
.set_read_timeout(Some(Duration::from_secs(2)))
.ok();
let request = format!(
"GET {} HTTP/1.0\r\nHost: 127.0.0.1:{}\r\nConnection: close\r\n\r\n",
health_path, port
);
if stream.write_all(request.as_bytes()).is_err() {
return false;
}
if stream.flush().is_err() {
return false;
}
let mut reader = BufReader::new(stream);
let mut status_line = String::new();
if reader.read_line(&mut status_line).is_err() {
return false;
}
status_line.contains("200")
}
fn backoff_duration(failures: u32) -> Duration {
let shift = failures.saturating_sub(1).min(63);
let secs = 1u64.checked_shl(shift).unwrap_or(u64::MAX);
Duration::from_secs(secs.min(MAX_BACKOFF.as_secs()))
}
fn health_monitor_loop(
services: Arc<Mutex<HashMap<String, ManagedService>>>,
shutdown: Arc<Mutex<bool>>,
home: PathBuf,
) {
loop {
std::thread::sleep(HEALTH_CHECK_INTERVAL);
if *shutdown.lock().unwrap() {
break;
}
let names: Vec<String> = {
let svcs = services.lock().unwrap();
svcs.keys().cloned().collect()
};
for name in names {
if *shutdown.lock().unwrap() {
return;
}
let mut svcs = services.lock().unwrap();
let Some(svc) = svcs.get_mut(&name) else {
continue;
};
if svc.state == ServiceState::Failed {
continue;
}
let process_dead = svc
.process
.as_mut()
.is_some_and(|c| c.try_wait().ok().flatten().is_some());
let healthy = if process_dead {
false
} else {
check_health(svc.port, &svc.health_path)
};
if healthy {
svc.consecutive_failures = 0;
svc.state = ServiceState::Running;
continue;
}
svc.consecutive_failures += 1;
eprintln!(
"daemon: service '{}' unhealthy (failure {}/{})",
name, svc.consecutive_failures, MAX_RESTART_FAILURES
);
if svc.consecutive_failures >= MAX_RESTART_FAILURES {
eprintln!("daemon: service '{}' marked as failed after {} consecutive failures", name, MAX_RESTART_FAILURES);
svc.state = ServiceState::Failed;
if let Some(ref mut child) = svc.process {
let _ = child.kill();
let _ = child.wait();
}
continue;
}
if let Some(ref mut child) = svc.process {
let _ = child.kill();
let _ = child.wait();
}
let wait = backoff_duration(svc.consecutive_failures);
let bin_path = svc.bin_path.clone();
let port = svc.port;
let health_path = svc.health_path.clone();
let svc_name = name.clone();
drop(svcs);
eprintln!("daemon: restarting '{}' in {:?}", svc_name, wait);
std::thread::sleep(wait);
if *shutdown.lock().unwrap() {
return;
}
match Command::new(&bin_path)
.arg(format!("--listen=:{}", port))
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::inherit())
.spawn()
{
Ok(child) => {
if wait_for_health(port, &health_path, HEALTH_CHECK_TIMEOUT) {
let mut svcs = services.lock().unwrap();
if let Some(svc) = svcs.get_mut(&svc_name) {
svc.process = Some(child);
svc.state = ServiceState::Running;
svc.consecutive_failures = 0;
eprintln!("daemon: service '{}' restarted successfully", svc_name);
}
} else {
eprintln!("daemon: service '{}' failed health check after restart", svc_name);
}
}
Err(e) => {
eprintln!("daemon: failed to restart '{}': {}", svc_name, e);
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_backoff_duration() {
assert_eq!(backoff_duration(1), Duration::from_secs(1));
assert_eq!(backoff_duration(2), Duration::from_secs(2));
assert_eq!(backoff_duration(3), Duration::from_secs(4));
assert_eq!(backoff_duration(4), Duration::from_secs(8));
assert_eq!(backoff_duration(5), Duration::from_secs(16));
assert_eq!(backoff_duration(6), Duration::from_secs(32));
assert_eq!(backoff_duration(7), Duration::from_secs(60)); assert_eq!(backoff_duration(100), Duration::from_secs(60));
}
#[test]
fn test_service_state_display() {
assert_eq!(ServiceState::Starting.to_string(), "starting");
assert_eq!(ServiceState::Running.to_string(), "running");
assert_eq!(ServiceState::Failed.to_string(), "failed");
}
}