systemprompt-agent 0.1.22

Core Agent protocol module for systemprompt.io
Documentation
use anyhow::{Context, Result};
use std::process::Command;
use std::time::Duration;
use systemprompt_models::CliPaths;

use crate::services::agent_orchestration::{OrchestrationError, OrchestrationResult, process};

#[derive(Debug, Copy, Clone)]
pub struct PortManager;

impl Default for PortManager {
    fn default() -> Self {
        Self::new()
    }
}

impl PortManager {
    pub const fn new() -> Self {
        Self
    }

    #[cfg(unix)]
    pub fn find_process_using_port(port: u16) -> Result<Option<u32>> {
        let output = Command::new("lsof")
            .arg("-ti")
            .arg(format!(":{port}"))
            .output()
            .context("Failed to run lsof command")?;

        if !output.status.success() {
            return Ok(None);
        }

        let stdout = String::from_utf8_lossy(&output.stdout);
        let pid_str = stdout.trim();

        if pid_str.is_empty() {
            return Ok(None);
        }

        let pid = pid_str
            .parse::<u32>()
            .context("Failed to parse PID from lsof output")?;

        Ok(Some(pid))
    }

    #[cfg(windows)]
    pub fn find_process_using_port(port: u16) -> Result<Option<u32>> {
        let output = Command::new("netstat")
            .args(["-ano", "-p", "TCP"])
            .output()
            .context("Failed to run netstat command")?;

        let stdout = String::from_utf8_lossy(&output.stdout);
        let port_pattern = format!(":{port} ");
        let port_pattern_tab = format!(":{port}\t");

        for line in stdout.lines() {
            if line.contains(&port_pattern) || line.contains(&port_pattern_tab) {
                if let Some(pid_str) = line.split_whitespace().last() {
                    if let Ok(pid) = pid_str.parse::<u32>() {
                        return Ok(Some(pid));
                    }
                }
            }
        }

        Ok(None)
    }

    #[cfg(unix)]
    pub fn get_process_info(pid: u32) -> Result<Option<ProcessInfo>> {
        let output = Command::new("ps")
            .arg("-p")
            .arg(pid.to_string())
            .arg("-o")
            .arg("pid,comm,args")
            .output()
            .context("Failed to run ps command")?;

        if !output.status.success() {
            return Ok(None);
        }

        let stdout = String::from_utf8_lossy(&output.stdout);
        let lines: Vec<&str> = stdout.lines().collect();

        if lines.len() < 2 {
            return Ok(None);
        }

        let line = lines[1].trim();
        if line.is_empty() {
            return Ok(None);
        }

        let parts: Vec<&str> = line.splitn(3, char::is_whitespace).collect();
        if parts.len() < 3 {
            return Ok(None);
        }

        let command_line = parts[2].trim();

        Ok(Some(ProcessInfo {
            pid,
            command: command_line.to_string(),
        }))
    }

    #[cfg(windows)]
    pub fn get_process_info(pid: u32) -> Result<Option<ProcessInfo>> {
        let output = Command::new("tasklist")
            .args(["/FI", &format!("PID eq {}", pid), "/FO", "CSV", "/NH"])
            .output()
            .context("Failed to run tasklist command")?;

        let stdout = String::from_utf8_lossy(&output.stdout);
        let line = stdout.trim();

        if line.is_empty() || line.contains("INFO: No tasks") {
            return Ok(None);
        }

        let parts: Vec<&str> = line.split(',').collect();
        if parts.is_empty() {
            return Ok(None);
        }

        let command = parts[0].trim_matches('"').to_string();

        Ok(Some(ProcessInfo { pid, command }))
    }

    pub fn is_agent_process(pid: u32) -> Result<bool, String> {
        match Self::get_process_info(pid) {
            Ok(Some(info)) => {
                let is_agent = info.command.contains("systemprompt")
                    && (info.command.contains(CliPaths::agent_run_cmd_pattern())
                        || info.command.contains("agent-worker"));
                Ok(is_agent)
            },
            Ok(None) => Err(format!("No process info found for PID {}", pid)),
            Err(e) => Err(format!("Failed to get process info for PID {}: {}", pid, e)),
        }
    }

