use std::collections::HashMap;
use std::path::PathBuf;
use std::time::{Duration, Instant};
use chrono::Utc;
use tokio::sync::{broadcast, mpsc};
use tracing::{debug, info, warn};
use atm_core::{
AgentType, HookEventType, SessionDomain, SessionId, SessionInfrastructure, SessionView,
};
use atm_protocol::RawStatusLine;
use super::commands::{RegistryCommand, RegistryError, RemovalReason, SessionEvent};
pub const MAX_SESSIONS: usize = 100;
#[derive(Debug)]
struct PendingSubagent {
parent_session_id: SessionId,
parent_pid: u32,
parent_start_time: Option<u64>,
agent_type: AgentType,
created_at: Instant,
}
pub struct RegistryActor {
receiver: mpsc::Receiver<RegistryCommand>,
sessions_by_pid: HashMap<u32, (SessionDomain, SessionInfrastructure)>,
session_id_to_pid: HashMap<SessionId, u32>,
event_publisher: broadcast::Sender<SessionEvent>,
pending_subagents: Vec<(String, PendingSubagent)>,
}
impl RegistryActor {
pub fn new(
receiver: mpsc::Receiver<RegistryCommand>,
event_publisher: broadcast::Sender<SessionEvent>,
) -> Self {
Self {
receiver,
sessions_by_pid: HashMap::new(),
session_id_to_pid: HashMap::new(),
event_publisher,
pending_subagents: Vec::new(),
}
}
pub async fn run(mut self) {
info!("Registry actor starting");
while let Some(cmd) = self.receiver.recv().await {
self.handle_command(cmd);
}
info!(
"Registry actor stopped (sessions: {})",
self.sessions_by_pid.len()
);
}
fn handle_command(&mut self, cmd: RegistryCommand) {
match cmd {
RegistryCommand::Register {
session,
respond_to,
} => {
let result = self.handle_register(*session, None);
let _ = respond_to.send(result);
}
RegistryCommand::UpdateFromStatusLine {
session_id,
data,
respond_to,
} => {
let result = self.handle_update_from_status_line(session_id, data);
let _ = respond_to.send(result);
}
RegistryCommand::ApplyHookEvent {
session_id,
event_type,
tool_name,
notification_type,
pid,
tmux_pane,
agent_id,
agent_type,
prompt,
respond_to,
} => {
let result = self.handle_apply_hook_event(
session_id,
event_type,
tool_name,
notification_type,
pid,
tmux_pane,
agent_id,
agent_type,
prompt,
);
let _ = respond_to.send(result);
}
RegistryCommand::GetSession {
session_id,
respond_to,
} => {
let result = self.handle_get_session(&session_id);
let _ = respond_to.send(result);
}
RegistryCommand::GetAllSessions { respond_to } => {
let result = self.handle_get_all_sessions();
let _ = respond_to.send(result);
}
RegistryCommand::Remove {
session_id,
respond_to,
} => {
let result = self.handle_remove(session_id, RemovalReason::Explicit);
let _ = respond_to.send(result);
}
RegistryCommand::CleanupStale => {
self.handle_cleanup_stale();
}
RegistryCommand::RefreshGitInfo => {
self.handle_refresh_git_info();
}
RegistryCommand::RegisterDiscovered {
session_id,
pid,
cwd,
tmux_pane,
respond_to,
} => {
let result = self.handle_register_discovered(session_id, pid, cwd, tmux_pane);
let _ = respond_to.send(result);
}
}
}
fn handle_register(
&mut self,
session: SessionDomain,
pid: Option<u32>,
) -> Result<(), RegistryError> {
if self.sessions_by_pid.len() >= MAX_SESSIONS {
warn!(
session_id = %session.id,
current = self.sessions_by_pid.len(),
max = MAX_SESSIONS,
"Registry is full, rejecting registration"
);
return Err(RegistryError::RegistryFull { max: MAX_SESSIONS });
}
let pid = match pid {
Some(p) if p != 0 => p,
_ => {
if self.session_id_to_pid.contains_key(&session.id) {
debug!(
session_id = %session.id,
"Session already exists (by session_id), rejecting registration"
);
return Err(RegistryError::SessionAlreadyExists(session.id));
}
self.generate_synthetic_pid()
}
};
if self.sessions_by_pid.contains_key(&pid) {
debug!(
session_id = %session.id,
pid = pid,
"Session already exists for PID, rejecting registration"
);
return Err(RegistryError::SessionAlreadyExists(session.id));
}
let mut session = session;
if session.project_root.is_none() {
if let Some(ref cwd) = session.working_directory {
session.project_root = atm_core::resolve_project_root(cwd);
let (wt_path, wt_branch) = atm_core::resolve_worktree_info(cwd);
session.worktree_path = wt_path;
session.worktree_branch = wt_branch;
}
}
let session_id = session.id.clone();
let agent_type = session.agent_type.clone();
let mut infra = SessionInfrastructure::new();
infra.set_pid(pid);
self.sessions_by_pid.insert(pid, (session, infra));
self.session_id_to_pid.insert(session_id.clone(), pid);
info!(
session_id = %session_id,
pid = pid,
agent_type = ?agent_type,
total_sessions = self.sessions_by_pid.len(),
"Session registered"
);
let _ = self.event_publisher.send(SessionEvent::Registered {
session_id,
agent_type,
});
Ok(())
}
fn generate_synthetic_pid(&self) -> u32 {
let base: u32 = 0x8000_0000;
for i in 0..u32::MAX {
let candidate = base.wrapping_add(i);
if !self.sessions_by_pid.contains_key(&candidate) {
return candidate;
}
}
base
}
fn handle_register_discovered(
&mut self,
session_id: SessionId,
pid: u32,
cwd: PathBuf,
tmux_pane: Option<String>,
) -> Result<(), RegistryError> {
if pid == 0 {
warn!(
session_id = %session_id,
"Cannot register discovered session with PID 0"
);
return Ok(());
}
if let Some((existing_session, _)) = self.sessions_by_pid.get_mut(&pid) {
if existing_session.id == session_id {
debug!(
session_id = %session_id,
pid = pid,
"Discovered session already exists, skipping"
);
return Ok(());
}
let old_id = existing_session.id.clone();
let cwd_str = cwd.to_string_lossy().to_string();
existing_session.id = session_id.clone();
existing_session.working_directory = Some(cwd_str.clone());
existing_session.project_root = atm_core::resolve_project_root(&cwd_str);
let (wt_path, wt_branch) = atm_core::resolve_worktree_info(&cwd_str);
existing_session.worktree_path = wt_path;
existing_session.worktree_branch = wt_branch;
if tmux_pane.is_some() {
existing_session.tmux_pane = tmux_pane;
}
info!(
old_id = %old_id,
new_id = %session_id,
pid = pid,
"Re-discovered existing session, refreshed git info (metadata preserved)"
);
let view = SessionView::from_domain(existing_session);
let _ = self.event_publisher.send(SessionEvent::Updated {
session: Box::new(view),
});
self.session_id_to_pid.remove(&old_id);
self.session_id_to_pid.insert(session_id, pid);
return Ok(());
}
if self.sessions_by_pid.len() >= MAX_SESSIONS {
warn!(
session_id = %session_id,
current = self.sessions_by_pid.len(),
max = MAX_SESSIONS,
"Registry is full, cannot register discovered session"
);
return Err(RegistryError::RegistryFull { max: MAX_SESSIONS });
}
use atm_core::{AgentType, Model};
let mut session = SessionDomain::new(
session_id.clone(),
AgentType::GeneralPurpose, Model::Unknown, );
let cwd_str = cwd.to_string_lossy().to_string();
session.project_root = atm_core::resolve_project_root(&cwd_str);
let (wt_path, wt_branch) = atm_core::resolve_worktree_info(&cwd_str);
session.worktree_path = wt_path;
session.worktree_branch = wt_branch;
session.working_directory = Some(cwd_str);
session.tmux_pane = tmux_pane;
let agent_type = session.agent_type.clone();
let mut infra = SessionInfrastructure::new();
infra.set_pid(pid);
self.sessions_by_pid.insert(pid, (session, infra));
self.session_id_to_pid.insert(session_id.clone(), pid);
info!(
session_id = %session_id,
pid = pid,
total_sessions = self.sessions_by_pid.len(),
"Discovered session registered"
);
let _ = self.event_publisher.send(SessionEvent::Registered {
session_id: session_id.clone(),
agent_type,
});
if let Some((session, _)) = self.sessions_by_pid.get(&pid) {
let view = SessionView::from_domain(session);
let _ = self.event_publisher.send(SessionEvent::Updated {
session: Box::new(view),
});
}
self.try_correlate_subagent(&session_id, pid);
Ok(())
}
fn handle_update_from_status_line(
&mut self,
session_id: SessionId,
data: serde_json::Value,
) -> Result<(), RegistryError> {
let raw_status: RawStatusLine =
serde_json::from_value(data).map_err(RegistryError::parse)?;
let status_pid = raw_status.pid;
if let Some(pid) = status_pid {
if pid != 0 {
return self.update_or_create_by_pid(pid, session_id, raw_status);
}
}
if let Some(&pid) = self.session_id_to_pid.get(&session_id) {
if let Some((session, infra)) = self.sessions_by_pid.get_mut(&pid) {
let cwd_changed = raw_status.update_session(session);
infra.record_update();
if session.project_root.is_none() || cwd_changed {
if let Some(ref cwd) = session.working_directory {
if cwd_changed {
info!(
session_id = %session_id,
pid = pid,
new_cwd = %cwd,
"Working directory changed, re-resolving git info"
);
}
session.project_root = atm_core::resolve_project_root(cwd);
let (wt_path, wt_branch) = atm_core::resolve_worktree_info(cwd);
session.worktree_path = wt_path;
session.worktree_branch = wt_branch;
}
}
debug!(
session_id = %session_id,
pid = pid,
cost = %session.cost,
"Session updated from status line (by session_id)"
);
let view = SessionView::from_domain(session);
let _ = self.event_publisher.send(SessionEvent::Updated {
session: Box::new(view),
});
}
return Ok(());
}
debug!(
session_id = %session_id,
"Status line without PID for unknown session, ignoring"
);
Ok(())
}
fn update_or_create_by_pid(
&mut self,
pid: u32,
session_id: SessionId,
raw_status: RawStatusLine,
) -> Result<(), RegistryError> {
if let Some((session, infra)) = self.sessions_by_pid.get_mut(&pid) {
let old_session_id = session.id.clone();
let cwd_changed = raw_status.update_session(session);
infra.record_update();
if session.project_root.is_none() || cwd_changed {
if let Some(ref cwd) = session.working_directory {
if cwd_changed {
info!(
session_id = %session.id,
pid = pid,
new_cwd = %cwd,
"Working directory changed, re-resolving git info"
);
}
session.project_root = atm_core::resolve_project_root(cwd);
let (wt_path, wt_branch) = atm_core::resolve_worktree_info(cwd);
session.worktree_path = wt_path;
session.worktree_branch = wt_branch;
}
}
if old_session_id != session_id {
session.id = session_id.clone();
self.session_id_to_pid.remove(&old_session_id);
self.session_id_to_pid.insert(session_id.clone(), pid);
info!(
old_id = %old_session_id,
new_id = %session_id,
pid = pid,
"Session ID upgraded"
);
let _ = self.event_publisher.send(SessionEvent::Removed {
session_id: old_session_id,
reason: RemovalReason::Upgraded,
});
let _ = self.event_publisher.send(SessionEvent::Registered {
session_id: session_id.clone(),
agent_type: session.agent_type.clone(),
});
}
debug!(
session_id = %session_id,
pid = pid,
cost = %session.cost,
"Session updated from status line"
);
let view = SessionView::from_domain(session);
let _ = self.event_publisher.send(SessionEvent::Updated {
session: Box::new(view),
});
} else {
let mut session = match raw_status.to_session_domain() {
Some(s) => s,
None => {
debug!(
session_id = %session_id,
pid = pid,
"Status line missing required fields for session creation"
);
return Ok(());
}
};
if let Some(ref cwd) = session.working_directory {
session.project_root = atm_core::resolve_project_root(cwd);
let (wt_path, wt_branch) = atm_core::resolve_worktree_info(cwd);
session.worktree_path = wt_path;
session.worktree_branch = wt_branch;
}
if self.sessions_by_pid.len() >= MAX_SESSIONS {
warn!(
session_id = %session_id,
"Registry full, cannot auto-register session"
);
return Err(RegistryError::RegistryFull { max: MAX_SESSIONS });
}
let agent_type = session.agent_type.clone();
let mut infra = SessionInfrastructure::new();
infra.set_pid(pid);
self.sessions_by_pid.insert(pid, (session, infra));
self.session_id_to_pid.insert(session_id.clone(), pid);
info!(
session_id = %session_id,
pid = pid,
"Session auto-registered from status line"
);
let _ = self.event_publisher.send(SessionEvent::Registered {
session_id: session_id.clone(),
agent_type,
});
if let Some((session, _)) = self.sessions_by_pid.get(&pid) {
let view = SessionView::from_domain(session);
let _ = self.event_publisher.send(SessionEvent::Updated {
session: Box::new(view),
});
}
}
Ok(())
}
fn handle_apply_hook_event(
&mut self,
session_id: SessionId,
event_type: HookEventType,
tool_name: Option<String>,
notification_type: Option<String>,
pid: Option<u32>,
tmux_pane: Option<String>,
agent_id: Option<String>,
agent_type: Option<String>,
prompt: Option<String>,
) -> Result<(), RegistryError> {
if event_type == HookEventType::SubagentStart {
if let Some(ref aid) = agent_id {
let resolved_parent_pid = pid
.or_else(|| self.session_id_to_pid.get(&session_id).copied())
.unwrap_or(0);
let parent_sid = if resolved_parent_pid != 0 {
self.sessions_by_pid
.get(&resolved_parent_pid)
.map(|(s, _)| s.id.clone())
.unwrap_or_else(|| session_id.clone())
} else {
session_id.clone()
};
let parent_start_time = if resolved_parent_pid != 0 {
crate::tmux::get_process_start_time(resolved_parent_pid)
} else {
None
};
let child_agent_type = agent_type
.as_deref()
.map(AgentType::from_subagent_type)
.unwrap_or_default();
self.pending_subagents.push((
aid.clone(),
PendingSubagent {
parent_session_id: parent_sid,
parent_pid: resolved_parent_pid,
parent_start_time,
agent_type: child_agent_type,
created_at: Instant::now(),
},
));
}
}
if event_type == HookEventType::SubagentStop {
if let Some(ref aid) = agent_id {
self.pending_subagents.retain(|(id, _)| id != aid);
}
}
if event_type == HookEventType::SessionEnd {
let target_pid = pid.or_else(|| self.session_id_to_pid.get(&session_id).copied());
if let Some(p) = target_pid {
if self.sessions_by_pid.contains_key(&p) {
info!(
session_id = %session_id,
pid = p,
"SessionEnd hook received, removing session"
);
return self.handle_remove_by_pid(p, RemovalReason::SessionEnded);
}
}
debug!(
session_id = %session_id,
"SessionEnd for non-existent session (already cleaned up or never created)"
);
return Ok(());
}
let target_pid = pid.or_else(|| self.session_id_to_pid.get(&session_id).copied());
let (session, infra) = match target_pid.and_then(|p| self.sessions_by_pid.get_mut(&p)) {
Some(entry) => entry,
None => {
if let Some(p) = pid {
if p != 0 {
debug!(
session_id = %session_id,
pid = p,
event_type = ?event_type,
"Creating session from hook event"
);
use atm_core::{AgentType, Model};
let mut session = SessionDomain::new(
session_id.clone(),
AgentType::GeneralPurpose,
Model::Unknown,
);
session.tmux_pane = tmux_pane.clone();
let mut infra = SessionInfrastructure::new();
infra.set_pid(p);
self.sessions_by_pid.insert(p, (session, infra));
self.session_id_to_pid.insert(session_id.clone(), p);
if let Some((session, infra)) = self.sessions_by_pid.get_mut(&p) {
if event_type == HookEventType::Notification {
session.apply_notification(notification_type.as_deref());
} else {
session.apply_hook_event(event_type, tool_name.as_deref());
}
if event_type == HookEventType::UserPromptSubmit {
if let Some(ref pr) = prompt {
session.set_first_prompt(pr);
}
}
if let Some(ref name) = tool_name {
infra.record_tool_use(name, None);
}
let view = SessionView::from_domain(session);
let _ = self.event_publisher.send(SessionEvent::Registered {
session_id: session_id.clone(),
agent_type: session.agent_type.clone(),
});
let _ = self.event_publisher.send(SessionEvent::Updated {
session: Box::new(view),
});
}
self.try_correlate_subagent(&session_id, p);
return Ok(());
}
}
debug!(
session_id = %session_id,
event_type = ?event_type,
"Hook event for non-existent session without PID, ignoring"
);
return Ok(());
}
};
if event_type == HookEventType::Notification {
session.apply_notification(notification_type.as_deref());
} else {
session.apply_hook_event(event_type, tool_name.as_deref());
}
if event_type == HookEventType::UserPromptSubmit {
if let Some(ref p) = prompt {
session.set_first_prompt(p);
}
}
if tmux_pane.is_some() && session.tmux_pane.is_none() {
session.tmux_pane = tmux_pane;
}
debug!(
session_id = %session.id,
event_type = ?event_type,
tool_name = ?tool_name,
new_status = %session.status,
"Hook event applied"
);
if let Some(ref name) = tool_name {
infra.record_tool_use(name, None);
}
let view = SessionView::from_domain(session);
let _ = self.event_publisher.send(SessionEvent::Updated {
session: Box::new(view),
});
Ok(())
}
fn handle_get_session(&self, session_id: &SessionId) -> Option<SessionView> {
self.session_id_to_pid
.get(session_id)
.and_then(|pid| self.sessions_by_pid.get(pid))
.map(|(session, _)| SessionView::from_domain(session))
}
fn handle_get_all_sessions(&self) -> Vec<SessionView> {
self.sessions_by_pid
.values()
.map(|(session, _)| SessionView::from_domain(session))
.collect()
}
fn handle_remove(
&mut self,
session_id: SessionId,
reason: RemovalReason,
) -> Result<(), RegistryError> {
let pid = match self.session_id_to_pid.remove(&session_id) {
Some(p) => p,
None => return Err(RegistryError::SessionNotFound(session_id)),
};
self.sessions_by_pid.remove(&pid);
info!(
session_id = %session_id,
pid = pid,
reason = %reason,
remaining_sessions = self.sessions_by_pid.len(),
"Session removed"
);
let _ = self
.event_publisher
.send(SessionEvent::Removed { session_id, reason });
Ok(())
}
fn handle_remove_by_pid(
&mut self,
pid: u32,
reason: RemovalReason,
) -> Result<(), RegistryError> {
let (session, _) = match self.sessions_by_pid.remove(&pid) {
Some(entry) => entry,
None => {
return Err(RegistryError::SessionNotFound(SessionId::new(format!(
"pid-{pid}"
))));
}
};
let session_id = session.id.clone();
self.session_id_to_pid.remove(&session_id);
info!(
session_id = %session_id,
pid = pid,
reason = %reason,
remaining_sessions = self.sessions_by_pid.len(),
"Session removed"
);
let _ = self
.event_publisher
.send(SessionEvent::Removed { session_id, reason });
Ok(())
}
fn try_correlate_subagent(&mut self, session_id: &SessionId, pid: u32) {
let matched_index = self.pending_subagents.iter().position(|(_, pending)| {
if pending.created_at.elapsed() >= Duration::from_secs(30) || pending.parent_pid == 0 {
return false;
}
let start_time_matches = match pending.parent_start_time {
Some(expected) => {
crate::tmux::get_process_start_time(pending.parent_pid) == Some(expected)
}
None => true,
};
start_time_matches && is_descendant_of(pid, pending.parent_pid)
});
if let Some(index) = matched_index {
let (agent_id, pending) = self.pending_subagents.remove(index);
info!(
child_session_id = %session_id,
parent_session_id = %pending.parent_session_id,
agent_id = %agent_id,
agent_type = %pending.agent_type,
"Correlated subagent with discovered session"
);
if let Some((parent_session, _)) = self.sessions_by_pid.get_mut(&pending.parent_pid) {
parent_session.child_session_ids.push(session_id.clone());
}
if let Some((child_session, _)) = self.sessions_by_pid.get_mut(&pid) {
child_session.parent_session_id = Some(pending.parent_session_id);
child_session.agent_type = pending.agent_type;
}
}
}
fn handle_cleanup_stale(&mut self) {
self.pending_subagents
.retain(|(_, p)| p.created_at.elapsed() < Duration::from_secs(30));
let now = Utc::now();
let to_remove: Vec<(u32, SessionId)> = self
.sessions_by_pid
.iter()
.filter_map(|(pid, (session, infra))| {
if !infra.is_process_alive() {
Some((*pid, session.id.clone()))
} else {
None
}
})
.collect();
if to_remove.is_empty() {
debug!("No dead-process sessions to clean up");
return;
}
info!(count = to_remove.len(), "Cleaning up dead-process sessions");
for (pid, session_id) in to_remove {
let log_details = self
.sessions_by_pid
.get(&pid)
.map(|(s, _)| {
let secs = now.signed_duration_since(s.last_activity).num_seconds();
format!("last_activity={secs}s ago, pid={pid}")
})
.unwrap_or_default();
self.sessions_by_pid.remove(&pid);
self.session_id_to_pid.remove(&session_id);
warn!(
session_id = %session_id,
reason = %RemovalReason::ProcessDied,
details = %log_details,
"Session removed by cleanup"
);
let _ = self.event_publisher.send(SessionEvent::Removed {
session_id,
reason: RemovalReason::ProcessDied,
});
}
}
fn handle_refresh_git_info(&mut self) {
let mut updated_count = 0;
for (pid, (session, _)) in self.sessions_by_pid.iter_mut() {
let cwd = match &session.working_directory {
Some(cwd) => cwd.clone(),
None => continue,
};
let new_project_root = atm_core::resolve_project_root(&cwd);
let (new_wt_path, new_wt_branch) = atm_core::resolve_worktree_info(&cwd);
let changed = session.project_root != new_project_root
|| session.worktree_path != new_wt_path
|| session.worktree_branch != new_wt_branch;
if changed {
info!(
session_id = %session.id,
pid = pid,
old_branch = ?session.worktree_branch,
new_branch = ?new_wt_branch,
"Git info changed, updating session"
);
session.project_root = new_project_root;
session.worktree_path = new_wt_path;
session.worktree_branch = new_wt_branch;
updated_count += 1;
let view = SessionView::from_domain(session);
let _ = self.event_publisher.send(SessionEvent::Updated {
session: Box::new(view),
});
}
}
if updated_count > 0 {
info!(updated_count, "Git info refresh completed with changes");
}
}
#[cfg(test)]
pub fn session_count(&self) -> usize {
self.sessions_by_pid.len()
}
#[cfg(test)]
pub fn pending_subagent_count(&self) -> usize {
self.pending_subagents.len()
}
}
fn is_descendant_of(pid: u32, ancestor_pid: u32) -> bool {
let mut current = pid;
for _ in 0..20 {
if current == ancestor_pid {
return true;
}
if current <= 1 {
return false;
}
match crate::tmux::get_parent_pid(current) {
Some(ppid) => current = ppid,
None => return false,
}
}
false
}
#[cfg(test)]
mod tests {
use super::*;
use atm_core::{AgentType, Model};
use tokio::sync::oneshot;
fn create_test_session(id: &str) -> SessionDomain {
SessionDomain::new(
SessionId::new(id),
AgentType::GeneralPurpose,
Model::Sonnet4,
)
}
fn create_actor() -> (
mpsc::Sender<RegistryCommand>,
RegistryActor,
broadcast::Receiver<SessionEvent>,
) {
let (cmd_tx, cmd_rx) = mpsc::channel(16);
let (event_tx, event_rx) = broadcast::channel(16);
let actor = RegistryActor::new(cmd_rx, event_tx);
(cmd_tx, actor, event_rx)
}
#[tokio::test]
async fn test_register_session() {
let (cmd_tx, mut actor, mut event_rx) = create_actor();
let session = create_test_session("test-123");
let (respond_tx, respond_rx) = oneshot::channel();
cmd_tx
.send(RegistryCommand::Register {
session: Box::new(session),
respond_to: respond_tx,
})
.await
.unwrap();
if let Some(cmd) = actor.receiver.recv().await {
actor.handle_command(cmd);
}
let result = respond_rx.await.unwrap();
assert!(result.is_ok());
assert_eq!(actor.session_count(), 1);
let event = event_rx.try_recv().unwrap();
assert!(matches!(event, SessionEvent::Registered { .. }));
}
#[tokio::test]
async fn test_register_duplicate_fails() {
let (_, mut actor, _) = create_actor();
let session1 = create_test_session("test-123");
let session2 = create_test_session("test-123");
let (tx1, _) = oneshot::channel();
let cmd1 = RegistryCommand::Register {
session: Box::new(session1),
respond_to: tx1,
};
actor.handle_command(cmd1);
let (tx2, rx2) = oneshot::channel();
let cmd2 = RegistryCommand::Register {
session: Box::new(session2),
respond_to: tx2,
};
actor.handle_command(cmd2);
let result = rx2.await.unwrap();
assert!(matches!(
result,
Err(RegistryError::SessionAlreadyExists(_))
));
assert_eq!(actor.session_count(), 1);
}
#[tokio::test]
async fn test_get_session() {
let (_, mut actor, _) = create_actor();
let session = create_test_session("test-123");
let (tx, _) = oneshot::channel();
actor.handle_command(RegistryCommand::Register {
session: Box::new(session),
respond_to: tx,
});
let (tx, rx) = oneshot::channel();
actor.handle_command(RegistryCommand::GetSession {
session_id: SessionId::new("test-123"),
respond_to: tx,
});
let result = rx.await.unwrap();
assert!(result.is_some());
assert_eq!(result.unwrap().id.as_str(), "test-123");
}
#[tokio::test]
async fn test_get_nonexistent_session() {
let (_, mut actor, _) = create_actor();
let (tx, rx) = oneshot::channel();
actor.handle_command(RegistryCommand::GetSession {
session_id: SessionId::new("nonexistent"),
respond_to: tx,
});
let result = rx.await.unwrap();
assert!(result.is_none());
}
#[tokio::test]
async fn test_get_all_sessions() {
let (_, mut actor, _) = create_actor();
for i in 0..3 {
let session = create_test_session(&format!("test-{i}"));
let (tx, _) = oneshot::channel();
actor.handle_command(RegistryCommand::Register {
session: Box::new(session),
respond_to: tx,
});
}
let (tx, rx) = oneshot::channel();
actor.handle_command(RegistryCommand::GetAllSessions { respond_to: tx });
let result = rx.await.unwrap();
assert_eq!(result.len(), 3);
}
#[tokio::test]
async fn test_remove_session() {
let (_, mut actor, mut event_rx) = create_actor();
let session = create_test_session("test-123");
let (tx, _) = oneshot::channel();
actor.handle_command(RegistryCommand::Register {
session: Box::new(session),
respond_to: tx,
});
let _ = event_rx.try_recv();
let (tx, rx) = oneshot::channel();
actor.handle_command(RegistryCommand::Remove {
session_id: SessionId::new("test-123"),
respond_to: tx,
});
let result = rx.await.unwrap();
assert!(result.is_ok());
assert_eq!(actor.session_count(), 0);
let event = event_rx.try_recv().unwrap();
assert!(matches!(
event,
SessionEvent::Removed {
reason: RemovalReason::Explicit,
..
}
));
}
#[tokio::test]
async fn test_remove_nonexistent_fails() {
let (_, mut actor, _) = create_actor();
let (tx, rx) = oneshot::channel();
actor.handle_command(RegistryCommand::Remove {
session_id: SessionId::new("nonexistent"),
respond_to: tx,
});
let result = rx.await.unwrap();
assert!(matches!(result, Err(RegistryError::SessionNotFound(_))));
}
#[tokio::test]
async fn test_apply_hook_event() {
let (_, mut actor, _) = create_actor();
let session = create_test_session("test-123");
let (tx, _) = oneshot::channel();
actor.handle_command(RegistryCommand::Register {
session: Box::new(session),
respond_to: tx,
});
let (tx, rx) = oneshot::channel();
actor.handle_command(RegistryCommand::ApplyHookEvent {
session_id: SessionId::new("test-123"),
event_type: HookEventType::PreToolUse,
tool_name: Some("Bash".to_string()),
notification_type: None,
pid: None,
tmux_pane: None,
agent_id: None,
agent_type: None,
prompt: None,
respond_to: tx,
});
let result = rx.await.unwrap();
assert!(result.is_ok());
let (tx, rx) = oneshot::channel();
actor.handle_command(RegistryCommand::GetSession {
session_id: SessionId::new("test-123"),
respond_to: tx,
});
let view = rx.await.unwrap().unwrap();
assert_eq!(view.status_label, "working");
assert_eq!(view.activity_detail, Some("Bash".to_string()));
}
#[tokio::test]
async fn test_apply_hook_event_session_end() {
let (_, mut actor, mut event_rx) = create_actor();
let session = create_test_session("test-session-end");
let (tx, _) = oneshot::channel();
actor.handle_command(RegistryCommand::Register {
session: Box::new(session),
respond_to: tx,
});
let _ = event_rx.try_recv();
assert_eq!(actor.session_count(), 1);
let (tx, rx) = oneshot::channel();
actor.handle_command(RegistryCommand::ApplyHookEvent {
session_id: SessionId::new("test-session-end"),
event_type: HookEventType::SessionEnd,
tool_name: None,
notification_type: None,
pid: None,
tmux_pane: None,
agent_id: None,
agent_type: None,
prompt: None,
respond_to: tx,
});
let result = rx.await.unwrap();
assert!(result.is_ok());
assert_eq!(actor.session_count(), 0);
let event = event_rx.try_recv().unwrap();
assert!(matches!(
event,
SessionEvent::Removed {
reason: RemovalReason::SessionEnded,
..
}
));
}
#[tokio::test]
async fn test_apply_hook_event_session_end_nonexistent() {
let (_, mut actor, _) = create_actor();
let (tx, rx) = oneshot::channel();
actor.handle_command(RegistryCommand::ApplyHookEvent {
session_id: SessionId::new("nonexistent"),
event_type: HookEventType::SessionEnd,
tool_name: None,
notification_type: None,
pid: None,
tmux_pane: None,
agent_id: None,
agent_type: None,
prompt: None,
respond_to: tx,
});
let result = rx.await.unwrap();
assert!(result.is_ok());
}
#[tokio::test]
async fn test_max_sessions_limit() {
let (_, mut actor, _) = create_actor();
for i in 0..MAX_SESSIONS {
let session = create_test_session(&format!("test-{i}"));
let (tx, _) = oneshot::channel();
actor.handle_command(RegistryCommand::Register {
session: Box::new(session),
respond_to: tx,
});
}
assert_eq!(actor.session_count(), MAX_SESSIONS);
let session = create_test_session("one-too-many");
let (tx, rx) = oneshot::channel();
actor.handle_command(RegistryCommand::Register {
session: Box::new(session),
respond_to: tx,
});
let result = rx.await.unwrap();
assert!(matches!(
result,
Err(RegistryError::RegistryFull { max: MAX_SESSIONS })
));
assert_eq!(actor.session_count(), MAX_SESSIONS);
}
#[tokio::test]
async fn test_update_from_status_line_existing_session() {
let (_, mut actor, _) = create_actor();
let session = create_test_session("test-123");
let (tx, _) = oneshot::channel();
actor.handle_command(RegistryCommand::Register {
session: Box::new(session),
respond_to: tx,
});
let status_json = serde_json::json!({
"session_id": "test-123",
"model": {"id": "claude-sonnet-4-20250514"},
"cost": {"total_cost_usd": 0.25, "total_duration_ms": 15000},
"context_window": {"total_input_tokens": 5000, "context_window_size": 200000}
});
let (tx, rx) = oneshot::channel();
actor.handle_command(RegistryCommand::UpdateFromStatusLine {
session_id: SessionId::new("test-123"),
data: status_json,
respond_to: tx,
});
let result = rx.await.unwrap();
assert!(result.is_ok());
let (tx, rx) = oneshot::channel();
actor.handle_command(RegistryCommand::GetSession {
session_id: SessionId::new("test-123"),
respond_to: tx,
});
let view = rx.await.unwrap().unwrap();
assert!(view.cost_display.contains("0.25") || view.cost_usd > 0.24);
}
#[tokio::test]
async fn test_update_from_status_line_auto_register() {
let (_, mut actor, mut event_rx) = create_actor();
let current_pid = std::process::id();
let status_json = serde_json::json!({
"session_id": "new-session",
"pid": current_pid,
"model": {"id": "claude-sonnet-4-20250514"},
"cost": {"total_cost_usd": 0.10, "total_duration_ms": 5000},
"context_window": {"total_input_tokens": 1000, "context_window_size": 200000}
});
let (tx, rx) = oneshot::channel();
actor.handle_command(RegistryCommand::UpdateFromStatusLine {
session_id: SessionId::new("new-session"),
data: status_json,
respond_to: tx,
});
let result = rx.await.unwrap();
assert!(result.is_ok());
assert_eq!(actor.session_count(), 1);
let event = event_rx.try_recv().unwrap();
assert!(matches!(event, SessionEvent::Registered { .. }));
}
#[tokio::test]
async fn test_cleanup_stale_no_stale_sessions() {
let (_, mut actor, _) = create_actor();
let session = create_test_session("test-123");
let (tx, _) = oneshot::channel();
actor.handle_command(RegistryCommand::Register {
session: Box::new(session),
respond_to: tx,
});
actor.handle_command(RegistryCommand::CleanupStale);
assert_eq!(actor.session_count(), 1);
}
#[tokio::test]
async fn test_pending_session_upgrade_on_status_line() {
let (_, mut actor, mut event_rx) = create_actor();
let current_pid = std::process::id();
let pending_id = SessionId::pending_from_pid(current_pid);
let (tx, rx) = oneshot::channel();
let cmd = RegistryCommand::RegisterDiscovered {
session_id: pending_id.clone(),
pid: current_pid,
cwd: std::path::PathBuf::from("/home/user/project"),
tmux_pane: None,
respond_to: tx,
};
actor.handle_command(cmd);
let result = rx.await.unwrap();
assert!(result.is_ok());
assert_eq!(actor.session_count(), 1);
let _ = event_rx.try_recv();
let _ = event_rx.try_recv();
let status_json = serde_json::json!({
"session_id": "real-session-uuid",
"pid": current_pid,
"model": {"id": "claude-sonnet-4-20250514"},
"cost": {"total_cost_usd": 0.10, "total_duration_ms": 5000},
"context_window": {"total_input_tokens": 1000, "context_window_size": 200000}
});
let (tx, rx) = oneshot::channel();
let cmd = RegistryCommand::UpdateFromStatusLine {
session_id: SessionId::new("real-session-uuid"),
data: status_json,
respond_to: tx,
};
actor.handle_command(cmd);
let result = rx.await.unwrap();
assert!(result.is_ok());
assert_eq!(actor.session_count(), 1);
let (tx, rx) = oneshot::channel();
let cmd = RegistryCommand::GetSession {
session_id: SessionId::new("real-session-uuid"),
respond_to: tx,
};
actor.handle_command(cmd);
let session = rx.await.unwrap();
assert!(session.is_some());
assert_eq!(session.unwrap().id.as_str(), "real-session-uuid");
let (tx, rx) = oneshot::channel();
let cmd = RegistryCommand::GetSession {
session_id: pending_id,
respond_to: tx,
};
actor.handle_command(cmd);
let pending_session = rx.await.unwrap();
assert!(pending_session.is_none());
let mut found_removed = false;
let mut found_registered = false;
while let Ok(event) = event_rx.try_recv() {
match event {
SessionEvent::Removed {
reason: RemovalReason::Upgraded,
..
} => {
found_removed = true;
}
SessionEvent::Registered { session_id, .. }
if session_id.as_str() == "real-session-uuid" =>
{
found_registered = true;
}
_ => {}
}
}
assert!(
found_removed,
"Should have received Removed event with Upgraded reason"
);
assert!(
found_registered,
"Should have received Registered event for real session"
);
}
#[tokio::test]
async fn test_subagent_start_records_pending() {
let (_, mut actor, _) = create_actor();
let session = create_test_session("parent-session");
let (tx, _) = oneshot::channel();
actor.handle_command(RegistryCommand::Register {
session: Box::new(session),
respond_to: tx,
});
assert_eq!(actor.pending_subagent_count(), 0);
let (tx, rx) = oneshot::channel();
actor.handle_command(RegistryCommand::ApplyHookEvent {
session_id: SessionId::new("parent-session"),
event_type: HookEventType::SubagentStart,
tool_name: None,
notification_type: None,
pid: None,
tmux_pane: None,
agent_id: Some("agent-abc-123".to_string()),
agent_type: Some("explore".to_string()),
prompt: None,
respond_to: tx,
});
let result = rx.await.unwrap();
assert!(result.is_ok());
assert_eq!(actor.pending_subagent_count(), 1);
}
#[tokio::test]
async fn test_subagent_stop_clears_pending() {
let (_, mut actor, _) = create_actor();
let session = create_test_session("parent-session");
let (tx, _) = oneshot::channel();
actor.handle_command(RegistryCommand::Register {
session: Box::new(session),
respond_to: tx,
});
let (tx, _) = oneshot::channel();
actor.handle_command(RegistryCommand::ApplyHookEvent {
session_id: SessionId::new("parent-session"),
event_type: HookEventType::SubagentStart,
tool_name: None,
notification_type: None,
pid: None,
tmux_pane: None,
agent_id: Some("agent-xyz-456".to_string()),
agent_type: Some("plan".to_string()),
prompt: None,
respond_to: tx,
});
assert_eq!(actor.pending_subagent_count(), 1);
let (tx, rx) = oneshot::channel();
actor.handle_command(RegistryCommand::ApplyHookEvent {
session_id: SessionId::new("parent-session"),
event_type: HookEventType::SubagentStop,
tool_name: None,
notification_type: None,
pid: None,
tmux_pane: None,
agent_id: Some("agent-xyz-456".to_string()),
agent_type: None,
prompt: None,
respond_to: tx,
});
let result = rx.await.unwrap();
assert!(result.is_ok());
assert_eq!(actor.pending_subagent_count(), 0);
}
#[tokio::test]
async fn test_pending_subagent_ttl_cleanup() {
let (_, mut actor, _) = create_actor();
let session = create_test_session("parent-session");
let (tx, _) = oneshot::channel();
actor.handle_command(RegistryCommand::Register {
session: Box::new(session),
respond_to: tx,
});
let (tx, _) = oneshot::channel();
actor.handle_command(RegistryCommand::ApplyHookEvent {
session_id: SessionId::new("parent-session"),
event_type: HookEventType::SubagentStart,
tool_name: None,
notification_type: None,
pid: None,
tmux_pane: None,
agent_id: Some("agent-expired".to_string()),
agent_type: Some("explore".to_string()),
prompt: None,
respond_to: tx,
});
assert_eq!(actor.pending_subagent_count(), 1);
if let Some((_, pending)) = actor
.pending_subagents
.iter_mut()
.find(|(id, _)| id == "agent-expired")
{
pending.created_at = Instant::now() - Duration::from_secs(31);
}
actor.handle_command(RegistryCommand::CleanupStale);
assert_eq!(actor.pending_subagent_count(), 0);
}
#[tokio::test]
async fn test_subagent_correlation_links_parent_child() {
let (_, mut actor, _) = create_actor();
let parent_pid = std::process::id();
let parent_id = SessionId::new("parent-session");
let (tx, _) = oneshot::channel();
actor.handle_command(RegistryCommand::RegisterDiscovered {
session_id: parent_id.clone(),
pid: parent_pid,
cwd: std::path::PathBuf::from("/home/user/project"),
tmux_pane: None,
respond_to: tx,
});
let (tx, _) = oneshot::channel();
actor.handle_command(RegistryCommand::ApplyHookEvent {
session_id: parent_id.clone(),
event_type: HookEventType::SubagentStart,
tool_name: None,
notification_type: None,
pid: Some(parent_pid),
tmux_pane: None,
agent_id: Some("sub-agent-001".to_string()),
agent_type: Some("explore".to_string()),
prompt: None,
respond_to: tx,
});
assert_eq!(actor.pending_subagent_count(), 1);
let child = std::process::Command::new("sleep")
.arg("60")
.spawn()
.expect("failed to spawn sleep process");
let child_pid = child.id();
let child_id = SessionId::new("child-session");
let (tx, _) = oneshot::channel();
actor.handle_command(RegistryCommand::RegisterDiscovered {
session_id: child_id.clone(),
pid: child_pid,
cwd: std::path::PathBuf::from("/home/user/project"),
tmux_pane: None,
respond_to: tx,
});
assert_eq!(
actor.pending_subagent_count(),
0,
"Pending subagent should be consumed by correlation"
);
if let Some((parent_session, _)) = actor.sessions_by_pid.get(&parent_pid) {
assert!(
parent_session.child_session_ids.contains(&child_id),
"Parent should list child in child_session_ids"
);
} else {
panic!("Parent session not found");
}
if let Some((child_session, _)) = actor.sessions_by_pid.get(&child_pid) {
assert_eq!(
child_session.parent_session_id.as_ref(),
Some(&parent_id),
"Child should reference parent_session_id"
);
} else {
panic!("Child session not found");
}
let _ = std::process::Command::new("kill")
.arg(child_pid.to_string())
.status();
}
#[tokio::test]
async fn test_no_duplicate_sessions_for_same_pid() {
let (_, mut actor, _) = create_actor();
let current_pid = std::process::id();
let discovered_id = SessionId::new("discovered-uuid");
let (tx, rx) = oneshot::channel();
let cmd = RegistryCommand::RegisterDiscovered {
session_id: discovered_id.clone(),
pid: current_pid,
cwd: std::path::PathBuf::from("/home/user/project"),
tmux_pane: None,
respond_to: tx,
};
actor.handle_command(cmd);
let result = rx.await.unwrap();
assert!(result.is_ok());
assert_eq!(actor.session_count(), 1);
let real_id = SessionId::new("real-uuid-from-status-line");
let status_json = serde_json::json!({
"session_id": "real-uuid-from-status-line",
"pid": current_pid,
"model": {"id": "claude-sonnet-4-20250514"},
"cost": {"total_cost_usd": 0.10, "total_duration_ms": 5000},
"context_window": {"total_input_tokens": 1000, "context_window_size": 200000}
});
let (tx, rx) = oneshot::channel();
let cmd = RegistryCommand::UpdateFromStatusLine {
session_id: real_id.clone(),
data: status_json,
respond_to: tx,
};
actor.handle_command(cmd);
let result = rx.await.unwrap();
assert!(result.is_ok());
assert_eq!(
actor.session_count(),
1,
"Should have 1 session, not duplicates"
);
let (tx, rx) = oneshot::channel();
let cmd = RegistryCommand::GetSession {
session_id: real_id.clone(),
respond_to: tx,
};
actor.handle_command(cmd);
let session = rx.await.unwrap();
assert!(session.is_some(), "Session should exist with real ID");
assert_eq!(session.unwrap().id.as_str(), "real-uuid-from-status-line");
let (tx, rx) = oneshot::channel();
let cmd = RegistryCommand::GetSession {
session_id: discovered_id,
respond_to: tx,
};
actor.handle_command(cmd);
let old_session = rx.await.unwrap();
assert!(
old_session.is_none(),
"Old session ID should not exist anymore"
);
}
#[tokio::test]
async fn test_refresh_git_info_detects_branch_change() {
let (_cmd_tx, mut actor, mut event_rx) = create_actor();
let dir = tempfile::tempdir().unwrap();
let repo = dir.path().join("refresh-repo");
std::fs::create_dir_all(repo.join(".git")).unwrap();
std::fs::write(repo.join(".git/HEAD"), "ref: refs/heads/main\n").unwrap();
let current_pid = std::process::id();
let (tx, rx) = oneshot::channel();
actor.handle_command(RegistryCommand::RegisterDiscovered {
session_id: SessionId::new("refresh-test"),
pid: current_pid,
cwd: repo.clone(),
tmux_pane: None,
respond_to: tx,
});
rx.await.unwrap().unwrap();
while event_rx.try_recv().is_ok() {}
let (tx, rx) = oneshot::channel();
actor.handle_command(RegistryCommand::GetSession {
session_id: SessionId::new("refresh-test"),
respond_to: tx,
});
let view = rx.await.unwrap().unwrap();
assert_eq!(view.worktree_branch.as_deref(), Some("main"));
std::fs::write(repo.join(".git/HEAD"), "ref: refs/heads/develop\n").unwrap();
actor.handle_command(RegistryCommand::RefreshGitInfo);
let (tx, rx) = oneshot::channel();
actor.handle_command(RegistryCommand::GetSession {
session_id: SessionId::new("refresh-test"),
respond_to: tx,
});
let view = rx.await.unwrap().unwrap();
assert_eq!(
view.worktree_branch.as_deref(),
Some("develop"),
"branch should be updated after RefreshGitInfo"
);
let event = event_rx.try_recv();
assert!(
matches!(event, Ok(SessionEvent::Updated { .. })),
"should publish Updated event on branch change"
);
}
#[tokio::test]
async fn test_refresh_git_info_no_change_no_event() {
let (_cmd_tx, mut actor, mut event_rx) = create_actor();
let dir = tempfile::tempdir().unwrap();
let repo = dir.path().join("no-change-repo");
std::fs::create_dir_all(repo.join(".git")).unwrap();
std::fs::write(repo.join(".git/HEAD"), "ref: refs/heads/main\n").unwrap();
let current_pid = std::process::id();
let (tx, rx) = oneshot::channel();
actor.handle_command(RegistryCommand::RegisterDiscovered {
session_id: SessionId::new("no-change-test"),
pid: current_pid,
cwd: repo.clone(),
tmux_pane: None,
respond_to: tx,
});
rx.await.unwrap().unwrap();
while event_rx.try_recv().is_ok() {}
actor.handle_command(RegistryCommand::RefreshGitInfo);
let event = event_rx.try_recv();
assert!(
event.is_err(),
"should NOT publish event when nothing changed"
);
}
#[tokio::test]
async fn test_rediscovery_preserves_domain_metadata() {
let (_cmd_tx, mut actor, _event_rx) = create_actor();
let dir = tempfile::tempdir().unwrap();
let repo = dir.path().join("preserve-repo");
std::fs::create_dir_all(repo.join(".git")).unwrap();
std::fs::write(repo.join(".git/HEAD"), "ref: refs/heads/main\n").unwrap();
let current_pid = std::process::id();
let (tx, rx) = oneshot::channel();
actor.handle_command(RegistryCommand::RegisterDiscovered {
session_id: SessionId::new("pending-1"),
pid: current_pid,
cwd: repo.clone(),
tmux_pane: None,
respond_to: tx,
});
rx.await.unwrap().unwrap();
let status_json = serde_json::json!({
"session_id": "real-id",
"pid": current_pid,
"cwd": repo.to_str().unwrap(),
"model": {"id": "claude-sonnet-4-20250514"},
"cost": {"total_cost_usd": 2.50, "total_duration_ms": 120000},
"context_window": {
"total_input_tokens": 80000,
"total_output_tokens": 20000,
"context_window_size": 200000
}
});
let (tx, rx) = oneshot::channel();
actor.handle_command(RegistryCommand::UpdateFromStatusLine {
session_id: SessionId::new("real-id"),
data: status_json,
respond_to: tx,
});
rx.await.unwrap().unwrap();
let (tx, rx) = oneshot::channel();
actor.handle_command(RegistryCommand::GetSession {
session_id: SessionId::new("real-id"),
respond_to: tx,
});
let view = rx.await.unwrap().unwrap();
assert!(view.cost_usd > 2.0, "cost should be ~2.50");
let (tx, rx) = oneshot::channel();
actor.handle_command(RegistryCommand::RegisterDiscovered {
session_id: SessionId::new("pending-rescan"),
pid: current_pid,
cwd: repo.clone(),
tmux_pane: None,
respond_to: tx,
});
rx.await.unwrap().unwrap();
let (tx, rx) = oneshot::channel();
actor.handle_command(RegistryCommand::GetSession {
session_id: SessionId::new("pending-rescan"),
respond_to: tx,
});
let view = rx.await.unwrap().unwrap();
assert!(
view.cost_usd > 2.0,
"cost should be preserved after rescan, got {}",
view.cost_usd
);
let (tx, rx) = oneshot::channel();
actor.handle_command(RegistryCommand::GetSession {
session_id: SessionId::new("real-id"),
respond_to: tx,
});
let old = rx.await.unwrap();
assert!(old.is_none(), "old session_id should be removed from index");
}
#[tokio::test]
async fn test_update_from_status_line_cwd_change_re_resolves_git() {
let (_cmd_tx, mut actor, _event_rx) = create_actor();
let dir = tempfile::tempdir().unwrap();
let repo_a = dir.path().join("repo-a");
let repo_b = dir.path().join("repo-b");
std::fs::create_dir_all(repo_a.join(".git")).unwrap();
std::fs::create_dir_all(repo_b.join(".git")).unwrap();
std::fs::write(repo_a.join(".git/HEAD"), "ref: refs/heads/main\n").unwrap();
std::fs::write(repo_b.join(".git/HEAD"), "ref: refs/heads/feature\n").unwrap();
let current_pid = std::process::id();
let (tx, rx) = oneshot::channel();
actor.handle_command(RegistryCommand::RegisterDiscovered {
session_id: SessionId::new("cwd-test"),
pid: current_pid,
cwd: repo_a.clone(),
tmux_pane: None,
respond_to: tx,
});
rx.await.unwrap().unwrap();
let (tx, rx) = oneshot::channel();
actor.handle_command(RegistryCommand::GetSession {
session_id: SessionId::new("cwd-test"),
respond_to: tx,
});
let view = rx.await.unwrap().unwrap();
assert_eq!(view.worktree_branch.as_deref(), Some("main"));
let status_json = serde_json::json!({
"session_id": "cwd-test",
"pid": current_pid,
"cwd": repo_b.to_str().unwrap(),
"model": {"id": "claude-sonnet-4-20250514"},
"cost": {"total_cost_usd": 0.50, "total_duration_ms": 5000},
"context_window": {"total_input_tokens": 1000, "context_window_size": 200000}
});
let (tx, rx) = oneshot::channel();
actor.handle_command(RegistryCommand::UpdateFromStatusLine {
session_id: SessionId::new("cwd-test"),
data: status_json,
respond_to: tx,
});
rx.await.unwrap().unwrap();
let (tx, rx) = oneshot::channel();
actor.handle_command(RegistryCommand::GetSession {
session_id: SessionId::new("cwd-test"),
respond_to: tx,
});
let view = rx.await.unwrap().unwrap();
assert_eq!(
view.worktree_branch.as_deref(),
Some("feature"),
"branch should be re-resolved after cwd change"
);
assert_eq!(
view.project_root.as_deref(),
Some(repo_b.to_str().unwrap()),
"project_root should point to repo_b"
);
}
}