use std::collections::BTreeMap;
use std::env;
use std::fs;
use std::fs::OpenOptions;
use std::io::Write;
#[cfg(unix)]
use std::os::unix::fs::{OpenOptionsExt, PermissionsExt};
#[cfg(unix)]
use std::os::unix::io::AsRawFd;
use std::path::Path;
use std::process::Stdio;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
use anyhow::{Context, Result};
use interprocess::local_socket::ListenerOptions;
use interprocess::local_socket::tokio::Stream;
use interprocess::local_socket::traits::tokio::Listener as _;
use regex::Regex;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::Command as TokioCommand;
use tokio::sync::{Mutex, watch};
use tokio::time::sleep;
use crate::cli::DaemonArgs;
use crate::config::{
apply_interpolation, build_process_instances, collect_process_subset, load_and_merge_configs,
load_dotenv_files,
};
use crate::ipc::{Request, Response, to_socket_name};
use crate::model::{
DependencyCondition, ExitMode, ProcessRuntime, ProcessSnapshot, ProcessStatus, RestartPolicy,
RuntimePaths,
};
#[cfg(unix)]
use crate::paths::FILE_MODE;
use crate::paths::{create_dir_secure, runtime_paths_for};
fn compile_ready_pattern(pattern: &str) -> Regex {
Regex::new(pattern).unwrap_or_else(|_| {
Regex::new(®ex::escape(pattern))
.unwrap_or_else(|_| Regex::new("$.^").expect("never-matching pattern is valid regex"))
})
}
#[cfg(unix)]
fn open_secure_append(path: &Path) -> std::io::Result<fs::File> {
let file = OpenOptions::new()
.create(true)
.append(true)
.mode(FILE_MODE)
.open(path)?;
fs::set_permissions(path, fs::Permissions::from_mode(FILE_MODE))?;
Ok(file)
}
#[cfg(not(unix))]
fn open_secure_append(path: &Path) -> std::io::Result<fs::File> {
OpenOptions::new().create(true).append(true).open(path)
}
#[cfg(unix)]
fn open_secure_lock(path: &Path) -> std::io::Result<fs::File> {
let file = OpenOptions::new()
.create(true)
.write(true)
.truncate(false)
.mode(FILE_MODE)
.open(path)?;
fs::set_permissions(path, fs::Permissions::from_mode(FILE_MODE))?;
Ok(file)
}
#[cfg(not(unix))]
fn open_secure_lock(path: &Path) -> std::io::Result<fs::File> {
OpenOptions::new()
.create(true)
.write(true)
.truncate(false)
.open(path)
}
#[cfg(unix)]
fn write_secure(path: &Path, contents: &[u8]) -> std::io::Result<()> {
use std::io::Write as _;
let mut file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.mode(FILE_MODE)
.open(path)?;
file.write_all(contents)?;
fs::set_permissions(path, fs::Permissions::from_mode(FILE_MODE))?;
Ok(())
}
#[cfg(not(unix))]
fn write_secure(path: &Path, contents: &[u8]) -> std::io::Result<()> {
fs::write(path, contents)
}
#[derive(Debug)]
pub(crate) struct DaemonState {
instance: String,
processes: BTreeMap<String, ProcessRuntime>,
controllers: BTreeMap<String, watch::Sender<bool>>,
shutdown_requested: bool,
exit_mode: ExitMode,
shutdown_timeout_override: Option<u64>,
cwd: std::path::PathBuf,
config_files: Vec<std::path::PathBuf>,
env_files: Vec<std::path::PathBuf>,
disable_dotenv: bool,
last_client_activity: Instant,
}
impl DaemonState {
fn broadcast_stop(&mut self) {
for tx in self.controllers.values() {
let _ = tx.send(true);
}
for runtime in self.processes.values_mut() {
if matches!(runtime.status, ProcessStatus::Pending) {
runtime.status = ProcessStatus::Stopped;
}
}
}
fn request_shutdown(&mut self) {
self.shutdown_requested = true;
self.broadcast_stop();
}
fn stop_instances(&mut self, names: &[String]) {
for name in names {
if let Some(runtime) = self.processes.get_mut(name) {
if matches!(runtime.status, ProcessStatus::Pending) {
runtime.status = ProcessStatus::Stopped;
continue;
}
}
if let Some(tx) = self.controllers.get(name) {
let _ = tx.send(true);
}
}
}
}
async fn wait_for_terminal(state: &SharedState, names: &[String], tick: Duration, max_ticks: u32) {
for _ in 0..max_ticks {
let all_stopped = {
let guard = state.lock().await;
names.iter().all(|name| {
guard
.processes
.get(name)
.map(|r| r.status.is_terminal())
.unwrap_or(true)
})
};
if all_stopped {
return;
}
sleep(tick).await;
}
}
pub(crate) type SharedState = Arc<Mutex<DaemonState>>;
pub(crate) async fn with_process_mut<F, R>(
state: &SharedState,
handle: &crate::model::NameHandle,
f: F,
) -> Option<R>
where
F: FnOnce(&mut ProcessRuntime) -> R,
{
let name = crate::model::read_name(handle);
let mut guard = state.lock().await;
guard.processes.get_mut(&name).map(f)
}
pub(crate) async fn with_process<F, R>(
state: &SharedState,
handle: &crate::model::NameHandle,
f: F,
) -> Option<R>
where
F: FnOnce(&ProcessRuntime) -> R,
{
let name = crate::model::read_name(handle);
let guard = state.lock().await;
guard.processes.get(&name).map(f)
}
#[cfg(unix)]
fn acquire_instance_lock(paths: &RuntimePaths) -> Result<fs::File> {
let lock_file = open_secure_lock(&paths.lock)
.with_context(|| format!("failed to open lock file at {}", paths.lock.display()))?;
let ret = unsafe { libc::flock(lock_file.as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB) };
if ret != 0 {
anyhow::bail!(
"another daemon is already running for this project (lock held on {})",
paths.lock.display()
);
}
Ok(lock_file)
}
#[cfg(not(unix))]
fn acquire_instance_lock(paths: &RuntimePaths) -> Result<fs::File> {
open_secure_lock(&paths.lock)
.with_context(|| format!("failed to open lock file at {}", paths.lock.display()))
}
#[allow(clippy::too_many_arguments)]
pub fn spawn_daemon_process(
cwd: &Path,
config_files: &[std::path::PathBuf],
instance: &str,
paths: &RuntimePaths,
env_files: &[std::path::PathBuf],
disable_dotenv: bool,
processes: &[String],
no_deps: bool,
parent_pid: Option<u32>,
) -> Result<()> {
let exe = env::current_exe().context("failed to locate current executable")?;
if let Some(parent) = paths.daemon_log.parent() {
create_dir_secure(parent)?;
}
let mut log_file = open_secure_append(&paths.daemon_log).with_context(|| {
format!(
"failed to open daemon log at {}",
paths.daemon_log.display()
)
})?;
writeln!(
log_file,
"\n--- daemon started at {} ---",
humantime::format_rfc3339_seconds(std::time::SystemTime::now())
)
.ok();
let log_err = log_file.try_clone()?;
let mut cmd = std::process::Command::new(exe);
cmd.arg("daemon").arg("--cwd").arg(cwd);
for cf in config_files {
cmd.arg("--config-file").arg(cf);
}
cmd.arg("--instance").arg(instance);
for ef in env_files {
cmd.arg("--env-file").arg(ef);
}
if disable_dotenv {
cmd.arg("--disable-dotenv");
}
for proc_name in processes {
cmd.arg("--process").arg(proc_name);
}
if no_deps {
cmd.arg("--no-deps");
}
if let Some(pid) = parent_pid {
cmd.arg("--parent-pid").arg(pid.to_string());
}
cmd.stdin(Stdio::null())
.stdout(Stdio::from(log_file))
.stderr(Stdio::from(log_err));
let _child = cmd.spawn().context("failed to spawn daemon process")?;
Ok(())
}
pub async fn run_daemon(args: DaemonArgs) -> Result<()> {
env::set_current_dir(&args.cwd).with_context(|| {
format!(
"failed to change cwd to {}",
args.cwd.as_os_str().to_string_lossy()
)
})?;
let paths = runtime_paths_for(&args.instance)?;
if let Some(parent) = paths.socket.parent() {
create_dir_secure(parent)?;
}
if let Some(parent) = paths.pid.parent() {
create_dir_secure(parent)?;
}
let _lock_file = acquire_instance_lock(&paths)?;
if paths.socket.exists() {
let _ = fs::remove_file(&paths.socket);
}
let dotenv = load_dotenv_files(&args.cwd, &args.env_files, args.disable_dotenv)?;
let mut config = load_and_merge_configs(&args.config_files)?;
apply_interpolation(&mut config);
let selected: Option<std::collections::HashSet<String>> = if !args.processes.is_empty() {
Some(collect_process_subset(
&config,
&args.processes,
!args.no_deps,
)?)
} else {
None
};
let exit_mode = config.exit_mode;
let mut process_map = build_process_instances(&config, &args.cwd, &dotenv);
if let Some(ref selected) = selected {
for (name, runtime) in process_map.iter_mut() {
if !selected.contains(&runtime.spec.base_name)
&& !selected.contains(name)
&& matches!(runtime.status, ProcessStatus::Pending)
{
runtime.status = ProcessStatus::NotStarted;
}
}
}
write_secure(&paths.pid, std::process::id().to_string().as_bytes()).with_context(|| {
format!(
"failed to write pid file to {}",
paths.pid.as_path().display()
)
})?;
let socket_name = to_socket_name(&paths.socket)?;
let listener = ListenerOptions::new()
.name(socket_name)
.create_tokio()
.context("failed to create local socket listener")?;
#[cfg(unix)]
if paths.socket.exists() {
let _ = fs::set_permissions(&paths.socket, fs::Permissions::from_mode(FILE_MODE));
}
let state = Arc::new(Mutex::new(DaemonState {
instance: args.instance.clone(),
processes: process_map,
controllers: BTreeMap::new(),
shutdown_requested: false,
exit_mode,
shutdown_timeout_override: None,
cwd: args.cwd.clone(),
config_files: args.config_files.clone(),
env_files: args.env_files.clone(),
disable_dotenv: args.disable_dotenv,
last_client_activity: Instant::now(),
}));
let (stop_tx, mut stop_rx) = watch::channel(false);
tokio::spawn(supervisor_loop(state.clone(), stop_tx));
if let Some(parent_pid) = args.parent_pid {
tokio::spawn(orphan_watchdog(state.clone(), parent_pid));
}
loop {
tokio::select! {
changed = stop_rx.changed() => {
if changed.is_ok() && *stop_rx.borrow() {
break;
}
}
incoming = listener.accept() => {
match incoming {
Ok(stream) => {
let state = state.clone();
tokio::spawn(async move {
let _ = handle_client(stream, state).await;
});
}
Err(e) => {
#[cfg(unix)]
let is_fatal = matches!(e.raw_os_error(), Some(libc::EBADF) | Some(libc::ENOTSOCK) | Some(libc::EINVAL));
#[cfg(not(unix))]
let is_fatal = false;
if is_fatal {
eprintln!("socket accept failed fatally, shutting down: {e}");
let mut guard = state.lock().await;
guard.request_shutdown();
break;
} else {
eprintln!("socket accept error (transient): {e}");
sleep(Duration::from_millis(50)).await;
}
}
}
}
}
}
let _ = fs::remove_file(&paths.socket);
let _ = fs::remove_file(&paths.pid);
let _ = fs::remove_file(&paths.lock);
Ok(())
}
async fn supervisor_loop(state: SharedState, stop_tx: watch::Sender<bool>) {
loop {
let mut launchable = Vec::new();
let mut request_shutdown = false;
{
let mut guard = state.lock().await;
let snapshot = guard.processes.clone();
if !guard.shutdown_requested {
let triggered = match guard.exit_mode {
ExitMode::WaitAll => false,
ExitMode::ExitOnFailure => snapshot.values().any(|p| {
matches!(p.status, ProcessStatus::Exited { code } if code != 0)
|| matches!(p.status, ProcessStatus::FailedToStart { .. })
}),
ExitMode::ExitOnEnd => snapshot
.values()
.any(|p| matches!(p.status, ProcessStatus::Exited { .. })),
};
if triggered {
drop(guard);
let mut guard = state.lock().await;
guard.request_shutdown();
request_shutdown = true;
} else {
for (name, proc_runtime) in &snapshot {
if !matches!(proc_runtime.status, ProcessStatus::Pending) {
continue;
}
if proc_runtime.spec.disabled {
continue;
}
if dependencies_met(proc_runtime, &snapshot) {
launchable.push(name.clone());
}
}
if guard.shutdown_requested {
request_shutdown = true;
guard.broadcast_stop();
}
}
} else {
request_shutdown = true;
guard.broadcast_stop();
}
}
for name in launchable {
start_process(name, state.clone()).await;
}
let done = {
let guard = state.lock().await;
request_shutdown
&& guard
.processes
.values()
.all(|runtime| runtime.status.is_terminal())
};
if done {
let _ = stop_tx.send(true);
break;
}
sleep(crate::tuning::supervisor_tick()).await;
}
}
#[cfg(unix)]
fn parent_alive(pid: u32) -> bool {
if pid == 0 {
return true;
}
use nix::errno::Errno;
use nix::sys::signal::kill;
use nix::unistd::Pid;
match kill(Pid::from_raw(pid as i32), None) {
Ok(()) => true,
Err(Errno::ESRCH) => false,
Err(_) => true,
}
}
#[cfg(not(unix))]
fn parent_alive(_pid: u32) -> bool {
true
}
async fn orphan_watchdog(state: SharedState, parent_pid: u32) {
let tick = crate::tuning::orphan_check_interval();
let grace = crate::tuning::orphan_timeout();
loop {
sleep(tick).await;
{
let guard = state.lock().await;
if guard.shutdown_requested {
return;
}
}
if parent_alive(parent_pid) {
continue;
}
let should_exit = {
let guard = state.lock().await;
guard.last_client_activity.elapsed() >= grace
};
if should_exit {
eprintln!(
"daemon: parent pid {parent_pid} is gone and no IPC activity for \
{}s; initiating shutdown",
grace.as_secs()
);
let mut guard = state.lock().await;
guard.request_shutdown();
return;
}
}
}
pub(crate) fn dependencies_met(
candidate: &ProcessRuntime,
snapshot: &BTreeMap<String, ProcessRuntime>,
) -> bool {
for (dep_base, cond) in &candidate.spec.depends_on {
let dep_instances: Vec<&ProcessRuntime> = snapshot
.values()
.filter(|p| p.spec.base_name == *dep_base)
.collect();
if dep_instances.is_empty() {
return false;
}
let satisfied = match cond {
DependencyCondition::ProcessStarted => dep_instances.iter().all(|p| p.started_once),
DependencyCondition::ProcessHealthy => dep_instances.iter().all(|p| p.ready),
DependencyCondition::ProcessLogReady => dep_instances.iter().all(|p| p.log_ready),
DependencyCondition::ProcessCompleted => dep_instances.iter().all(|p| {
matches!(
p.status,
ProcessStatus::Exited { .. } | ProcessStatus::Stopped
)
}),
DependencyCondition::ProcessCompletedSuccessfully => dep_instances
.iter()
.all(|p| matches!(p.status, ProcessStatus::Exited { code: 0 })),
};
if !satisfied {
return false;
}
}
true
}
async fn start_process(name: String, state: SharedState) {
let (spec, name_handle) = {
let mut guard = state.lock().await;
let Some(runtime) = guard.processes.get_mut(&name) else {
return;
};
if !matches!(runtime.status, ProcessStatus::Pending) {
return;
}
(runtime.spec.clone(), runtime.name_handle.clone())
};
let ready_pattern: Option<Regex> = spec.ready_log_line.as_deref().map(compile_ready_pattern);
let Some(mut child) =
spawn_process_child(&name_handle, &spec, &state, SpawnContext::Initial).await
else {
return;
};
let pid = child.id().unwrap_or(0);
with_process_mut(&state, &name_handle, |runtime| {
runtime.status = ProcessStatus::Running { pid };
runtime.started_once = true;
})
.await;
spawn_health_probes(&name_handle, &spec, &state);
attach_output_readers(&mut child, &name_handle, ready_pattern, state.clone());
let (kill_tx, kill_rx) = watch::channel(false);
{
let current = crate::model::read_name(&name_handle);
let mut guard = state.lock().await;
guard.controllers.insert(current, kill_tx);
}
tokio::spawn(process_lifecycle(
name_handle,
spec,
child,
kill_rx,
state.clone(),
));
}
#[derive(Clone, Copy)]
enum SpawnContext {
Initial,
Restart,
}
impl SpawnContext {
fn build_label(self) -> &'static str {
match self {
SpawnContext::Initial => "failed to build shell command",
SpawnContext::Restart => "failed to build shell command on restart",
}
}
fn spawn_label(self) -> &'static str {
match self {
SpawnContext::Initial => "failed to spawn process",
SpawnContext::Restart => "failed to spawn process on restart",
}
}
}
async fn spawn_process_child(
name_handle: &crate::model::NameHandle,
spec: &crate::model::ProcessInstanceSpec,
state: &SharedState,
ctx: SpawnContext,
) -> Option<tokio::process::Child> {
let name = crate::model::read_name(name_handle);
let mut cmd = match build_shell_command(&spec.command)
.with_context(|| format!("[{name}] {} for {:?}", ctx.build_label(), spec.command))
{
Ok(c) => c,
Err(e) => {
mark_failed_to_start(name_handle, state, &e).await;
return None;
}
};
cmd.current_dir(&spec.working_dir)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.envs(&spec.environment);
match cmd.spawn().with_context(|| {
format!(
"[{name}] {} (command={:?}, cwd={})",
ctx.spawn_label(),
spec.command,
spec.working_dir.display()
)
}) {
Ok(child) => Some(child),
Err(e) => {
mark_failed_to_start(name_handle, state, &e).await;
None
}
}
}
async fn mark_failed_to_start(
name_handle: &crate::model::NameHandle,
state: &SharedState,
err: &anyhow::Error,
) {
let name = crate::model::read_name(name_handle);
eprintln!("[{name}] {err:#}");
with_process_mut(state, name_handle, |runtime| {
runtime.status = ProcessStatus::FailedToStart {
reason: format!("{err:#}"),
};
})
.await;
}
fn spawn_health_probes(
name_handle: &crate::model::NameHandle,
spec: &crate::model::ProcessInstanceSpec,
state: &SharedState,
) {
use crate::health_probes::{ProbeKind, spawn_probe_if_present};
spawn_probe_if_present(
spec.readiness_probe.as_ref(),
ProbeKind::Readiness,
name_handle,
&spec.working_dir,
&spec.environment,
state,
);
spawn_probe_if_present(
spec.liveness_probe.as_ref(),
ProbeKind::Liveness,
name_handle,
&spec.working_dir,
&spec.environment,
state,
);
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ExitReason {
Code(i32),
Signal(i32),
}
impl ExitReason {
fn describe(self) -> String {
match self {
ExitReason::Code(code) => format!("exit code {code}"),
ExitReason::Signal(sig) => {
let name = signal_name(sig);
match name {
Some(n) => format!("signal {n}"),
None => format!("signal {sig}"),
}
}
}
}
}
fn signal_name(sig: i32) -> Option<&'static str> {
use nix::sys::signal::Signal;
Signal::try_from(sig).ok().map(|s| s.as_str())
}
async fn wait_for_child_exit(
name_handle: &crate::model::NameHandle,
spec: &crate::model::ProcessInstanceSpec,
child: &mut tokio::process::Child,
kill_rx: &mut watch::Receiver<bool>,
state: &SharedState,
) -> (ProcessStatus, Option<ExitReason>) {
tokio::select! {
_ = kill_rx.changed() => {
let timeout_override = {
let guard = state.lock().await;
guard.shutdown_timeout_override
};
shutdown_child(child, spec, timeout_override).await;
let _ = name_handle; (ProcessStatus::Stopped, None)
}
wait_res = child.wait() => {
match wait_res {
Ok(exit_status) => {
let reason = exit_reason_from_status(&exit_status);
let status = match reason {
ExitReason::Code(code) => ProcessStatus::Exited { code },
ExitReason::Signal(_) => ProcessStatus::Exited { code: -1 },
};
(status, Some(reason))
}
Err(e) => (
ProcessStatus::FailedToStart {
reason: format!("wait failed: {e}"),
},
None,
),
}
}
}
}
fn exit_reason_from_status(status: &std::process::ExitStatus) -> ExitReason {
if let Some(code) = status.code() {
return ExitReason::Code(code);
}
#[cfg(unix)]
{
use std::os::unix::process::ExitStatusExt;
if let Some(sig) = status.signal() {
return ExitReason::Signal(sig);
}
}
ExitReason::Code(-1)
}
fn format_restart_separator(
name: &str,
reason: ExitReason,
attempt: u32,
max: Option<u32>,
) -> String {
let attempt_part = match max {
Some(m) => format!("attempt {attempt}/{m}"),
None => format!("attempt {attempt}"),
};
format!(
"[{name}] --- restarted ({reason}, {attempt_part}) ---",
reason = reason.describe()
)
}
async fn apply_restart_decision(
name_handle: &crate::model::NameHandle,
state: &SharedState,
final_status: ProcessStatus,
) -> bool {
with_process_mut(state, name_handle, |runtime| {
let do_restart = match (&final_status, runtime.spec.restart_policy) {
(ProcessStatus::Stopped, _) => false,
(_, RestartPolicy::No) => false,
(ProcessStatus::Exited { code: 0 }, RestartPolicy::OnFailure) => false,
(_, RestartPolicy::OnFailure) | (_, RestartPolicy::Always) => {
match runtime.spec.max_restarts {
Some(max) => runtime.restart_count < max,
None => true,
}
}
};
if do_restart {
runtime.status = ProcessStatus::Restarting;
runtime.restart_count += 1;
true
} else {
runtime.status = final_status;
false
}
})
.await
.unwrap_or(false)
}
async fn process_lifecycle(
name_handle: crate::model::NameHandle,
mut spec: crate::model::ProcessInstanceSpec,
mut child: tokio::process::Child,
mut kill_rx: watch::Receiver<bool>,
state: SharedState,
) {
loop {
let (final_status, exit_reason) =
wait_for_child_exit(&name_handle, &spec, &mut child, &mut kill_rx, &state).await;
let should_restart = apply_restart_decision(&name_handle, &state, final_status).await;
if !should_restart {
let name = crate::model::read_name(&name_handle);
let mut guard = state.lock().await;
guard.controllers.remove(&name);
break;
}
if let Some(reason) = exit_reason {
let (attempt, max) = with_process(&state, &name_handle, |r| {
(r.restart_count, r.spec.max_restarts)
})
.await
.unwrap_or((0, None));
let name = crate::model::read_name(&name_handle);
println!("{}", format_restart_separator(&name, reason, attempt, max));
}
let backoff = with_process(&state, &name_handle, |r| r.spec.backoff_seconds)
.await
.unwrap_or(1);
sleep(Duration::from_secs(backoff)).await;
let next_spec = with_process(&state, &name_handle, |r| r.spec.clone()).await;
let Some(next_spec) = next_spec else { break };
spec = next_spec;
let Some(new_child) =
spawn_process_child(&name_handle, &spec, &state, SpawnContext::Restart).await
else {
let name = crate::model::read_name(&name_handle);
let mut guard = state.lock().await;
guard.controllers.remove(&name);
break;
};
child = new_child;
let pid = child.id().unwrap_or(0);
with_process_mut(&state, &name_handle, |runtime| {
runtime.status = ProcessStatus::Running { pid };
runtime.log_ready = false;
runtime.ready = false;
runtime.alive = true;
})
.await;
let ready_pattern: Option<Regex> =
spec.ready_log_line.as_deref().map(compile_ready_pattern);
attach_output_readers(&mut child, &name_handle, ready_pattern, state.clone());
let (new_kill_tx, new_kill_rx) = watch::channel(false);
{
let name = crate::model::read_name(&name_handle);
let mut guard = state.lock().await;
guard.controllers.insert(name, new_kill_tx);
}
kill_rx = new_kill_rx;
}
}
fn attach_output_readers(
child: &mut tokio::process::Child,
name_handle: &crate::model::NameHandle,
ready_pattern: Option<Regex>,
state: SharedState,
) {
let log_ready_flag = Arc::new(AtomicBool::new(false));
if let Some(stdout) = child.stdout.take() {
let handle = name_handle.clone();
let pattern = ready_pattern.clone();
let flag = log_ready_flag.clone();
let state_clone = state.clone();
tokio::spawn(async move {
let mut lines = BufReader::new(stdout).lines();
while let Ok(Some(line)) = lines.next_line().await {
let proc_name = crate::model::read_name(&handle);
println!("[{proc_name}] {line}");
if let Some(ref re) = pattern {
if !flag.load(Ordering::Relaxed) && re.is_match(&line) {
flag.store(true, Ordering::Relaxed);
with_process_mut(&state_clone, &handle, |runtime| {
runtime.log_ready = true;
})
.await;
}
}
}
});
}
if let Some(stderr) = child.stderr.take() {
let handle = name_handle.clone();
let pattern = ready_pattern;
let flag = log_ready_flag;
let state_clone = state;
tokio::spawn(async move {
let mut lines = BufReader::new(stderr).lines();
while let Ok(Some(line)) = lines.next_line().await {
let proc_name = crate::model::read_name(&handle);
eprintln!("[{proc_name}] {line}");
if let Some(ref re) = pattern {
if !flag.load(Ordering::Relaxed) && re.is_match(&line) {
flag.store(true, Ordering::Relaxed);
with_process_mut(&state_clone, &handle, |runtime| {
runtime.log_ready = true;
})
.await;
}
}
}
});
}
}
async fn shutdown_child(
child: &mut tokio::process::Child,
spec: &crate::model::ProcessInstanceSpec,
timeout_override: Option<u64>,
) {
let total_timeout =
Duration::from_secs(timeout_override.unwrap_or(spec.shutdown_timeout_seconds));
let deadline = tokio::time::Instant::now() + total_timeout;
if let Some(ref cmd_str) = spec.shutdown_command {
match build_shell_command(cmd_str) {
Ok(mut cmd) => {
cmd.current_dir(&spec.working_dir).envs(&spec.environment);
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
eprintln!(
"[{}] shutdown command {:?} skipped: no time budget remaining",
spec.name, cmd_str
);
} else {
match tokio::time::timeout(remaining, cmd.output()).await {
Ok(Ok(output)) => {
if !output.status.success() {
let code = output
.status
.code()
.map(|c| c.to_string())
.unwrap_or_else(|| "signal".to_string());
eprintln!(
"[{}] shutdown command {:?} exited with status {code}",
spec.name, cmd_str
);
}
}
Ok(Err(e)) => {
eprintln!(
"[{}] shutdown command {:?} failed to spawn: {e}",
spec.name, cmd_str
);
}
Err(_) => {
eprintln!(
"[{}] shutdown command {:?} timed out; proceeding to SIGTERM",
spec.name, cmd_str
);
}
}
}
}
Err(e) => {
eprintln!("[{}] shutdown command failed to build: {e}", spec.name);
}
}
}
let signal = spec.shutdown_signal.unwrap_or(15);
if let Some(pid) = child.id() {
#[cfg(unix)]
{
use nix::sys::signal::{self, Signal};
use nix::unistd::Pid;
if let Ok(sig) = Signal::try_from(signal) {
let _ = signal::kill(Pid::from_raw(pid as i32), sig);
}
}
#[cfg(not(unix))]
{
let _ = signal; let _ = child.start_kill();
}
}
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
let terminated = if remaining.is_zero() {
false
} else {
tokio::time::timeout(remaining, child.wait()).await.is_ok()
};
if !terminated {
let _ = child.start_kill();
let _ = child.wait().await;
}
}
pub(crate) fn build_shell_command(command: &str) -> Result<TokioCommand> {
if cfg!(windows) {
let mut cmd = TokioCommand::new("cmd");
cmd.arg("/C").arg(command);
Ok(cmd)
} else {
let shell = env::var("COMPOSE_SHELL").unwrap_or_else(|_| "sh".to_string());
let shell_path = Path::new(&shell);
if shell_path.is_absolute() {
if !shell_path.exists() {
anyhow::bail!("shell {shell:?} (from COMPOSE_SHELL) does not exist");
}
} else {
if which::which(&shell).is_err() {
anyhow::bail!("shell {shell:?} (from COMPOSE_SHELL) not found on PATH");
}
}
let mut cmd = TokioCommand::new(&shell);
cmd.arg("-c").arg(command);
Ok(cmd)
}
}
fn resolve_services(
state: &DaemonState,
services: &[String],
) -> std::result::Result<Vec<String>, Vec<String>> {
if services.is_empty() {
return Ok(state.processes.keys().cloned().collect());
}
let mut unknown = Vec::new();
let mut matched = std::collections::BTreeSet::new();
for svc in services {
let mut found = false;
for (name, runtime) in &state.processes {
if runtime.spec.base_name == *svc || runtime.spec.name == *svc {
matched.insert(name.clone());
found = true;
}
}
if !found {
unknown.push(svc.clone());
}
}
if unknown.is_empty() {
Ok(matched.into_iter().collect())
} else {
Err(unknown)
}
}
fn describe_services(services: &[String]) -> String {
if services.is_empty() {
"all services".to_string()
} else {
services.join(", ")
}
}
fn resolve_services_or_error(
state: &DaemonState,
services: &[String],
) -> std::result::Result<Vec<String>, Response> {
resolve_services(state, services).map_err(|unknown| Response::Error {
message: format!("unknown service(s): {}", unknown.join(", ")),
})
}
#[derive(Debug, Default, PartialEq, Eq)]
pub(crate) struct ReloadPlan {
pub added: Vec<String>,
pub removed: Vec<String>,
pub changed: Vec<String>,
pub unchanged: Vec<String>,
pub scaled: BTreeMap<String, (u16, u16)>,
}
#[derive(Debug, Clone)]
pub(crate) struct ServiceFingerprint {
pub config_hash: String,
pub replicas: u16,
}
pub(crate) fn compute_reload_plan(
old: &BTreeMap<String, ServiceFingerprint>,
new: &BTreeMap<String, ServiceFingerprint>,
deps: &BTreeMap<String, Vec<String>>,
force_recreate: bool,
no_recreate: bool,
) -> std::result::Result<ReloadPlan, String> {
debug_assert!(
!(force_recreate && no_recreate),
"force_recreate and no_recreate are mutually exclusive"
);
let mut plan = ReloadPlan::default();
for (name, new_fp) in new {
match old.get(name) {
None => plan.added.push(name.clone()),
Some(old_fp) => {
let hash_differs = old_fp.config_hash != new_fp.config_hash;
let replicas_differ = old_fp.replicas != new_fp.replicas;
if force_recreate {
plan.changed.push(name.clone());
} else if hash_differs {
if no_recreate {
plan.unchanged.push(name.clone());
} else {
plan.changed.push(name.clone());
}
} else if replicas_differ {
plan.scaled
.insert(name.clone(), (old_fp.replicas, new_fp.replicas));
} else {
plan.unchanged.push(name.clone());
}
}
}
}
for name in old.keys() {
if !new.contains_key(name) {
plan.removed.push(name.clone());
}
}
for (svc, svc_deps) in deps {
for dep in svc_deps {
if plan.removed.contains(dep) {
return Err(format!(
"cannot reload: service {svc:?} still depends on {dep:?}, which is removed in the new config"
));
}
}
}
plan.added.sort();
plan.removed.sort();
plan.changed.sort();
plan.unchanged.sort();
Ok(plan)
}
async fn handle_client(stream: Stream, state: SharedState) -> Result<()> {
let (read_half, mut write_half) = tokio::io::split(stream);
let mut reader = BufReader::new(read_half);
let mut line = String::new();
let read = reader
.read_line(&mut line)
.await
.context("failed to read request line")?;
if read == 0 {
return Ok(());
}
let req: Request = serde_json::from_str(line.trim()).context("invalid request json")?;
{
let mut guard = state.lock().await;
guard.last_client_activity = Instant::now();
}
let response = match req {
Request::Ping => {
let guard = state.lock().await;
Response::Pong {
pid: std::process::id(),
instance: guard.instance.clone(),
}
}
Request::Ps => handle_ps(&state).await,
Request::Down { timeout_seconds } => {
let mut guard = state.lock().await;
guard.shutdown_timeout_override = timeout_seconds;
guard.request_shutdown();
Response::Ack {
message: "shutdown requested".to_string(),
}
}
Request::Stop { services } => handle_stop(&state, services).await,
Request::Start { services } => handle_start(&state, services).await,
Request::Kill { services, signal } => handle_kill(&state, services, signal).await,
Request::Restart { services } => handle_restart(&state, services).await,
Request::RemoveOrphans { keep } => handle_remove_orphans(&state, keep).await,
Request::Reload {
force_recreate,
no_recreate,
remove_orphans,
no_start,
} => {
handle_reload(
state.clone(),
force_recreate,
no_recreate,
remove_orphans,
no_start,
)
.await
}
Request::ServiceRunState { name } => handle_service_run_state(&state, name).await,
};
let payload = serde_json::to_string(&response)?;
write_half.write_all(payload.as_bytes()).await?;
write_half.write_all(b"\n").await?;
write_half.flush().await?;
Ok(())
}
async fn handle_ps(state: &SharedState) -> Response {
let guard = state.lock().await;
let processes = guard
.processes
.values()
.map(ProcessSnapshot::from)
.collect::<Vec<_>>();
Response::Ps {
pid: std::process::id(),
instance: guard.instance.clone(),
processes,
}
}
async fn handle_stop(state: &SharedState, services: Vec<String>) -> Response {
let mut guard = state.lock().await;
match resolve_services_or_error(&guard, &services) {
Err(resp) => resp,
Ok(names) => {
guard.stop_instances(&names);
Response::Ack {
message: format!("stopping {}", describe_services(&services)),
}
}
}
}
async fn handle_start(state: &SharedState, services: Vec<String>) -> Response {
let mut guard = state.lock().await;
match resolve_services_or_error(&guard, &services) {
Err(resp) => resp,
Ok(names) => {
let mut to_start: std::collections::BTreeSet<String> = names.into_iter().collect();
let mut queue: std::collections::VecDeque<String> = to_start.iter().cloned().collect();
while let Some(name) = queue.pop_front() {
if let Some(runtime) = guard.processes.get(&name) {
for dep_base in runtime.spec.depends_on.keys() {
let dep_names: Vec<String> = guard
.processes
.iter()
.filter(|(_, r)| r.spec.base_name == *dep_base)
.map(|(n, _)| n.clone())
.collect();
for dep_name in dep_names {
if to_start.insert(dep_name.clone()) {
queue.push_back(dep_name);
}
}
}
}
}
let mut started = 0;
for name in &to_start {
if let Some(runtime) = guard.processes.get_mut(name) {
if runtime.status.is_terminal() {
runtime.status = ProcessStatus::Pending;
runtime.log_ready = false;
runtime.ready = false;
runtime.alive = true;
started += 1;
}
}
}
if started == 0 {
Response::Ack {
message: format!("{} already running", describe_services(&services)),
}
} else {
Response::Ack {
message: format!("starting {}", describe_services(&services)),
}
}
}
}
}
async fn handle_kill(state: &SharedState, services: Vec<String>, signal: i32) -> Response {
let guard = state.lock().await;
match resolve_services_or_error(&guard, &services) {
Err(resp) => resp,
Ok(names) => {
for name in &names {
if let Some(runtime) = guard.processes.get(name) {
if let ProcessStatus::Running { pid } = runtime.status {
#[cfg(unix)]
{
use nix::sys::signal::{self, Signal};
use nix::unistd::Pid;
if let Ok(sig) = Signal::try_from(signal) {
let _ = signal::kill(Pid::from_raw(pid as i32), sig);
}
}
}
}
}
Response::Ack {
message: format!("killed {}", describe_services(&services)),
}
}
}
}
async fn handle_restart(state: &SharedState, services: Vec<String>) -> Response {
let mut guard = state.lock().await;
match resolve_services_or_error(&guard, &services) {
Err(resp) => resp,
Ok(names) => {
guard.stop_instances(&names);
drop(guard);
let state_clone = state.clone();
let names_clone = names.clone();
tokio::spawn(async move {
wait_for_terminal(&state_clone, &names_clone, Duration::from_millis(50), 200).await;
let mut guard = state_clone.lock().await;
for name in &names_clone {
if let Some(runtime) = guard.processes.get_mut(name) {
runtime.status = ProcessStatus::Pending;
runtime.log_ready = false;
runtime.ready = false;
runtime.alive = true;
}
}
});
Response::Ack {
message: format!("restarting {}", describe_services(&services)),
}
}
}
}
async fn handle_remove_orphans(state: &SharedState, keep: Vec<String>) -> Response {
let mut guard = state.lock().await;
let keep_set: std::collections::HashSet<&str> = keep.iter().map(|s| s.as_str()).collect();
let orphans: Vec<String> = guard
.processes
.keys()
.filter(|name| {
let base = guard.processes[name.as_str()].spec.base_name.as_str();
!keep_set.contains(base)
})
.cloned()
.collect();
guard.stop_instances(&orphans);
if !orphans.is_empty() {
let state_clone = state.clone();
let orphans_clone = orphans.clone();
tokio::spawn(async move {
wait_for_terminal(&state_clone, &orphans_clone, Duration::from_millis(50), 200).await;
let mut guard = state_clone.lock().await;
for name in &orphans_clone {
guard.processes.remove(name);
guard.controllers.remove(name);
}
});
}
let msg = if orphans.is_empty() {
"no orphan processes found".to_string()
} else {
format!("removing orphan(s): {}", orphans.join(", "))
};
Response::Ack { message: msg }
}
async fn handle_service_run_state(state: &SharedState, name: String) -> Response {
let guard = state.lock().await;
let mut known = false;
let mut any_running = false;
for runtime in guard.processes.values() {
if runtime.spec.base_name == name || runtime.spec.name == name {
known = true;
if matches!(runtime.status, ProcessStatus::Running { .. }) {
any_running = true;
break;
}
}
}
Response::ServiceRunState { known, any_running }
}
async fn handle_reload(
state: SharedState,
force_recreate: bool,
no_recreate: bool,
remove_orphans: bool,
no_start: bool,
) -> Response {
if force_recreate && no_recreate {
return Response::Error {
message: "reload: --force-recreate and --no-recreate are mutually exclusive"
.to_string(),
};
}
let (cwd, config_files, env_files, disable_dotenv) = {
let guard = state.lock().await;
(
guard.cwd.clone(),
guard.config_files.clone(),
guard.env_files.clone(),
guard.disable_dotenv,
)
};
let dotenv = match load_dotenv_files(&cwd, &env_files, disable_dotenv) {
Ok(d) => d,
Err(e) => {
return Response::Error {
message: format!("reload: failed to load env files: {e:#}"),
};
}
};
let mut new_config = match load_and_merge_configs(&config_files) {
Ok(c) => c,
Err(e) => {
return Response::Error {
message: format!("reload: invalid config: {e:#}"),
};
}
};
apply_interpolation(&mut new_config);
let new_process_map = build_process_instances(&new_config, &cwd, &dotenv);
let new_fingerprints: BTreeMap<String, ServiceFingerprint> = {
let mut map: BTreeMap<String, (String, u16)> = BTreeMap::new();
for runtime in new_process_map.values() {
let entry = map
.entry(runtime.spec.base_name.clone())
.or_insert_with(|| (runtime.spec.config_hash.clone(), 0));
entry.1 += 1;
}
map.into_iter()
.map(|(k, (hash, replicas))| {
(
k,
ServiceFingerprint {
config_hash: hash,
replicas,
},
)
})
.collect()
};
let new_deps: BTreeMap<String, Vec<String>> = new_process_map
.values()
.map(|r| {
(
r.spec.base_name.clone(),
r.spec.depends_on.keys().cloned().collect(),
)
})
.collect();
let old_fingerprints: BTreeMap<String, ServiceFingerprint> = {
let guard = state.lock().await;
let mut map: BTreeMap<String, (String, u16)> = BTreeMap::new();
for runtime in guard.processes.values() {
let entry = map
.entry(runtime.spec.base_name.clone())
.or_insert_with(|| (runtime.spec.config_hash.clone(), 0));
entry.1 += 1;
}
map.into_iter()
.map(|(k, (hash, replicas))| {
(
k,
ServiceFingerprint {
config_hash: hash,
replicas,
},
)
})
.collect()
};
let plan = match compute_reload_plan(
&old_fingerprints,
&new_fingerprints,
&new_deps,
force_recreate,
no_recreate,
) {
Ok(p) => p,
Err(msg) => return Response::Error { message: msg },
};
let changed_instances: Vec<String> = {
let guard = state.lock().await;
guard
.processes
.iter()
.filter(|(_, r)| plan.changed.contains(&r.spec.base_name))
.map(|(n, _)| n.clone())
.collect()
};
let removed_instances: Vec<String> = if remove_orphans {
let guard = state.lock().await;
guard
.processes
.iter()
.filter(|(_, r)| plan.removed.contains(&r.spec.base_name))
.map(|(n, _)| n.clone())
.collect()
} else {
Vec::new()
};
let scaled_down_instances: Vec<String> = {
let guard = state.lock().await;
let mut to_drop = Vec::new();
for (base, (old_count, new_count)) in &plan.scaled {
if new_count < old_count {
for (name, runtime) in &guard.processes {
if &runtime.spec.base_name == base && runtime.spec.replica > *new_count {
to_drop.push(name.clone());
}
}
}
}
to_drop
};
let rename_plan: Vec<(String, String)> = plan
.scaled
.iter()
.filter_map(|(base, (old_count, new_count))| {
if *old_count == 1 && *new_count >= 2 {
Some((base.clone(), format!("{base}[1]")))
} else if *old_count >= 2 && *new_count == 1 {
Some((format!("{base}[1]"), base.clone()))
} else {
None
}
})
.collect();
let to_stop: Vec<String> = changed_instances
.iter()
.chain(removed_instances.iter())
.chain(scaled_down_instances.iter())
.cloned()
.collect();
if !to_stop.is_empty() {
let mut guard = state.lock().await;
guard.stop_instances(&to_stop);
}
if !to_stop.is_empty() {
wait_for_terminal(&state, &to_stop, Duration::from_millis(50), 1200).await;
}
let (n_added, n_changed, n_removed, n_scaled_services, replica_delta, n_renamed) = {
let mut guard = state.lock().await;
for name in &changed_instances {
guard.processes.remove(name);
guard.controllers.remove(name);
}
for name in &removed_instances {
guard.processes.remove(name);
guard.controllers.remove(name);
}
for name in &scaled_down_instances {
guard.processes.remove(name);
guard.controllers.remove(name);
}
let mut renamed = 0usize;
for (old_name, new_name) in &rename_plan {
let Some(mut runtime) = guard.processes.remove(old_name) else {
continue;
};
runtime.spec.name = new_name.clone();
if let Ok(mut guard_name) = runtime.name_handle.write() {
*guard_name = new_name.clone();
}
guard.processes.insert(new_name.clone(), runtime);
if let Some(ctrl) = guard.controllers.remove(old_name) {
guard.controllers.insert(new_name.clone(), ctrl);
}
renamed += 1;
}
let mut new_instances_added = 0usize;
let mut new_instances_changed = 0usize;
for (instance_name, runtime) in &new_process_map {
let is_added = plan.added.contains(&runtime.spec.base_name);
let is_changed = plan.changed.contains(&runtime.spec.base_name);
let scaled_up_tail =
plan.scaled
.get(&runtime.spec.base_name)
.is_some_and(|(old_count, new_count)| {
new_count > old_count && runtime.spec.replica > *old_count
});
if !(is_added || is_changed || scaled_up_tail) {
continue;
}
let mut runtime = runtime.clone();
if no_start && matches!(runtime.status, ProcessStatus::Pending) {
runtime.status = ProcessStatus::NotStarted;
}
guard.processes.insert(instance_name.clone(), runtime);
if is_added {
new_instances_added += 1;
} else if is_changed {
new_instances_changed += 1;
}
}
let removed_count = if remove_orphans {
removed_instances.len()
} else {
plan.removed.len()
};
let delta: i32 = plan
.scaled
.values()
.map(|(old_count, new_count)| i32::from(*new_count) - i32::from(*old_count))
.sum();
(
new_instances_added,
new_instances_changed,
removed_count,
plan.scaled.len(),
delta,
renamed,
)
};
if !plan.removed.is_empty() && !remove_orphans {
eprintln!(
"reload: orphan service(s) no longer in config (left running): [{}]",
plan.removed.join(", ")
);
}
let removed_label = if remove_orphans { "removed" } else { "orphan" };
let delta_sign = if replica_delta >= 0 { "+" } else { "" };
let rename_suffix = if n_renamed > 0 {
format!(", {n_renamed} renamed")
} else {
String::new()
};
Response::Ack {
message: format!(
"reloaded: +{n_added} added, {n_changed} changed, \
{n_scaled_services} scaled ({delta_sign}{replica_delta} replicas), \
{n_removed} {removed_label}{rename_suffix}",
),
}
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use std::path::PathBuf;
use super::*;
use crate::model::{
DependencyCondition, ProcessInstanceSpec, ProcessRuntime, ProcessStatus, RestartPolicy,
};
fn runtime_with(base: &str, status: ProcessStatus, started_once: bool) -> ProcessRuntime {
ProcessRuntime {
spec: ProcessInstanceSpec {
name: base.to_string(),
base_name: base.to_string(),
replica: 1,
command: "echo".to_string(),
description: None,
working_dir: PathBuf::from("/tmp"),
environment: BTreeMap::new(),
depends_on: BTreeMap::new(),
ready_log_line: None,
restart_policy: RestartPolicy::No,
backoff_seconds: 1,
max_restarts: None,
shutdown_signal: None,
shutdown_timeout_seconds: 10,
shutdown_command: None,
readiness_probe: None,
liveness_probe: None,
disabled: false,
config_hash: String::new(),
},
status,
started_once,
log_ready: false,
restart_count: 0,
ready: false,
alive: true,
name_handle: crate::model::make_name_handle(base.to_string()),
}
}
#[test]
fn dependencies_require_all_replicas_to_satisfy_condition() {
let mut snapshot = BTreeMap::new();
snapshot.insert("db[1]".to_string(), {
let mut r = runtime_with("db", ProcessStatus::Exited { code: 0 }, true);
r.spec.name = "db[1]".to_string();
r.spec.replica = 1;
r
});
snapshot.insert("db[2]".to_string(), {
let mut r = runtime_with("db", ProcessStatus::Exited { code: 1 }, true);
r.spec.name = "db[2]".to_string();
r.spec.replica = 2;
r
});
let mut candidate = runtime_with("api", ProcessStatus::Pending, false);
candidate.spec.depends_on.insert(
"db".to_string(),
DependencyCondition::ProcessCompletedSuccessfully,
);
assert!(!dependencies_met(&candidate, &snapshot));
if let Some(db2) = snapshot.get_mut("db[2]") {
db2.status = ProcessStatus::Exited { code: 0 };
}
assert!(dependencies_met(&candidate, &snapshot));
}
#[test]
fn process_started_condition_uses_started_once() {
let mut snapshot = BTreeMap::new();
snapshot.insert(
"db".to_string(),
runtime_with("db", ProcessStatus::Pending, false),
);
let mut candidate = runtime_with("api", ProcessStatus::Pending, false);
candidate
.spec
.depends_on
.insert("db".to_string(), DependencyCondition::ProcessStarted);
assert!(!dependencies_met(&candidate, &snapshot));
if let Some(db) = snapshot.get_mut("db") {
db.started_once = true;
db.status = ProcessStatus::Running { pid: 42 };
}
assert!(dependencies_met(&candidate, &snapshot));
}
#[test]
fn process_healthy_condition_uses_ready_flag_not_alive() {
let mut snapshot = BTreeMap::new();
let mut db = runtime_with("db", ProcessStatus::Running { pid: 42 }, true);
db.ready = false;
db.alive = true;
snapshot.insert("db".to_string(), db);
let mut candidate = runtime_with("api", ProcessStatus::Pending, false);
candidate
.spec
.depends_on
.insert("db".to_string(), DependencyCondition::ProcessHealthy);
assert!(
!dependencies_met(&candidate, &snapshot),
"alive-but-not-ready must not satisfy process_healthy"
);
if let Some(db) = snapshot.get_mut("db") {
db.ready = true;
}
assert!(dependencies_met(&candidate, &snapshot));
if let Some(db) = snapshot.get_mut("db") {
db.alive = false;
}
assert!(
dependencies_met(&candidate, &snapshot),
"process_healthy must ignore alive; only readiness gates it"
);
}
#[test]
fn process_log_ready_condition_uses_log_ready_flag() {
let mut snapshot = BTreeMap::new();
let mut db = runtime_with("db", ProcessStatus::Running { pid: 42 }, true);
db.log_ready = false;
snapshot.insert("db".to_string(), db);
let mut candidate = runtime_with("api", ProcessStatus::Pending, false);
candidate
.spec
.depends_on
.insert("db".to_string(), DependencyCondition::ProcessLogReady);
assert!(!dependencies_met(&candidate, &snapshot));
if let Some(db) = snapshot.get_mut("db") {
db.log_ready = true;
}
assert!(dependencies_met(&candidate, &snapshot));
}
fn fp(hash: &str, replicas: u16) -> ServiceFingerprint {
ServiceFingerprint {
config_hash: hash.to_string(),
replicas,
}
}
#[test]
fn reload_plan_classifies_added_removed_changed_unchanged() {
let mut old = BTreeMap::new();
old.insert("api".to_string(), fp("h_api_v1", 1));
old.insert("db".to_string(), fp("h_db", 1));
old.insert("cache".to_string(), fp("h_cache", 1));
let mut new = BTreeMap::new();
new.insert("api".to_string(), fp("h_api_v2", 1));
new.insert("db".to_string(), fp("h_db", 1));
new.insert("worker".to_string(), fp("h_worker", 1));
let deps: BTreeMap<String, Vec<String>> = BTreeMap::new();
let plan = compute_reload_plan(&old, &new, &deps, false, false).expect("no dep violations");
assert_eq!(plan.added, vec!["worker".to_string()]);
assert_eq!(plan.removed, vec!["cache".to_string()]);
assert_eq!(plan.changed, vec!["api".to_string()]);
assert_eq!(plan.unchanged, vec!["db".to_string()]);
}
#[test]
fn reload_plan_scales_pure_replica_count_change() {
let mut old = BTreeMap::new();
old.insert("worker".to_string(), fp("same_hash", 2));
let mut new = BTreeMap::new();
new.insert("worker".to_string(), fp("same_hash", 3));
let plan = compute_reload_plan(&old, &new, &BTreeMap::new(), false, false)
.expect("no dep violations");
assert!(
plan.changed.is_empty(),
"pure replica change should not recreate: {plan:?}"
);
assert!(
plan.unchanged.is_empty(),
"pure replica change should not be unchanged: {plan:?}"
);
assert_eq!(plan.scaled.get("worker"), Some(&(2, 3)));
}
#[test]
fn reload_plan_scale_down_classifies_as_scaled() {
let mut old = BTreeMap::new();
old.insert("worker".to_string(), fp("same_hash", 5));
let mut new = BTreeMap::new();
new.insert("worker".to_string(), fp("same_hash", 2));
let plan = compute_reload_plan(&old, &new, &BTreeMap::new(), false, false)
.expect("no dep violations");
assert!(plan.changed.is_empty());
assert_eq!(plan.scaled.get("worker"), Some(&(5, 2)));
}
#[test]
fn reload_plan_replica_change_plus_hash_change_is_changed() {
let mut old = BTreeMap::new();
old.insert("worker".to_string(), fp("h_v1", 2));
let mut new = BTreeMap::new();
new.insert("worker".to_string(), fp("h_v2", 3));
let plan = compute_reload_plan(&old, &new, &BTreeMap::new(), false, false)
.expect("no dep violations");
assert_eq!(plan.changed, vec!["worker".to_string()]);
assert!(plan.scaled.is_empty());
}
#[test]
fn reload_plan_force_recreate_overrides_scale() {
let mut old = BTreeMap::new();
old.insert("worker".to_string(), fp("same_hash", 2));
let mut new = BTreeMap::new();
new.insert("worker".to_string(), fp("same_hash", 3));
let plan = compute_reload_plan(&old, &new, &BTreeMap::new(), true, false)
.expect("force_recreate valid");
assert_eq!(plan.changed, vec!["worker".to_string()]);
assert!(plan.scaled.is_empty());
}
#[test]
fn reload_plan_no_recreate_does_not_block_scale() {
let mut old = BTreeMap::new();
old.insert("worker".to_string(), fp("same_hash", 2));
let mut new = BTreeMap::new();
new.insert("worker".to_string(), fp("same_hash", 3));
let plan = compute_reload_plan(&old, &new, &BTreeMap::new(), false, true)
.expect("no_recreate valid");
assert!(plan.changed.is_empty());
assert_eq!(plan.scaled.get("worker"), Some(&(2, 3)));
}
#[test]
fn reload_plan_replica_1_boundary_is_scaled_with_rename() {
for (old_count, new_count) in [(1u16, 2u16), (2, 1), (1, 3), (3, 1)] {
let mut old = BTreeMap::new();
old.insert("worker".to_string(), fp("same_hash", old_count));
let mut new = BTreeMap::new();
new.insert("worker".to_string(), fp("same_hash", new_count));
let plan = compute_reload_plan(&old, &new, &BTreeMap::new(), false, false)
.expect("valid transition");
assert!(
plan.changed.is_empty(),
"{old_count}→{new_count} should not be a full recreate"
);
assert_eq!(
plan.scaled.get("worker"),
Some(&(old_count, new_count)),
"{old_count}→{new_count} should be scaled"
);
}
}
#[test]
fn reload_plan_rejects_removed_service_still_depended_on() {
let mut old = BTreeMap::new();
old.insert("api".to_string(), fp("h_api", 1));
old.insert("db".to_string(), fp("h_db", 1));
let mut new = BTreeMap::new();
new.insert("api".to_string(), fp("h_api", 1));
let mut deps: BTreeMap<String, Vec<String>> = BTreeMap::new();
deps.insert("api".to_string(), vec!["db".to_string()]);
let err = compute_reload_plan(&old, &new, &deps, false, false)
.expect_err("must reject removed-but-still-depended-on");
assert!(err.contains("api"), "error mentions dependent: {err}");
assert!(err.contains("db"), "error mentions removed dep: {err}");
}
#[test]
fn reload_plan_allows_removal_when_no_remaining_service_depends_on_it() {
let mut old = BTreeMap::new();
old.insert("api".to_string(), fp("h_api", 1));
old.insert("legacy".to_string(), fp("h_legacy", 1));
let mut new = BTreeMap::new();
new.insert("api".to_string(), fp("h_api", 1));
let deps: BTreeMap<String, Vec<String>> = BTreeMap::new();
let plan =
compute_reload_plan(&old, &new, &deps, false, false).expect("removal is allowed");
assert_eq!(plan.removed, vec!["legacy".to_string()]);
}
#[test]
fn reload_plan_force_recreate_promotes_unchanged_to_changed() {
let mut old = BTreeMap::new();
old.insert("api".to_string(), fp("same_hash", 1));
old.insert("db".to_string(), fp("h_db", 1));
let mut new = BTreeMap::new();
new.insert("api".to_string(), fp("same_hash", 1));
new.insert("db".to_string(), fp("h_db", 1));
let deps: BTreeMap<String, Vec<String>> = BTreeMap::new();
let plan = compute_reload_plan(&old, &new, &deps, true, false)
.expect("force_recreate is a valid option");
assert_eq!(
plan.changed,
vec!["api".to_string(), "db".to_string()],
"force_recreate promotes every still-present service to changed"
);
assert!(
plan.unchanged.is_empty(),
"force_recreate should leave nothing in unchanged"
);
}
#[test]
fn reload_plan_no_recreate_demotes_hash_diverged_to_unchanged() {
let mut old = BTreeMap::new();
old.insert("api".to_string(), fp("h_api_v1", 1));
old.insert("db".to_string(), fp("h_db", 1));
let mut new = BTreeMap::new();
new.insert("api".to_string(), fp("h_api_v2", 1));
new.insert("db".to_string(), fp("h_db", 1));
new.insert("worker".to_string(), fp("h_worker", 1));
let deps: BTreeMap<String, Vec<String>> = BTreeMap::new();
let plan = compute_reload_plan(&old, &new, &deps, false, true)
.expect("no_recreate is a valid option");
assert!(
plan.changed.is_empty(),
"no_recreate should keep every still-present service as unchanged"
);
assert_eq!(
plan.unchanged,
vec!["api".to_string(), "db".to_string()],
"hash-diverged api was demoted to unchanged"
);
assert_eq!(
plan.added,
vec!["worker".to_string()],
"added services aren't affected by no_recreate"
);
}
#[test]
fn reload_plan_default_flags_preserve_existing_behavior() {
let mut old = BTreeMap::new();
old.insert("api".to_string(), fp("h_api_v1", 1));
old.insert("db".to_string(), fp("h_db", 1));
let mut new = BTreeMap::new();
new.insert("api".to_string(), fp("h_api_v2", 1));
new.insert("db".to_string(), fp("h_db", 1));
let deps: BTreeMap<String, Vec<String>> = BTreeMap::new();
let plan =
compute_reload_plan(&old, &new, &deps, false, false).expect("default flags are valid");
assert_eq!(plan.changed, vec!["api".to_string()]);
assert_eq!(plan.unchanged, vec!["db".to_string()]);
}
#[test]
fn restart_separator_formats_exit_code_with_max() {
let line = format_restart_separator("api", ExitReason::Code(1), 2, Some(5));
assert_eq!(line, "[api] --- restarted (exit code 1, attempt 2/5) ---");
}
#[test]
fn restart_separator_formats_exit_code_without_max() {
let line = format_restart_separator("api", ExitReason::Code(1), 2, None);
assert_eq!(line, "[api] --- restarted (exit code 1, attempt 2) ---");
}
#[test]
fn restart_separator_formats_signal_with_known_name() {
let line = format_restart_separator("api", ExitReason::Signal(15), 1, None);
assert_eq!(line, "[api] --- restarted (signal SIGTERM, attempt 1) ---");
}
#[test]
fn restart_separator_formats_unknown_signal() {
let line = format_restart_separator("worker", ExitReason::Signal(999), 3, Some(10));
assert_eq!(
line,
"[worker] --- restarted (signal 999, attempt 3/10) ---"
);
}
#[test]
fn restart_separator_preserves_process_name_prefix_for_filtering() {
let line = format_restart_separator("my-service", ExitReason::Code(0), 1, None);
assert!(
line.starts_with("[my-service] "),
"expected name-prefixed line, got {line:?}"
);
}
}