teamctl_ui/data.rs
1//! `TeamSnapshot` — point-in-time read of the dogfood team that the UI
2//! renders against. Built by walking up to the nearest `.team/`,
3//! parsing `team-compose.yaml`, querying the supervisor for each
4//! agent's process state, and aggregating a small set of mailbox
5//! counters (unread + pending approvals).
6//!
7//! Read by both `App::tick()` (live refresh every second) and the
8//! snapshot tests (constructed manually). The snapshot is intentionally
9//! cheap to build — every field is derived from a single SQL query
10//! per agent — so refresh cadence stays well under tmux's own
11//! `capture-pane` cost.
12
13use std::collections::HashMap;
14use std::path::{Path, PathBuf};
15
16use anyhow::{Context, Result};
17use rusqlite::Connection;
18use team_core::compose::Compose;
19use team_core::supervisor::{AgentSpec, AgentState, Supervisor, TmuxSupervisor};
20
21/// Per-agent fields the UI reads to render the roster + drive
22/// selection / detail-pane streaming.
23#[derive(Debug, Clone)]
24pub struct AgentInfo {
25 /// `<project>:<agent>` — the canonical id used in `teamctl send`
26 /// targets, MCP tool calls, and `reports_to` chains.
27 pub id: String,
28 /// Short agent name within the project (the YAML key).
29 pub agent: String,
30 /// Project id this agent belongs to.
31 pub project: String,
32 /// Resolved tmux session name (`<prefix><project>-<agent>`) — fed
33 /// to the pane-capture call so the detail pane targets the right
34 /// session even when `tmux_prefix` rotates.
35 pub tmux_session: String,
36 /// Process state — `Running`, `Stopped`, or `Unknown` per the
37 /// supervisor trait. Drives the primary glyph in the roster.
38 pub state: AgentState,
39 /// Count of mailbox messages addressed to this agent that haven't
40 /// been ack'd yet. Surfaces the `✉` glyph when nonzero.
41 pub unread_mail: u32,
42 /// Count of `request_approval` rows still in `pending` state for
43 /// this agent. Surfaces the `!` glyph when nonzero (highest
44 /// priority — overrides the unread-mail glyph).
45 pub pending_approvals: u32,
46 /// `true` for managers (`is_manager: true` in compose), used when
47 /// the roster wants to draw a tier separator. Read but unused in
48 /// PR-UI-2; kept on the struct so PR-UI-4's approvals modal can
49 /// route based on tier without a second compose lookup.
50 pub is_manager: bool,
51 /// T-160: optional human-friendly label from
52 /// `team-compose.yaml`. When `Some`, the TUI renders this in place
53 /// of `id` everywhere an agent label surfaces to the operator
54 /// (roster, detail header, mailbox attribution, statusline,
55 /// approvals, compose modal). When `None`, label falls back to
56 /// `id`. The id stays canonical for routing/tmux/CLI.
57 pub display_name: Option<String>,
58 /// T-212: most recent rate-limit reset timestamp (unix epoch
59 /// seconds) for this agent, sourced from the `rate_limits` table
60 /// populated by `teamctl rl-watch`. `None` when rl-watch has
61 /// never recorded a `resets_at` for this agent. The TUI status
62 /// bar formats this against `now()` via
63 /// [`format_rate_limit_window`] to render "5m 12s" / "1h 23m" —
64 /// past timestamps render as `None` (no active limit).
65 pub rate_limit_resets_at: Option<f64>,
66}
67
68/// Return the operator-facing label for `agent_id`: the agent's
69/// `display_name` when set, otherwise `agent_id` itself. Read-only
70/// borrow into the snapshot — callers that need an owned `String`
71/// can `.to_string()` at the use-site. Unknown ids fall through to
72/// `agent_id` (the canonical id is always a valid label).
73pub fn agent_label<'a>(team: &'a TeamSnapshot, agent_id: &'a str) -> &'a str {
74 team.agents
75 .iter()
76 .find(|a| a.id == agent_id)
77 .and_then(|a| a.display_name.as_deref())
78 .unwrap_or(agent_id)
79}
80
81/// One channel exposed in `team-compose.yaml`. Used by PR-UI-6's
82/// per-channel broadcast picker and by the Mailbox-first layout's
83/// channel list. `id` is `<project>:<name>` (matches the broker's
84/// `channels.id`); `name` is the short label rendered as `#name`.
85#[derive(Debug, Clone, PartialEq, Eq)]
86pub struct ChannelInfo {
87 pub id: String,
88 pub name: String,
89 pub project_id: String,
90}
91
92#[derive(Debug, Clone)]
93pub struct TeamSnapshot {
94 /// Path to the `.team/` discovered by walk-up (the compose root).
95 pub root: PathBuf,
96 /// Human label from `team-compose.yaml::projects[].project.name`
97 /// — falls back to the project id when name is empty.
98 pub team_name: String,
99 /// Agents in deterministic order: managers first, then workers,
100 /// each group sorted by id. Roster navigation (`↑` / `↓`) walks
101 /// this slice directly.
102 pub agents: Vec<AgentInfo>,
103 /// Channels declared across every project file. Drives the
104 /// PR-UI-6 broadcast picker + the Mailbox-first layout's
105 /// channel list.
106 pub channels: Vec<ChannelInfo>,
107}
108
109impl TeamSnapshot {
110 /// Build an empty snapshot rooted at the given path. Used by
111 /// tests and as the rendered shape when no `.team/` is reachable.
112 pub fn empty(root: PathBuf) -> Self {
113 Self {
114 root,
115 team_name: "(no team loaded)".into(),
116 agents: Vec::new(),
117 channels: Vec::new(),
118 }
119 }
120
121 /// Walk up from cwd to find the nearest `.team/`, parse the
122 /// compose tree, query supervisor + mailbox state per agent,
123 /// and return the assembled snapshot. Returns `Ok(None)` when
124 /// no `.team/` is reachable — the UI renders the empty state in
125 /// that case rather than panicking.
126 pub fn discover_and_load() -> Result<Option<Self>> {
127 let cwd = std::env::current_dir().context("get cwd")?;
128 match Compose::discover(&cwd) {
129 Ok(root) => Self::load(&root).map(Some),
130 Err(_) => Ok(None),
131 }
132 }
133
134 /// Build a snapshot for an explicit `.team/` root. Public so
135 /// integration tests can hand-feed a tempdir without going
136 /// through walk-up discovery.
137 pub fn load(root: &Path) -> Result<Self> {
138 let compose = Compose::load(root)?;
139 let mailbox = compose.root.join(&compose.global.broker.path);
140 let counts = mailbox_counts(&mailbox).unwrap_or_default();
141
142 let supervisor = TmuxSupervisor;
143 let team_name = compose
144 .projects
145 .first()
146 .map(|p| {
147 if p.project.name.is_empty() {
148 p.project.id.clone()
149 } else {
150 p.project.name.clone()
151 }
152 })
153 .unwrap_or_else(|| "(unnamed team)".into());
154
155 let mut agents = Vec::new();
156 for h in compose.agents() {
157 let display_name = h.spec.display_name.clone();
158 let spec =
159 AgentSpec::from_handle(h, &compose.root, &compose.global.supervisor.tmux_prefix);
160 let state = supervisor.state(&spec).unwrap_or(AgentState::Unknown);
161 let id = h.id();
162 let unread_mail = counts.unread.get(&id).copied().unwrap_or(0);
163 let pending_approvals = counts.pending.get(&id).copied().unwrap_or(0);
164 let rate_limit_resets_at = counts.rate_limit.get(&id).copied();
165 agents.push(AgentInfo {
166 id,
167 agent: h.agent.into(),
168 project: h.project.into(),
169 tmux_session: spec.tmux_session,
170 state,
171 unread_mail,
172 pending_approvals,
173 is_manager: h.is_manager,
174 display_name,
175 rate_limit_resets_at,
176 });
177 }
178
179 // Managers first, then workers; deterministic within each.
180 agents.sort_by(|a, b| match (b.is_manager, a.is_manager) {
181 (x, y) if x == y => a.id.cmp(&b.id),
182 (true, false) => std::cmp::Ordering::Greater,
183 (false, true) => std::cmp::Ordering::Less,
184 _ => std::cmp::Ordering::Equal,
185 });
186
187 let mut channels = Vec::new();
188 for project in &compose.projects {
189 for ch in &project.channels {
190 channels.push(ChannelInfo {
191 id: format!("{}:{}", project.project.id, ch.name),
192 name: ch.name.clone(),
193 project_id: project.project.id.clone(),
194 });
195 }
196 }
197 // Stable order for the picker — operators see the same
198 // sequence on every open.
199 channels.sort_by(|a, b| a.id.cmp(&b.id));
200
201 Ok(Self {
202 root: compose.root,
203 team_name,
204 agents,
205 channels,
206 })
207 }
208}
209
210#[derive(Debug, Default)]
211struct MailboxCounts {
212 unread: HashMap<String, u32>,
213 pending: HashMap<String, u32>,
214 /// T-212: per-agent latest `rate_limits.resets_at` (unix epoch
215 /// seconds). Only the most recent rate-limit row per agent that
216 /// has a non-null `resets_at` lands here — rows with no parsed
217 /// reset time are still recorded by `rl-watch` for forensics
218 /// but don't drive UI.
219 rate_limit: HashMap<String, f64>,
220}
221
222/// Single sweep of the mailbox to populate per-agent counters. Read
223/// errors degrade silently to zeroes — a missing or unreadable DB
224/// is just "no team running yet" from the UI's perspective, not a
225/// fatal launch error.
226fn mailbox_counts(mailbox: &Path) -> Result<MailboxCounts> {
227 if !mailbox.is_file() {
228 return Ok(MailboxCounts::default());
229 }
230 let conn = Connection::open(mailbox)?;
231 let mut counts = MailboxCounts::default();
232
233 // Unread mail per recipient agent (channels excluded — channel
234 // messages ack independently per subscriber and would require a
235 // join we don't need in PR-UI-2).
236 //
237 // INVARIANT: every `messages.recipient` value falls into exactly
238 // one of three prefix classes — `<project>:<agent>` (DM, no
239 // scheme prefix; the channel-or-user split here relies on that
240 // absence), `channel:<channel_id>`, or `user:<handle>`. The two
241 // `NOT LIKE` clauses below treat anything outside the channel /
242 // user prefixes as a per-agent DM. If a fourth prefix class
243 // ever lands, every site that splits recipients (here,
244 // `mailbox::BrokerMailboxSource::*` queries, and the tail.rs
245 // follow loop) needs to learn it.
246 let mut stmt = conn.prepare(
247 "SELECT recipient, COUNT(*) FROM messages
248 WHERE acked_at IS NULL
249 AND recipient NOT LIKE 'channel:%'
250 AND recipient NOT LIKE 'user:%'
251 GROUP BY recipient",
252 )?;
253 let rows = stmt.query_map([], |r| Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)?)))?;
254 for row in rows.flatten() {
255 counts.unread.insert(row.0, row.1.max(0) as u32);
256 }
257
258 // Pending approvals per requesting agent.
259 let mut stmt = conn.prepare(
260 "SELECT project_id || ':' || agent_id, COUNT(*) FROM approvals
261 WHERE status = 'pending'
262 GROUP BY project_id, agent_id",
263 )?;
264 let rows = stmt.query_map([], |r| Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)?)))?;
265 for row in rows.flatten() {
266 counts.pending.insert(row.0, row.1.max(0) as u32);
267 }
268
269 // T-212: latest rate-limit reset per agent. The `rate_limits`
270 // table is populated by `teamctl rl-watch`. We pick the most
271 // recent row per agent (by `id`, which is monotonically
272 // increasing per the schema in team-core::mailbox), and only
273 // when `resets_at` is non-null — null-resets-at rows are still
274 // logged by rl-watch for debugging the parser but don't drive
275 // UI state. Past-`resets_at` filtering happens at format time
276 // in [`format_rate_limit_window`] so we don't need a `now()`
277 // dependency in this query.
278 //
279 // Table missing (rl-watch never ran on this mailbox) →
280 // `prepare()` errors and we degrade silently to empty map,
281 // matching the surrounding "no data is fine" pattern.
282 if let Ok(mut stmt) = conn.prepare(
283 "SELECT agent_id, resets_at FROM rate_limits
284 WHERE id IN (
285 SELECT MAX(id) FROM rate_limits
286 WHERE resets_at IS NOT NULL
287 GROUP BY agent_id
288 )",
289 ) {
290 if let Ok(rows) = stmt.query_map([], |r| Ok((r.get::<_, String>(0)?, r.get::<_, f64>(1)?)))
291 {
292 for row in rows.flatten() {
293 counts.rate_limit.insert(row.0, row.1);
294 }
295 }
296 }
297
298 Ok(counts)
299}
300
301/// T-212: format a rate-limit reset timestamp as a short label for
302/// the status bar. Returns `None` when the limit is in the past, at
303/// the current instant, or unset — the indicator hides in those
304/// cases. For active limits, formats as `42s` (under a minute),
305/// `5m 12s` (under an hour), or `1h 23m` (an hour or more).
306/// Operator-facing string; not for parsing.
307pub fn format_rate_limit_window(resets_at: Option<f64>, now_unix: f64) -> Option<String> {
308 let resets_at = resets_at?;
309 let remaining = resets_at - now_unix;
310 if remaining <= 0.0 {
311 return None;
312 }
313 let secs = remaining as u64;
314 if secs >= 3600 {
315 let hours = secs / 3600;
316 let mins = (secs % 3600) / 60;
317 Some(format!("{hours}h {mins}m"))
318 } else if secs >= 60 {
319 let mins = secs / 60;
320 let s = secs % 60;
321 Some(format!("{mins}m {s}s"))
322 } else {
323 Some(format!("{secs}s"))
324 }
325}
326
327/// Single-cell glyph for an agent's primary state — derived from the
328/// triplet (`state`, `pending_approvals`, `unread_mail`) in priority
329/// order: pending approval beats unread mail beats process state.
330/// Plain ASCII fallback when the caller signals a monochrome /
331/// no-symbol terminal.
332pub fn state_glyph(info: &AgentInfo, fallback_ascii: bool) -> &'static str {
333 match info.state {
334 AgentState::Stopped => {
335 if fallback_ascii {
336 "x"
337 } else {
338 "✕"
339 }
340 }
341 AgentState::Unknown => "?",
342 AgentState::Running => {
343 if info.pending_approvals > 0 {
344 "!"
345 } else if info.unread_mail > 0 {
346 if fallback_ascii {
347 "@"
348 } else {
349 "✉"
350 }
351 } else if fallback_ascii {
352 "*"
353 } else {
354 "●"
355 }
356 }
357 }
358}
359
360#[cfg(test)]
361mod tests {
362 use super::*;
363
364 fn info(state: AgentState, unread: u32, pending: u32) -> AgentInfo {
365 AgentInfo {
366 id: "p:a".into(),
367 agent: "a".into(),
368 project: "p".into(),
369 tmux_session: "t-p-a".into(),
370 state,
371 unread_mail: unread,
372 pending_approvals: pending,
373 is_manager: false,
374 display_name: None,
375 rate_limit_resets_at: None,
376 }
377 }
378
379 #[test]
380 fn state_glyph_priorities_pending_then_unread_then_running() {
381 assert_eq!(state_glyph(&info(AgentState::Running, 0, 0), false), "●");
382 assert_eq!(state_glyph(&info(AgentState::Running, 3, 0), false), "✉");
383 assert_eq!(state_glyph(&info(AgentState::Running, 3, 1), false), "!");
384 }
385
386 #[test]
387 fn state_glyph_stopped_and_unknown() {
388 assert_eq!(state_glyph(&info(AgentState::Stopped, 0, 0), false), "✕");
389 assert_eq!(state_glyph(&info(AgentState::Unknown, 0, 0), false), "?");
390 }
391
392 #[test]
393 fn state_glyph_ascii_fallback() {
394 assert_eq!(state_glyph(&info(AgentState::Running, 0, 0), true), "*");
395 assert_eq!(state_glyph(&info(AgentState::Running, 5, 0), true), "@");
396 assert_eq!(state_glyph(&info(AgentState::Stopped, 0, 0), true), "x");
397 // `!` and `?` are unchanged across the fallback boundary.
398 assert_eq!(state_glyph(&info(AgentState::Running, 0, 1), true), "!");
399 assert_eq!(state_glyph(&info(AgentState::Unknown, 0, 0), true), "?");
400 }
401
402 // T-212: format_rate_limit_window covers the value-shape rules
403 // the status-bar slot will render against. The SQL-extension
404 // path is exercised by integration tests at the snapshot layer
405 // (matching the existing untested-at-this-layer pattern for
406 // unread/pending) — the formatter is the part with branchy
407 // logic worth pinning.
408
409 #[test]
410 fn format_rate_limit_window_returns_none_when_unset() {
411 assert_eq!(format_rate_limit_window(None, 1000.0), None);
412 }
413
414 #[test]
415 fn format_rate_limit_window_returns_none_when_already_past() {
416 assert_eq!(format_rate_limit_window(Some(500.0), 1000.0), None);
417 }
418
419 #[test]
420 fn format_rate_limit_window_returns_none_at_exact_now() {
421 assert_eq!(format_rate_limit_window(Some(1000.0), 1000.0), None);
422 }
423
424 #[test]
425 fn format_rate_limit_window_under_minute_renders_seconds() {
426 assert_eq!(
427 format_rate_limit_window(Some(1042.0), 1000.0),
428 Some("42s".into())
429 );
430 assert_eq!(
431 format_rate_limit_window(Some(1059.0), 1000.0),
432 Some("59s".into())
433 );
434 }
435
436 #[test]
437 fn format_rate_limit_window_under_hour_renders_minutes_and_seconds() {
438 assert_eq!(
439 format_rate_limit_window(Some(1060.0), 1000.0),
440 Some("1m 0s".into())
441 );
442 assert_eq!(
443 format_rate_limit_window(Some(1312.0), 1000.0),
444 Some("5m 12s".into())
445 );
446 }
447
448 #[test]
449 fn format_rate_limit_window_at_or_over_hour_renders_hours_and_minutes() {
450 assert_eq!(
451 format_rate_limit_window(Some(4600.0), 1000.0),
452 Some("1h 0m".into())
453 );
454 assert_eq!(
455 format_rate_limit_window(Some(5980.0), 1000.0),
456 Some("1h 23m".into())
457 );
458 }
459}