use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use uuid::Uuid;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum SessionStatus {
Active,
Stale,
Closed,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum ThreadState {
Busy,
Idle,
Closed,
}
fn default_thread_state() -> ThreadState {
ThreadState::Busy
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SessionEntry {
pub agent_id: String,
pub identity: String,
pub team: String,
pub thread_id: Option<String>,
pub cwd: String,
pub repo_root: Option<String>,
pub repo_name: Option<String>,
pub branch: Option<String>,
pub started_at: String,
pub last_active: String,
pub status: SessionStatus,
#[serde(default = "default_thread_state")]
pub thread_state: ThreadState,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tag: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub agent_source: Option<String>,
}
#[derive(Debug, Error)]
pub enum RegistryError {
#[error("identity '{identity}' is already bound to active session '{agent_id}'")]
IdentityConflict {
identity: String,
agent_id: String,
},
#[error("max concurrent sessions ({max}) reached")]
MaxSessionsExceeded {
max: usize,
},
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct RegistrySnapshot {
#[serde(default = "default_registry_version")]
pub version: u32,
pub sessions: Vec<SessionEntry>,
}
fn default_registry_version() -> u32 {
1
}
#[derive(Debug)]
pub struct SessionRegistry {
sessions: HashMap<String, SessionEntry>,
identity_map: HashMap<String, String>,
max_concurrent: usize,
}
impl SessionRegistry {
pub fn new(max_concurrent: usize) -> Self {
Self {
sessions: HashMap::new(),
identity_map: HashMap::new(),
max_concurrent,
}
}
pub fn max_concurrent(&self) -> usize {
self.max_concurrent
}
pub fn active_count(&self) -> usize {
self.sessions
.values()
.filter(|e| e.status == SessionStatus::Active)
.count()
}
pub fn register(
&mut self,
identity: String,
team: String,
cwd: String,
repo_root: Option<String>,
repo_name: Option<String>,
branch: Option<String>,
) -> Result<SessionEntry, RegistryError> {
if let Some(existing_id) = self.identity_map.get(&identity) {
return Err(RegistryError::IdentityConflict {
identity,
agent_id: existing_id.clone(),
});
}
if self.active_count() >= self.max_concurrent {
return Err(RegistryError::MaxSessionsExceeded {
max: self.max_concurrent,
});
}
let agent_id = format!("codex:{}", Uuid::new_v4());
let now = now_iso8601();
let entry = SessionEntry {
agent_id: agent_id.clone(),
identity: identity.clone(),
team,
thread_id: None,
cwd,
repo_root,
repo_name,
branch,
started_at: now.clone(),
last_active: now,
status: SessionStatus::Active,
thread_state: ThreadState::Busy,
tag: None,
agent_source: None,
};
self.sessions.insert(agent_id.clone(), entry.clone());
self.identity_map.insert(identity, agent_id);
Ok(entry)
}
pub fn get(&self, agent_id: &str) -> Option<&SessionEntry> {
self.sessions.get(agent_id)
}
pub fn set_thread_id(&mut self, agent_id: &str, thread_id: String) {
if let Some(entry) = self.sessions.get_mut(agent_id) {
entry.thread_id = Some(thread_id);
}
}
pub fn set_agent_source(&mut self, agent_id: &str, source: String) {
if let Some(entry) = self.sessions.get_mut(agent_id) {
entry.agent_source = Some(source);
}
}
pub fn touch(
&mut self,
agent_id: &str,
repo_root: Option<String>,
repo_name: Option<String>,
branch: Option<String>,
) {
if let Some(entry) = self.sessions.get_mut(agent_id) {
entry.last_active = now_iso8601();
entry.repo_root = repo_root;
entry.repo_name = repo_name;
entry.branch = branch;
}
}
pub fn close(&mut self, agent_id: &str) {
if let Some(entry) = self.sessions.get_mut(agent_id) {
entry.status = SessionStatus::Closed;
entry.thread_state = ThreadState::Closed;
self.identity_map.remove(&entry.identity.clone());
}
}
pub fn mark_all_stale(&mut self) {
for entry in self.sessions.values_mut() {
if entry.status == SessionStatus::Active {
entry.status = SessionStatus::Stale;
}
}
self.identity_map.clear();
}
pub fn resume_stale(&mut self, agent_id: &str, new_identity: String) -> Option<&SessionEntry> {
let entry = self.sessions.get_mut(agent_id)?;
if entry.status != SessionStatus::Stale {
return None;
}
self.identity_map.remove(&entry.identity.clone());
entry.identity = new_identity.clone();
entry.status = SessionStatus::Active;
entry.thread_state = ThreadState::Busy;
entry.last_active = now_iso8601();
self.identity_map.insert(new_identity, agent_id.to_string());
self.sessions.get(agent_id)
}
pub fn insert_stale(&mut self, entry: SessionEntry) {
self.sessions.insert(entry.agent_id.clone(), entry);
}
pub fn set_cwd(&mut self, agent_id: &str, cwd: String) {
if let Some(entry) = self.sessions.get_mut(agent_id) {
entry.cwd = cwd;
}
}
pub fn set_thread_state(&mut self, agent_id: &str, state: ThreadState) {
if let Some(entry) = self.sessions.get_mut(agent_id) {
entry.thread_state = state;
}
}
pub fn get_thread_state(&self, agent_id: &str) -> Option<ThreadState> {
self.sessions.get(agent_id).map(|e| e.thread_state.clone())
}
pub fn find_by_identity(&self, identity: &str) -> Option<&str> {
self.identity_map.get(identity).map(String::as_str)
}
pub fn list_all(&self) -> Vec<&SessionEntry> {
self.sessions.values().collect()
}
pub fn to_snapshot(&self) -> RegistrySnapshot {
RegistrySnapshot {
version: 1,
sessions: self.sessions.values().cloned().collect(),
}
}
pub fn load_from_snapshot(snapshot: RegistrySnapshot, max_concurrent: usize) -> Self {
let mut registry = Self::new(max_concurrent);
for mut entry in snapshot.sessions {
if entry.status == SessionStatus::Active {
entry.status = SessionStatus::Stale;
}
registry.sessions.insert(entry.agent_id.clone(), entry);
}
registry
}
}
fn now_iso8601() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
epoch_secs_to_iso8601(secs)
}
fn epoch_secs_to_iso8601(secs: u64) -> String {
let total_days = secs / 86400;
let time_of_day = secs % 86400;
let hour = time_of_day / 3600;
let minute = (time_of_day % 3600) / 60;
let second = time_of_day % 60;
let (year, month, day) = days_to_ymd(total_days);
format!("{year:04}-{month:02}-{day:02}T{hour:02}:{minute:02}:{second:02}Z")
}
fn days_to_ymd(mut days: u64) -> (u64, u64, u64) {
days += 719468; let era = days / 146097;
let doe = days % 146097; let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365; let y = yoe + era * 400;
let doy = doe - (365 * yoe + yoe / 4 - yoe / 100); let mp = (5 * doy + 2) / 153; let d = doy - (153 * mp + 2) / 5 + 1; let m = if mp < 10 { mp + 3 } else { mp - 9 }; let y = if m <= 2 { y + 1 } else { y };
(y, m, d)
}
#[cfg(test)]
mod tests {
use super::*;
fn make_registry(max: usize) -> SessionRegistry {
SessionRegistry::new(max)
}
fn reg_entry(
registry: &mut SessionRegistry,
identity: &str,
) -> Result<SessionEntry, RegistryError> {
registry.register(
identity.to_string(),
"atm-dev".to_string(),
"/tmp".to_string(),
None,
None,
None,
)
}
#[test]
fn register_new_session_succeeds() {
let mut r = make_registry(10);
let entry = reg_entry(&mut r, "arch-ctm").unwrap();
assert_eq!(entry.identity, "arch-ctm");
assert_eq!(entry.team, "atm-dev");
assert!(entry.agent_id.starts_with("codex:"));
assert_eq!(entry.status, SessionStatus::Active);
assert_eq!(r.active_count(), 1);
}
#[test]
fn register_duplicate_identity_fails_with_conflict() {
let mut r = make_registry(10);
reg_entry(&mut r, "arch-ctm").unwrap();
let err = reg_entry(&mut r, "arch-ctm").unwrap_err();
assert!(
matches!(err, RegistryError::IdentityConflict { ref identity, .. } if identity == "arch-ctm")
);
}
#[test]
fn register_after_close_succeeds() {
let mut r = make_registry(10);
let first = reg_entry(&mut r, "arch-ctm").unwrap();
r.close(&first.agent_id);
let second = reg_entry(&mut r, "arch-ctm").unwrap();
assert_ne!(first.agent_id, second.agent_id);
assert_eq!(second.status, SessionStatus::Active);
}
#[test]
fn max_concurrent_enforced() {
let mut r = make_registry(2);
reg_entry(&mut r, "agent-1").unwrap();
reg_entry(&mut r, "agent-2").unwrap();
let err = reg_entry(&mut r, "agent-3").unwrap_err();
assert!(matches!(
err,
RegistryError::MaxSessionsExceeded { max: 2 }
));
}
#[test]
fn mark_all_stale_clears_identity_map() {
let mut r = make_registry(10);
reg_entry(&mut r, "agent-a").unwrap();
reg_entry(&mut r, "agent-b").unwrap();
r.mark_all_stale();
assert_eq!(r.active_count(), 0);
assert!(r.find_by_identity("agent-a").is_none());
assert!(r.find_by_identity("agent-b").is_none());
assert_eq!(r.list_all().len(), 2);
}
#[test]
fn resume_stale_session_makes_it_active() {
let mut r = make_registry(10);
let entry = reg_entry(&mut r, "agent-a").unwrap();
let id = entry.agent_id.clone();
r.mark_all_stale();
let resumed = r.resume_stale(&id, "agent-a-new".to_string());
assert!(resumed.is_some());
let resumed = resumed.unwrap();
assert_eq!(resumed.status, SessionStatus::Active);
assert_eq!(resumed.identity, "agent-a-new");
assert_eq!(r.active_count(), 1);
}
#[test]
fn resume_active_session_returns_none() {
let mut r = make_registry(10);
let entry = reg_entry(&mut r, "agent-a").unwrap();
let result = r.resume_stale(&entry.agent_id, "agent-a".to_string());
assert!(result.is_none());
}
#[test]
fn close_session_removes_from_identity_map() {
let mut r = make_registry(10);
let entry = reg_entry(&mut r, "arch-ctm").unwrap();
r.close(&entry.agent_id);
assert!(r.find_by_identity("arch-ctm").is_none());
assert!(r.get(&entry.agent_id).is_some());
assert_eq!(r.get(&entry.agent_id).unwrap().status, SessionStatus::Closed);
}
#[test]
fn set_thread_id_updates_entry() {
let mut r = make_registry(10);
let entry = reg_entry(&mut r, "arch-ctm").unwrap();
r.set_thread_id(&entry.agent_id, "thread-abc-123".to_string());
let updated = r.get(&entry.agent_id).unwrap();
assert_eq!(updated.thread_id, Some("thread-abc-123".to_string()));
}
#[test]
fn list_all_returns_all_sessions() {
let mut r = make_registry(10);
reg_entry(&mut r, "agent-a").unwrap();
reg_entry(&mut r, "agent-b").unwrap();
let all = r.list_all();
assert_eq!(all.len(), 2);
}
#[test]
fn find_by_identity_returns_agent_id() {
let mut r = make_registry(10);
let entry = reg_entry(&mut r, "arch-ctm").unwrap();
let found = r.find_by_identity("arch-ctm");
assert_eq!(found, Some(entry.agent_id.as_str()));
}
#[test]
fn find_by_identity_missing_returns_none() {
let r = make_registry(10);
assert!(r.find_by_identity("nobody").is_none());
}
#[test]
fn active_count_excludes_stale_and_closed() {
let mut r = make_registry(10);
let a = reg_entry(&mut r, "agent-a").unwrap();
reg_entry(&mut r, "agent-b").unwrap();
let c = reg_entry(&mut r, "agent-c").unwrap();
r.mark_all_stale(); r.resume_stale(&a.agent_id, "agent-a".to_string());
let d = registry_with_stale_register(&mut r, "agent-d");
r.close(&c.agent_id);
let _ = d;
assert_eq!(r.active_count(), 2);
}
fn registry_with_stale_register(r: &mut SessionRegistry, identity: &str) -> SessionEntry {
r.register(
identity.to_string(),
"atm-dev".to_string(),
"/tmp".to_string(),
None,
None,
None,
)
.unwrap()
}
#[test]
fn insert_stale_adds_entry_without_identity_slot() {
let mut r = make_registry(10);
let entry = crate::session::SessionEntry {
agent_id: "codex:persisted-1234".to_string(),
identity: "arch-ctm".to_string(),
team: "atm-dev".to_string(),
thread_id: None,
cwd: "/tmp".to_string(),
repo_root: None,
repo_name: None,
branch: None,
started_at: "2026-01-01T00:00:00Z".to_string(),
last_active: "2026-01-01T00:00:00Z".to_string(),
status: SessionStatus::Stale,
thread_state: ThreadState::Idle,
tag: None,
agent_source: None,
};
r.insert_stale(entry);
let found = r.get("codex:persisted-1234");
assert!(found.is_some());
assert_eq!(found.unwrap().status, SessionStatus::Stale);
assert!(r.find_by_identity("arch-ctm").is_none());
assert_eq!(r.active_count(), 0);
}
#[test]
fn set_cwd_updates_entry() {
let mut r = make_registry(10);
let entry = reg_entry(&mut r, "arch-ctm").unwrap();
r.set_cwd(&entry.agent_id, "/new/cwd".to_string());
let updated = r.get(&entry.agent_id).unwrap();
assert_eq!(updated.cwd, "/new/cwd");
}
#[test]
fn set_cwd_nonexistent_agent_is_noop() {
let mut r = make_registry(10);
r.set_cwd("codex:no-such-agent", "/tmp".to_string());
}
#[test]
fn to_snapshot_empty_registry() {
let r = make_registry(10);
let snap = r.to_snapshot();
assert!(snap.sessions.is_empty());
}
#[test]
fn to_snapshot_includes_all_sessions() {
let mut r = make_registry(10);
reg_entry(&mut r, "agent-a").unwrap();
reg_entry(&mut r, "agent-b").unwrap();
let snap = r.to_snapshot();
assert_eq!(snap.sessions.len(), 2);
let identities: Vec<&str> = snap.sessions.iter().map(|e| e.identity.as_str()).collect();
assert!(identities.contains(&"agent-a"));
assert!(identities.contains(&"agent-b"));
}
#[test]
fn to_snapshot_includes_all_statuses() {
let mut r = make_registry(10);
let a = reg_entry(&mut r, "agent-a").unwrap();
reg_entry(&mut r, "agent-b").unwrap();
r.close(&a.agent_id);
reg_entry(&mut r, "agent-c").unwrap();
r.mark_all_stale();
let snap = r.to_snapshot();
assert_eq!(snap.sessions.len(), 3);
}
#[test]
fn load_from_snapshot_empty_gives_empty_registry() {
let snap = RegistrySnapshot { version: 1, sessions: vec![] };
let r = SessionRegistry::load_from_snapshot(snap, 10);
assert_eq!(r.active_count(), 0);
assert!(r.list_all().is_empty());
}
#[test]
fn load_from_snapshot_marks_active_as_stale() {
let entry = SessionEntry {
agent_id: "codex:snap-test-1".to_string(),
identity: "arch-ctm".to_string(),
team: "atm-dev".to_string(),
thread_id: Some("thread-xyz".to_string()),
cwd: "/tmp".to_string(),
repo_root: None,
repo_name: None,
branch: None,
started_at: "2026-01-01T00:00:00Z".to_string(),
last_active: "2026-01-01T00:00:00Z".to_string(),
status: SessionStatus::Active,
thread_state: ThreadState::Busy,
tag: None,
agent_source: None,
};
let snap = RegistrySnapshot {
version: 1,
sessions: vec![entry],
};
let r = SessionRegistry::load_from_snapshot(snap, 10);
assert_eq!(r.active_count(), 0, "active sessions become stale on load");
let found = r.get("codex:snap-test-1").expect("session must be present");
assert_eq!(found.status, SessionStatus::Stale);
assert!(r.find_by_identity("arch-ctm").is_none());
}
#[test]
fn load_from_snapshot_preserves_stale_and_closed() {
let stale = SessionEntry {
agent_id: "codex:stale-1".to_string(),
identity: "stale-agent".to_string(),
team: "atm-dev".to_string(),
thread_id: None,
cwd: "/tmp".to_string(),
repo_root: None,
repo_name: None,
branch: None,
started_at: "2026-01-01T00:00:00Z".to_string(),
last_active: "2026-01-01T00:00:00Z".to_string(),
status: SessionStatus::Stale,
thread_state: ThreadState::Idle,
tag: None,
agent_source: None,
};
let closed = SessionEntry {
agent_id: "codex:closed-1".to_string(),
identity: "closed-agent".to_string(),
team: "atm-dev".to_string(),
thread_id: None,
cwd: "/tmp".to_string(),
repo_root: None,
repo_name: None,
branch: None,
started_at: "2026-01-01T00:00:00Z".to_string(),
last_active: "2026-01-01T00:00:00Z".to_string(),
status: SessionStatus::Closed,
thread_state: ThreadState::Closed,
tag: None,
agent_source: None,
};
let snap = RegistrySnapshot {
version: 1,
sessions: vec![stale, closed],
};
let r = SessionRegistry::load_from_snapshot(snap, 10);
let s = r.get("codex:stale-1").unwrap();
assert_eq!(s.status, SessionStatus::Stale);
let c = r.get("codex:closed-1").unwrap();
assert_eq!(c.status, SessionStatus::Closed);
}
#[test]
fn round_trip_snapshot_serialize_deserialize() {
let mut r = make_registry(10);
let e = reg_entry(&mut r, "arch-ctm").unwrap();
r.set_thread_id(&e.agent_id, "thread-rt-1".to_string());
reg_entry(&mut r, "dev-agent").unwrap();
let snap = r.to_snapshot();
let json = serde_json::to_string(&snap).unwrap();
let snap2: RegistrySnapshot = serde_json::from_str(&json).unwrap();
let r2 = SessionRegistry::load_from_snapshot(snap2, 10);
assert_eq!(r2.list_all().len(), 2);
assert_eq!(r2.active_count(), 0);
let loaded = r2.get(&e.agent_id).unwrap();
assert_eq!(loaded.status, SessionStatus::Stale);
assert_eq!(loaded.thread_id, Some("thread-rt-1".to_string()));
}
#[test]
fn new_session_has_busy_thread_state() {
let mut r = make_registry(10);
let entry = reg_entry(&mut r, "arch-ctm").unwrap();
assert_eq!(
r.get(&entry.agent_id).unwrap().thread_state,
ThreadState::Busy,
"newly registered sessions must start Busy (FR-17.2)"
);
}
#[test]
fn set_thread_state_updates_entry() {
let mut r = make_registry(10);
let entry = reg_entry(&mut r, "arch-ctm").unwrap();
r.set_thread_state(&entry.agent_id, ThreadState::Idle);
assert_eq!(
r.get_thread_state(&entry.agent_id),
Some(ThreadState::Idle)
);
}
#[test]
fn set_thread_state_nonexistent_is_noop() {
let mut r = make_registry(10);
r.set_thread_state("codex:no-such-agent", ThreadState::Idle);
}
#[test]
fn get_thread_state_unknown_agent_returns_none() {
let r = make_registry(10);
assert!(r.get_thread_state("codex:ghost").is_none());
}
#[test]
fn close_sets_thread_state_to_closed() {
let mut r = make_registry(10);
let entry = reg_entry(&mut r, "arch-ctm").unwrap();
r.set_thread_state(&entry.agent_id, ThreadState::Idle);
r.close(&entry.agent_id);
assert_eq!(
r.get_thread_state(&entry.agent_id),
Some(ThreadState::Closed),
"close() must set thread_state to Closed"
);
}
#[test]
fn resume_stale_sets_thread_state_to_busy() {
let mut r = make_registry(10);
let entry = reg_entry(&mut r, "agent-a").unwrap();
let id = entry.agent_id.clone();
r.set_thread_state(&id, ThreadState::Idle);
r.mark_all_stale();
let resumed = r.resume_stale(&id, "agent-a".to_string()).unwrap();
assert_eq!(
resumed.thread_state,
ThreadState::Busy,
"resume_stale() must set thread_state to Busy"
);
}
#[test]
fn thread_state_serialization_round_trip() {
let busy_json = serde_json::to_string(&ThreadState::Busy).unwrap();
assert_eq!(busy_json, "\"busy\"");
let idle_json = serde_json::to_string(&ThreadState::Idle).unwrap();
assert_eq!(idle_json, "\"idle\"");
let closed_json = serde_json::to_string(&ThreadState::Closed).unwrap();
assert_eq!(closed_json, "\"closed\"");
let state: ThreadState = serde_json::from_str("\"idle\"").unwrap();
assert_eq!(state, ThreadState::Idle);
}
#[test]
fn session_entry_thread_state_defaults_to_busy_when_deserializing_old_format() {
let json = r#"{
"agent_id": "codex:old-session",
"identity": "legacy",
"team": "atm-dev",
"thread_id": null,
"cwd": "/tmp",
"repo_root": null,
"repo_name": null,
"branch": null,
"started_at": "2026-01-01T00:00:00Z",
"last_active": "2026-01-01T00:00:00Z",
"status": "active"
}"#;
let entry: SessionEntry = serde_json::from_str(json).unwrap();
assert_eq!(
entry.thread_state,
ThreadState::Busy,
"missing thread_state field must default to Busy via serde default"
);
}
#[test]
fn epoch_secs_zero_is_unix_epoch() {
let s = epoch_secs_to_iso8601(0);
assert_eq!(s, "1970-01-01T00:00:00Z");
}
#[test]
fn now_iso8601_is_not_empty() {
let s = now_iso8601();
assert!(!s.is_empty());
assert!(s.ends_with('Z'));
assert!(s.contains('T'));
}
}