systemprompt-agent 0.2.0

Core Agent protocol module for systemprompt.io
Documentation
use super::AgentOrchestrator;
use crate::services::agent_orchestration::{AgentStatus, OrchestrationError, OrchestrationResult};

impl AgentOrchestrator {
    pub async fn delete_agent(&self, agent_name: &str) -> OrchestrationResult<()> {
        tracing::debug!(agent_name = %agent_name, "Deleting agent");

        if let Ok(AgentStatus::Running { .. }) = self.get_status(agent_name).await {
            tracing::debug!(agent_name = %agent_name, "Stopping running agent before deletion");
            self.lifecycle.disable_agent(agent_name).await?;
        }

        self.db_service.remove_agent_service(agent_name).await?;

        tracing::debug!(agent_name = %agent_name, "Agent deleted successfully");
        Ok(())
    }

    pub async fn delete_all_agents(&self) -> OrchestrationResult<u64> {
        tracing::info!("Deleting all agents");

        let agents = self.list_all().await?;
        let total_count = agents.len() as u64;

        if total_count == 0 {
            tracing::debug!("No agents to delete");
            return Ok(0);
        }

        tracing::info!(count = %total_count, "Found agents to delete");

        tracing::debug!("Disabling all running agents");
        self.disable_all().await?;

        let mut deleted_count = 0;
        for (agent_id, _) in agents {
            match self.delete_agent(&agent_id).await {
                Ok(()) => {
                    deleted_count += 1;
                },
                Err(e) => {
                    tracing::error!(agent_id = %agent_id, error = %e, "Failed to delete agent");
                },
            }
        }

        tracing::info!(deleted = %deleted_count, total = %total_count, "Deleted agents");
        Ok(deleted_count)
    }

    #[cfg(unix)]
    pub async fn cleanup_orphaned_processes(&self) -> OrchestrationResult<()> {
        tracing::debug!("Scanning for orphaned agent processes");

        let output = std::process::Command::new("pgrep")
            .arg("-f")
            .arg("agent-worker")
            .output()
            .map_err(|e| {
                OrchestrationError::ProcessSpawnFailed(format!("Failed to run pgrep: {e}"))
            })?;

        if !output.status.success() {
            tracing::debug!("No orphaned agent processes found");
            return Ok(());
        }

        let pids_str = String::from_utf8_lossy(&output.stdout);
        self.process_orphaned_pids(&pids_str).await
    }

    #[cfg(windows)]
    pub async fn cleanup_orphaned_processes(&self) -> OrchestrationResult<()> {
        tracing::debug!("Scanning for orphaned agent processes");

        let output = std::process::Command::new("tasklist")
            .args(["/FI", "IMAGENAME eq systemprompt*", "/FO", "CSV", "/NH"])
            .output()
            .map_err(|e| {
                OrchestrationError::ProcessSpawnFailed(format!("Failed to run tasklist: {e}"))
            })?;

        let stdout = String::from_utf8_lossy(&output.stdout);

        if stdout.contains("INFO: No tasks") || stdout.trim().is_empty() {
            tracing::debug!("No orphaned agent processes found");
            return Ok(());
        }

        let mut pids = Vec::new();
        for line in stdout.lines() {
            let parts: Vec<&str> = line.split(',').collect();
            if parts.len() >= 2 {
                if let Ok(pid) = parts[1].trim_matches('"').parse::<u32>() {
                    pids.push(pid.to_string());
                }
            }
        }

        let pids_str = pids.join("\n");
        self.process_orphaned_pids(&pids_str).await
    }

    async fn process_orphaned_pids(&self, pids_str: &str) -> OrchestrationResult<()> {
        let mut registered = 0;
        let mut failed = 0;

        for line in pids_str.lines() {
            if let Ok(pid) = line.trim().parse::<u32>() {
                if self.is_pid_tracked(pid).await? {
                    continue;
                }

                if let Some((agent_id, port)) = Self::identify_orphaned_process(pid) {
                    tracing::debug!(
                        pid = %pid,
                        agent_id = %agent_id,
                        port = %port,
                        "Found orphaned process"
                    );

                    let name = self
                        .db_service
                        .get_agent_config(&agent_id)
                        .await
                        .map_or_else(|_| "unknown".to_string(), |config| config.name);

                    match self.db_service.register_agent(&name, pid, port).await {
                        Ok(service_id) => {
                            tracing::info!(
                                service_id = %service_id,
                                pid = %pid,
                                "Registered orphaned process as service"
                            );
                            registered += 1;
                        },
                        Err(e) => {
                            tracing::error!(
                                error = %e,
                                pid = %pid,
                                "Failed to register orphaned process"
                            );
                            failed += 1;
                        },
                    }
                } else {
                    tracing::warn!(pid = %pid, "Could not identify agent for orphaned process");
                    failed += 1;
                }
            }
        }

        if registered > 0 {
            tracing::info!(registered = %registered, "Registered orphaned processes");
        }
        if failed > 0 {
            tracing::warn!(failed = %failed, "Failed to handle some processes");
        }

        Ok(())
    }

    pub(super) async fn is_pid_tracked(&self, pid: u32) -> OrchestrationResult<bool> {
        let agents = self.db_service.list_all_agents().await?;
        for (_, status) in agents {
            if let AgentStatus::Running {
                pid: tracked_pid, ..
            } = status
            {
                if tracked_pid == pid {
                    return Ok(true);
                }
            }
        }
        Ok(false)
    }

    #[cfg(target_os = "linux")]
    pub(super) fn identify_orphaned_process(pid: u32) -> Option<(String, u16)> {
        let environ_path = format!("/proc/{}/environ", pid);
        if let Ok(environ_data) = std::fs::read(&environ_path) {
            let environ_str = String::from_utf8_lossy(&environ_data);
            let mut agent_id = None;
            let mut port = None;

            for env_var in environ_str.split('\0') {
                if env_var.starts_with("AGENT_ID=") || env_var.starts_with("AGENT_UUID=") {
                    agent_id = env_var.split('=').nth(1).map(ToString::to_string);
                } else if env_var.starts_with("AGENT_PORT=") {
                    if let Some(port_str) = env_var.split('=').nth(1) {
                        port = port_str.parse::<u16>().ok();
                    }
                }
            }

            if let (Some(id), Some(p)) = (agent_id, port) {
                return Some((id, p));
            }
        }

        None
    }

    #[cfg(not(target_os = "linux"))]
    pub(super) fn identify_orphaned_process(pid: u32) -> Option<(String, u16)> {
        let output = std::process::Command::new("ps")
            .args(["-p", &pid.to_string(), "-o", "args="])
            .output()
            .ok();

        if let Some(output) = output {
            let cmd = String::from_utf8_lossy(&output.stdout);
            let mut agent_name = None;
            let mut port = None;

            let args: Vec<&str> = cmd.split_whitespace().collect();
            let mut iter = args.iter().peekable();

            while let Some(arg) = iter.next() {
                match *arg {
                    "--agent-name" => {
                        agent_name = iter.next().map(|s| s.to_string());
                    },
                    "--port" => {
                        port = iter.next().and_then(|s| s.parse::<u16>().ok());
                    },
                    _ => {},
                }
            }

            if let (Some(name), Some(p)) = (agent_name, port) {
                return Some((name, p));
            }
        }

        None
    }
}