use std::collections::HashMap;
use std::sync::Mutex;
use chrono::{DateTime, Duration, Utc};
use crate::error::Result;
use crate::store::{ChildEntry, ChildStatus, ProjectKey, SubagentStore};
#[derive(Debug, Clone, PartialEq)]
pub struct Registration {
pub child_id: String,
pub parent_id: String,
pub subagent_type: String,
pub title: String,
pub responsibility: String,
pub endpoint: String,
pub pid: u32,
pub status: ChildStatus,
pub last_heartbeat: DateTime<Utc>,
}
#[derive(Debug, Clone)]
pub struct RegisterChild {
pub child_id: String,
pub parent_id: String,
pub subagent_type: String,
pub title: String,
pub responsibility: String,
pub endpoint: String,
pub pid: u32,
}
pub struct Registry {
store: SubagentStore,
key: ProjectKey,
table: Mutex<HashMap<String, Registration>>,
}
impl Registry {
pub fn new(store: SubagentStore, key: ProjectKey) -> Self {
Self {
store,
key,
table: Mutex::new(HashMap::new()),
}
}
pub async fn register(&self, r: RegisterChild, now: DateTime<Utc>) -> Result<()> {
let reg = Registration {
child_id: r.child_id.clone(),
parent_id: r.parent_id.clone(),
subagent_type: r.subagent_type.clone(),
title: r.title.clone(),
responsibility: r.responsibility.clone(),
endpoint: r.endpoint,
pid: r.pid,
status: ChildStatus::Running,
last_heartbeat: now,
};
self.table
.lock()
.unwrap_or_else(|e| e.into_inner())
.insert(r.child_id.clone(), reg.clone());
self.persist_entry(®, now).await
}
pub async fn heartbeat(
&self,
child_id: &str,
status: Option<ChildStatus>,
now: DateTime<Utc>,
) -> Result<()> {
let reg = {
let mut table = self.table.lock().unwrap_or_else(|e| e.into_inner());
let Some(reg) = table.get_mut(child_id) else {
return Ok(()); };
reg.last_heartbeat = now;
if let Some(s) = status {
reg.status = s;
}
reg.clone()
};
self.persist_entry(®, now).await
}
pub async fn deregister(
&self,
child_id: &str,
final_status: ChildStatus,
now: DateTime<Utc>,
) -> Result<()> {
let reg = self
.table
.lock()
.unwrap_or_else(|e| e.into_inner())
.remove(child_id);
if let Some(mut reg) = reg {
reg.status = final_status;
self.persist_entry(®, now).await?;
}
Ok(())
}
pub fn list(&self) -> Vec<Registration> {
let mut v: Vec<_> = self
.table
.lock()
.unwrap_or_else(|e| e.into_inner())
.values()
.cloned()
.collect();
v.sort_by(|a, b| a.child_id.cmp(&b.child_id));
v
}
pub fn get(&self, child_id: &str) -> Option<Registration> {
self.table
.lock()
.unwrap_or_else(|e| e.into_inner())
.get(child_id)
.cloned()
}
pub fn stale(&self, ttl: Duration, now: DateTime<Utc>) -> Vec<String> {
self.table
.lock()
.unwrap_or_else(|e| e.into_inner())
.values()
.filter(|r| now - r.last_heartbeat > ttl)
.map(|r| r.child_id.clone())
.collect()
}
async fn persist_entry(&self, reg: &Registration, now: DateTime<Utc>) -> Result<()> {
self.store
.upsert_child(
&self.key,
®.parent_id,
ChildEntry {
child_id: reg.child_id.clone(),
subagent_type: reg.subagent_type.clone(),
status: reg.status,
title: reg.title.clone(),
responsibility: reg.responsibility.clone(),
updated_at: now,
},
)
.await
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn setup() -> (TempDir, Registry) {
let d = TempDir::new().unwrap();
let store = SubagentStore::open(d.path());
let reg = Registry::new(store, ProjectKey::from_raw("proj"));
(d, reg)
}
fn reg_input(id: &str) -> RegisterChild {
RegisterChild {
child_id: id.into(),
parent_id: "p1".into(),
subagent_type: "researcher".into(),
title: id.into(),
responsibility: format!("do {id}"),
endpoint: "ws://127.0.0.1:1".into(),
pid: 100,
}
}
#[tokio::test]
async fn register_reflects_in_table_and_index() {
let (_d, reg) = setup();
let now = Utc::now();
reg.register(reg_input("c1"), now).await.unwrap();
let listed = reg.list();
assert_eq!(listed.len(), 1);
assert_eq!(listed[0].status, ChildStatus::Running);
let children = reg.store.list_children(®.key, "p1").await.unwrap();
assert_eq!(children.len(), 1);
assert_eq!(children[0].child_id, "c1");
assert!(reg
.store
.resolve_child(®.key, "c1")
.await
.unwrap()
.is_some());
}
#[tokio::test]
async fn heartbeat_updates_status_and_deregister_finalizes() {
let (_d, reg) = setup();
let now = Utc::now();
reg.register(reg_input("c1"), now).await.unwrap();
reg.heartbeat("c1", Some(ChildStatus::Idle), now)
.await
.unwrap();
assert_eq!(reg.get("c1").unwrap().status, ChildStatus::Idle);
reg.deregister("c1", ChildStatus::Completed, now)
.await
.unwrap();
assert!(reg.get("c1").is_none()); let children = reg.store.list_children(®.key, "p1").await.unwrap();
assert_eq!(children[0].status, ChildStatus::Completed);
}
#[tokio::test]
async fn stale_detects_missed_heartbeats() {
let (_d, reg) = setup();
let t0 = Utc::now();
reg.register(reg_input("c1"), t0).await.unwrap();
assert!(reg.stale(Duration::seconds(10), t0).is_empty());
let stale = reg.stale(Duration::seconds(10), t0 + Duration::seconds(20));
assert_eq!(stale, vec!["c1".to_string()]);
}
}