systemprompt-agent 0.2.2

Agent-to-Agent (A2A) protocol for systemprompt.io AI governance: streaming, JSON-RPC models, task lifecycle, .well-known discovery, and governed agent orchestration.
Documentation
use std::time::Duration;

use super::AgentLifecycle;
use crate::services::agent_orchestration::{
    AgentStatus, OrchestrationError, OrchestrationResult, process,
};

impl AgentLifecycle {
    pub(crate) async fn validate_prerequisites(&self, port: u16) -> OrchestrationResult<()> {
        use super::super::port_manager::PortManager;

        let port_manager = PortManager::new();

        if process::is_port_in_use(port) {
            match port_manager.cleanup_port_if_needed(port).await {
                Ok(()) => {
                    tracing::info!(port = %port, "Cleaned up port");
                },
                Err(e) => {
                    tracing::error!(error = %e, port = %port, "Port is in use and cleanup failed");
                    return Err(e);
                },
            }
        }

        Ok(())
    }

    #[allow(clippy::unused_self)]
    pub(crate) fn spawn_detached_process(
        &self,
        agent_name: &str,
        port: u16,
    ) -> OrchestrationResult<u32> {
        process::spawn_detached_process(agent_name, port)
    }

    pub(crate) async fn verify_startup(
        &self,
        agent_name: &str,
        port: u16,
    ) -> OrchestrationResult<()> {
        const MAX_ATTEMPTS: u32 = 5;
        const SLEEP_MS: u64 = 1000;
        const TCP_TIMEOUT_SECS: u64 = 2;

        for attempt in 1..=MAX_ATTEMPTS {
            tokio::time::sleep(Duration::from_millis(SLEEP_MS)).await;

            match self.check_port_responsiveness(port, TCP_TIMEOUT_SECS).await {
                Ok(true) => return Ok(()),
                Ok(false) => {
                    tracing::debug!(
                        agent = %agent_name,
                        port,
                        attempt,
                        max_attempts = MAX_ATTEMPTS,
                        "Health check returned false - agent not ready"
                    );
                },
                Err(e) => {
                    tracing::debug!(
                        agent = %agent_name,
                        port,
                        attempt,
                        max_attempts = MAX_ATTEMPTS,
                        error = %e,
                        "Health check connection error"
                    );
                },
            }
        }

        self.log_startup_failure(agent_name, port).await;
        self.db_service.mark_error(agent_name).await?;
        Err(OrchestrationError::HealthCheckTimeout(
            agent_name.to_string(),
        ))
    }

    async fn check_port_responsiveness(
        &self,
        port: u16,
        timeout_secs: u64,
    ) -> OrchestrationResult<bool> {
        use tokio::net::TcpStream;
        use tokio::time::timeout;

        let address = format!("127.0.0.1:{port}");
        match timeout(
            Duration::from_secs(timeout_secs),
            TcpStream::connect(&address),
        )
        .await
        {
            Ok(Ok(_)) => Ok(true),
            Ok(Err(e)) => {
                tracing::trace!(error = %e, "TCP connection refused");
                Ok(false)
            },
            Err(_) => Ok(false),
        }
    }

    pub(crate) async fn log_startup_failure(&self, agent_name: &str, port: u16) {
        let log_path = match systemprompt_models::Config::get() {
            Ok(config) => format!("{}/logs/agent-{}.log", config.system_path, agent_name),
            Err(e) => {
                tracing::debug!(error = %e, "Config not available, using fallback log path");
                format!("/tmp/logs/agent-{}.log", agent_name)
            },
        };

        match self.db_service.get_status(agent_name).await {
            Ok(AgentStatus::Running { pid, .. }) => {
                if process::process_exists(pid) {
                    tracing::error!(
                        agent = %agent_name,
                        pid,
                        port,
                        log_file = %log_path,
                        "Agent process exists but not responding on port"
                    );
                } else {
                    tracing::error!(
                        agent = %agent_name,
                        pid,
                        log_file = %log_path,
                        "Agent process died after spawn - check log file for errors"
                    );
                }
            },
            Ok(AgentStatus::Failed { reason, .. }) => {
                tracing::error!(
                    agent = %agent_name,
                    port,
                    reason = %reason,
                    log_file = %log_path,
                    "Agent in failed state"
                );
            },
            Err(e) => {
                tracing::error!(
                    agent = %agent_name,
                    port,
                    error = %e,
                    log_file = %log_path,
                    "Failed to retrieve agent status"
                );
            },
        }
    }
}