use crate::sdk::{FailureReason, ServiceState};
use nix::sys::signal::Signal;
use crate::server::graph::ServiceId;
use crate::server::process::send_signal_to_group;
use super::Supervisor;
use super::events::{ReevaluateAction, SupervisorEvent, TimeoutKind};
impl Supervisor {
pub(crate) async fn handle_event(&mut self, event: SupervisorEvent) {
let event_desc = match &event {
SupervisorEvent::ProcessExited {
service_id,
exit_code,
signal,
} => {
let name = self
.service_name(*service_id)
.await
.unwrap_or_else(|| "?".to_string());
format!(
"ProcessExited {{ service={}, exit_code={:?}, signal={:?} }}",
name, exit_code, signal
)
}
SupervisorEvent::HealthCheckResult {
service_id, passed, ..
} => {
let name = self
.service_name(*service_id)
.await
.unwrap_or_else(|| "?".to_string());
format!(
"HealthCheckResult {{ service={}, passed={} }}",
name, passed
)
}
SupervisorEvent::Timeout { service_id, kind } => {
let name = self
.service_name(*service_id)
.await
.unwrap_or_else(|| "?".to_string());
format!("Timeout {{ service={}, kind={:?} }}", name, kind)
}
SupervisorEvent::Reevaluate { service_id } => {
let name = self
.service_name(*service_id)
.await
.unwrap_or_else(|| "?".to_string());
format!("Reevaluate {{ service={} }}", name)
}
SupervisorEvent::BuiltinCompleted {
service_id,
success,
..
} => {
let name = self
.service_name(*service_id)
.await
.unwrap_or_else(|| "?".to_string());
format!(
"BuiltinCompleted {{ service={}, success={} }}",
name, success
)
}
_ => format!("{}", event),
};
tracing::debug!(event = %event_desc, "handling event");
match event {
SupervisorEvent::ProcessExited {
service_id,
exit_code,
signal,
} => {
self.handle_process_exit(service_id, exit_code, signal)
.await;
}
SupervisorEvent::HealthCheckResult {
service_id,
passed,
error,
} => {
self.handle_health_check_result(service_id, passed, error)
.await;
}
SupervisorEvent::Timeout { service_id, kind } => {
self.handle_timeout(service_id, kind).await;
}
SupervisorEvent::Reevaluate { service_id } => {
self.handle_reevaluate(service_id).await;
}
SupervisorEvent::StartService { name } => {
if let Err(e) = self.start_service(&name).await {
tracing::error!(service = %name, error = %e, "failed to start service");
}
}
SupervisorEvent::StopService { name } => {
if let Err(e) = self.stop_service(&name).await {
tracing::error!(service = %name, error = %e, "failed to stop service");
}
}
SupervisorEvent::RestartService { name } => {
let (id, is_running) = {
let graph = self.graph.read().await;
if let Some(id) = graph.get_by_name(&name) {
let service = graph.get(id).unwrap();
let is_running = service.state.is_active();
(Some(id), is_running)
} else {
(None, false)
}
};
if let Some(id) = id {
if is_running {
self.pending_restarts.insert(id);
tracing::info!(service = %name, "restart requested, stopping service");
if let Err(e) = self.stop_service(&name).await {
self.pending_restarts.remove(&id);
tracing::error!(service = %name, error = %e, "failed to stop service for restart");
}
} else {
tracing::info!(service = %name, "restart requested, service not running, starting");
{
let mut graph = self.graph.write().await;
if let Some(service) = graph.get_mut(id) {
service.reset_backoff();
}
}
self.try_start_service(id).await;
}
} else {
tracing::error!(service = %name, "service not found for restart");
}
}
SupervisorEvent::KillService { name, signal } => {
if let Err(e) = self.kill_service(&name, signal.as_deref()).await {
tracing::error!(service = %name, error = %e, "failed to kill service");
}
}
SupervisorEvent::AddService { config } => {
let name = config.service.name.clone();
if let Err(e) = self.add_service(config).await {
tracing::error!(service = %name, error = %e, "failed to add service");
} else {
tracing::info!(service = %name, "service added");
}
}
SupervisorEvent::RemoveService { name } => {
if let Err(e) = self.remove_service(&name).await {
tracing::error!(service = %name, error = %e, "failed to remove service");
} else {
tracing::info!(service = %name, "service removed");
}
}
SupervisorEvent::Reload { response_tx } => {
let result = self.reload().await;
match &result {
Ok(r) => {
tracing::info!(
added = r.added.len(),
removed = r.removed.len(),
changed = r.changed.len(),
"configuration reloaded"
);
}
Err(e) => {
tracing::error!(error = %e, "failed to reload configuration");
}
}
let _ = response_tx.send(result.map_err(|e| e.to_string()));
}
SupervisorEvent::PrepareRestart { response_tx } => {
tracing::info!("preparing for restart");
let result = self.prepare_restart().await;
let _ = response_tx.send(result);
}
SupervisorEvent::BuiltinCompleted {
service_id,
success,
error,
} => {
self.handle_builtin_completed(service_id, success, error)
.await;
}
}
}
pub(crate) async fn handle_reevaluate(&mut self, id: ServiceId) {
let action = {
let graph = self.graph.read().await;
let service = match graph.get(id) {
Some(s) => s,
None => return,
};
match &service.state {
ServiceState::Blocked { .. } | ServiceState::Inactive => {
ReevaluateAction::TryStart
}
ServiceState::Running { .. } if service.is_target() => {
match graph.can_start(id) {
Ok(()) => ReevaluateAction::None,
Err(reason) => ReevaluateAction::DowngradeTarget {
name: service.name.clone(),
waiting_on: reason.waiting_on(),
},
}
}
_ => ReevaluateAction::None,
}
};
match action {
ReevaluateAction::TryStart => {
self.try_start_service(id).await;
}
ReevaluateAction::DowngradeTarget { name, waiting_on } => {
tracing::warn!(
target = %name,
waiting_on = ?waiting_on,
"target dependencies no longer satisfied, downgrading to blocked"
);
let dependents = {
let mut graph = self.graph.write().await;
graph.set_state(id, ServiceState::Blocked { waiting_on });
graph.dependents(id)
};
self.queue_reevaluate(dependents).await;
}
ReevaluateAction::None => {}
}
}
pub(crate) async fn handle_process_exit(
&mut self,
id: ServiceId,
exit_code: Option<i32>,
signal: Option<i32>,
) {
self.cancel_timeout(id, TimeoutKind::Start);
self.cancel_timeout(id, TimeoutKind::Stop);
self.cancel_timeout(id, TimeoutKind::HealthCheck);
self.process_tasks.remove(&id);
self.health_attempts.remove(&id);
let (name, restart_delay, dependents, critical_failure) = {
let mut graph = self.graph.write().await;
let service = match graph.get_mut(id) {
Some(s) => s,
None => return,
};
let name = service.name.clone();
let was_stopping = matches!(service.state, ServiceState::Stopping { .. });
let is_oneshot = service
.service_config()
.map(|c| c.service.oneshot)
.unwrap_or(false);
let is_critical = service.is_critical();
let new_state = if was_stopping {
ServiceState::Exited { exit_code }
} else if let Some(sig) = signal {
ServiceState::Failed {
reason: FailureReason::Signal { signal: sig },
}
} else if exit_code == Some(0) || is_oneshot {
ServiceState::Exited { exit_code }
} else {
ServiceState::Failed {
reason: FailureReason::ExitCode {
code: exit_code.unwrap_or(-1),
},
}
};
service.state = new_state.clone();
tracing::info!(
service = %name,
exit_code = ?exit_code,
signal = ?signal,
state = %new_state.name(),
"process exited"
);
let critical_failure = if is_critical {
if let ServiceState::Failed { ref reason } = new_state {
Some(reason.clone())
} else {
None
}
} else {
None
};
service.try_reset_backoff();
let restart_delay = if !was_stopping && !is_oneshot {
match service.next_restart_delay(exit_code) {
Some(delay) => Some(delay),
None => {
if service.restart_count > 0 {
tracing::warn!(
service = %name,
restart_count = service.restart_count,
"max restarts exceeded, giving up"
);
}
None
}
}
} else {
None
};
let dependents = graph.dependents(id);
(name, restart_delay, dependents, critical_failure)
};
if let Some(reason) = critical_failure {
self.handle_critical_failure(&name, &reason).await;
}
self.queue_reevaluate(dependents).await;
if self.pending_restarts.remove(&id) {
tracing::info!(service = %name, "restarting service (explicit restart)");
{
let mut graph = self.graph.write().await;
if let Some(service) = graph.get_mut(id) {
service.reset_backoff();
}
}
self.try_start_service(id).await;
} else if let Some(delay) = restart_delay {
let restart_count = {
let graph = self.graph.read().await;
graph.get(id).map(|s| s.restart_count).unwrap_or(0)
};
tracing::info!(
service = %name,
delay_ms = delay,
restart_count = restart_count,
"scheduling restart with exponential backoff"
);
self.schedule_timeout(id, TimeoutKind::RestartDelay, delay);
}
}
pub(crate) async fn handle_critical_failure(
&mut self,
failed_service: &str,
reason: &FailureReason,
) {
if !self.pid1_mode {
return;
}
tracing::error!("==================== CRITICAL FAILURE ====================");
tracing::error!("Service '{}' failed: {}", failed_service, reason);
tracing::error!("Manual intervention may be required.");
tracing::error!("==========================================================");
let network_up = {
let graph = self.graph.read().await;
graph
.get_by_name("network")
.and_then(|id| graph.get(id))
.is_some_and(|s| matches!(s.state, ServiceState::Running { .. }))
};
if network_up {
tracing::error!("Network is up - connect via SSH to recover:");
tracing::error!(" zinit status # view service states");
tracing::error!(" zinit start <name> # retry failed service");
} else {
tracing::error!("Network not available. Use serial console if configured.");
}
}
pub(crate) async fn handle_health_check_result(
&mut self,
id: ServiceId,
passed: bool,
error: Option<String>,
) {
if passed {
let dependents = {
let mut graph = self.graph.write().await;
let service = match graph.get_mut(id) {
Some(s) => s,
None => return,
};
let name = service.name.clone();
if let ServiceState::Starting { pid } = service.state {
service.state = ServiceState::Running { pid };
tracing::info!(service = %name, "health check passed, service running");
graph.dependents(id)
} else {
return;
}
};
self.cancel_timeout(id, TimeoutKind::Start);
self.health_attempts.remove(&id);
self.queue_reevaluate(dependents).await;
} else {
let attempts = self.health_attempts.entry(id).or_insert(0);
*attempts += 1;
let current_attempts = *attempts;
let (name, max_retries, interval, pid) = {
let graph = self.graph.read().await;
let service = match graph.get(id) {
Some(s) => s,
None => return,
};
let name = service.name.clone();
let pid = service.state.pid();
let (max_retries, interval) = service
.service_config()
.and_then(|c| c.health.as_ref())
.map(|h| match h {
crate::sdk::HealthDef::Tcp { common, .. }
| crate::sdk::HealthDef::Http { common, .. }
| crate::sdk::HealthDef::Exec { common, .. } => {
(common.retries, common.interval_ms)
}
})
.unwrap_or((3, 10000));
(name, max_retries, interval, pid)
};
tracing::warn!(
service = %name,
attempt = current_attempts,
max_retries = max_retries,
error = ?error,
"health check failed"
);
if current_attempts >= max_retries {
if let Some(p) = pid {
let _ = send_signal_to_group(p, Signal::SIGKILL);
}
let mut graph = self.graph.write().await;
graph.set_state(
id,
ServiceState::Failed {
reason: FailureReason::HealthCheckFailed {
attempts: current_attempts,
},
},
);
} else {
self.schedule_timeout(id, TimeoutKind::HealthCheck, interval);
}
}
}
pub(crate) async fn handle_timeout(&mut self, id: ServiceId, kind: TimeoutKind) {
self.timers.remove(&(id, kind));
match kind {
TimeoutKind::Start => {
let mut graph = self.graph.write().await;
let service = match graph.get(id) {
Some(s) => s,
None => return,
};
if let ServiceState::Starting { pid } = service.state {
tracing::error!(service = %service.name, pgid = pid, "start timeout, killing process group");
let _ = send_signal_to_group(pid, Signal::SIGKILL);
graph.set_state(
id,
ServiceState::Failed {
reason: FailureReason::StartTimeout,
},
);
}
}
TimeoutKind::Stop => {
let graph = self.graph.read().await;
let service = match graph.get(id) {
Some(s) => s,
None => return,
};
if let ServiceState::Stopping { pid } = service.state {
tracing::warn!(service = %service.name, pgid = pid, "stop timeout, sending SIGKILL to process group");
let _ = send_signal_to_group(pid, Signal::SIGKILL);
}
}
TimeoutKind::HealthCheck => {
let health = {
let graph = self.graph.read().await;
graph
.get(id)
.and_then(|s| s.service_config())
.and_then(|c| c.health.clone())
};
if let Some(health) = health {
self.run_health_check(id, health);
}
}
TimeoutKind::RestartDelay => {
self.try_start_service(id).await;
}
}
}
}