systemprompt-agent 0.2.0

Core Agent protocol module for systemprompt.io
Documentation
use crate::repository::agent_service::AgentServiceRepository;
use crate::services::agent_orchestration::{
    AgentStatus, OrchestrationError, OrchestrationResult, process,
};
use crate::services::registry::AgentRegistry;
use systemprompt_models::services::AgentConfig;

#[derive(Debug)]
pub struct AgentDatabaseService {
    pub repository: AgentServiceRepository,
    pub registry: AgentRegistry,
}

impl AgentDatabaseService {
    pub fn new(repository: AgentServiceRepository) -> OrchestrationResult<Self> {
        let registry = AgentRegistry::new().map_err(|e| {
            OrchestrationError::Database(format!("Failed to load agent registry: {e}"))
        })?;

        Ok(Self {
            repository,
            registry,
        })
    }

    pub async fn register_agent(
        &self,
        name: &str,
        pid: u32,
        port: u16,
    ) -> OrchestrationResult<String> {
        self.repository
            .register_agent(name, pid, port)
            .await
            .map_err(|e| OrchestrationError::Database(e.to_string()))
    }

    pub async fn get_status(&self, agent_name: &str) -> OrchestrationResult<AgentStatus> {
        let row = self
            .repository
            .get_agent_status(agent_name)
            .await
            .map_err(|e| OrchestrationError::Database(e.to_string()))?;

        match row {
            Some(r) => match (r.pid, r.status.as_str()) {
                (Some(pid), "running") => {
                    let pid = pid as u32;
                    if process::process_exists(pid) {
                        Ok(AgentStatus::Running {
                            pid,
                            port: r.port as u16,
                        })
                    } else {
                        self.mark_failed(agent_name).await?;
                        Ok(AgentStatus::Failed {
                            reason: "Process died unexpectedly".to_string(),
                            last_attempt: None,
                            retry_count: 0,
                        })
                    }
                },
                (_, "starting") => Ok(AgentStatus::Failed {
                    reason: "Agent is starting".to_string(),
                    last_attempt: None,
                    retry_count: 0,
                }),
                (_, "failed" | "crashed" | "stopped") => {
                    let error_msg = self
                        .get_error_message(agent_name)
                        .await
                        .unwrap_or_else(|_| "Unknown failure".to_string());
                    Ok(AgentStatus::Failed {
                        reason: error_msg,
                        last_attempt: None,
                        retry_count: 0,
                    })
                },
                _ => {
                    self.mark_failed(agent_name).await?;
                    Ok(AgentStatus::Failed {
                        reason: "Invalid database state".to_string(),
                        last_attempt: None,
                        retry_count: 0,
                    })
                },
            },
            None => Ok(AgentStatus::Failed {
                reason: "No service record found".to_string(),
                last_attempt: None,
                retry_count: 0,
            }),
        }
    }

    pub async fn mark_failed(&self, agent_name: &str) -> OrchestrationResult<()> {
        self.repository
            .mark_error(agent_name)
            .await
            .map_err(|e| OrchestrationError::Database(e.to_string()))?;

        self.repository
            .mark_crashed(agent_name)
            .await
            .map_err(|e| OrchestrationError::Database(e.to_string()))
    }

    pub async fn mark_crashed(&self, agent_name: &str) -> OrchestrationResult<()> {
        self.mark_failed(agent_name).await
    }

    pub async fn get_error_message(&self, agent_name: &str) -> OrchestrationResult<String> {
        let row = self
            .repository
            .get_agent_status(agent_name)
            .await
            .map_err(|e| OrchestrationError::Database(e.to_string()))?;

        match row {
            Some(r) => Ok(format!("Status: {}", r.status)),
            None => Ok("No service record".to_string()),
        }
    }

    pub async fn mark_error(&self, agent_name: &str) -> OrchestrationResult<()> {
        self.repository
            .mark_error(agent_name)
            .await
            .map_err(|e| OrchestrationError::Database(e.to_string()))
    }

