use std::path::PathBuf;
use std::sync::RwLock;
use anyhow::{bail, Context, Result};
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use crate::agent_config::{AgentConfig, LifecycleState};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentRecord {
pub config: AgentConfig,
pub state: LifecycleState,
pub port: Option<u16>,
pub pid: Option<u32>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
pub struct AgentRegistry {
agents: DashMap<String, AgentRecord>,
system_agent_id: RwLock<Option<String>>,
persist_dir: PathBuf,
}
impl std::fmt::Debug for AgentRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AgentRegistry")
.field("persist_dir", &self.persist_dir)
.field("agent_count", &self.agents.len())
.finish()
}
}
impl AgentRegistry {
pub fn new(persist_dir: PathBuf) -> Self {
Self {
agents: DashMap::new(),
system_agent_id: RwLock::new(None),
persist_dir,
}
}
pub fn create_agent(&self, config: AgentConfig) -> Result<String> {
let id = config.id.clone();
use dashmap::mapref::entry::Entry;
match self.agents.entry(id.clone()) {
Entry::Occupied(_) => {
bail!("agent with id '{}' already exists", id);
}
Entry::Vacant(slot) => {
let now = Utc::now();
let record = AgentRecord {
config,
state: LifecycleState::Created,
port: None,
pid: None,
created_at: now,
updated_at: now,
};
self.persist(&id, &record)?;
slot.insert(record);
}
}
Ok(id)
}
pub fn register_system_agent(&self, mut config: AgentConfig) -> Result<()> {
let mut guard = self
.system_agent_id
.write()
.map_err(|e| anyhow::anyhow!("lock poisoned: {e}"))?;
if guard.is_some() {
bail!("a system agent already exists");
}
config.role.allow = vec!["*".to_string()];
config.role.deny = Vec::new();
let id = config.id.clone();
let now = Utc::now();
let record = AgentRecord {
config,
state: LifecycleState::Created,
port: None,
pid: None,
created_at: now,
updated_at: now,
};
self.persist(&id, &record)?;
self.agents.insert(id.clone(), record);
*guard = Some(id);
Ok(())
}
pub fn transition(&self, id: &str, new_state: LifecycleState) -> Result<()> {
let mut entry = self
.agents
.get_mut(id)
.ok_or_else(|| anyhow::anyhow!("agent '{}' not found", id))?;
if !is_valid_transition(&entry.state, &new_state) {
bail!(
"invalid state transition for '{}': {:?} → {:?}",
id,
entry.state,
new_state
);
}
entry.state = new_state;
entry.updated_at = Utc::now();
self.persist(id, &entry)?;
Ok(())
}
pub fn force_state(&self, id: &str, new_state: LifecycleState) -> Result<()> {
let mut entry = self
.agents
.get_mut(id)
.ok_or_else(|| anyhow::anyhow!("agent '{}' not found", id))?;
entry.state = new_state;
entry.updated_at = Utc::now();
self.persist(id, &entry)?;
Ok(())
}
pub fn update_config(&self, id: &str, config: AgentConfig) -> Result<()> {
let mut entry = self
.agents
.get_mut(id)
.ok_or_else(|| anyhow::anyhow!("agent '{}' not found", id))?;
if config.id != id {
bail!("config id '{}' does not match agent id '{}'", config.id, id);
}
entry.config = config;
entry.updated_at = Utc::now();
self.persist(id, &entry)?;
Ok(())
}
pub fn delete(&self, id: &str) -> Result<AgentRecord> {
{
let entry = self
.agents
.get(id)
.ok_or_else(|| anyhow::anyhow!("agent '{}' not found", id))?;
match &entry.state {
LifecycleState::Stopped | LifecycleState::Error { .. } => {}
other => bail!(
"cannot delete agent '{}' in state {:?} — must be Stopped or Error",
id,
other
),
}
}
let (_, record) = self
.agents
.remove(id)
.ok_or_else(|| anyhow::anyhow!("agent '{}' disappeared during delete", id))?;
self.archive_workspace(id);
let _ = self.remove_persisted(id);
Ok(record)
}
pub fn load_from_disk(&self) -> Result<usize> {
if !self.persist_dir.exists() {
return Ok(0);
}
let mut count = 0usize;
let entries = std::fs::read_dir(&self.persist_dir)
.with_context(|| format!("reading persist dir {:?}", self.persist_dir))?;
for entry in entries {
let entry = entry?;
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) != Some("json") {
continue;
}
let data =
std::fs::read_to_string(&path).with_context(|| format!("reading {:?}", path))?;
let record: AgentRecord =
serde_json::from_str(&data).with_context(|| format!("parsing {:?}", path))?;
let id = record.config.id.clone();
self.agents.insert(id, record);
count += 1;
}
Ok(count)
}
pub fn list(&self) -> Vec<(String, AgentRecord)> {
self.agents
.iter()
.map(|entry| (entry.key().clone(), entry.value().clone()))
.collect()
}
pub fn get(&self, id: &str) -> Option<dashmap::mapref::one::Ref<'_, String, AgentRecord>> {
self.agents.get(id)
}
pub fn is_system_agent(&self, id: &str) -> bool {
self.system_agent_id
.read()
.ok()
.and_then(|guard| guard.as_ref().map(|s| s == id))
.unwrap_or(false)
}
fn persist(&self, id: &str, record: &AgentRecord) -> Result<()> {
std::fs::create_dir_all(&self.persist_dir)
.with_context(|| format!("creating persist dir {:?}", self.persist_dir))?;
let path = self.persist_dir.join(format!("{}.json", id));
let data = serde_json::to_string_pretty(record)?;
std::fs::write(&path, data).with_context(|| format!("writing {:?}", path))?;
Ok(())
}
fn remove_persisted(&self, id: &str) -> Result<()> {
let path = self.persist_dir.join(format!("{}.json", id));
if path.exists() {
std::fs::remove_file(&path).with_context(|| format!("removing {:?}", path))?;
}
Ok(())
}
fn archive_workspace(&self, id: &str) {
let archive_root = self.persist_dir.join("..").join(".archive");
let timestamp = Utc::now().format("%Y%m%dT%H%M%S");
let dest = archive_root.join(format!("{}-{}", id, timestamp));
let agent_dir = self.persist_dir.join("..").join(id);
if agent_dir.exists() {
let _ = std::fs::create_dir_all(&archive_root);
let _ = std::fs::rename(&agent_dir, &dest);
}
}
}
pub fn is_valid_transition(from: &LifecycleState, to: &LifecycleState) -> bool {
matches!(
(from, to),
(LifecycleState::Created, LifecycleState::Starting)
| (LifecycleState::Starting, LifecycleState::Running)
| (LifecycleState::Starting, LifecycleState::Error { .. })
| (LifecycleState::Running, LifecycleState::Stopping)
| (LifecycleState::Running, LifecycleState::Error { .. })
| (LifecycleState::Stopping, LifecycleState::Stopped)
| (LifecycleState::Stopped, LifecycleState::Starting)
| (LifecycleState::Error { .. }, LifecycleState::Starting)
)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::agent_config::{AgentRoleConfig, AgentType};
use tempfile::TempDir;
fn make_config(id: &str) -> AgentConfig {
AgentConfig {
id: id.to_string(),
name: format!("Agent {}", id),
description: "test agent".to_string(),
agent_type: AgentType::Llm,
model: "test/model".to_string(),
api_key_env: "TEST_KEY".to_string(),
instruction: "do stuff".to_string(),
tools: vec![],
action_nodes: vec![],
workflow_edges: vec![],
sub_agents: vec![],
role: AgentRoleConfig {
allow: vec![],
deny: vec![],
},
channel_bindings: vec![],
auto_start: false,
temperature: None,
max_output_tokens: None,
model_override: None,
}
}
#[test]
fn persist_and_load_round_trip() {
let tmp = TempDir::new().unwrap();
let persist_dir = tmp.path().join("registry");
let reg = AgentRegistry::new(persist_dir.clone());
reg.create_agent(make_config("alpha")).unwrap();
reg.create_agent(make_config("beta")).unwrap();
reg.transition("alpha", LifecycleState::Starting).unwrap();
reg.transition("alpha", LifecycleState::Running).unwrap();
let reg2 = AgentRegistry::new(persist_dir);
let loaded = reg2.load_from_disk().unwrap();
assert_eq!(loaded, 2);
let alpha = reg2.get("alpha").unwrap();
assert_eq!(alpha.config.id, "alpha");
assert_eq!(alpha.state, LifecycleState::Running);
let beta = reg2.get("beta").unwrap();
assert_eq!(beta.config.id, "beta");
assert_eq!(beta.state, LifecycleState::Created);
}
}