use std::collections::{HashMap, HashSet};
use std::fs;
use std::path::Path;
use std::time::{SystemTime, UNIX_EPOCH};
use petgraph::Direction;
use petgraph::algo::{is_cyclic_directed, toposort};
use petgraph::graph::{DiGraph, NodeIndex};
use petgraph::visit::EdgeRef;
use crate::sdk::{
DepType, DependencyDef, ReloadResult, ServiceConfig, ServiceState, TargetConfig, validate,
};
use super::error::{BlockedReason, GraphError, GraphResult, ProcessConflictInfo};
fn now_millis() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}
#[derive(Debug, Clone)]
pub struct PortOwner {
pub pid: u32,
pub name: Option<String>,
}
pub fn check_port_in_use(port: u16) -> Option<PortOwner> {
use std::net::TcpListener;
let addresses = [format!("0.0.0.0:{}", port), format!("127.0.0.1:{}", port)];
for addr in &addresses {
if TcpListener::bind(addr).is_err() {
#[cfg(target_os = "linux")]
{
return find_port_owner_linux(port);
}
#[cfg(target_os = "macos")]
{
return find_port_owner_macos(port);
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
{
return Some(PortOwner { pid: 0, name: None });
}
}
}
None
}
#[cfg(target_os = "linux")]
fn find_port_owner_linux(port: u16) -> Option<PortOwner> {
use std::io::BufRead;
for path in &["/proc/net/tcp", "/proc/net/tcp6"] {
if let Ok(file) = std::fs::File::open(path) {
let reader = std::io::BufReader::new(file);
for line in reader.lines().skip(1).map_while(Result::ok) {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() < 10 {
continue;
}
if let Some(local) = parts.get(1) {
if let Some(port_hex) = local.split(':').nth(1) {
if let Ok(p) = u16::from_str_radix(port_hex, 16) {
if p == port {
if let Some(inode) = parts.get(9) {
if let Some(owner) = find_pid_by_inode(inode) {
return Some(owner);
}
}
return Some(PortOwner { pid: 0, name: None });
}
}
}
}
}
}
}
Some(PortOwner { pid: 0, name: None })
}
#[cfg(target_os = "linux")]
fn find_pid_by_inode(inode: &str) -> Option<PortOwner> {
use std::os::unix::fs::MetadataExt;
let socket_pattern = format!("socket:[{}]", inode);
if let Ok(entries) = std::fs::read_dir("/proc") {
for entry in entries.filter_map(Result::ok) {
let pid_str = entry.file_name().to_string_lossy().to_string();
if let Ok(pid) = pid_str.parse::<u32>() {
let fd_path = format!("/proc/{}/fd", pid);
if let Ok(fds) = std::fs::read_dir(&fd_path) {
for fd in fds.filter_map(Result::ok) {
if let Ok(link) = std::fs::read_link(fd.path()) {
if link.to_string_lossy() == socket_pattern {
let name = std::fs::read_to_string(format!("/proc/{}/comm", pid))
.ok()
.map(|s| s.trim().to_string());
return Some(PortOwner { pid, name });
}
}
}
}
}
}
}
None
}
#[cfg(target_os = "macos")]
fn find_port_owner_macos(port: u16) -> Option<PortOwner> {
use std::process::Command;
let output = Command::new("lsof")
.args(["-i", &format!(":{}", port), "-t", "-sTCP:LISTEN"])
.output()
.ok()?;
if output.status.success() {
let pid_str = String::from_utf8_lossy(&output.stdout);
if let Ok(pid) = pid_str.trim().lines().next()?.parse::<u32>() {
let ps_output = Command::new("ps")
.args(["-p", &pid.to_string(), "-o", "comm="])
.output()
.ok();
let name = ps_output.and_then(|o| {
if o.status.success() {
Some(String::from_utf8_lossy(&o.stdout).trim().to_string())
} else {
None
}
});
return Some(PortOwner { pid, name });
}
}
Some(PortOwner { pid: 0, name: None })
}
#[derive(Debug, Clone)]
pub struct ProcessInfo {
pub pid: u32,
pub name: String,
pub cmdline: Option<String>,
}
pub fn find_processes_by_name(filter: &str) -> Vec<ProcessInfo> {
if filter.is_empty() {
return Vec::new();
}
#[cfg(target_os = "linux")]
{
find_processes_by_name_linux(filter)
}
#[cfg(target_os = "macos")]
{
find_processes_by_name_macos(filter)
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
{
Vec::new()
}
}
#[cfg(target_os = "linux")]
fn find_processes_by_name_linux(filter: &str) -> Vec<ProcessInfo> {
let filter_lower = filter.to_lowercase();
let mut processes = Vec::new();
if let Ok(entries) = fs::read_dir("/proc") {
for entry in entries.flatten() {
let path = entry.path();
if let Some(name) = path.file_name() {
if let Some(pid_str) = name.to_str() {
if let Ok(pid) = pid_str.parse::<u32>() {
let comm_path = path.join("comm");
let cmdline_path = path.join("cmdline");
let comm = fs::read_to_string(&comm_path)
.ok()
.map(|s| s.trim().to_string());
let cmdline = fs::read_to_string(&cmdline_path)
.ok()
.map(|s| s.replace('\0', " ").trim().to_string());
if let Some(ref process_name) = comm {
if process_name.to_lowercase().contains(&filter_lower) {
processes.push(ProcessInfo {
pid,
name: process_name.clone(),
cmdline,
});
}
}
}
}
}
}
}
processes
}
#[cfg(target_os = "macos")]
fn find_processes_by_name_macos(filter: &str) -> Vec<ProcessInfo> {
use std::process::Command;
let filter_lower = filter.to_lowercase();
let mut processes = Vec::new();
let output = match Command::new("ps").args(["-A", "-o", "pid=,comm="]).output() {
Ok(o) => o,
Err(_) => return Vec::new(),
};
if !output.status.success() {
return Vec::new();
}
let ps_output = String::from_utf8_lossy(&output.stdout);
for line in ps_output.lines() {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 2 {
if let Ok(pid) = parts[0].parse::<u32>() {
let comm = parts[1..].join(" ");
if comm.to_lowercase().contains(&filter_lower) {
let cmdline = Command::new("ps")
.args(["-p", &pid.to_string(), "-o", "args="])
.output()
.ok()
.and_then(|o| {
if o.status.success() {
let args = String::from_utf8_lossy(&o.stdout);
let trimmed = args.trim();
if !trimmed.is_empty() {
Some(trimmed.to_string())
} else {
None
}
} else {
None
}
});
processes.push(ProcessInfo {
pid,
name: comm,
cmdline,
});
}
}
}
}
processes
}
pub type ServiceId = NodeIndex;
#[derive(Debug, Clone)]
pub struct Service {
pub name: String,
pub config: ServiceConfigKind,
pub state: ServiceState,
pub restart_count: u32,
pub current_restart_delay_ms: u64,
pub started_at: Option<u64>,
pub last_state_change: u64,
pub last_exit_code: Option<i32>,
pub last_exit_signal: Option<i32>,
pub ephemeral: bool,
}
#[derive(Debug, Clone)]
#[allow(clippy::large_enum_variant)]
pub enum ServiceConfigKind {
Service(ServiceConfig),
Target(TargetConfig),
}
impl Service {
pub fn from_service(config: ServiceConfig) -> Self {
let initial_delay = config.lifecycle.restart_delay_ms;
Self {
name: config.service.name.clone(),
config: ServiceConfigKind::Service(config),
state: ServiceState::Inactive,
restart_count: 0,
current_restart_delay_ms: initial_delay,
started_at: None,
last_state_change: now_millis(),
last_exit_code: None,
last_exit_signal: None,
ephemeral: false,
}
}
pub fn from_service_ephemeral(config: ServiceConfig) -> Self {
let mut service = Self::from_service(config);
service.ephemeral = true;
service
}
pub fn from_target(config: TargetConfig) -> Self {
Self {
name: config.target.name.clone(),
config: ServiceConfigKind::Target(config),
state: ServiceState::Inactive,
restart_count: 0,
current_restart_delay_ms: 0, started_at: None,
last_state_change: now_millis(),
last_exit_code: None,
last_exit_signal: None,
ephemeral: false,
}
}
pub fn touch_state_change(&mut self) {
self.last_state_change = now_millis();
}
pub fn record_started(&mut self) {
self.started_at = Some(now_millis());
self.touch_state_change();
}
pub fn record_exit(&mut self, exit_code: Option<i32>, signal: Option<i32>) {
self.last_exit_code = exit_code;
self.last_exit_signal = signal;
self.touch_state_change();
}
pub fn is_target(&self) -> bool {
matches!(self.config, ServiceConfigKind::Target(_))
}
pub fn dependencies(&self) -> &DependencyDef {
match &self.config {
ServiceConfigKind::Service(c) => &c.dependencies,
ServiceConfigKind::Target(c) => &c.dependencies,
}
}
pub fn service_config(&self) -> Option<&ServiceConfig> {
match &self.config {
ServiceConfigKind::Service(c) => Some(c),
ServiceConfigKind::Target(_) => None,
}
}
pub fn desired_status(&self) -> crate::sdk::Status {
match &self.config {
ServiceConfigKind::Service(c) => c.service.status,
ServiceConfigKind::Target(_) => crate::sdk::Status::Start,
}
}
pub fn should_autostart(&self) -> bool {
self.desired_status().should_autostart()
}
pub fn service_class(&self) -> crate::sdk::ServiceClass {
match &self.config {
ServiceConfigKind::Service(c) => c.service.class,
ServiceConfigKind::Target(_) => crate::sdk::ServiceClass::User,
}
}
pub fn is_protected(&self) -> bool {
self.service_class().is_system()
}
pub fn is_critical(&self) -> bool {
match &self.config {
ServiceConfigKind::Service(c) => c.service.critical,
ServiceConfigKind::Target(c) => c.target.critical,
}
}
pub fn should_restart(&self, exit_code: Option<i32>) -> bool {
let config = match self.service_config() {
Some(c) => c,
None => return false, };
if !self.should_autostart() {
return false;
}
use crate::sdk::RestartPolicy;
let policy_allows = match config.lifecycle.restart {
RestartPolicy::Always => true,
RestartPolicy::OnFailure => exit_code != Some(0),
RestartPolicy::Never => false,
};
if !policy_allows {
return false;
}
let max = config.lifecycle.max_restarts;
if max > 0 && self.restart_count >= max {
return false;
}
true
}
pub fn next_restart_delay(&mut self, exit_code: Option<i32>) -> Option<u64> {
if !self.should_restart(exit_code) {
return None;
}
let delay = self.current_restart_delay_ms;
if let Some(config) = self.service_config() {
self.current_restart_delay_ms =
(self.current_restart_delay_ms * 2).min(config.lifecycle.restart_delay_max_ms);
}
self.restart_count += 1;
Some(delay)
}
pub fn reset_backoff(&mut self) {
self.restart_count = 0;
if let Some(config) = self.service_config() {
self.current_restart_delay_ms = config.lifecycle.restart_delay_ms;
}
}
pub fn try_reset_backoff(&mut self) -> bool {
let config = match self.service_config() {
Some(c) => c,
None => return false, };
let stability_period = config.lifecycle.stability_period_ms;
let now = now_millis();
if let Some(started_at) = self.started_at {
let running_time = now.saturating_sub(started_at);
if running_time >= stability_period {
tracing::debug!(
service = %self.name,
running_time_ms = running_time,
stability_period_ms = stability_period,
"service stable, resetting backoff"
);
self.reset_backoff();
return true;
} else {
tracing::debug!(
service = %self.name,
running_time_ms = running_time,
stability_period_ms = stability_period,
current_delay = self.current_restart_delay_ms,
restart_count = self.restart_count,
"service unstable, keeping backoff"
);
}
}
false
}
}
#[derive(Debug)]
pub struct ServiceGraph {
graph: DiGraph<Service, DepType>,
by_name: HashMap<String, ServiceId>,
}
impl Default for ServiceGraph {
fn default() -> Self {
Self::new()
}
}
impl ServiceGraph {
pub fn new() -> Self {
Self {
graph: DiGraph::new(),
by_name: HashMap::new(),
}
}
pub fn load_from_directory(path: &Path) -> GraphResult<(Self, Vec<(String, String)>)> {
let mut graph = Self::new();
graph.load_services_from_path(path, None, &HashSet::new())?;
let missing_deps = graph.link_dependencies()?;
graph.validate()?;
Ok((graph, missing_deps))
}
pub fn load_from_system_directory(&mut self, path: &Path) -> GraphResult<Vec<String>> {
if !path.exists() {
return Ok(vec![]);
}
let before_count = self.by_name.len();
self.load_services_from_path(
path,
Some(crate::sdk::ServiceClass::System),
&HashSet::new(),
)?;
let names: Vec<String> = self.by_name.keys().skip(before_count).cloned().collect();
tracing::info!(
path = %path.display(),
count = names.len(),
"loaded system services"
);
Ok(names)
}
pub fn load_from_user_directory(
&mut self,
path: &Path,
skip: &HashSet<String>,
) -> GraphResult<()> {
if !path.exists() {
return Ok(());
}
self.load_services_from_path(path, None, skip)?;
Ok(())
}
fn load_services_from_path(
&mut self,
path: &Path,
class_override: Option<crate::sdk::ServiceClass>,
skip: &HashSet<String>,
) -> GraphResult<()> {
if !path.exists() {
return Ok(());
}
let entries = fs::read_dir(path)
.map_err(|e| GraphError::ServiceNotFound(format!("cannot read directory: {}", e)))?;
for entry in entries.flatten() {
let file_path = entry.path();
if file_path.extension().is_some_and(|e| e == "toml") {
let content = fs::read_to_string(&file_path).map_err(|e| {
GraphError::ServiceNotFound(format!(
"cannot read {}: {}",
file_path.display(),
e
))
})?;
if let Ok(mut config) = ServiceConfig::parse(&content) {
if skip.contains(&config.service.name) {
tracing::warn!(
service = %config.service.name,
"skipping user service (shadowed by system service)"
);
continue;
}
if let Some(class) = class_override {
config.service.class = class;
}
let errors = validate::validate_service(&config);
if errors.is_empty() {
self.add_service(Service::from_service(config))?;
} else {
tracing::warn!(
"skipping invalid service {}: {}",
file_path.display(),
errors.join(", ")
);
}
} else if let Ok(config) = TargetConfig::parse(&content) {
if skip.contains(&config.target.name) {
tracing::warn!(
target = %config.target.name,
"skipping user target (shadowed by system target)"
);
continue;
}
let errors = validate::validate_target(&config);
if errors.is_empty() {
self.add_service(Service::from_target(config))?;
} else {
tracing::warn!(
"skipping invalid target {}: {}",
file_path.display(),
errors.join(", ")
);
}
} else {
tracing::warn!("skipping unparseable config: {}", file_path.display());
}
}
}
Ok(())
}
pub fn add_service(&mut self, service: Service) -> GraphResult<ServiceId> {
if self.by_name.contains_key(&service.name) {
return Err(GraphError::ServiceAlreadyExists(service.name));
}
let name = service.name.clone();
let id = self.graph.add_node(service);
self.by_name.insert(name, id);
Ok(id)
}
pub fn link_dependencies(&mut self) -> GraphResult<Vec<(String, String)>> {
let mut edges_to_add = Vec::new();
let mut missing_deps = Vec::new();
for id in self.graph.node_indices() {
let service = &self.graph[id];
let service_name = service.name.clone();
let deps = service.dependencies().clone();
for dep_name in &deps.after {
if let Some(&dep_id) = self.by_name.get(dep_name) {
edges_to_add.push((dep_id, id, DepType::After));
}
}
for dep_name in &deps.requires {
if let Some(&dep_id) = self.by_name.get(dep_name) {
edges_to_add.push((dep_id, id, DepType::Requires));
} else {
missing_deps.push((service_name.clone(), dep_name.clone()));
tracing::warn!(
service = %service_name,
dependency = %dep_name,
"service has missing required dependency"
);
}
}
for dep_name in &deps.wants {
if let Some(&dep_id) = self.by_name.get(dep_name) {
edges_to_add.push((dep_id, id, DepType::Wants));
}
}
for conflict_name in &deps.conflicts {
if let Some(&conflict_id) = self.by_name.get(conflict_name) {
edges_to_add.push((id, conflict_id, DepType::Conflicts));
edges_to_add.push((conflict_id, id, DepType::Conflicts));
}
}
}
for (from, to, dep_type) in edges_to_add {
let already_exists = self
.graph
.edges_connecting(from, to)
.any(|e| *e.weight() == dep_type);
if !already_exists {
self.graph.add_edge(from, to, dep_type);
}
}
Ok(missing_deps)
}
pub fn link_service_dependencies(&mut self, id: ServiceId) -> Vec<(String, String)> {
let service = match self.graph.node_weight(id) {
Some(s) => s,
None => return vec![],
};
let service_name = service.name.clone();
let deps = service.dependencies().clone();
let mut edges_to_add = Vec::new();
let mut missing_deps = Vec::new();
for dep_name in &deps.after {
if let Some(&dep_id) = self.by_name.get(dep_name) {
edges_to_add.push((dep_id, id, DepType::After));
}
}
for dep_name in &deps.requires {
if let Some(&dep_id) = self.by_name.get(dep_name) {
edges_to_add.push((dep_id, id, DepType::Requires));
} else {
missing_deps.push((service_name.clone(), dep_name.clone()));
tracing::warn!(
service = %service_name,
dependency = %dep_name,
"service has missing required dependency"
);
}
}
for dep_name in &deps.wants {
if let Some(&dep_id) = self.by_name.get(dep_name) {
edges_to_add.push((dep_id, id, DepType::Wants));
}
}
for conflict_name in &deps.conflicts {
if let Some(&conflict_id) = self.by_name.get(conflict_name) {
edges_to_add.push((id, conflict_id, DepType::Conflicts));
edges_to_add.push((conflict_id, id, DepType::Conflicts));
}
}
for (from, to, dep_type) in edges_to_add {
let already_exists = self
.graph
.edges_connecting(from, to)
.any(|e| *e.weight() == dep_type);
if !already_exists {
self.graph.add_edge(from, to, dep_type);
}
}
missing_deps
}
pub fn validate(&self) -> GraphResult<()> {
let filtered: DiGraph<(), ()> = self.graph.filter_map(
|_, _| Some(()),
|_, weight| {
if *weight != DepType::Conflicts {
Some(())
} else {
None
}
},
);
if is_cyclic_directed(&filtered) {
let cycle = self.find_cycle(&filtered);
if !cycle.is_empty() {
return Err(GraphError::CyclicDependency(cycle));
}
}
Ok(())
}
fn find_cycle(&self, filtered: &DiGraph<(), ()>) -> Vec<String> {
use petgraph::algo::kosaraju_scc;
let sccs = kosaraju_scc(filtered);
for scc in sccs {
if scc.len() > 1 {
return scc
.iter()
.filter_map(|id| self.graph.node_weight(*id))
.map(|s| s.name.clone())
.collect();
}
}
vec![]
}
pub fn get(&self, id: ServiceId) -> Option<&Service> {
self.graph.node_weight(id)
}
pub fn get_mut(&mut self, id: ServiceId) -> Option<&mut Service> {
self.graph.node_weight_mut(id)
}
pub fn get_by_name(&self, name: &str) -> Option<ServiceId> {
self.by_name.get(name).copied()
}
pub fn all_services(&self) -> impl Iterator<Item = ServiceId> + '_ {
self.graph.node_indices()
}
pub fn len(&self) -> usize {
self.graph.node_count()
}
pub fn is_empty(&self) -> bool {
self.graph.node_count() == 0
}
pub fn dependencies(&self, id: ServiceId) -> Vec<(ServiceId, DepType)> {
self.graph
.edges_directed(id, Direction::Incoming)
.filter(|e| *e.weight() != DepType::Conflicts)
.map(|e| (e.source(), *e.weight()))
.collect()
}
pub fn dependents(&self, id: ServiceId) -> Vec<ServiceId> {
self.graph
.edges_directed(id, Direction::Outgoing)
.filter(|e| *e.weight() != DepType::Conflicts)
.map(|e| e.target())
.collect()
}
pub fn conflicts(&self, id: ServiceId) -> Vec<ServiceId> {
self.graph
.edges_directed(id, Direction::Outgoing)
.filter(|e| *e.weight() == DepType::Conflicts)
.map(|e| e.target())
.collect()
}
pub fn start_order(&self) -> Vec<ServiceId> {
let filtered: DiGraph<(), ()> = self.graph.filter_map(
|_, _| Some(()),
|_e, weight| {
if *weight != DepType::Conflicts {
Some(())
} else {
None
}
},
);
match toposort(&filtered, None) {
Ok(order) => order,
Err(_) => {
self.graph.node_indices().collect()
}
}
}
pub fn shutdown_order(&self) -> Vec<ServiceId> {
let mut order = self.start_order();
order.reverse();
order
}
pub fn all_dependents_ordered(&self, id: ServiceId) -> Vec<ServiceId> {
let mut result = Vec::new();
let mut visited = HashSet::new();
self.collect_dependents_recursive(id, &mut result, &mut visited);
result
}
fn collect_dependents_recursive(
&self,
id: ServiceId,
result: &mut Vec<ServiceId>,
visited: &mut HashSet<ServiceId>,
) {
for dependent_id in self.dependents(id) {
if visited.insert(dependent_id) {
self.collect_dependents_recursive(dependent_id, result, visited);
result.push(dependent_id);
}
}
}
pub fn can_start(&self, id: ServiceId) -> Result<(), BlockedReason> {
let mut waiting_on = Vec::new();
let mut conflicts_with = Vec::new();
for edge in self.graph.edges_directed(id, Direction::Incoming) {
let dep_id = edge.source();
let dep_type = *edge.weight();
let dep = &self.graph[dep_id];
match dep_type {
DepType::Requires => {
if !dep.state.is_satisfied() {
waiting_on.push(dep.name.clone());
}
}
DepType::After => {
if matches!(
dep.state,
ServiceState::Inactive | ServiceState::Blocked { .. }
) {
waiting_on.push(dep.name.clone());
}
}
DepType::Wants => {
}
DepType::Conflicts => {
}
}
}
for conflict_id in self.conflicts(id) {
let conflict = &self.graph[conflict_id];
if conflict.state.is_active() {
conflicts_with.push(conflict.name.clone());
}
}
let service = &self.graph[id];
let port_conflict = if let Some(config) = service.service_config() {
if !config.service.ports.is_empty() {
let mut conflicted_ports = Vec::new();
let mut services_with_ports = Vec::new();
for other_id in self.graph.node_indices() {
if other_id == id {
continue;
}
let other = &self.graph[other_id];
if !other.state.is_active() {
continue;
}
if let Some(other_config) = other.service_config() {
for other_port in &other_config.service.ports {
for our_port in &config.service.ports {
if other_port == our_port {
if !conflicted_ports.contains(our_port) {
conflicted_ports.push(*our_port);
}
if !services_with_ports.contains(&other.name) {
services_with_ports.push(other.name.clone());
}
}
}
}
}
}
if !conflicted_ports.is_empty() {
Some((conflicted_ports, services_with_ports))
} else {
None
}
} else {
None
}
} else {
None
};
if let Some((ports, services)) = port_conflict {
return Err(BlockedReason::PortConflict { ports, services });
}
if let Some(config) = service.service_config() {
for port in &config.service.ports {
if let Some(owner) = check_port_in_use(*port) {
if owner.pid > 0 {
return Err(BlockedReason::ExternalPortConflict {
port: *port,
pid: Some(owner.pid),
process_name: owner.name,
});
}
}
}
}
if let Some(config) = service.service_config() {
if !config.service.process_filters.is_empty() {
let mut all_conflicts = Vec::new();
for filter in &config.service.process_filters {
let processes = find_processes_by_name(filter);
if !processes.is_empty() {
tracing::warn!(
service = %service.name,
filter = %filter,
count = processes.len(),
pids = ?processes.iter().map(|p| p.pid).collect::<Vec<_>>(),
"process name conflict detected"
);
for p in processes {
all_conflicts.push((
filter.clone(),
ProcessConflictInfo {
pid: p.pid,
name: p.name,
cmdline: p.cmdline,
},
));
}
}
}
if !all_conflicts.is_empty() {
let mut conflicts_by_filter = std::collections::HashMap::new();
for (filter, conflict) in all_conflicts {
conflicts_by_filter
.entry(filter)
.or_insert_with(Vec::new)
.push(conflict);
}
if let Some((filter, processes)) = conflicts_by_filter.into_iter().next() {
return Err(BlockedReason::ProcessNameConflict { filter, processes });
}
}
}
}
if waiting_on.is_empty() && conflicts_with.is_empty() {
Ok(())
} else if !waiting_on.is_empty() && !conflicts_with.is_empty() {
Err(BlockedReason::Both {
waiting_on,
conflicts_with,
})
} else if !waiting_on.is_empty() {
Err(BlockedReason::WaitingOn(waiting_on))
} else {
Err(BlockedReason::ConflictsWith(conflicts_with))
}
}
pub fn all_requires_satisfied(&self, id: ServiceId) -> bool {
for edge in self.graph.edges_directed(id, Direction::Incoming) {
if *edge.weight() == DepType::Requires {
let dep = &self.graph[edge.source()];
if !dep.state.is_satisfied() {
return false;
}
}
}
true
}
pub fn remove_service(&mut self, name: &str) -> GraphResult<()> {
let id = self
.by_name
.get(name)
.copied()
.ok_or_else(|| GraphError::ServiceNotFound(name.to_string()))?;
let dependents: Vec<String> = self
.dependents(id)
.iter()
.map(|dep_id| self.graph[*dep_id].name.clone())
.collect();
if !dependents.is_empty() {
return Err(GraphError::HasDependents {
service: name.to_string(),
dependents,
});
}
tracing::debug!(
service = %name,
node_id = ?id,
"removing service from graph"
);
self.by_name.remove(name);
let last_index = NodeIndex::new(self.graph.node_count() - 1);
if id != last_index {
if let Some(last_service) = self.graph.node_weight(last_index) {
let last_name = last_service.name.clone();
self.by_name.insert(last_name, id);
}
}
self.graph.remove_node(id);
tracing::debug!(
service = %name,
"service removed from graph"
);
Ok(())
}
pub fn reload_from_directory(&mut self, path: &Path) -> GraphResult<ReloadResult> {
let (mut new_graph, missing_deps) = Self::load_from_directory(path)?;
let mut diff = self.compute_diff(&new_graph);
diff.config_errors = missing_deps.clone();
self.preserve_runtime_state(&mut new_graph);
new_graph.mark_missing_deps_failed(&missing_deps);
*self = new_graph;
Ok(diff)
}
pub fn reload_from_directories(
&mut self,
system_dir: Option<&Path>,
user_dir: &Path,
) -> GraphResult<(ReloadResult, HashSet<String>)> {
let mut new_graph = Self::new();
let mut system_names = HashSet::new();
if let Some(sys_path) = system_dir
&& let Ok(names) = new_graph.load_from_system_directory(sys_path)
{
system_names = names.into_iter().collect();
}
new_graph.load_from_user_directory(user_dir, &system_names)?;
let missing_deps = new_graph.link_dependencies()?;
new_graph.validate()?;
let mut diff = self.compute_diff(&new_graph);
diff.config_errors = missing_deps.clone();
self.preserve_runtime_state(&mut new_graph);
new_graph.mark_missing_deps_failed(&missing_deps);
*self = new_graph;
Ok((diff, system_names))
}
fn preserve_runtime_state(&self, new_graph: &mut ServiceGraph) {
for (name, &old_id) in &self.by_name {
if let Some(&new_id) = new_graph.by_name.get(name) {
let old_service = &self.graph[old_id];
if let Some(new_service) = new_graph.graph.node_weight_mut(new_id) {
new_service.state = old_service.state.clone();
new_service.restart_count = old_service.restart_count;
new_service.current_restart_delay_ms = old_service.current_restart_delay_ms;
new_service.started_at = old_service.started_at;
new_service.last_state_change = old_service.last_state_change;
new_service.last_exit_code = old_service.last_exit_code;
new_service.last_exit_signal = old_service.last_exit_signal;
tracing::trace!(
service = %name,
state = %new_service.state,
"preserved runtime state during reload"
);
}
}
}
}
fn compute_diff(&self, new: &ServiceGraph) -> ReloadResult {
let mut result = ReloadResult::default();
for name in new.by_name.keys() {
if !self.by_name.contains_key(name) {
result.added.push(name.clone());
}
}
for name in self.by_name.keys() {
if !new.by_name.contains_key(name) {
result.removed.push(name.clone());
}
}
for (name, &old_id) in &self.by_name {
if let Some(&new_id) = new.by_name.get(name) {
let old_service = &self.graph[old_id];
let new_service = &new.graph[new_id];
let changed = match (&old_service.config, &new_service.config) {
(ServiceConfigKind::Service(old), ServiceConfigKind::Service(new)) => {
old != new
}
(ServiceConfigKind::Target(old), ServiceConfigKind::Target(new)) => old != new,
_ => true, };
if changed {
result.changed.push(name.clone());
}
}
}
result
}
pub fn format_why_blocked(&self, name: &str) -> Option<String> {
let id = self.by_name.get(name)?;
let service = &self.graph[*id];
let mut lines = Vec::new();
lines.push(format!(
"{} {} ({})",
service.state.symbol(),
service.name,
service.state.name()
));
match self.can_start(*id) {
Ok(()) => {
lines.push(" Can start: all dependencies satisfied".to_string());
}
Err(reason) => {
for dep_name in reason.waiting_on() {
if let Some(&dep_id) = self.by_name.get(&dep_name) {
let dep = &self.graph[dep_id];
lines.push(format!(
" {} waiting on {} ({})",
"->",
dep.name,
dep.state.name()
));
}
}
for conflict_name in reason.conflicts_with() {
if let Some(&conflict_id) = self.by_name.get(&conflict_name) {
let conflict = &self.graph[conflict_id];
lines.push(format!(
" {} conflicts with {} ({})",
"!>",
conflict.name,
conflict.state.name()
));
}
}
}
}
Some(lines.join("\n"))
}
pub fn format_tree(&self) -> String {
let mut lines = Vec::new();
let order = self.start_order();
for (i, id) in order.iter().enumerate() {
let service = &self.graph[*id];
let is_last = i == order.len() - 1;
let prefix = if is_last { "└─" } else { "├─" };
lines.push(format!(
"{} {} {} ({})",
prefix,
service.state.symbol(),
service.name,
service.state.name()
));
let deps = self.dependencies(*id);
for (j, (dep_id, dep_type)) in deps.iter().enumerate() {
let dep = &self.graph[*dep_id];
let dep_prefix = if is_last { " " } else { "│ " };
let dep_branch = if j == deps.len() - 1 {
"└─"
} else {
"├─"
};
lines.push(format!(
"{} {} {} {} ({})",
dep_prefix,
dep_branch,
dep_type,
dep.name,
dep.state.name()
));
}
}
lines.join("\n")
}
pub fn set_state(&mut self, id: ServiceId, state: ServiceState) {
if let Some(service) = self.graph.node_weight_mut(id) {
service.state = state;
service.touch_state_change();
}
}
pub fn mark_missing_deps_failed(&mut self, missing_deps: &[(String, String)]) -> usize {
use crate::sdk::FailureReason;
use std::collections::HashMap as StdHashMap;
let mut by_service: StdHashMap<&str, Vec<&str>> = StdHashMap::new();
for (service, dep) in missing_deps {
by_service
.entry(service.as_str())
.or_default()
.push(dep.as_str());
}
let mut count = 0;
for (service_name, deps) in by_service {
if let Some(id) = self.get_by_name(service_name) {
let first_dep = deps[0].to_string();
self.set_state(
id,
ServiceState::Failed {
reason: FailureReason::MissingDependency {
dependency: first_dep,
},
},
);
count += 1;
}
}
count
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sdk::{LifecycleDef, LoggingDef, ServiceDef, TargetDef};
use std::collections::HashMap as StdHashMap;
fn make_service(name: &str) -> Service {
Service::from_service(ServiceConfig {
service: ServiceDef {
name: name.to_string(),
exec: format!("/bin/{}", name),
dir: None,
oneshot: false,
env: StdHashMap::new(),
status: crate::sdk::Status::default(),
class: crate::sdk::ServiceClass::default(),
critical: false,
ports: Vec::new(),
kill_others: false,
process_filters: Vec::new(),
},
dependencies: DependencyDef::default(),
lifecycle: LifecycleDef::default(),
health: None,
logging: LoggingDef::default(),
})
}
fn make_service_with_deps(name: &str, requires: Vec<&str>) -> Service {
Service::from_service(ServiceConfig {
service: ServiceDef {
name: name.to_string(),
exec: format!("/bin/{}", name),
dir: None,
oneshot: false,
env: StdHashMap::new(),
status: crate::sdk::Status::default(),
class: crate::sdk::ServiceClass::default(),
critical: false,
ports: Vec::new(),
kill_others: false,
process_filters: Vec::new(),
},
dependencies: DependencyDef {
requires: requires.iter().map(|s| s.to_string()).collect(),
..Default::default()
},
lifecycle: LifecycleDef::default(),
health: None,
logging: LoggingDef::default(),
})
}
#[test]
fn test_add_service() {
let mut graph = ServiceGraph::new();
let id = graph.add_service(make_service("test")).unwrap();
assert!(graph.get(id).is_some());
assert_eq!(graph.get(id).unwrap().name, "test");
}
#[test]
fn test_duplicate_service() {
let mut graph = ServiceGraph::new();
graph.add_service(make_service("test")).unwrap();
let result = graph.add_service(make_service("test"));
assert!(matches!(result, Err(GraphError::ServiceAlreadyExists(_))));
}
#[test]
fn test_get_by_name() {
let mut graph = ServiceGraph::new();
let id = graph.add_service(make_service("test")).unwrap();
assert_eq!(graph.get_by_name("test"), Some(id));
assert_eq!(graph.get_by_name("nonexistent"), None);
}
#[test]
fn test_start_order() {
let mut graph = ServiceGraph::new();
graph.add_service(make_service("a")).unwrap();
graph
.add_service(make_service_with_deps("b", vec!["a"]))
.unwrap();
graph.link_dependencies().unwrap();
let order = graph.start_order();
let a_idx = order
.iter()
.position(|id| graph.get(*id).unwrap().name == "a")
.unwrap();
let b_idx = order
.iter()
.position(|id| graph.get(*id).unwrap().name == "b")
.unwrap();
assert!(a_idx < b_idx); }
#[test]
fn test_can_start_no_deps() {
let mut graph = ServiceGraph::new();
let id = graph.add_service(make_service("test")).unwrap();
assert!(graph.can_start(id).is_ok());
}
#[test]
fn test_can_start_unsatisfied_requires() {
let mut graph = ServiceGraph::new();
graph.add_service(make_service("dep")).unwrap();
graph
.add_service(make_service_with_deps("test", vec!["dep"]))
.unwrap();
graph.link_dependencies().unwrap();
let test_id = graph.get_by_name("test").unwrap();
let result = graph.can_start(test_id);
assert!(matches!(result, Err(BlockedReason::WaitingOn(_))));
}
#[test]
fn test_can_start_satisfied_requires() {
let mut graph = ServiceGraph::new();
let dep_id = graph.add_service(make_service("dep")).unwrap();
graph
.add_service(make_service_with_deps("test", vec!["dep"]))
.unwrap();
graph.link_dependencies().unwrap();
graph.set_state(dep_id, ServiceState::Running { pid: 123 });
let test_id = graph.get_by_name("test").unwrap();
assert!(graph.can_start(test_id).is_ok());
}
#[test]
fn test_target_service() {
let target = Service::from_target(TargetConfig {
target: TargetDef {
name: "multi-user".to_string(),
critical: false,
},
dependencies: DependencyDef::default(),
});
assert!(target.is_target());
assert!(target.service_config().is_none());
}
#[test]
fn test_format_tree() {
let mut graph = ServiceGraph::new();
graph.add_service(make_service("a")).unwrap();
graph.add_service(make_service("b")).unwrap();
let tree = graph.format_tree();
assert!(tree.contains("a"));
assert!(tree.contains("b"));
}
#[test]
fn test_shutdown_order_simple() {
let mut graph = ServiceGraph::new();
graph.add_service(make_service("database")).unwrap();
graph
.add_service(make_service_with_deps("app", vec!["database"]))
.unwrap();
graph.link_dependencies().unwrap();
let shutdown = graph.shutdown_order();
let names: Vec<_> = shutdown
.iter()
.map(|id| graph.get(*id).unwrap().name.as_str())
.collect();
let app_idx = names.iter().position(|n| *n == "app").unwrap();
let db_idx = names.iter().position(|n| *n == "database").unwrap();
assert!(app_idx < db_idx, "app should stop before database");
}
#[test]
fn test_shutdown_order_diamond() {
let mut graph = ServiceGraph::new();
graph.add_service(make_service("a")).unwrap();
graph
.add_service(make_service_with_deps("b", vec!["a"]))
.unwrap();
graph
.add_service(make_service_with_deps("c", vec!["a"]))
.unwrap();
graph
.add_service(make_service_with_deps("d", vec!["b", "c"]))
.unwrap();
graph.link_dependencies().unwrap();
let shutdown = graph.shutdown_order();
let names: Vec<_> = shutdown
.iter()
.map(|id| graph.get(*id).unwrap().name.as_str())
.collect();
let d_idx = names.iter().position(|n| *n == "d").unwrap();
let b_idx = names.iter().position(|n| *n == "b").unwrap();
let c_idx = names.iter().position(|n| *n == "c").unwrap();
let a_idx = names.iter().position(|n| *n == "a").unwrap();
assert!(d_idx < b_idx, "d should stop before b");
assert!(d_idx < c_idx, "d should stop before c");
assert!(b_idx < a_idx, "b should stop before a");
assert!(c_idx < a_idx, "c should stop before a");
}
#[test]
fn test_all_dependents_ordered_chain() {
let mut graph = ServiceGraph::new();
graph.add_service(make_service("a")).unwrap();
graph
.add_service(make_service_with_deps("b", vec!["a"]))
.unwrap();
graph
.add_service(make_service_with_deps("c", vec!["b"]))
.unwrap();
graph
.add_service(make_service_with_deps("d", vec!["c"]))
.unwrap();
graph.link_dependencies().unwrap();
let a_id = graph.get_by_name("a").unwrap();
let dependents = graph.all_dependents_ordered(a_id);
let names: Vec<_> = dependents
.iter()
.map(|id| graph.get(*id).unwrap().name.as_str())
.collect();
assert_eq!(names.len(), 3);
assert_eq!(names, vec!["d", "c", "b"]);
}
#[test]
fn test_all_dependents_ordered_diamond() {
let mut graph = ServiceGraph::new();
graph.add_service(make_service("database")).unwrap();
graph
.add_service(make_service_with_deps("app", vec!["database"]))
.unwrap();
graph
.add_service(make_service_with_deps("worker", vec!["database"]))
.unwrap();
graph.link_dependencies().unwrap();
let db_id = graph.get_by_name("database").unwrap();
let dependents = graph.all_dependents_ordered(db_id);
let names: Vec<_> = dependents
.iter()
.map(|id| graph.get(*id).unwrap().name.as_str())
.collect();
assert_eq!(names.len(), 2);
assert!(names.contains(&"app"));
assert!(names.contains(&"worker"));
}
#[test]
fn test_all_dependents_ordered_none() {
let mut graph = ServiceGraph::new();
graph.add_service(make_service("database")).unwrap();
graph
.add_service(make_service_with_deps("app", vec!["database"]))
.unwrap();
graph.link_dependencies().unwrap();
let app_id = graph.get_by_name("app").unwrap();
let dependents = graph.all_dependents_ordered(app_id);
assert!(dependents.is_empty());
}
#[test]
fn test_load_from_system_directory_nonexistent() {
let mut graph = ServiceGraph::new();
let path = std::path::Path::new("/nonexistent/path");
let result = graph.load_from_system_directory(path);
assert!(result.is_ok());
assert!(result.unwrap().is_empty());
}
#[test]
fn test_load_from_user_directory_with_skip() {
let mut graph = ServiceGraph::new();
let mut system_svc = make_service("network");
if let ServiceConfigKind::Service(ref mut c) = system_svc.config {
c.service.class = crate::sdk::ServiceClass::System;
}
graph.add_service(system_svc).unwrap();
let skip: HashSet<String> = vec!["network".to_string()].into_iter().collect();
let result = graph.load_from_user_directory(std::path::Path::new("/nonexistent"), &skip);
assert!(result.is_ok());
assert!(graph.get_by_name("network").is_some());
}
#[test]
fn test_reload_from_directories_empty() {
let mut graph = ServiceGraph::new();
graph.add_service(make_service("old-service")).unwrap();
let result = graph.reload_from_directories(
Some(std::path::Path::new("/nonexistent/system")),
std::path::Path::new("/nonexistent/user"),
);
assert!(result.is_ok());
let (diff, system_names) = result.unwrap();
assert!(diff.removed.contains(&"old-service".to_string()));
assert!(system_names.is_empty());
}
#[test]
fn test_missing_dependency_graceful() {
let mut graph = ServiceGraph::new();
graph
.add_service(make_service_with_deps("app", vec!["missing-db"]))
.unwrap();
let missing = graph.link_dependencies().unwrap();
assert_eq!(missing.len(), 1);
assert_eq!(missing[0].0, "app");
assert_eq!(missing[0].1, "missing-db");
assert!(graph.validate().is_ok());
}
#[test]
fn test_mark_missing_deps_failed() {
use crate::sdk::FailureReason;
let mut graph = ServiceGraph::new();
graph
.add_service(make_service_with_deps("app", vec!["missing-db"]))
.unwrap();
let missing = graph.link_dependencies().unwrap();
let count = graph.mark_missing_deps_failed(&missing);
assert_eq!(count, 1);
let app_id = graph.get_by_name("app").unwrap();
let app = graph.get(app_id).unwrap();
match &app.state {
ServiceState::Failed { reason } => match reason {
FailureReason::MissingDependency { dependency } => {
assert_eq!(dependency, "missing-db");
}
_ => panic!("expected MissingDependency reason"),
},
_ => panic!("expected Failed state"),
}
}
#[test]
fn test_mixed_valid_and_missing_deps() {
let mut graph = ServiceGraph::new();
graph.add_service(make_service("database")).unwrap();
let mut svc = make_service_with_deps("app", vec!["database", "missing-cache"]);
if let ServiceConfigKind::Service(ref mut c) = svc.config {
c.dependencies.requires = vec!["database".to_string(), "missing-cache".to_string()];
}
graph.add_service(svc).unwrap();
let missing = graph.link_dependencies().unwrap();
assert_eq!(missing.len(), 1);
assert_eq!(missing[0].0, "app");
assert_eq!(missing[0].1, "missing-cache");
let app_id = graph.get_by_name("app").unwrap();
let deps = graph.dependencies(app_id);
assert_eq!(deps.len(), 1); }
#[test]
fn test_preserve_runtime_state() {
let mut old_graph = ServiceGraph::new();
let db_id = old_graph.add_service(make_service("database")).unwrap();
let app_id = old_graph.add_service(make_service("app")).unwrap();
old_graph.set_state(db_id, ServiceState::Running { pid: 1234 });
if let Some(db) = old_graph.get_mut(db_id) {
db.restart_count = 3;
db.started_at = Some(1000);
db.current_restart_delay_ms = 8000;
}
old_graph.set_state(app_id, ServiceState::Running { pid: 5678 });
if let Some(app) = old_graph.get_mut(app_id) {
app.restart_count = 1;
app.started_at = Some(2000);
}
let mut new_graph = ServiceGraph::new();
new_graph.add_service(make_service("database")).unwrap();
new_graph.add_service(make_service("app")).unwrap();
new_graph.add_service(make_service("new-service")).unwrap();
let new_db_id = new_graph.get_by_name("database").unwrap();
assert!(matches!(
new_graph.get(new_db_id).unwrap().state,
ServiceState::Inactive
));
old_graph.preserve_runtime_state(&mut new_graph);
let new_db = new_graph.get(new_db_id).unwrap();
assert!(matches!(new_db.state, ServiceState::Running { pid: 1234 }));
assert_eq!(new_db.restart_count, 3);
assert_eq!(new_db.started_at, Some(1000));
assert_eq!(new_db.current_restart_delay_ms, 8000);
let new_app_id = new_graph.get_by_name("app").unwrap();
let new_app = new_graph.get(new_app_id).unwrap();
assert!(matches!(new_app.state, ServiceState::Running { pid: 5678 }));
assert_eq!(new_app.restart_count, 1);
assert_eq!(new_app.started_at, Some(2000));
let new_svc_id = new_graph.get_by_name("new-service").unwrap();
let new_svc = new_graph.get(new_svc_id).unwrap();
assert!(matches!(new_svc.state, ServiceState::Inactive));
assert_eq!(new_svc.restart_count, 0);
}
#[test]
fn test_remove_multiple_independent_services() {
let mut graph = ServiceGraph::new();
graph.add_service(make_service("svc-0")).unwrap();
graph.add_service(make_service("svc-1")).unwrap();
graph.add_service(make_service("svc-2")).unwrap();
assert_eq!(graph.len(), 3);
graph.remove_service("svc-0").unwrap();
assert_eq!(graph.len(), 2);
assert!(graph.get_by_name("svc-0").is_none());
assert!(graph.get_by_name("svc-1").is_some());
assert!(graph.get_by_name("svc-2").is_some());
let svc2_id = graph.get_by_name("svc-2").unwrap();
assert_eq!(graph.get(svc2_id).unwrap().name, "svc-2");
graph.remove_service("svc-1").unwrap();
assert_eq!(graph.len(), 1);
assert!(graph.get_by_name("svc-1").is_none());
assert!(graph.get_by_name("svc-2").is_some());
graph.remove_service("svc-2").unwrap();
assert_eq!(graph.len(), 0);
assert!(graph.get_by_name("svc-2").is_none());
}
fn find_free_port() -> u16 {
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let port = listener.local_addr().unwrap().port();
drop(listener);
port
}
fn make_service_with_ports(name: &str, ports: Vec<u16>) -> Service {
let mut service = make_service(name);
if let ServiceConfigKind::Service(ref mut config) = service.config {
config.service.ports = ports;
}
service
}
#[test]
fn test_port_conflict_no_ports() {
let mut graph = ServiceGraph::new();
graph.add_service(make_service("svc1")).unwrap();
graph.add_service(make_service("svc2")).unwrap();
graph.link_dependencies().unwrap();
let svc2_id = graph.get_by_name("svc2").unwrap();
let result = graph.can_start(svc2_id);
assert!(result.is_ok(), "Services without ports should not conflict");
}
#[test]
fn test_port_conflict_different_ports() {
let mut graph = ServiceGraph::new();
graph
.add_service(make_service_with_ports("web", vec![8080]))
.unwrap();
graph
.add_service(make_service_with_ports("db", vec![5432]))
.unwrap();
graph.link_dependencies().unwrap();
let web_id = graph.get_by_name("web").unwrap();
{
let g = &mut graph.graph[web_id];
g.state = ServiceState::Running { pid: 1234 };
}
let db_id = graph.get_by_name("db").unwrap();
let result = graph.can_start(db_id);
assert!(
result.is_ok(),
"Services with different ports should not conflict: {:?}",
result
);
}
#[test]
fn test_port_conflict_same_port() {
let mut graph = ServiceGraph::new();
graph
.add_service(make_service_with_ports("web1", vec![8080]))
.unwrap();
graph
.add_service(make_service_with_ports("web2", vec![8080]))
.unwrap();
graph.link_dependencies().unwrap();
let web1_id = graph.get_by_name("web1").unwrap();
{
let g = &mut graph.graph[web1_id];
g.state = ServiceState::Running { pid: 1234 };
}
let web2_id = graph.get_by_name("web2").unwrap();
let result = graph.can_start(web2_id);
assert!(result.is_err(), "Services with same port should conflict");
if let Err(BlockedReason::PortConflict { ports, services }) = result {
assert_eq!(ports, vec![8080]);
assert_eq!(services, vec!["web1"]);
} else {
panic!("Expected PortConflict error, got: {:?}", result);
}
}
#[test]
fn test_port_conflict_multiple_ports() {
let mut graph = ServiceGraph::new();
graph
.add_service(make_service_with_ports("app", vec![8080, 8443]))
.unwrap();
graph
.add_service(make_service_with_ports("cache", vec![8080, 6379]))
.unwrap();
graph.link_dependencies().unwrap();
let app_id = graph.get_by_name("app").unwrap();
{
let g = &mut graph.graph[app_id];
g.state = ServiceState::Running { pid: 1234 };
}
let cache_id = graph.get_by_name("cache").unwrap();
let result = graph.can_start(cache_id);
assert!(
result.is_err(),
"Services with overlapping ports should conflict"
);
if let Err(BlockedReason::PortConflict { ports, services }) = result {
assert_eq!(ports, vec![8080]);
assert_eq!(services, vec!["app"]);
} else {
panic!("Expected PortConflict error, got: {:?}", result);
}
}
#[test]
fn test_port_conflict_multiple_services() {
let mut graph = ServiceGraph::new();
graph
.add_service(make_service_with_ports("web", vec![8080]))
.unwrap();
graph
.add_service(make_service_with_ports("db", vec![5432]))
.unwrap();
graph
.add_service(make_service_with_ports("cache", vec![6379]))
.unwrap();
graph
.add_service(make_service_with_ports("app", vec![8080, 5432, 7000]))
.unwrap();
graph.link_dependencies().unwrap();
for name in &["web", "db", "cache"] {
let id = graph.get_by_name(name).unwrap();
let g = &mut graph.graph[id];
g.state = ServiceState::Running { pid: 1000 };
}
let app_id = graph.get_by_name("app").unwrap();
let result = graph.can_start(app_id);
assert!(
result.is_err(),
"Should detect conflicts with multiple services"
);
if let Err(BlockedReason::PortConflict { ports, services }) = result {
assert!(ports.contains(&8080));
assert!(ports.contains(&5432));
assert!(services.contains(&"web".to_string()));
assert!(services.contains(&"db".to_string()));
} else {
panic!("Expected PortConflict error, got: {:?}", result);
}
}
#[test]
fn test_port_no_conflict_with_inactive() {
let port = find_free_port();
let mut graph = ServiceGraph::new();
graph
.add_service(make_service_with_ports("web", vec![port]))
.unwrap();
graph
.add_service(make_service_with_ports("other", vec![port]))
.unwrap();
graph.link_dependencies().unwrap();
let other_id = graph.get_by_name("other").unwrap();
let result = graph.can_start(other_id);
assert!(
result.is_ok(),
"Inactive services should not block port usage: {:?}",
result
);
}
#[test]
fn test_port_no_conflict_with_stopped() {
let port = find_free_port();
let mut graph = ServiceGraph::new();
graph
.add_service(make_service_with_ports("web", vec![port]))
.unwrap();
graph
.add_service(make_service_with_ports("other", vec![port]))
.unwrap();
graph.link_dependencies().unwrap();
let web_id = graph.get_by_name("web").unwrap();
{
let g = &mut graph.graph[web_id];
g.state = ServiceState::Exited { exit_code: Some(0) };
}
let other_id = graph.get_by_name("other").unwrap();
let result = graph.can_start(other_id);
assert!(
result.is_ok(),
"Stopped services should not block port usage: {:?}",
result
);
}
#[test]
fn test_port_conflict_with_dependencies() {
let port = find_free_port();
let mut graph = ServiceGraph::new();
graph
.add_service(make_service_with_ports("web", vec![port]))
.unwrap();
graph
.add_service(make_service_with_deps("other", vec!["missing_dep"]))
.unwrap();
graph.add_service(make_service("missing_dep")).unwrap();
graph.link_dependencies().unwrap();
let web_id = graph.get_by_name("web").unwrap();
{
let g = &mut graph.graph[web_id];
g.state = ServiceState::Running { pid: 1234 };
}
let other_id = graph.get_by_name("other").unwrap();
if let ServiceConfigKind::Service(ref mut config) = graph.graph[other_id].config {
config.service.ports = vec![port];
}
let result = graph.can_start(other_id);
assert!(
matches!(result, Err(BlockedReason::PortConflict { .. })),
"Port conflicts should be detected independently: {:?}",
result
);
}
#[test]
fn test_check_port_in_use_free_port() {
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let port = listener.local_addr().unwrap().port();
drop(listener);
let result = check_port_in_use(port);
assert!(
result.is_none(),
"Free port should return None, got {:?}",
result
);
}
#[test]
fn test_check_port_in_use_occupied_port() {
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let port = listener.local_addr().unwrap().port();
let result = check_port_in_use(port);
assert!(
result.is_some(),
"Occupied port should return Some(PortOwner)"
);
let owner = result.unwrap();
assert!(
owner.pid > 0 || owner.name.is_some(),
"Should have PID or process name for occupied port"
);
drop(listener);
}
#[test]
fn test_external_port_conflict_blocks_service() {
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let port = listener.local_addr().unwrap().port();
let mut graph = ServiceGraph::new();
graph
.add_service(make_service_with_ports("test-svc", vec![port]))
.unwrap();
graph.link_dependencies().unwrap();
let svc_id = graph.get_by_name("test-svc").unwrap();
let result = graph.can_start(svc_id);
assert!(
result.is_err(),
"Service should be blocked when port is in use externally"
);
if let Err(BlockedReason::ExternalPortConflict {
port: conflict_port,
pid,
..
}) = result
{
assert_eq!(conflict_port, port, "Conflict port should match");
assert!(pid.is_some(), "Should have PID of external process");
} else {
panic!("Expected ExternalPortConflict error, got: {:?}", result);
}
drop(listener);
}
#[test]
fn test_external_port_conflict_with_real_process() {
use std::process::{Command, Stdio};
use std::time::Duration;
let port = find_free_port();
let python_result = Command::new("python3")
.args(["-m", "http.server", &port.to_string()])
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn();
let mut python_proc = match python_result {
Ok(p) => p,
Err(_) => {
eprintln!("Skipping test: python3 not available");
return;
}
};
std::thread::sleep(Duration::from_secs(2));
let port_check = check_port_in_use(port);
if port_check.is_none() {
let _ = python_proc.kill();
eprintln!("Skipping test: Python failed to bind to port");
return;
}
let mut graph = ServiceGraph::new();
graph
.add_service(make_service_with_ports("python-conflict-test", vec![port]))
.unwrap();
graph.link_dependencies().unwrap();
let svc_id = graph.get_by_name("python-conflict-test").unwrap();
let result = graph.can_start(svc_id);
let _ = python_proc.kill();
let _ = python_proc.wait();
assert!(
result.is_err(),
"Service should be blocked when real external process is using port"
);
if let Err(BlockedReason::ExternalPortConflict {
port: conflict_port,
pid,
process_name,
}) = result
{
assert_eq!(conflict_port, port, "Conflict port should match");
assert!(pid.is_some(), "Should detect PID of Python process");
if cfg!(target_os = "macos") {
assert!(
process_name.is_some(),
"Should detect process name on macOS"
);
let name = process_name.unwrap();
assert!(
name.to_lowercase().contains("python"),
"Process name should contain 'python', got: {}",
name
);
}
} else {
panic!("Expected ExternalPortConflict error, got: {:?}", result);
}
}
#[test]
fn test_no_external_conflict_when_port_free() {
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let port = listener.local_addr().unwrap().port();
drop(listener);
let mut graph = ServiceGraph::new();
graph
.add_service(make_service_with_ports("test-svc", vec![port]))
.unwrap();
graph.link_dependencies().unwrap();
let svc_id = graph.get_by_name("test-svc").unwrap();
let result = graph.can_start(svc_id);
assert!(
result.is_ok(),
"Service should be able to start when port is free: {:?}",
result
);
}
#[test]
fn test_external_conflict_with_multiple_ports() {
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let occupied_port = listener.local_addr().unwrap().port();
let free_listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let free_port = free_listener.local_addr().unwrap().port();
drop(free_listener);
let mut graph = ServiceGraph::new();
graph
.add_service(make_service_with_ports(
"multi-port-svc",
vec![free_port, occupied_port],
))
.unwrap();
graph.link_dependencies().unwrap();
let svc_id = graph.get_by_name("multi-port-svc").unwrap();
let result = graph.can_start(svc_id);
assert!(
result.is_err(),
"Service should be blocked when any port is in use"
);
if let Err(BlockedReason::ExternalPortConflict { port, .. }) = result {
assert_eq!(port, occupied_port, "Should report the occupied port");
} else {
panic!("Expected ExternalPortConflict error, got: {:?}", result);
}
drop(listener);
}
}