use std::collections::HashMap;
use std::path::{Path, PathBuf};
use anyhow::{Context, Result};
use rusqlite::Connection;
use team_core::compose::Compose;
use team_core::supervisor::{AgentSpec, AgentState, Supervisor, TmuxSupervisor};
#[derive(Debug, Clone)]
pub struct AgentInfo {
pub id: String,
pub agent: String,
pub project: String,
pub tmux_session: String,
pub state: AgentState,
pub unread_mail: u32,
pub pending_approvals: u32,
pub is_manager: bool,
pub display_name: Option<String>,
pub rate_limit_resets_at: Option<f64>,
pub last_activity_at: Option<f64>,
pub reports_to: Option<String>,
}
pub fn agent_label<'a>(team: &'a TeamSnapshot, agent_id: &'a str) -> &'a str {
team.agents
.iter()
.find(|a| a.id == agent_id)
.and_then(|a| a.display_name.as_deref())
.unwrap_or(agent_id)
}
pub fn recipient_label(team: &TeamSnapshot, recipient_id: &str) -> String {
if let Some(rest) = recipient_id.strip_prefix("channel:") {
let short = rest.rsplit_once(':').map(|(_, n)| n).unwrap_or(rest);
return format!("#{short}");
}
agent_label(team, recipient_id).to_string()
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ChannelInfo {
pub id: String,
pub name: String,
pub project_id: String,
}
#[derive(Debug, Clone)]
pub struct TeamSnapshot {
pub root: PathBuf,
pub team_name: String,
pub agents: Vec<AgentInfo>,
pub channels: Vec<ChannelInfo>,
}
impl TeamSnapshot {
pub fn empty(root: PathBuf) -> Self {
Self {
root,
team_name: "(no team loaded)".into(),
agents: Vec::new(),
channels: Vec::new(),
}
}
pub fn discover_and_load() -> Result<Option<Self>> {
let cwd = std::env::current_dir().context("get cwd")?;
match Compose::discover(&cwd) {
Ok(root) => Self::load(&root).map(Some),
Err(_) => Ok(None),
}
}
pub fn load(root: &Path) -> Result<Self> {
let compose = Compose::load(root)?;
let mailbox = compose.root.join(&compose.global.broker.path);
let counts = mailbox_counts(&mailbox).unwrap_or_default();
let supervisor = TmuxSupervisor;
let team_name = compose
.projects
.first()
.map(|p| {
if p.project.name.is_empty() {
p.project.id.clone()
} else {
p.project.name.clone()
}
})
.unwrap_or_else(|| "(unnamed team)".into());
let mut agents = Vec::new();
for h in compose.agents() {
let display_name = h.spec.display_name.clone();
let reports_to = h.spec.reports_to.clone();
let spec =
AgentSpec::from_handle(h, &compose.root, &compose.global.supervisor.tmux_prefix);
let state = supervisor.state(&spec).unwrap_or(AgentState::Unknown);
let id = h.id();
let unread_mail = counts.unread.get(&id).copied().unwrap_or(0);
let pending_approvals = counts.pending.get(&id).copied().unwrap_or(0);
let rate_limit_resets_at = counts.rate_limit.get(&id).copied();
let last_activity_at = std::fs::metadata(team_core::render::heartbeat_path(
&compose.root,
h.project,
h.agent,
))
.ok()
.and_then(|m| m.modified().ok())
.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
.map(|d| d.as_secs_f64());
agents.push(AgentInfo {
id,
agent: h.agent.into(),
project: h.project.into(),
tmux_session: spec.tmux_session,
state,
unread_mail,
pending_approvals,
is_manager: h.is_manager,
display_name,
rate_limit_resets_at,
last_activity_at,
reports_to,
});
}
agents.sort_by(|a, b| match (b.is_manager, a.is_manager) {
(x, y) if x == y => a.id.cmp(&b.id),
(true, false) => std::cmp::Ordering::Greater,
(false, true) => std::cmp::Ordering::Less,
_ => std::cmp::Ordering::Equal,
});
agents = into_tree_dfs_order(agents);
let mut channels = Vec::new();
for project in &compose.projects {
for ch in &project.channels {
channels.push(ChannelInfo {
id: format!("{}:{}", project.project.id, ch.name),
name: ch.name.clone(),
project_id: project.project.id.clone(),
});
}
}
channels.sort_by(|a, b| a.id.cmp(&b.id));
Ok(Self {
root: compose.root,
team_name,
agents,
channels,
})
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct TreeRowMeta {
pub depth: usize,
pub is_last_sibling: bool,
}
pub fn into_tree_dfs_order(agents: Vec<AgentInfo>) -> Vec<AgentInfo> {
if agents.iter().all(|a| a.reports_to.is_none()) {
return agents; }
let name_to_index: HashMap<(&str, &str), usize> = agents
.iter()
.enumerate()
.map(|(i, a)| ((a.project.as_str(), a.agent.as_str()), i))
.collect();
let mut children: HashMap<usize, Vec<usize>> = HashMap::new();
let mut top_level: Vec<usize> = Vec::new();
for (i, a) in agents.iter().enumerate() {
let parent_idx = a
.reports_to
.as_deref()
.and_then(|p| name_to_index.get(&(a.project.as_str(), p)).copied());
match parent_idx {
Some(p) => children.entry(p).or_default().push(i),
None => top_level.push(i),
}
}
let mut emitted = vec![false; agents.len()];
let mut order: Vec<usize> = Vec::with_capacity(agents.len());
fn walk(
i: usize,
children: &HashMap<usize, Vec<usize>>,
emitted: &mut [bool],
order: &mut Vec<usize>,
) {
if emitted[i] {
return; }
emitted[i] = true;
order.push(i);
if let Some(kids) = children.get(&i) {
for &k in kids {
walk(k, children, emitted, order);
}
}
}
for &i in &top_level {
walk(i, &children, &mut emitted, &mut order);
}
for (i, &was_emitted) in emitted.iter().enumerate() {
if !was_emitted {
order.push(i);
}
}
let mut indexed: Vec<Option<AgentInfo>> = agents.into_iter().map(Some).collect();
order
.into_iter()
.filter_map(|i| indexed[i].take())
.collect()
}
pub fn tree_row_meta(agents: &[AgentInfo]) -> Vec<TreeRowMeta> {
if agents.iter().all(|a| a.reports_to.is_none()) {
let n = agents.len();
return (0..n)
.map(|i| TreeRowMeta {
depth: 0,
is_last_sibling: i + 1 == n,
})
.collect();
}
let name_to_index: HashMap<(&str, &str), usize> = agents
.iter()
.enumerate()
.map(|(i, a)| ((a.project.as_str(), a.agent.as_str()), i))
.collect();
let parents: Vec<Option<usize>> = agents
.iter()
.map(|a| {
a.reports_to
.as_deref()
.and_then(|p| name_to_index.get(&(a.project.as_str(), p)).copied())
})
.collect();
let mut depth = vec![0usize; agents.len()];
for i in 0..agents.len() {
if let Some(p) = parents[i] {
depth[i] = depth[p] + 1;
}
}
let mut last_in_bucket: HashMap<Option<usize>, usize> = HashMap::new();
for (i, p) in parents.iter().enumerate() {
last_in_bucket
.entry(*p)
.and_modify(|stored| {
if i > *stored {
*stored = i;
}
})
.or_insert(i);
}
(0..agents.len())
.map(|i| TreeRowMeta {
depth: depth[i],
is_last_sibling: last_in_bucket.get(&parents[i]).copied() == Some(i),
})
.collect()
}
#[derive(Debug, Default)]
struct MailboxCounts {
unread: HashMap<String, u32>,
pending: HashMap<String, u32>,
rate_limit: HashMap<String, f64>,
}
fn mailbox_counts(mailbox: &Path) -> Result<MailboxCounts> {
if !mailbox.is_file() {
return Ok(MailboxCounts::default());
}
let conn = Connection::open(mailbox)?;
let mut counts = MailboxCounts::default();
let mut stmt = conn.prepare(
"SELECT recipient, COUNT(*) FROM messages
WHERE acked_at IS NULL
AND recipient NOT LIKE 'channel:%'
AND recipient NOT LIKE 'user:%'
GROUP BY recipient",
)?;
let rows = stmt.query_map([], |r| Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)?)))?;
for row in rows.flatten() {
counts.unread.insert(row.0, row.1.max(0) as u32);
}
let mut stmt = conn.prepare(
"SELECT project_id || ':' || agent_id, COUNT(*) FROM approvals
WHERE status = 'pending'
GROUP BY project_id, agent_id",
)?;
let rows = stmt.query_map([], |r| Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)?)))?;
for row in rows.flatten() {
counts.pending.insert(row.0, row.1.max(0) as u32);
}
if let Ok(mut stmt) = conn.prepare(
"SELECT agent_id, resets_at FROM rate_limits
WHERE id IN (
SELECT MAX(id) FROM rate_limits
WHERE resets_at IS NOT NULL
GROUP BY agent_id
)",
) {
if let Ok(rows) = stmt.query_map([], |r| Ok((r.get::<_, String>(0)?, r.get::<_, f64>(1)?)))
{
for row in rows.flatten() {
counts.rate_limit.insert(row.0, row.1);
}
}
}
Ok(counts)
}
pub fn format_rate_limit_window(resets_at: Option<f64>, now_unix: f64) -> Option<String> {
let resets_at = resets_at?;
let remaining = resets_at - now_unix;
if remaining <= 0.0 {
return None;
}
let secs = remaining as u64;
if secs >= 3600 {
let hours = secs / 3600;
let mins = (secs % 3600) / 60;
Some(format!("{hours}h {mins}m"))
} else if secs >= 60 {
let mins = secs / 60;
let s = secs % 60;
Some(format!("{mins}m {s}s"))
} else {
Some(format!("{secs}s"))
}
}
pub const HEARTBEAT_FRESH_SECS: f64 = 15.0;
pub fn is_working(last_activity_at: Option<f64>, now_unix: f64) -> bool {
matches!(last_activity_at, Some(t) if now_unix - t < HEARTBEAT_FRESH_SECS)
}
pub fn state_glyph(info: &AgentInfo, fallback_ascii: bool, now_unix: f64) -> &'static str {
match info.state {
AgentState::Stopped => {
if fallback_ascii {
"x"
} else {
"✕"
}
}
AgentState::Unknown => "?",
AgentState::Running => {
if info.pending_approvals > 0 {
"!"
} else if is_working(info.last_activity_at, now_unix) {
if fallback_ascii {
"*"
} else {
"●"
}
} else if fallback_ascii {
"o"
} else {
"○"
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn info(state: AgentState, unread: u32, pending: u32) -> AgentInfo {
AgentInfo {
id: "p:a".into(),
agent: "a".into(),
project: "p".into(),
tmux_session: "t-p-a".into(),
state,
unread_mail: unread,
pending_approvals: pending,
is_manager: false,
display_name: None,
rate_limit_resets_at: None,
last_activity_at: None,
reports_to: None,
}
}
fn info_active(state: AgentState, last_activity_at: Option<f64>) -> AgentInfo {
AgentInfo {
last_activity_at,
..info(state, 0, 0)
}
}
#[test]
fn is_working_classifies_at_the_15s_boundary() {
let now = 1_000_000.0;
assert!(is_working(Some(now), now), "just touched => working");
assert!(is_working(Some(now - 14.0), now), "14s old => working");
assert!(
!is_working(Some(now - 15.0), now),
"exactly 15s => idle (strict <)"
);
assert!(!is_working(Some(now - 16.0), now), "16s old => idle");
assert!(!is_working(None, now), "absent marker => idle");
}
#[test]
fn state_glyph_priorities_pending_then_working_then_idle() {
let now = 1_000.0;
assert_eq!(
state_glyph(&info_active(AgentState::Running, Some(now)), false, now),
"●"
);
assert_eq!(
state_glyph(&info(AgentState::Running, 0, 0), false, now),
"○"
);
assert_eq!(
state_glyph(
&info_active(AgentState::Running, Some(now - 20.0)),
false,
now
),
"○"
);
let mut working_pending = info_active(AgentState::Running, Some(now));
working_pending.pending_approvals = 1;
assert_eq!(state_glyph(&working_pending, false, now), "!");
assert_eq!(
state_glyph(&info(AgentState::Running, 3, 0), false, now),
"○"
);
}
#[test]
fn state_glyph_stopped_and_unknown() {
let now = 1_000.0;
assert_eq!(
state_glyph(&info(AgentState::Stopped, 0, 0), false, now),
"✕"
);
assert_eq!(
state_glyph(&info(AgentState::Unknown, 0, 0), false, now),
"?"
);
}
#[test]
fn state_glyph_ascii_fallback() {
let now = 1_000.0;
assert_eq!(
state_glyph(&info_active(AgentState::Running, Some(now)), true, now),
"*"
);
assert_eq!(
state_glyph(&info(AgentState::Running, 0, 0), true, now),
"o"
);
assert_eq!(
state_glyph(&info(AgentState::Stopped, 0, 0), true, now),
"x"
);
let mut working_pending = info_active(AgentState::Running, Some(now));
working_pending.pending_approvals = 1;
assert_eq!(state_glyph(&working_pending, true, now), "!");
assert_eq!(
state_glyph(&info(AgentState::Unknown, 0, 0), true, now),
"?"
);
}
#[test]
fn format_rate_limit_window_returns_none_when_unset() {
assert_eq!(format_rate_limit_window(None, 1000.0), None);
}
#[test]
fn format_rate_limit_window_returns_none_when_already_past() {
assert_eq!(format_rate_limit_window(Some(500.0), 1000.0), None);
}
#[test]
fn format_rate_limit_window_returns_none_at_exact_now() {
assert_eq!(format_rate_limit_window(Some(1000.0), 1000.0), None);
}
#[test]
fn format_rate_limit_window_under_minute_renders_seconds() {
assert_eq!(
format_rate_limit_window(Some(1042.0), 1000.0),
Some("42s".into())
);
assert_eq!(
format_rate_limit_window(Some(1059.0), 1000.0),
Some("59s".into())
);
}
#[test]
fn format_rate_limit_window_under_hour_renders_minutes_and_seconds() {
assert_eq!(
format_rate_limit_window(Some(1060.0), 1000.0),
Some("1m 0s".into())
);
assert_eq!(
format_rate_limit_window(Some(1312.0), 1000.0),
Some("5m 12s".into())
);
}
#[test]
fn format_rate_limit_window_at_or_over_hour_renders_hours_and_minutes() {
assert_eq!(
format_rate_limit_window(Some(4600.0), 1000.0),
Some("1h 0m".into())
);
assert_eq!(
format_rate_limit_window(Some(5980.0), 1000.0),
Some("1h 23m".into())
);
}
fn empty_team() -> TeamSnapshot {
TeamSnapshot::empty(std::path::PathBuf::from("/tmp"))
}
#[test]
fn recipient_label_returns_agent_id_when_no_display_name() {
let team = empty_team();
assert_eq!(recipient_label(&team, "p:dev"), "p:dev");
}
#[test]
fn recipient_label_returns_display_name_when_set() {
use team_core::supervisor::AgentState;
let agent = AgentInfo {
id: "p:hugo".into(),
agent: "hugo".into(),
project: "p".into(),
tmux_session: "a-p-hugo".into(),
state: AgentState::Running,
unread_mail: 0,
pending_approvals: 0,
is_manager: true,
display_name: Some("Hugo (PM)".into()),
rate_limit_resets_at: None,
last_activity_at: None,
reports_to: None,
};
let team = TeamSnapshot {
root: std::path::PathBuf::from("/tmp"),
team_name: "t".into(),
agents: vec![agent],
channels: vec![],
};
assert_eq!(recipient_label(&team, "p:hugo"), "Hugo (PM)");
}
#[test]
fn recipient_label_renders_channel_with_hash_prefix() {
let team = empty_team();
assert_eq!(recipient_label(&team, "channel:teamctl:dev"), "#dev");
assert_eq!(recipient_label(&team, "channel:teamctl:all"), "#all");
}
#[test]
fn recipient_label_handles_malformed_channel_recipient() {
let team = empty_team();
assert_eq!(recipient_label(&team, "channel:malformed"), "#malformed");
}
#[test]
fn recipient_label_renders_user_recipient_verbatim() {
let team = empty_team();
assert_eq!(recipient_label(&team, "user:telegram"), "user:telegram");
}
}