use crate::error::KowalskiError;
use crate::federation::registry::{AgentRecord, AgentRegistry};
#[cfg(feature = "postgres")]
use serde::{Deserialize, Serialize};
#[cfg(feature = "postgres")]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentStateSnapshot {
pub agent_id: String,
pub current_task: Option<String>,
pub active: bool,
pub updated_at: chrono::DateTime<chrono::Utc>,
pub capabilities: Vec<String>,
}
#[cfg(feature = "postgres")]
pub async fn load_registry_into(
registry: &AgentRegistry,
database_url: &str,
) -> Result<(), KowalskiError> {
use sqlx::Row;
use sqlx::postgres::PgPool;
let pool = PgPool::connect(database_url)
.await
.map_err(|e| KowalskiError::Federation(format!("registry load connect: {e}")))?;
let rows =
sqlx::query("SELECT agent_id, capabilities FROM federation_registry ORDER BY agent_id")
.fetch_all(&pool)
.await
.map_err(|e| KowalskiError::Federation(format!("registry load query: {e}")))?;
for row in rows {
let id: String = row
.try_get("agent_id")
.map_err(|e| KowalskiError::Federation(format!("registry load row: {e}")))?;
let caps_val: serde_json::Value = row
.try_get("capabilities")
.map_err(|e| KowalskiError::Federation(format!("registry load row: {e}")))?;
let capabilities: Vec<String> = serde_json::from_value(caps_val).unwrap_or_default();
registry.register(AgentRecord { id, capabilities })?;
}
Ok(())
}
#[cfg(not(feature = "postgres"))]
pub async fn load_registry_into(
_registry: &AgentRegistry,
_database_url: &str,
) -> Result<(), KowalskiError> {
Ok(())
}
#[cfg(feature = "postgres")]
pub async fn upsert_registry_record(
database_url: &str,
record: &AgentRecord,
) -> Result<(), KowalskiError> {
use sqlx::postgres::PgPool;
let pool = PgPool::connect(database_url)
.await
.map_err(|e| KowalskiError::Federation(format!("registry upsert connect: {e}")))?;
let caps = serde_json::to_value(&record.capabilities).map_err(KowalskiError::Json)?;
sqlx::query(
r#"INSERT INTO federation_registry (agent_id, capabilities)
VALUES ($1, $2)
ON CONFLICT (agent_id) DO UPDATE SET capabilities = $2, updated_at = NOW()"#,
)
.bind(&record.id)
.bind(caps)
.execute(&pool)
.await
.map_err(|e| KowalskiError::Federation(format!("registry upsert: {e}")))?;
Ok(())
}
#[cfg(not(feature = "postgres"))]
pub async fn upsert_registry_record(
_database_url: &str,
_record: &AgentRecord,
) -> Result<(), KowalskiError> {
Ok(())
}
#[cfg(feature = "postgres")]
pub async fn upsert_agent_state_for_record(
database_url: &str,
record: &AgentRecord,
) -> Result<(), KowalskiError> {
use sqlx::postgres::PgPool;
let pool = PgPool::connect(database_url)
.await
.map_err(|e| KowalskiError::Federation(format!("agent_state upsert connect: {e}")))?;
let caps = serde_json::to_value(&record.capabilities).map_err(KowalskiError::Json)?;
sqlx::query(
r#"INSERT INTO agent_state (agent_id, capabilities, active, updated_at)
VALUES ($1, $2::jsonb, true, NOW())
ON CONFLICT (agent_id) DO UPDATE SET
capabilities = EXCLUDED.capabilities,
active = true,
updated_at = NOW()"#,
)
.bind(&record.id)
.bind(caps)
.execute(&pool)
.await
.map_err(|e| KowalskiError::Federation(format!("agent_state upsert: {e}")))?;
Ok(())
}
#[cfg(not(feature = "postgres"))]
pub async fn upsert_agent_state_for_record(
_database_url: &str,
_record: &AgentRecord,
) -> Result<(), KowalskiError> {
Ok(())
}
#[cfg(feature = "postgres")]
pub async fn load_agent_states(
database_url: &str,
) -> Result<std::collections::HashMap<String, AgentStateSnapshot>, KowalskiError> {
use sqlx::Row;
use sqlx::postgres::PgPool;
use std::collections::HashMap;
let pool = PgPool::connect(database_url)
.await
.map_err(|e| KowalskiError::Federation(format!("agent_state load connect: {e}")))?;
let rows = sqlx::query(
"SELECT agent_id, current_task, active, updated_at, capabilities FROM agent_state",
)
.fetch_all(&pool)
.await
.map_err(|e| KowalskiError::Federation(format!("agent_state load query: {e}")))?;
let mut out = HashMap::new();
for row in rows {
let agent_id: String = row
.try_get("agent_id")
.map_err(|e| KowalskiError::Federation(format!("agent_state row: {e}")))?;
let current_task: Option<String> = row
.try_get("current_task")
.map_err(|e| KowalskiError::Federation(format!("agent_state row: {e}")))?;
let active: bool = row
.try_get("active")
.map_err(|e| KowalskiError::Federation(format!("agent_state row: {e}")))?;
let updated_at: chrono::DateTime<chrono::Utc> = row
.try_get("updated_at")
.map_err(|e| KowalskiError::Federation(format!("agent_state row: {e}")))?;
let caps_val: serde_json::Value = row
.try_get("capabilities")
.map_err(|e| KowalskiError::Federation(format!("agent_state row: {e}")))?;
let capabilities: Vec<String> = serde_json::from_value(caps_val).unwrap_or_default();
out.insert(
agent_id.clone(),
AgentStateSnapshot {
agent_id,
current_task,
active,
updated_at,
capabilities,
},
);
}
Ok(out)
}
#[cfg(feature = "postgres")]
pub async fn touch_agent_heartbeat(
database_url: &str,
agent_id: &str,
) -> Result<(), KowalskiError> {
use sqlx::postgres::PgPool;
let pool = PgPool::connect(database_url)
.await
.map_err(|e| KowalskiError::Federation(format!("agent_state heartbeat connect: {e}")))?;
sqlx::query(
r#"INSERT INTO agent_state (agent_id, capabilities, active, updated_at)
VALUES ($1, '[]'::jsonb, true, NOW())
ON CONFLICT (agent_id) DO UPDATE SET
updated_at = NOW(),
active = true"#,
)
.bind(agent_id)
.execute(&pool)
.await
.map_err(|e| KowalskiError::Federation(format!("agent_state heartbeat: {e}")))?;
Ok(())
}
#[cfg(not(feature = "postgres"))]
pub async fn touch_agent_heartbeat(
_database_url: &str,
_agent_id: &str,
) -> Result<(), KowalskiError> {
Ok(())
}
#[cfg(feature = "postgres")]
pub async fn delete_federation_agent(
database_url: &str,
agent_id: &str,
) -> Result<(), KowalskiError> {
use sqlx::postgres::PgPool;
let pool = PgPool::connect(database_url)
.await
.map_err(|e| KowalskiError::Federation(format!("federation delete connect: {e}")))?;
sqlx::query("DELETE FROM agent_state WHERE agent_id = $1")
.bind(agent_id)
.execute(&pool)
.await
.map_err(|e| KowalskiError::Federation(format!("agent_state delete: {e}")))?;
sqlx::query("DELETE FROM federation_registry WHERE agent_id = $1")
.bind(agent_id)
.execute(&pool)
.await
.map_err(|e| KowalskiError::Federation(format!("federation_registry delete: {e}")))?;
Ok(())
}
#[cfg(not(feature = "postgres"))]
pub async fn delete_federation_agent(
_database_url: &str,
_agent_id: &str,
) -> Result<(), KowalskiError> {
Ok(())
}
#[cfg(feature = "postgres")]
pub async fn mark_stale_agents_inactive(
database_url: &str,
stale_after_secs: u64,
) -> Result<u64, KowalskiError> {
use sqlx::postgres::PgPool;
let pool = PgPool::connect(database_url)
.await
.map_err(|e| KowalskiError::Federation(format!("stale cleanup connect: {e}")))?;
let cutoff = chrono::Utc::now() - chrono::Duration::seconds(stale_after_secs as i64);
let r = sqlx::query(
"UPDATE agent_state SET active = false WHERE updated_at < $1 AND active = true",
)
.bind(cutoff)
.execute(&pool)
.await
.map_err(|e| KowalskiError::Federation(format!("stale cleanup: {e}")))?;
Ok(r.rows_affected())
}
#[cfg(not(feature = "postgres"))]
pub async fn mark_stale_agents_inactive(
_database_url: &str,
_stale_after_secs: u64,
) -> Result<u64, KowalskiError> {
Ok(0)
}
#[cfg(feature = "postgres")]
pub async fn set_agent_current_task(
database_url: &str,
agent_id: &str,
current_task: &str,
) -> Result<(), KowalskiError> {
use sqlx::postgres::PgPool;
let pool = PgPool::connect(database_url)
.await
.map_err(|e| KowalskiError::Federation(format!("current_task connect: {e}")))?;
sqlx::query(
r#"INSERT INTO agent_state (agent_id, capabilities, current_task, active, updated_at)
VALUES ($1, '[]'::jsonb, $2, true, NOW())
ON CONFLICT (agent_id) DO UPDATE SET
current_task = EXCLUDED.current_task,
active = true,
updated_at = NOW()"#,
)
.bind(agent_id)
.bind(current_task)
.execute(&pool)
.await
.map_err(|e| KowalskiError::Federation(format!("current_task update: {e}")))?;
Ok(())
}
#[cfg(not(feature = "postgres"))]
pub async fn set_agent_current_task(
_database_url: &str,
_agent_id: &str,
_current_task: &str,
) -> Result<(), KowalskiError> {
Ok(())
}