use std::collections::HashMap;
use std::fmt;
use std::io::{Read, Write};
use std::net::TcpStream;
use std::time::{Duration, Instant};
use serde::Deserialize;
use crate::error::PawError;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Mode {
AcceptEdits,
Interactive,
Unknown,
}
impl Mode {
#[must_use]
pub fn as_str(self) -> &'static str {
match self {
Self::AcceptEdits => "accept-edits",
Self::Interactive => "interactive",
Self::Unknown => "unknown",
}
}
}
impl fmt::Display for Mode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.as_str())
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AgentEntry {
pub branch_id: String,
pub status: String,
pub last_seen_seconds: u64,
pub cli: Option<String>,
pub mode: Mode,
pub pane_index: Option<usize>,
}
#[derive(Debug, Clone)]
pub struct AgentInventory {
pub entries: Vec<AgentEntry>,
pub refreshed_at: Instant,
}
impl AgentInventory {
#[must_use]
pub fn find(&self, target_id: &str) -> Option<&AgentEntry> {
let needle = normalize_id(target_id);
self.entries
.iter()
.find(|e| normalize_id(&e.branch_id) == needle)
}
#[must_use]
pub fn candidate_ids(&self) -> Vec<String> {
let mut ids: Vec<String> = self
.entries
.iter()
.filter(|e| e.branch_id != "supervisor")
.map(|e| e.branch_id.clone())
.collect();
ids.sort();
ids
}
}
fn normalize_id(id: &str) -> String {
id.trim().replace('/', "-")
}
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
pub struct StatusAgent {
pub agent_id: String,
#[serde(default)]
pub status: String,
#[serde(default)]
pub last_seen_seconds: u64,
#[serde(default)]
pub cli: String,
}
#[derive(Debug, Deserialize)]
struct StatusBody {
#[serde(default)]
agents: Vec<StatusAgent>,
}
pub fn parse_status_agents(json: &str) -> Result<Vec<StatusAgent>, PawError> {
let body: StatusBody = serde_json::from_str(json)
.map_err(|e| PawError::SessionError(format!("broker /status parse error: {e}")))?;
Ok(body.agents)
}
#[must_use]
pub fn parse_pane_paths(output: &str) -> Vec<(usize, String)> {
output
.lines()
.filter_map(|line| {
let line = line.trim_end();
let (idx, path) = line.split_once(' ')?;
let idx: usize = idx.trim().parse().ok()?;
Some((idx, path.to_string()))
})
.collect()
}
#[must_use]
pub fn match_pane(agent_id: &str, pane_paths: &[(usize, String)]) -> Option<usize> {
if agent_id == "supervisor" {
return Some(0);
}
let suffix = format!("-{agent_id}");
pane_paths.iter().find_map(|(idx, path)| {
let base = path
.trim_end_matches('/')
.rsplit('/')
.next()
.unwrap_or(path);
(base == agent_id || base.ends_with(&suffix)).then_some(*idx)
})
}
#[must_use]
pub fn join_inventory<S: std::hash::BuildHasher>(
agents: Vec<StatusAgent>,
pane_paths: &[(usize, String)],
modes: &HashMap<usize, Mode, S>,
) -> Vec<AgentEntry> {
agents
.into_iter()
.map(|a| {
let pane_index = match_pane(&a.agent_id, pane_paths);
let mode = pane_index
.and_then(|idx| modes.get(&idx).copied())
.unwrap_or(Mode::Unknown);
let cli = if a.cli.trim().is_empty() {
None
} else {
Some(a.cli)
};
AgentEntry {
branch_id: a.agent_id,
status: a.status,
last_seen_seconds: a.last_seen_seconds,
cli,
mode,
pane_index,
}
})
.collect()
}
#[must_use]
pub fn detect_mode(pane_title: &str, capture: &str) -> Mode {
let hay = format!("{pane_title}\n{capture}").to_lowercase();
if hay.contains("accept edits")
|| hay.contains("accept-edits")
|| hay.contains("bypass permissions")
{
Mode::AcceptEdits
} else if hay.contains("? for shortcuts")
|| hay.contains("do you want to proceed")
|| hay.contains("do you want to allow")
|| hay.contains("(y/n)")
|| hay.contains("[y/n]")
|| hay.contains("❯ 1. yes")
{
Mode::Interactive
} else {
Mode::Unknown
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ValidationError {
UnknownTarget {
target: String,
candidates: Vec<String>,
},
}
impl fmt::Display for ValidationError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::UnknownTarget { target, candidates } => {
write!(
f,
"unknown target `{target}`; available agents: {}",
candidates.join(", ")
)
}
}
}
}
impl std::error::Error for ValidationError {}
pub fn validate_target<'a>(
inventory: &'a AgentInventory,
target_id: &str,
) -> Result<&'a AgentEntry, ValidationError> {
inventory
.find(target_id)
.ok_or_else(|| ValidationError::UnknownTarget {
target: target_id.trim().to_string(),
candidates: inventory.candidate_ids(),
})
}
#[derive(Debug)]
pub struct InventoryCache {
snapshot: Option<AgentInventory>,
max_age: Duration,
}
impl InventoryCache {
#[must_use]
pub fn new(max_age: Duration) -> Self {
Self {
snapshot: None,
max_age,
}
}
#[must_use]
pub fn from_seconds(seconds: u64) -> Self {
Self::new(Duration::from_secs(seconds))
}
#[must_use]
pub fn max_age(&self) -> Duration {
self.max_age
}
#[must_use]
pub fn snapshot(&self) -> Option<&AgentInventory> {
self.snapshot.as_ref()
}
#[must_use]
pub fn is_fresh_at(&self, now: Instant) -> bool {
self.snapshot
.as_ref()
.is_some_and(|s| now.duration_since(s.refreshed_at) < self.max_age)
}
pub fn store(&mut self, snapshot: AgentInventory) {
self.snapshot = Some(snapshot);
}
pub fn get_or_refresh<F, E>(&mut self, now: Instant, refresh: F) -> Result<&AgentInventory, E>
where
F: FnOnce() -> Result<AgentInventory, E>,
{
if !self.is_fresh_at(now) {
let snapshot = refresh()?;
self.snapshot = Some(snapshot);
}
Ok(self
.snapshot
.as_ref()
.expect("snapshot present after refresh"))
}
}
pub fn build_inventory(broker_url: &str, tmux_session: &str) -> Result<AgentInventory, PawError> {
let body = fetch_status_body(broker_url)?;
let agents = parse_status_agents(&body)?;
let pane_output = list_pane_paths(tmux_session).unwrap_or_default();
let pane_paths = parse_pane_paths(&pane_output);
let mut modes = HashMap::new();
for (idx, _) in &pane_paths {
modes.insert(*idx, detect_pane_mode(tmux_session, *idx));
}
let entries = join_inventory(agents, &pane_paths, &modes);
Ok(AgentInventory {
entries,
refreshed_at: Instant::now(),
})
}
pub fn fetch_status_agents_over_http(broker_url: &str) -> Result<Vec<StatusAgent>, PawError> {
parse_status_agents(&fetch_status_body(broker_url)?)
}
fn fetch_status_body(broker_url: &str) -> Result<String, PawError> {
let addr = broker_url.strip_prefix("http://").unwrap_or(broker_url);
let socket_addr = if let Ok(a) = addr.parse() {
a
} else {
use std::net::ToSocketAddrs;
addr.to_socket_addrs()
.map_err(|e| PawError::SessionError(format!("invalid broker address {addr}: {e}")))?
.next()
.ok_or_else(|| {
PawError::SessionError(format!("broker address {addr} resolved to no addrs"))
})?
};
let mut stream = TcpStream::connect_timeout(&socket_addr, Duration::from_millis(500))
.map_err(|e| PawError::SessionError(format!("failed to connect to broker: {e}")))?;
stream.set_read_timeout(Some(Duration::from_secs(2))).ok();
stream.set_write_timeout(Some(Duration::from_secs(2))).ok();
let request = format!("GET /status HTTP/1.1\r\nHost: {addr}\r\nConnection: close\r\n\r\n");
stream
.write_all(request.as_bytes())
.map_err(|e| PawError::SessionError(format!("failed to write status request: {e}")))?;
let mut response = String::new();
let _ = stream.read_to_string(&mut response);
if !(response.starts_with("HTTP/1.1 200") || response.starts_with("HTTP/1.0 200")) {
return Err(PawError::SessionError(format!(
"broker /status returned non-200: {}",
response.lines().next().unwrap_or("<empty>")
)));
}
let body_start = response
.find("\r\n\r\n")
.map(|i| i + 4)
.ok_or_else(|| PawError::SessionError("malformed broker /status response".to_string()))?;
Ok(response[body_start..].to_string())
}
fn list_pane_paths(session: &str) -> Result<String, PawError> {
let output = std::process::Command::new("tmux")
.args([
"list-panes",
"-t",
&format!("{session}:0"),
"-F",
"#{pane_index} #{pane_current_path}",
])
.output()
.map_err(|e| PawError::SessionError(format!("tmux list-panes failed: {e}")))?;
if !output.status.success() {
return Err(PawError::SessionError(format!(
"tmux list-panes exited with {}",
output.status
)));
}
Ok(String::from_utf8_lossy(&output.stdout).into_owned())
}
fn detect_pane_mode(session: &str, pane_index: usize) -> Mode {
let title = std::process::Command::new("tmux")
.args([
"display-message",
"-t",
&format!("{session}:0.{pane_index}"),
"-p",
"#{pane_title}",
])
.output()
.ok()
.map(|o| String::from_utf8_lossy(&o.stdout).into_owned())
.unwrap_or_default();
let capture =
crate::supervisor::permission_prompt::capture_pane(session, pane_index).unwrap_or_default();
detect_mode(&title, &capture)
}
#[cfg(test)]
mod tests {
use super::*;
use std::cell::Cell;
const STATUS_JSON: &str = r#"{
"git_paw": true,
"version": "0.6.0",
"uptime_seconds": 42,
"agents": [
{"agent_id": "feat-auth", "cli": "claude", "status": "working", "last_seen_seconds": 3, "summary": ""},
{"agent_id": "feat-api", "cli": "", "status": "blocked", "last_seen_seconds": 90, "summary": ""},
{"agent_id": "supervisor", "cli": "claude", "status": "working", "last_seen_seconds": 1, "summary": ""}
]
}"#;
fn fixture_inventory() -> AgentInventory {
let agents = parse_status_agents(STATUS_JSON).unwrap();
let panes = parse_pane_paths(
"0 /home/user/myproj\n1 /home/user/myproj-feat-api\n2 /home/user/myproj-feat-auth\n",
);
let mut modes = HashMap::new();
modes.insert(2usize, Mode::AcceptEdits);
let entries = join_inventory(agents, &panes, &modes);
AgentInventory {
entries,
refreshed_at: Instant::now(),
}
}
#[test]
fn parse_status_agents_reads_all_rows() {
let agents = parse_status_agents(STATUS_JSON).unwrap();
assert_eq!(agents.len(), 3);
assert_eq!(agents[0].agent_id, "feat-auth");
assert_eq!(agents[0].cli, "claude");
assert_eq!(agents[1].last_seen_seconds, 90);
}
#[test]
fn parse_pane_paths_handles_spaces_and_skips_garbage() {
let panes =
parse_pane_paths("0 /home/user/my proj\n1 /home/user/wt-feat-x\nnot-a-pane line\n");
assert_eq!(panes.len(), 2);
assert_eq!(panes[0], (0, "/home/user/my proj".to_string()));
assert_eq!(panes[1], (1, "/home/user/wt-feat-x".to_string()));
}
#[test]
fn pane_index_is_path_resolved_not_ordered() {
let inv = fixture_inventory();
let api = inv.find("feat-api").unwrap();
let auth = inv.find("feat-auth").unwrap();
assert_eq!(api.pane_index, Some(1));
assert_eq!(auth.pane_index, Some(2));
}
#[test]
fn match_pane_does_not_partial_match_prefix() {
let panes = parse_pane_paths("1 /home/user/proj-feat-api\n");
assert_eq!(match_pane("feat-a", &panes), None);
assert_eq!(match_pane("feat-api", &panes), Some(1));
}
#[test]
fn supervisor_resolves_to_pane_zero() {
let inv = fixture_inventory();
let sup = inv.find("supervisor").unwrap();
assert_eq!(sup.pane_index, Some(0));
}
#[test]
fn empty_cli_maps_to_none() {
let inv = fixture_inventory();
assert_eq!(
inv.find("feat-auth").unwrap().cli.as_deref(),
Some("claude")
);
assert_eq!(inv.find("feat-api").unwrap().cli, None);
}
#[test]
fn agent_removed_mid_grid_drops_pane_index() {
let agents = parse_status_agents(STATUS_JSON).unwrap();
let panes = parse_pane_paths("0 /home/user/myproj\n2 /home/user/myproj-feat-auth\n");
let entries = join_inventory(agents, &panes, &HashMap::new());
let inv = AgentInventory {
entries,
refreshed_at: Instant::now(),
};
assert_eq!(inv.find("feat-api").unwrap().pane_index, None);
assert_eq!(inv.find("feat-auth").unwrap().pane_index, Some(2));
}
#[test]
fn detect_mode_accept_edits() {
assert_eq!(
detect_mode("", "⏵⏵ accept edits on (shift+tab to cycle)"),
Mode::AcceptEdits
);
assert_eq!(
detect_mode("claude — bypass permissions", ""),
Mode::AcceptEdits
);
}
#[test]
fn detect_mode_interactive_prompt() {
assert_eq!(
detect_mode("", "Do you want to proceed?\n❯ 1. Yes"),
Mode::Interactive
);
}
#[test]
fn detect_mode_unknown_when_no_signal() {
assert_eq!(
detect_mode("", "Boondoggling… (esc to interrupt)"),
Mode::Unknown
);
}
#[test]
fn unknown_mode_signals_join_to_unknown() {
let inv = fixture_inventory();
assert_eq!(inv.find("feat-api").unwrap().mode, Mode::Unknown);
assert_eq!(inv.find("feat-auth").unwrap().mode, Mode::AcceptEdits);
}
#[test]
fn validate_target_accepts_slug_and_slash_form() {
let inv = fixture_inventory();
assert!(validate_target(&inv, "feat-auth").is_ok());
assert_eq!(
validate_target(&inv, "feat/auth").unwrap().branch_id,
"feat-auth"
);
}
#[test]
fn validate_target_unknown_returns_candidate_list() {
let inv = fixture_inventory();
let err = validate_target(&inv, "feat/ghost").unwrap_err();
match err {
ValidationError::UnknownTarget { target, candidates } => {
assert_eq!(target, "feat/ghost");
assert_eq!(
candidates,
vec!["feat-api".to_string(), "feat-auth".to_string()]
);
}
}
}
#[test]
fn validation_error_display_lists_candidates() {
let err = ValidationError::UnknownTarget {
target: "feat/ghost".to_string(),
candidates: vec!["feat/a".to_string(), "feat/b".to_string()],
};
let msg = err.to_string();
assert!(msg.contains("feat/ghost"));
assert!(msg.contains("feat/a, feat/b"), "got: {msg}");
}
fn snapshot_now() -> AgentInventory {
AgentInventory {
entries: Vec::new(),
refreshed_at: Instant::now(),
}
}
#[test]
fn cache_starts_empty_and_not_fresh() {
let cache = InventoryCache::from_seconds(60);
assert!(cache.snapshot().is_none());
assert!(!cache.is_fresh_at(Instant::now()));
}
#[test]
fn rapid_lookups_within_window_refresh_once() {
let calls = Cell::new(0u32);
let mut cache = InventoryCache::from_seconds(60);
let refresh = || {
calls.set(calls.get() + 1);
Ok::<_, ()>(snapshot_now())
};
cache.get_or_refresh(Instant::now(), refresh).unwrap();
let refresh2 = || {
calls.set(calls.get() + 1);
Ok::<_, ()>(snapshot_now())
};
cache.get_or_refresh(Instant::now(), refresh2).unwrap();
assert_eq!(calls.get(), 1, "fresh cache must not re-poll the broker");
}
#[test]
fn stale_snapshot_triggers_refresh() {
let mut cache = InventoryCache::from_seconds(60);
let stale = AgentInventory {
entries: Vec::new(),
refreshed_at: Instant::now()
.checked_sub(Duration::from_mins(2))
.expect("instant in range"),
};
cache.store(stale);
assert!(!cache.is_fresh_at(Instant::now()));
let calls = Cell::new(0u32);
cache
.get_or_refresh(Instant::now(), || {
calls.set(calls.get() + 1);
Ok::<_, ()>(snapshot_now())
})
.unwrap();
assert_eq!(calls.get(), 1, "stale cache must rebuild");
assert!(cache.is_fresh_at(Instant::now()));
}
fn spawn_status_server(body: &'static str) -> String {
use std::net::TcpListener;
let listener = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port");
let addr = listener.local_addr().expect("local addr");
std::thread::spawn(move || {
if let Ok((mut stream, _)) = listener.accept() {
let mut buf = [0u8; 1024];
let _ = stream.read(&mut buf);
let resp = format!(
"HTTP/1.1 200 OK\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}",
body.len()
);
let _ = stream.write_all(resp.as_bytes());
}
});
format!("http://{addr}")
}
#[test]
fn build_inventory_against_fake_broker_no_tmux() {
let url = spawn_status_server(STATUS_JSON);
let inv = build_inventory(&url, "paw-nonexistent-xyz-123").expect("inventory builds");
assert_eq!(inv.entries.len(), 3);
assert_eq!(inv.find("feat-auth").unwrap().pane_index, None);
assert_eq!(inv.find("feat-auth").unwrap().mode, Mode::Unknown);
assert_eq!(inv.find("supervisor").unwrap().pane_index, Some(0));
}
#[test]
fn build_inventory_unreachable_broker_errors() {
assert!(build_inventory("http://127.0.0.1:1", "x").is_err());
}
#[test]
fn parse_status_agents_rejects_garbage() {
assert!(parse_status_agents("not json at all").is_err());
}
#[test]
fn detect_pane_mode_helper_on_dead_session_is_unknown() {
assert_eq!(
detect_pane_mode("paw-nonexistent-xyz-123", 9),
Mode::Unknown
);
}
}