use anyhow::{Context, Result};
use sqlx::PgPool;
use std::sync::Arc;
use systemprompt_database::DbPool;
use systemprompt_traits::RepositoryError;
#[derive(Debug)]
pub struct AgentServiceRow {
pub name: String,
pub pid: Option<i32>,
pub port: i32,
pub status: String,
}
#[derive(Debug)]
pub struct AgentServerIdRow {
pub name: String,
}
#[derive(Debug)]
pub struct AgentServerIdPidRow {
pub name: String,
pub pid: i32,
}
#[derive(Debug, Clone)]
pub struct AgentServiceRepository {
pool: Arc<PgPool>,
write_pool: Arc<PgPool>,
}
impl AgentServiceRepository {
pub fn new(db: &DbPool) -> Result<Self> {
let pool = db.pool_arc().context("PostgreSQL pool not available")?;
let write_pool = db
.write_pool_arc()
.context("Write PostgreSQL pool not available")?;
Ok(Self { pool, write_pool })
}
pub async fn register_agent(
&self,
name: &str,
pid: u32,
port: u16,
) -> Result<String, RepositoryError> {
self.remove_agent_service(name).await?;
let pool = &self.write_pool;
let pid_i32 = pid as i32;
let port_i32 = i32::from(port);
sqlx::query!(
"INSERT INTO services (name, module_name, pid, port, status, updated_at)
VALUES ($1, 'agent', $2, $3, 'running', CURRENT_TIMESTAMP)
ON CONFLICT (name) DO UPDATE SET pid = $2, port = $3, status = 'running', updated_at \
= CURRENT_TIMESTAMP",
name,
pid_i32,
port_i32
)
.execute(pool.as_ref())
.await
.context("Failed to register agent")
.map_err(RepositoryError::Other)?;
Ok(name.to_string())
}
pub async fn register_agent_starting(
&self,
name: &str,
pid: u32,
port: u16,
) -> Result<String, RepositoryError> {
self.remove_agent_service(name).await?;
let pool = &self.write_pool;
let pid_i32 = pid as i32;
let port_i32 = i32::from(port);
sqlx::query!(
"INSERT INTO services (name, module_name, pid, port, status, updated_at)
VALUES ($1, 'agent', $2, $3, 'starting', CURRENT_TIMESTAMP)
ON CONFLICT (name) DO UPDATE SET pid = $2, port = $3, status = 'starting', updated_at \
= CURRENT_TIMESTAMP",
name,
pid_i32,
port_i32
)
.execute(pool.as_ref())
.await
.context("Failed to register agent as starting")
.map_err(RepositoryError::Other)?;
Ok(name.to_string())
}
pub async fn mark_running(&self, agent_name: &str) -> Result<(), RepositoryError> {
let pool = &self.write_pool;
sqlx::query!(
"UPDATE services SET status = 'running', updated_at = CURRENT_TIMESTAMP WHERE name = \
$1",
agent_name
)
.execute(pool.as_ref())
.await
.context("Failed to mark agent as running")
.map_err(RepositoryError::Other)?;
Ok(())
}
pub async fn get_agent_status(
&self,
agent_name: &str,
) -> Result<Option<AgentServiceRow>, RepositoryError> {
let pool = &self.pool;
let row = sqlx::query!(
"SELECT name, pid, port, status FROM services WHERE name = $1",
agent_name
)
.fetch_optional(pool.as_ref())
.await
.context("Failed to get agent status")
.map_err(RepositoryError::Other)?;
Ok(row.map(|r| AgentServiceRow {
name: r.name,
pid: r.pid,
port: r.port,
status: r.status,
}))
}
pub async fn mark_crashed(&self, agent_name: &str) -> Result<(), RepositoryError> {
let pool = &self.write_pool;
sqlx::query!(
"UPDATE services SET status = 'error', pid = NULL, updated_at = CURRENT_TIMESTAMP \
WHERE name = $1",
agent_name
)
.execute(pool.as_ref())
.await
.context("Failed to mark agent as crashed")
.map_err(RepositoryError::Other)?;
Ok(())
}
pub async fn mark_stopped(&self, agent_name: &str) -> Result<(), RepositoryError> {
let pool = &self.write_pool;
sqlx::query!(
"UPDATE services SET status = 'stopped', pid = NULL, updated_at = CURRENT_TIMESTAMP \
WHERE name = $1",
agent_name
)
.execute(pool.as_ref())
.await
.context("Failed to mark agent as stopped")
.map_err(RepositoryError::Other)?;
Ok(())
}
pub async fn mark_error(&self, agent_name: &str) -> Result<(), RepositoryError> {
let pool = &self.write_pool;
sqlx::query!(
"UPDATE services SET status = 'error', pid = NULL, updated_at = CURRENT_TIMESTAMP \
WHERE name = $1",
agent_name
)
.execute(pool.as_ref())
.await
.context("Failed to mark agent with error")
.map_err(RepositoryError::Other)?;
Ok(())
}
pub async fn list_running_agents(&self) -> Result<Vec<AgentServerIdRow>, RepositoryError> {
let pool = &self.pool;
let rows = sqlx::query!("SELECT name FROM services WHERE status = 'running'")
.fetch_all(pool.as_ref())
.await
.context("Failed to list running agents")
.map_err(RepositoryError::Other)?;
Ok(rows
.into_iter()
.map(|r| AgentServerIdRow { name: r.name })
.collect())
}
pub async fn list_running_agent_pids(
&self,
) -> Result<Vec<AgentServerIdPidRow>, RepositoryError> {
let pool = &self.pool;
let rows = sqlx::query!(
"SELECT name, pid FROM services WHERE status = 'running' AND pid IS NOT NULL"
)
.fetch_all(pool.as_ref())
.await
.context("Failed to list running agent PIDs")
.map_err(RepositoryError::Other)?;
Ok(rows
.into_iter()
.filter_map(|r| r.pid.map(|pid| AgentServerIdPidRow { name: r.name, pid }))
.collect())
}
pub async fn remove_agent_service(&self, agent_name: &str) -> Result<(), RepositoryError> {
let pool = &self.write_pool;
sqlx::query!("DELETE FROM services WHERE name = $1", agent_name)
.execute(pool.as_ref())
.await
.context("Failed to remove agent service")
.map_err(RepositoryError::Other)?;
Ok(())
}
pub async fn update_health_status(
&self,
agent_name: &str,
health_status: &str,
) -> Result<(), RepositoryError> {
let pool = &self.write_pool;
sqlx::query!(
"UPDATE services SET status = $1, updated_at = CURRENT_TIMESTAMP WHERE name = $2",
health_status,
agent_name
)
.execute(pool.as_ref())
.await
.context("Failed to update agent health status")
.map_err(RepositoryError::Other)?;
Ok(())
}
}