systemprompt-agent 0.7.0

Agent-to-Agent (A2A) protocol for systemprompt.io AI governance: streaming, JSON-RPC models, task lifecycle, .well-known discovery, and governed agent orchestration.
Documentation
//! Port management — detect, kill, and verify availability of agent ports.

mod probe;

use std::time::Duration;

use crate::services::agent_orchestration::{OrchestrationError, OrchestrationResult, process};

pub use probe::{ProcessInfo, find_process_using_port, get_process_info, is_agent_process};

#[derive(Debug, Copy, Clone)]
pub struct PortManager;

impl Default for PortManager {
    fn default() -> Self {
        Self::new()
    }
}

impl PortManager {
    #[must_use]
    pub const fn new() -> Self {
        Self
    }

    pub async fn kill_process_on_port(&self, port: u16) -> OrchestrationResult<bool> {
        let pid = match find_process_using_port(port) {
            Ok(Some(p)) => p,
            Ok(None) => {
                return Ok(false);
            },
            Err(e) => {
                return Err(OrchestrationError::ProcessSpawnFailed(format!(
                    "Failed to check port {}: {}",
                    port, e
                )));
            },
        };

        match is_agent_process(pid) {
            Ok(true) => {},
            Ok(false) => {
                return Err(OrchestrationError::ProcessSpawnFailed(format!(
                    "Port {} is in use by non-agent process (PID {}). Please free the port \
                     manually.",
                    port, pid
                )));
            },
            Err(e) => {
                return Err(OrchestrationError::ProcessSpawnFailed(format!(
                    "Port {} is in use but failed to identify process (PID {}): {}",
                    port, pid, e
                )));
            },
        }

        tracing::warn!(pid = %pid, port = %port, "Killing orphaned agent process");

        if !process::kill_process(pid) {
            return Err(OrchestrationError::ProcessSpawnFailed(format!(
                "Failed to kill process {} on port {}",
                pid, port
            )));
        }

        self.wait_for_port_available(port, 5).await?;

        tracing::debug!(port = %port, "Port is now available");
        Ok(true)
    }

    pub async fn wait_for_port_available(
        &self,
        port: u16,
        timeout_secs: u64,
    ) -> OrchestrationResult<()> {
        let check_interval = Duration::from_millis(100);
        let max_checks = (timeout_secs * 1000) / 100;

        for _ in 0..max_checks {
            if !process::is_port_in_use(port) {
                return Ok(());
            }
            tokio::time::sleep(check_interval).await;
        }

        Err(OrchestrationError::ProcessSpawnFailed(format!(
            "Port {} did not become available within {} seconds",
            port, timeout_secs
        )))
    }

    pub async fn cleanup_port_if_needed(&self, port: u16) -> OrchestrationResult<()> {
        if !process::is_port_in_use(port) {
            return Ok(());
        }

        match find_process_using_port(port) {
            Ok(Some(pid)) => match is_agent_process(pid) {
                Ok(true) => {
                    tracing::warn!(port = %port, pid = %pid, "Port occupied by orphaned agent process");
                    self.kill_process_on_port(port).await?;
                },
                Ok(false) => {
                    let info = get_process_info(pid)
                        .map_err(|e| {
                            tracing::trace!(pid = %pid, error = %e, "Failed to get process info for error message");
                            e
                        })
                        .ok()
                        .flatten()
                        .map_or_else(|| "unknown".to_string(), |i| i.command);

                    return Err(OrchestrationError::ProcessSpawnFailed(format!(
                        "Port {} is in use by non-agent process (PID {}): {}\nPlease stop the \
                         process manually or choose a different port.",
                        port, pid, info
                    )));
                },
                Err(e) => {
                    return Err(OrchestrationError::ProcessSpawnFailed(format!(
                        "Port {} is in use but failed to identify process (PID {}): {}",
                        port, pid, e
                    )));
                },
            },
            Ok(None) => {
                return Err(OrchestrationError::ProcessSpawnFailed(format!(
                    "Port {} appears to be in use but process cannot be identified",
                    port
                )));
            },
            Err(e) => {
                return Err(OrchestrationError::ProcessSpawnFailed(format!(
                    "Failed to check port {}: {}",
                    port, e
                )));
            },
        }

        Ok(())
    }

    pub async fn cleanup_agent_ports(&self, ports: &[u16]) -> OrchestrationResult<u32> {
        let mut cleaned = 0;

        for &port in ports {
            if process::is_port_in_use(port) {
                match self.cleanup_port_if_needed(port).await {
                    Ok(()) => cleaned += 1,
                    Err(e) => {
                        tracing::error!(port = %port, error = %e, "Failed to cleanup port");
                        return Err(e);
                    },
                }
            }
        }

        if cleaned > 0 {
            tracing::info!(cleaned = %cleaned, "Cleaned up ports");
        }

        Ok(cleaned)
    }

    pub fn verify_all_ports_available(ports: &[u16]) -> OrchestrationResult<()> {
        let mut blocked_ports = Vec::new();

        for &port in ports {
            if process::is_port_in_use(port) {
                if let Ok(Some(pid)) = find_process_using_port(port) {
                    blocked_ports.push((port, pid));
                }
            }
        }

        if !blocked_ports.is_empty() {
            let port_info: Vec<String> = blocked_ports
                .iter()
                .map(|(port, pid)| {
                    let info = get_process_info(*pid)
                        .map_err(|e| {
                            tracing::trace!(pid = %pid, error = %e, "Failed to get process info for port status");
                            e
                        })
                        .ok()
                        .flatten()
                        .map_or_else(|| "unknown".to_string(), |i| i.command);
                    format!("  • Port {} - PID {} ({})", port, pid, info)
                })
                .collect();

            return Err(OrchestrationError::ProcessSpawnFailed(format!(
                "The following ports are still in use:\n{}",
                port_info.join("\n")
            )));
        }

        Ok(())
    }
}