    pub async fn list_running_agents(&self) -> OrchestrationResult<Vec<String>> {
        let rows = self
            .repository
            .list_running_agents()
            .await
            .map_err(|e| OrchestrationError::Database(e.to_string()))?;

        Ok(rows.into_iter().map(|row| row.name).collect())
    }

    pub async fn list_all_agents(&self) -> OrchestrationResult<Vec<(String, AgentStatus)>> {
        let agent_configs = self.registry.list_agents().await.map_err(|e| {
            OrchestrationError::Database(format!("Failed to list agents from config: {e}"))
        })?;

        let mut agents = Vec::new();

        for agent_config in agent_configs {
            let agent_name = &agent_config.name;

            let status = self.get_status(agent_name).await?;

            agents.push((agent_name.clone(), status));
        }

        Ok(agents)
    }

    pub async fn agent_exists(&self, agent_name: &str) -> OrchestrationResult<bool> {
        self.registry
            .get_agent(agent_name)
            .await
            .map(|_| true)
            .or_else(|_| Ok(false))
    }

    pub async fn get_agent_config(&self, agent_name: &str) -> OrchestrationResult<AgentConfig> {
        let agent_config = self.registry.get_agent(agent_name).await.map_err(|e| {
            OrchestrationError::AgentNotFound(format!(
                "Agent {} not found in config: {}",
                agent_name, e
            ))
        })?;

        Ok(agent_config)
    }

    pub async fn cleanup_orphaned_services(&self) -> OrchestrationResult<u64> {
        let rows = self
            .repository
            .list_running_agent_pids()
            .await
            .map_err(|e| OrchestrationError::Database(e.to_string()))?;

        let mut cleaned = 0u64;

        for row in rows {
            let pid = row.pid as u32;
            if !process::process_exists(pid) {
                self.mark_crashed(&row.name).await?;
                cleaned += 1;
            }
        }

        Ok(cleaned)
    }

    pub async fn remove_agent_service(&self, agent_name: &str) -> OrchestrationResult<()> {
        self.repository
            .remove_agent_service(agent_name)
            .await
            .map_err(|e| OrchestrationError::Database(e.to_string()))
    }

    pub async fn update_health_status(
        &self,
        agent_name: &str,
        health_status: &str,
    ) -> OrchestrationResult<()> {
        self.repository
            .update_health_status(agent_name, health_status)
            .await
            .map_err(|e| OrchestrationError::Database(e.to_string()))
    }

    pub async fn update_agent_running(
        &self,
        agent_name: &str,
        pid: u32,
        port: u16,
    ) -> OrchestrationResult<String> {
        self.repository
            .register_agent(agent_name, pid, port)
            .await
            .map_err(|e| OrchestrationError::Database(e.to_string()))
    }

    pub async fn update_agent_stopped(&self, agent_name: &str) -> OrchestrationResult<()> {
        self.repository
            .mark_stopped(agent_name)
            .await
            .map_err(|e| OrchestrationError::Database(e.to_string()))
    }

    pub async fn register_agent_starting(
        &self,
        agent_name: &str,
        pid: u32,
        port: u16,
    ) -> OrchestrationResult<String> {
        self.repository
            .register_agent_starting(agent_name, pid, port)
            .await
            .map_err(|e| OrchestrationError::Database(e.to_string()))
    }

    pub async fn mark_running(&self, agent_name: &str) -> OrchestrationResult<()> {
        self.repository
            .mark_running(agent_name)
            .await
            .map_err(|e| OrchestrationError::Database(e.to_string()))
    }

    pub async fn get_unresponsive_agents(&self) -> OrchestrationResult<Vec<(String, Option<u32>)>> {
        use crate::services::agent_orchestration::monitor::check_a2a_agent_health;

        let agents = self.list_all_agents().await?;

        let mut unresponsive = Vec::new();
        for (agent_name, status) in agents {
            if let AgentStatus::Running { pid, port, .. } = status {
                let is_healthy = check_a2a_agent_health(port, 10).await.unwrap_or(false);

                if !is_healthy {
                    unresponsive.push((agent_name, Some(pid)));
                }
            }
        }

        Ok(unresponsive)
    }
}