use std::collections::HashMap;
use std::time::Duration;
use crate::sdk::{LogLine, ReloadResult, ServiceConfig, ServiceState, ServiceStatus, WhyBlocked};
use crate::server::error::{SupervisorError, SupervisorResult};
use crate::server::graph::{Service, ServiceId};
use crate::server::log;
use crate::server::process::{parse_signal, send_signal_to_group};
use super::SYSTEM_CONFIG_DIR;
use super::Supervisor;
use super::events::TimeoutKind;
impl Supervisor {
pub async fn list_services(&self) -> Vec<String> {
let graph = self.graph.read().await;
graph
.all_services()
.filter_map(|id| graph.get(id).map(|s| s.name.clone()))
.collect()
}
pub async fn get_status(&self, name: &str) -> SupervisorResult<ServiceStatus> {
let graph = self.graph.read().await;
let id = graph
.get_by_name(name)
.ok_or_else(|| SupervisorError::ServiceNotFound(name.to_string()))?;
let service = graph.get(id).unwrap();
Ok(ServiceStatus::from_state(
service.name.clone(),
&service.state,
))
}
pub async fn start_service(&mut self, name: &str) -> SupervisorResult<()> {
let id = {
let graph = self.graph.read().await;
graph
.get_by_name(name)
.ok_or_else(|| SupervisorError::ServiceNotFound(name.to_string()))?
};
self.try_start_service(id).await;
Ok(())
}
pub async fn stop_service(&mut self, name: &str) -> SupervisorResult<()> {
let dependents_to_stop: Vec<String> = {
let graph = self.graph.read().await;
let id = graph
.get_by_name(name)
.ok_or_else(|| SupervisorError::ServiceNotFound(name.to_string()))?;
graph
.all_dependents_ordered(id)
.into_iter()
.filter_map(|dep_id| {
let dep = graph.get(dep_id)?;
if dep.state.is_active() || dep.state.pid().is_some() {
Some(dep.name.clone())
} else {
None
}
})
.collect()
};
if !dependents_to_stop.is_empty() {
tracing::info!(
service = %name,
dependents = ?dependents_to_stop,
"stopping dependents first"
);
for dep_name in dependents_to_stop {
if let Err(e) = self.stop_single_service(&dep_name).await {
tracing::warn!(
service = %dep_name,
error = %e,
"failed to stop dependent service"
);
}
}
}
self.stop_single_service(name).await
}
pub(crate) async fn stop_single_service(&mut self, name: &str) -> SupervisorResult<()> {
let (id, pid, stop_signal, stop_timeout) = {
let graph = self.graph.read().await;
let id = graph
.get_by_name(name)
.ok_or_else(|| SupervisorError::ServiceNotFound(name.to_string()))?;
let service = graph.get(id).unwrap();
if matches!(
service.state,
ServiceState::Inactive
| ServiceState::Exited { .. }
| ServiceState::Stopping { .. }
) {
tracing::debug!(service = %name, state = %service.state.name(), "service already stopped or stopping");
return Ok(());
}
let pid = service.state.pid();
if pid == Some(0) || pid.is_none() {
drop(graph);
let mut graph = self.graph.write().await;
graph.set_state(id, ServiceState::Inactive);
tracing::info!(service = %name, "target/service marked inactive");
return Ok(());
}
let pid = pid.unwrap();
let stop_signal = service
.service_config()
.map(|c| c.lifecycle.stop_signal.clone())
.unwrap_or_else(|| "SIGTERM".to_string());
let stop_timeout = service
.service_config()
.map(|c| c.lifecycle.stop_timeout_ms)
.unwrap_or(10000);
(id, pid, stop_signal, stop_timeout)
};
let sig = parse_signal(&stop_signal)?;
send_signal_to_group(pid, sig)?;
{
let mut graph = self.graph.write().await;
graph.set_state(id, ServiceState::Stopping { pid });
}
self.schedule_timeout(id, TimeoutKind::Stop, stop_timeout);
tracing::info!(service = %name, signal = %stop_signal, pgid = pid, "stopping service (process group)");
Ok(())
}
pub async fn restart_service(&mut self, name: &str) -> SupervisorResult<()> {
self.stop_service(name).await?;
Ok(())
}
pub async fn kill_service(&mut self, name: &str, signal: Option<&str>) -> SupervisorResult<()> {
let graph = self.graph.read().await;
let id = graph
.get_by_name(name)
.ok_or_else(|| SupervisorError::ServiceNotFound(name.to_string()))?;
let service = graph.get(id).unwrap();
let pid = service
.state
.pid()
.ok_or_else(|| SupervisorError::ServiceNotRunning(name.to_string()))?;
let sig = parse_signal(signal.unwrap_or("SIGTERM"))?;
send_signal_to_group(pid, sig)?;
tracing::info!(service = %name, signal = ?sig, pgid = pid, "sent signal to process group");
Ok(())
}
pub async fn why_blocked(&self, name: &str) -> SupervisorResult<WhyBlocked> {
let graph = self.graph.read().await;
let id = graph
.get_by_name(name)
.ok_or_else(|| SupervisorError::ServiceNotFound(name.to_string()))?;
let service = graph.get(id).unwrap();
let blocked = matches!(service.state, ServiceState::Blocked { .. });
let (waiting_on, conflicts_with) = match graph.can_start(id) {
Ok(()) => (vec![], vec![]),
Err(reason) => (reason.waiting_on(), reason.conflicts_with()),
};
let ascii = graph
.format_why_blocked(name)
.unwrap_or_else(|| "Unknown service".to_string());
Ok(WhyBlocked {
name: name.to_string(),
blocked,
waiting_on,
conflicts_with,
ascii,
})
}
pub async fn get_tree(&self) -> String {
let graph = self.graph.read().await;
graph.format_tree()
}
pub async fn add_service(&mut self, config: ServiceConfig) -> SupervisorResult<()> {
let errors = crate::sdk::validate::validate_service(&config);
if !errors.is_empty() {
return Err(SupervisorError::Validation(errors.join(", ")));
}
let (id, should_autostart) = {
let mut graph = self.graph.write().await;
let service = Service::from_service(config);
let autostart = service.should_autostart();
let id = graph.add_service(service)?;
(id, autostart)
};
if should_autostart {
self.try_start_service(id).await;
} else {
tracing::debug!(service_id = ?id, "service added with status != start, not auto-starting");
}
Ok(())
}
pub async fn remove_service(&mut self, name: &str) -> SupervisorResult<()> {
let is_active = {
let graph = self.graph.read().await;
graph
.get_by_name(name)
.and_then(|id| graph.get(id))
.is_some_and(|service| service.state.is_active())
};
if is_active {
self.stop_service(name).await?;
tokio::time::sleep(Duration::from_millis(100)).await;
}
let mut graph = self.graph.write().await;
graph.remove_service(name)?;
Ok(())
}
pub async fn reload(&mut self) -> SupervisorResult<ReloadResult> {
let mut graph = self.graph.write().await;
let old_id_to_name: HashMap<ServiceId, String> = graph
.all_services()
.filter_map(|id| graph.get(id).map(|s| (id, s.name.clone())))
.collect();
let result = if self.pid1_mode {
let system_dir = std::path::Path::new(SYSTEM_CONFIG_DIR);
let (result, new_system_names) =
graph.reload_from_directories(Some(system_dir), &self.config_dir)?;
self.system_service_names = new_system_names;
result
} else {
graph.reload_from_directory(&self.config_dir)?
};
let name_to_new_id: HashMap<String, ServiceId> = graph
.all_services()
.filter_map(|id| graph.get(id).map(|s| (s.name.clone(), id)))
.collect();
drop(graph);
self.remap_service_ids(&old_id_to_name, &name_to_new_id)
.await;
tracing::info!(
added = result.added.len(),
removed = result.removed.len(),
changed = result.changed.len(),
pid1_mode = self.pid1_mode,
"configuration reloaded"
);
Ok(result)
}
pub async fn get_logs(
&self,
name: &str,
lines: Option<usize>,
) -> SupervisorResult<Vec<LogLine>> {
let graph = self.graph.read().await;
let id = graph
.get_by_name(name)
.ok_or_else(|| SupervisorError::ServiceNotFound(name.to_string()))?;
drop(graph);
Ok(log::get_logs(&self.log_buffers, id, lines).await)
}
}