use anyhow::{Context, Result};
use std::process::Command;
use std::time::Duration;
use systemprompt_models::CliPaths;
use crate::services::agent_orchestration::{OrchestrationError, OrchestrationResult, process};
#[derive(Debug, Copy, Clone)]
pub struct PortManager;
impl Default for PortManager {
fn default() -> Self {
Self::new()
}
}
impl PortManager {
pub const fn new() -> Self {
Self
}
#[cfg(unix)]
pub fn find_process_using_port(port: u16) -> Result<Option<u32>> {
let output = Command::new("lsof")
.arg("-ti")
.arg(format!(":{port}"))
.output()
.context("Failed to run lsof command")?;
if !output.status.success() {
return Ok(None);
}
let stdout = String::from_utf8_lossy(&output.stdout);
let pid_str = stdout.trim();
if pid_str.is_empty() {
return Ok(None);
}
let pid = pid_str
.parse::<u32>()
.context("Failed to parse PID from lsof output")?;
Ok(Some(pid))
}
#[cfg(windows)]
pub fn find_process_using_port(port: u16) -> Result<Option<u32>> {
let output = Command::new("netstat")
.args(["-ano", "-p", "TCP"])
.output()
.context("Failed to run netstat command")?;
let stdout = String::from_utf8_lossy(&output.stdout);
let port_pattern = format!(":{port} ");
let port_pattern_tab = format!(":{port}\t");
for line in stdout.lines() {
if line.contains(&port_pattern) || line.contains(&port_pattern_tab) {
if let Some(pid_str) = line.split_whitespace().last() {
if let Ok(pid) = pid_str.parse::<u32>() {
return Ok(Some(pid));
}
}
}
}
Ok(None)
}
#[cfg(unix)]
pub fn get_process_info(pid: u32) -> Result<Option<ProcessInfo>> {
let output = Command::new("ps")
.arg("-p")
.arg(pid.to_string())
.arg("-o")
.arg("pid,comm,args")
.output()
.context("Failed to run ps command")?;
if !output.status.success() {
return Ok(None);
}
let stdout = String::from_utf8_lossy(&output.stdout);
let lines: Vec<&str> = stdout.lines().collect();
if lines.len() < 2 {
return Ok(None);
}
let line = lines[1].trim();
if line.is_empty() {
return Ok(None);
}
let parts: Vec<&str> = line.splitn(3, char::is_whitespace).collect();
if parts.len() < 3 {
return Ok(None);
}
let command_line = parts[2].trim();
Ok(Some(ProcessInfo {
pid,
command: command_line.to_string(),
}))
}
#[cfg(windows)]
pub fn get_process_info(pid: u32) -> Result<Option<ProcessInfo>> {
let output = Command::new("tasklist")
.args(["/FI", &format!("PID eq {}", pid), "/FO", "CSV", "/NH"])
.output()
.context("Failed to run tasklist command")?;
let stdout = String::from_utf8_lossy(&output.stdout);
let line = stdout.trim();
if line.is_empty() || line.contains("INFO: No tasks") {
return Ok(None);
}
let parts: Vec<&str> = line.split(',').collect();
if parts.is_empty() {
return Ok(None);
}
let command = parts[0].trim_matches('"').to_string();
Ok(Some(ProcessInfo { pid, command }))
}
pub fn is_agent_process(pid: u32) -> Result<bool, String> {
match Self::get_process_info(pid) {
Ok(Some(info)) => {
let is_agent = info.command.contains("systemprompt")
&& (info.command.contains(CliPaths::agent_run_cmd_pattern())
|| info.command.contains("agent-worker"));
Ok(is_agent)
},
Ok(None) => Err(format!("No process info found for PID {}", pid)),
Err(e) => Err(format!("Failed to get process info for PID {}: {}", pid, e)),
}
}
pub async fn kill_process_on_port(&self, port: u16) -> OrchestrationResult<bool> {
let pid = match Self::find_process_using_port(port) {
Ok(Some(p)) => p,
Ok(None) => {
return Ok(false);
},
Err(e) => {
return Err(OrchestrationError::ProcessSpawnFailed(format!(
"Failed to check port {}: {}",
port, e
)));
},
};
match Self::is_agent_process(pid) {
Ok(true) => {},
Ok(false) => {
return Err(OrchestrationError::ProcessSpawnFailed(format!(
"Port {} is in use by non-agent process (PID {}). Please free the port \
manually.",
port, pid
)));
},
Err(e) => {
return Err(OrchestrationError::ProcessSpawnFailed(format!(
"Port {} is in use but failed to identify process (PID {}): {}",
port, pid, e
)));
},
}
tracing::warn!(pid = %pid, port = %port, "Killing orphaned agent process");
if !process::kill_process(pid) {
return Err(OrchestrationError::ProcessSpawnFailed(format!(
"Failed to kill process {} on port {}",
pid, port
)));
}
self.wait_for_port_available(port, 5).await?;
tracing::debug!(port = %port, "Port is now available");
Ok(true)
}
pub async fn wait_for_port_available(
&self,
port: u16,
timeout_secs: u64,
) -> OrchestrationResult<()> {
let check_interval = Duration::from_millis(100);
let max_checks = (timeout_secs * 1000) / 100;
for _ in 0..max_checks {
if !process::is_port_in_use(port) {
return Ok(());
}
tokio::time::sleep(check_interval).await;
}
Err(OrchestrationError::ProcessSpawnFailed(format!(
"Port {} did not become available within {} seconds",
port, timeout_secs
)))
}
pub async fn cleanup_port_if_needed(&self, port: u16) -> OrchestrationResult<()> {
if !process::is_port_in_use(port) {
return Ok(());
}
match Self::find_process_using_port(port) {
Ok(Some(pid)) => match Self::is_agent_process(pid) {
Ok(true) => {
tracing::warn!(port = %port, pid = %pid, "Port occupied by orphaned agent process");
self.kill_process_on_port(port).await?;
},
Ok(false) => {
let info = Self::get_process_info(pid)
.map_err(|e| {
tracing::trace!(pid = %pid, error = %e, "Failed to get process info for error message");
e
})
.ok()
.flatten().map_or_else(|| "unknown".to_string(), |i| i.command);
return Err(OrchestrationError::ProcessSpawnFailed(format!(
"Port {} is in use by non-agent process (PID {}): {}\nPlease stop the \
process manually or choose a different port.",
port, pid, info
)));
},
Err(e) => {
return Err(OrchestrationError::ProcessSpawnFailed(format!(
"Port {} is in use but failed to identify process (PID {}): {}",
port, pid, e
)));
},
},
Ok(None) => {
return Err(OrchestrationError::ProcessSpawnFailed(format!(
"Port {} appears to be in use but process cannot be identified",
port
)));
},
Err(e) => {
return Err(OrchestrationError::ProcessSpawnFailed(format!(
"Failed to check port {}: {}",
port, e
)));
},
}
Ok(())
}
pub async fn cleanup_agent_ports(&self, ports: &[u16]) -> OrchestrationResult<u32> {
let mut cleaned = 0;
for &port in ports {
if process::is_port_in_use(port) {
match self.cleanup_port_if_needed(port).await {
Ok(()) => cleaned += 1,
Err(e) => {
tracing::error!(port = %port, error = %e, "Failed to cleanup port");
return Err(e);
},
}
}
}
if cleaned > 0 {
tracing::info!(cleaned = %cleaned, "Cleaned up ports");
}
Ok(cleaned)
}
pub fn verify_all_ports_available(ports: &[u16]) -> OrchestrationResult<()> {
let mut blocked_ports = Vec::new();
for &port in ports {
if process::is_port_in_use(port) {
if let Ok(Some(pid)) = Self::find_process_using_port(port) {
blocked_ports.push((port, pid));
}
}
}
if !blocked_ports.is_empty() {
let port_info: Vec<String> = blocked_ports
.iter()
.map(|(port, pid)| {
let info = Self::get_process_info(*pid)
.map_err(|e| {
tracing::trace!(pid = %pid, error = %e, "Failed to get process info for port status");
e
})
.ok()
.flatten().map_or_else(|| "unknown".to_string(), |i| i.command);
format!(" • Port {} - PID {} ({})", port, pid, info)
})
.collect();
return Err(OrchestrationError::ProcessSpawnFailed(format!(
"The following ports are still in use:\n{}",
port_info.join("\n")
)));
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct ProcessInfo {
pub pid: u32,
pub command: String,
}