use std::collections::HashMap;
use std::time::Duration;
use crate::sdk::{PrepareRestartResult, ServiceState};
use nix::sys::signal::Signal;
use crate::server::graph::ServiceId;
use crate::server::process::{process_exists, send_signal_to_group};
use crate::server::state::{
PersistedService, PersistedServiceState, PersistedState, STATE_VERSION, now_millis, save_state,
};
use super::Supervisor;
impl Supervisor {
pub(crate) async fn remap_service_ids(
&mut self,
old_id_to_name: &HashMap<ServiceId, String>,
name_to_new_id: &HashMap<String, ServiceId>,
) {
let remap = |old_id: ServiceId| -> Option<ServiceId> {
old_id_to_name
.get(&old_id)
.and_then(|name| name_to_new_id.get(name))
.copied()
};
let old_process_tasks = std::mem::take(&mut self.process_tasks);
for (old_id, handle) in old_process_tasks {
if let Some(new_id) = remap(old_id) {
self.process_tasks.insert(new_id, handle);
} else {
handle.abort();
}
}
let old_timers = std::mem::take(&mut self.timers);
for ((old_id, kind), handle) in old_timers {
if let Some(new_id) = remap(old_id) {
self.timers.insert((new_id, kind), handle);
} else {
handle.abort();
}
}
let old_health = std::mem::take(&mut self.health_attempts);
for (old_id, count) in old_health {
if let Some(new_id) = remap(old_id) {
self.health_attempts.insert(new_id, count);
}
}
let old_pending = std::mem::take(&mut self.pending_restarts);
for old_id in old_pending {
if let Some(new_id) = remap(old_id) {
self.pending_restarts.insert(new_id);
}
}
{
let mut buffers = self.log_buffers.write().await;
let old_buffers = std::mem::take(&mut *buffers);
for (old_id, buffer) in old_buffers {
if let Some(new_id) = remap(old_id) {
buffers.insert(new_id, buffer);
}
}
}
}
pub async fn stop_all_services(&mut self) {
let shutdown_order: Vec<(ServiceId, String, Option<u32>)> = {
let graph = self.graph.read().await;
graph
.shutdown_order()
.into_iter()
.filter_map(|id| {
let svc = graph.get(id)?;
if svc.state.is_active() || svc.state.pid().is_some() {
Some((id, svc.name.clone(), svc.state.pid()))
} else {
None
}
})
.collect()
};
if shutdown_order.is_empty() {
tracing::info!("no running services to stop");
return;
}
tracing::info!(
count = shutdown_order.len(),
"stopping all services in dependency order"
);
for (id, name, pid_opt) in &shutdown_order {
if let Some(pid) = pid_opt {
if *pid == 0 {
tracing::info!(service = %name, "marking target as inactive");
let mut graph = self.graph.write().await;
graph.set_state(*id, ServiceState::Inactive);
continue;
}
tracing::info!(service = %name, pgid = pid, "stopping service (SIGTERM)");
let _ = send_signal_to_group(*pid, Signal::SIGTERM);
{
let mut graph = self.graph.write().await;
graph.set_state(*id, ServiceState::Stopping { pid: *pid });
}
let mut stopped = false;
for _ in 0..50 {
tokio::time::sleep(Duration::from_millis(100)).await;
if !process_exists(*pid) {
stopped = true;
break;
}
}
if !stopped {
tracing::warn!(service = %name, pgid = pid, "stop timeout, sending SIGKILL");
let _ = send_signal_to_group(*pid, Signal::SIGKILL);
tokio::time::sleep(Duration::from_millis(100)).await;
}
{
let mut graph = self.graph.write().await;
graph.set_state(*id, ServiceState::Exited { exit_code: None });
}
} else {
let mut graph = self.graph.write().await;
graph.set_state(*id, ServiceState::Inactive);
}
}
tracing::info!("all services stopped");
}
pub async fn serialize_state(&self) -> PersistedState {
let graph = self.graph.read().await;
let services = graph
.all_services()
.filter_map(|id| {
let svc = graph.get(id)?;
let persisted = PersistedService {
name: svc.name.clone(),
state: PersistedServiceState::from(&svc.state),
pid: svc.state.pid(),
restart_count: svc.restart_count,
current_restart_delay_ms: svc.current_restart_delay_ms,
last_exit_code: svc.last_exit_code,
last_exit_signal: svc.last_exit_signal,
started_at: svc.started_at,
last_state_change: svc.last_state_change,
ephemeral: svc.ephemeral,
config: if svc.ephemeral {
svc.service_config().cloned()
} else {
None
},
};
Some((svc.name.clone(), persisted))
})
.collect();
PersistedState {
version: STATE_VERSION,
saved_at: now_millis(),
boot_time: self.boot_time,
services,
config_dir: self.config_dir.display().to_string(),
socket_path: self.socket_path.display().to_string(),
}
}
pub async fn save_state_to_disk(&self) -> Result<(), std::io::Error> {
let state = self.serialize_state().await;
save_state(&state)?;
tracing::info!(
services = state.services.len(),
path = crate::server::state::STATE_PATH,
"state saved for restart"
);
Ok(())
}
pub async fn prepare_restart(&self) -> Result<PrepareRestartResult, String> {
self.save_state_to_disk()
.await
.map_err(|e| format!("Failed to save state: {}", e))?;
Ok(PrepareRestartResult {
state_path: crate::server::state::STATE_PATH.to_string(),
ready: true,
})
}
}