    pub async fn kill_process_on_port(&self, port: u16) -> OrchestrationResult<bool> {
        let pid = match Self::find_process_using_port(port) {
            Ok(Some(p)) => p,
            Ok(None) => {
                return Ok(false);
            },
            Err(e) => {
                return Err(OrchestrationError::ProcessSpawnFailed(format!(
                    "Failed to check port {}: {}",
                    port, e
                )));
            },
        };

        match Self::is_agent_process(pid) {
            Ok(true) => {},
            Ok(false) => {
                return Err(OrchestrationError::ProcessSpawnFailed(format!(
                    "Port {} is in use by non-agent process (PID {}). Please free the port \
                     manually.",
                    port, pid
                )));
            },
            Err(e) => {
                return Err(OrchestrationError::ProcessSpawnFailed(format!(
                    "Port {} is in use but failed to identify process (PID {}): {}",
                    port, pid, e
                )));
            },
        }

        tracing::warn!(pid = %pid, port = %port, "Killing orphaned agent process");

        if !process::kill_process(pid) {
            return Err(OrchestrationError::ProcessSpawnFailed(format!(
                "Failed to kill process {} on port {}",
                pid, port
            )));
        }

        self.wait_for_port_available(port, 5).await?;

        tracing::debug!(port = %port, "Port is now available");
        Ok(true)
    }

    pub async fn wait_for_port_available(
        &self,
        port: u16,
        timeout_secs: u64,
    ) -> OrchestrationResult<()> {
        let check_interval = Duration::from_millis(100);
        let max_checks = (timeout_secs * 1000) / 100;

        for _ in 0..max_checks {
            if !process::is_port_in_use(port) {
                return Ok(());
            }
            tokio::time::sleep(check_interval).await;
        }

        Err(OrchestrationError::ProcessSpawnFailed(format!(
            "Port {} did not become available within {} seconds",
            port, timeout_secs
        )))
    }

    pub async fn cleanup_port_if_needed(&self, port: u16) -> OrchestrationResult<()> {
        if !process::is_port_in_use(port) {
            return Ok(());
        }

        match Self::find_process_using_port(port) {
            Ok(Some(pid)) => match Self::is_agent_process(pid) {
                Ok(true) => {
                    tracing::warn!(port = %port, pid = %pid, "Port occupied by orphaned agent process");
                    self.kill_process_on_port(port).await?;
                },
                Ok(false) => {
                    let info = Self::get_process_info(pid)
                        .map_err(|e| {
                            tracing::trace!(pid = %pid, error = %e, "Failed to get process info for error message");
                            e
                        })
                        .ok()
                        .flatten().map_or_else(|| "unknown".to_string(), |i| i.command);

                    return Err(OrchestrationError::ProcessSpawnFailed(format!(
                        "Port {} is in use by non-agent process (PID {}): {}\nPlease stop the \
                         process manually or choose a different port.",
                        port, pid, info
                    )));
                },
                Err(e) => {
                    return Err(OrchestrationError::ProcessSpawnFailed(format!(
                        "Port {} is in use but failed to identify process (PID {}): {}",
                        port, pid, e
                    )));
                },
            },
            Ok(None) => {
                return Err(OrchestrationError::ProcessSpawnFailed(format!(
                    "Port {} appears to be in use but process cannot be identified",
                    port
                )));
            },
            Err(e) => {
                return Err(OrchestrationError::ProcessSpawnFailed(format!(
                    "Failed to check port {}: {}",
                    port, e
                )));
            },
        }

        Ok(())
    }

    pub async fn cleanup_agent_ports(&self, ports: &[u16]) -> OrchestrationResult<u32> {
        let mut cleaned = 0;

        for &port in ports {
            if process::is_port_in_use(port) {
                match self.cleanup_port_if_needed(port).await {
                    Ok(()) => cleaned += 1,
                    Err(e) => {
                        tracing::error!(port = %port, error = %e, "Failed to cleanup port");
                        return Err(e);
                    },
                }
            }
        }

        if cleaned > 0 {
            tracing::info!(cleaned = %cleaned, "Cleaned up ports");
        }

        Ok(cleaned)
    }

    pub fn verify_all_ports_available(ports: &[u16]) -> OrchestrationResult<()> {
        let mut blocked_ports = Vec::new();

        for &port in ports {
            if process::is_port_in_use(port) {
                if let Ok(Some(pid)) = Self::find_process_using_port(port) {
                    blocked_ports.push((port, pid));
                }
            }
        }

        if !blocked_ports.is_empty() {
            let port_info: Vec<String> = blocked_ports
                .iter()
                .map(|(port, pid)| {
                    let info = Self::get_process_info(*pid)
                        .map_err(|e| {
                            tracing::trace!(pid = %pid, error = %e, "Failed to get process info for port status");
                            e
                        })
                        .ok()
                        .flatten().map_or_else(|| "unknown".to_string(), |i| i.command);
                    format!("  • Port {} - PID {} ({})", port, pid, info)
                })
                .collect();

            return Err(OrchestrationError::ProcessSpawnFailed(format!(
                "The following ports are still in use:\n{}",
                port_info.join("\n")
            )));
        }

        Ok(())
    }
}

#[derive(Debug, Clone)]
pub struct ProcessInfo {
    pub pid: u32,
    pub command: String,
}