Skip to main content

teamctl_ui/
mailbox.rs

1//! Mailbox-pane data source and tab definitions.
2//!
3//! Four filter shapes, one per tab in SPEC §2's Triptych mailbox:
4//!
5//! - `Inbox` — DMs whose `recipient = '<project>:<agent>'`.
6//! - `Sent` — every row whose `sender = '<project>:<agent>'`,
7//!   irrespective of recipient class. Closes the "did this agent
8//!   actually emit X" debug loop without pivoting to the recipient.
9//! - `Channel` — channel traffic for channels the focused agent is
10//!   a member of (recipient is `'channel:<channel_id>'`, filtered
11//!   through `channel_members`).
12//! - `Wire` — project-wide broadcast traffic on the `all` channel
13//!   (`recipient = 'channel:<project>:all'`).
14//!
15//! INVARIANT: every `messages.recipient` value falls into exactly
16//! one of three prefix classes — `<project>:<agent>` (DM, no scheme
17//! prefix; the channel-or-user split below depends on this absence),
18//! `channel:<channel_id>`, or `user:<handle>`. `data::mailbox_counts`
19//! relies on the same contract when it filters out channel/user rows
20//! for the per-agent unread-mail counter; if a fourth prefix class
21//! ever lands, the comment there and the queries here both need to
22//! learn it. Sent is the one tab whose filter is sender-side and
23//! recipient-class-agnostic — it returns rows from all three
24//! recipient prefix classes.
25
26use std::path::PathBuf;
27
28use anyhow::Result;
29use rusqlite::{params, Connection};
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum MailboxTab {
33    Inbox,
34    Sent,
35    Channel,
36    Wire,
37}
38
39impl MailboxTab {
40    pub const ALL: [MailboxTab; 4] = [
41        MailboxTab::Inbox,
42        MailboxTab::Sent,
43        MailboxTab::Channel,
44        MailboxTab::Wire,
45    ];
46
47    pub fn label(self) -> &'static str {
48        match self {
49            MailboxTab::Inbox => "Inbox",
50            MailboxTab::Sent => "Sent",
51            MailboxTab::Channel => "Channel",
52            MailboxTab::Wire => "Wire",
53        }
54    }
55
56    pub fn empty_hint(self) -> &'static str {
57        match self {
58            MailboxTab::Inbox => "(no DMs)",
59            MailboxTab::Sent => "(no sent messages)",
60            MailboxTab::Channel => "(no channel traffic)",
61            MailboxTab::Wire => "(quiet)",
62        }
63    }
64
65    pub fn next(self) -> Self {
66        match self {
67            MailboxTab::Inbox => MailboxTab::Sent,
68            MailboxTab::Sent => MailboxTab::Channel,
69            MailboxTab::Channel => MailboxTab::Wire,
70            MailboxTab::Wire => MailboxTab::Inbox,
71        }
72    }
73
74    pub fn prev(self) -> Self {
75        match self {
76            MailboxTab::Inbox => MailboxTab::Wire,
77            MailboxTab::Sent => MailboxTab::Inbox,
78            MailboxTab::Channel => MailboxTab::Sent,
79            MailboxTab::Wire => MailboxTab::Channel,
80        }
81    }
82}
83
84#[derive(Debug, Clone)]
85pub struct MessageRow {
86    pub id: i64,
87    pub sender: String,
88    pub recipient: String,
89    pub text: String,
90    pub sent_at: f64,
91}
92
93/// Format a single row for the mailbox pane. Kept terse: `[from]
94/// text` on one line — no timestamps, no recipient (the tab tells
95/// you the recipient class). Multi-line bodies are flattened with a
96/// space so a single message stays one row in the pane. `team` is
97/// consulted via `agent_label` so senders with a `display_name` show
98/// their human label; unknown senders (cross-project, system) fall
99/// back to the canonical id verbatim.
100pub fn render_row(row: &MessageRow, team: &crate::data::TeamSnapshot) -> String {
101    let one_line: String = row
102        .text
103        .replace('\n', " ")
104        .replace('\r', "")
105        .chars()
106        .take(180)
107        .collect();
108    let sender = crate::data::agent_label(team, &row.sender);
109    format!("[{}] {}", sender, one_line)
110}
111
112/// Lookup contract: each method returns rows newer than `after_id`
113/// for the given filter, in ascending id order. Callers fold the
114/// returned rows into a per-tab buffer and bump `after_id` to the
115/// last returned id.
116pub trait MailboxSource: Send + Sync {
117    fn inbox(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>>;
118    fn sent(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>>;
119    fn channel_feed(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>>;
120    fn wire(&self, project_id: &str, after_id: i64) -> Result<Vec<MessageRow>>;
121}
122
123/// Production impl reading the broker SQLite at `<root>/state/mailbox.db`.
124/// Each call opens a fresh connection — `mailbox.db` is local and
125/// short-lived connections cost effectively zero.
126#[derive(Debug, Clone)]
127pub struct BrokerMailboxSource {
128    pub db_path: PathBuf,
129}
130
131impl BrokerMailboxSource {
132    pub fn new(db_path: PathBuf) -> Self {
133        Self { db_path }
134    }
135
136    fn open(&self) -> Result<Option<Connection>> {
137        if !self.db_path.is_file() {
138            return Ok(None);
139        }
140        let conn = Connection::open(&self.db_path)?;
141        Ok(Some(conn))
142    }
143}
144
145impl MailboxSource for BrokerMailboxSource {
146    fn inbox(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
147        let Some(conn) = self.open()? else {
148            return Ok(Vec::new());
149        };
150        let mut stmt = conn.prepare(
151            "SELECT id, sender, recipient, text, sent_at FROM messages
152             WHERE id > ?1 AND recipient = ?2
153             ORDER BY id ASC",
154        )?;
155        let rows = stmt
156            .query_map(params![after_id, agent_id], |r| {
157                Ok(MessageRow {
158                    id: r.get(0)?,
159                    sender: r.get(1)?,
160                    recipient: r.get(2)?,
161                    text: r.get(3)?,
162                    sent_at: r.get(4)?,
163                })
164            })?
165            .flatten()
166            .collect();
167        Ok(rows)
168    }
169
170    fn sent(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
171        let Some(conn) = self.open()? else {
172            return Ok(Vec::new());
173        };
174        // Sender-side filter — every row the focused agent emitted,
175        // irrespective of recipient class. Returns DMs, telegram
176        // replies, channel posts, and wire broadcasts in a single
177        // stream.
178        let mut stmt = conn.prepare(
179            "SELECT id, sender, recipient, text, sent_at FROM messages
180             WHERE id > ?1 AND sender = ?2
181             ORDER BY id ASC",
182        )?;
183        let rows = stmt
184            .query_map(params![after_id, agent_id], |r| {
185                Ok(MessageRow {
186                    id: r.get(0)?,
187                    sender: r.get(1)?,
188                    recipient: r.get(2)?,
189                    text: r.get(3)?,
190                    sent_at: r.get(4)?,
191                })
192            })?
193            .flatten()
194            .collect();
195        Ok(rows)
196    }
197
198    fn channel_feed(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
199        let Some(conn) = self.open()? else {
200            return Ok(Vec::new());
201        };
202        // Same shape as `teamctl tail <agent>`'s channel arm: rows
203        // whose recipient is a `channel:` URL the agent is a member
204        // of. Membership lives in `channel_members.agent_id =
205        // <project>:<agent>`.
206        let mut stmt = conn.prepare(
207            "SELECT id, sender, recipient, text, sent_at FROM messages
208             WHERE id > ?1
209               AND recipient IN (
210                   SELECT 'channel:' || cm.channel_id FROM channel_members cm
211                   WHERE cm.agent_id = ?2
212               )
213             ORDER BY id ASC",
214        )?;
215        let rows = stmt
216            .query_map(params![after_id, agent_id], |r| {
217                Ok(MessageRow {
218                    id: r.get(0)?,
219                    sender: r.get(1)?,
220                    recipient: r.get(2)?,
221                    text: r.get(3)?,
222                    sent_at: r.get(4)?,
223                })
224            })?
225            .flatten()
226            .collect();
227        Ok(rows)
228    }
229
230    fn wire(&self, project_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
231        let Some(conn) = self.open()? else {
232            return Ok(Vec::new());
233        };
234        // The project-wide `all` channel is the broadcast wire.
235        // Channel ids are `<project>:<name>`; messages address them
236        // via `channel:<channel_id>`.
237        let target = format!("channel:{project_id}:all");
238        let mut stmt = conn.prepare(
239            "SELECT id, sender, recipient, text, sent_at FROM messages
240             WHERE id > ?1 AND recipient = ?2
241             ORDER BY id ASC",
242        )?;
243        let rows = stmt
244            .query_map(params![after_id, target], |r| {
245                Ok(MessageRow {
246                    id: r.get(0)?,
247                    sender: r.get(1)?,
248                    recipient: r.get(2)?,
249                    text: r.get(3)?,
250                    sent_at: r.get(4)?,
251                })
252            })?
253            .flatten()
254            .collect();
255        Ok(rows)
256    }
257}
258
259/// Per-agent buffer state — four tabs, four `after_id` cursors.
260/// Lives on `App` so swapping the focused agent resets the cursors
261/// without trying to back-fill: the operator sees only forward
262/// motion in the tab they're watching.
263#[derive(Debug, Default, Clone)]
264pub struct MailboxBuffers {
265    pub inbox: Vec<MessageRow>,
266    pub sent: Vec<MessageRow>,
267    pub channel: Vec<MessageRow>,
268    pub wire: Vec<MessageRow>,
269    pub inbox_after: i64,
270    pub sent_after: i64,
271    pub channel_after: i64,
272    pub wire_after: i64,
273}
274
275const MAX_TAB_ROWS: usize = 500;
276
277impl MailboxBuffers {
278    pub fn rows(&self, tab: MailboxTab) -> &[MessageRow] {
279        match tab {
280            MailboxTab::Inbox => &self.inbox,
281            MailboxTab::Sent => &self.sent,
282            MailboxTab::Channel => &self.channel,
283            MailboxTab::Wire => &self.wire,
284        }
285    }
286
287    /// Fold a freshly-fetched batch into the appropriate tab,
288    /// trimming to the last `MAX_TAB_ROWS`. Bumps the cursor to the
289    /// last returned id when the batch is non-empty.
290    pub fn extend(&mut self, tab: MailboxTab, batch: Vec<MessageRow>) {
291        let last_id = batch.last().map(|r| r.id);
292        let (buf, after) = match tab {
293            MailboxTab::Inbox => (&mut self.inbox, &mut self.inbox_after),
294            MailboxTab::Sent => (&mut self.sent, &mut self.sent_after),
295            MailboxTab::Channel => (&mut self.channel, &mut self.channel_after),
296            MailboxTab::Wire => (&mut self.wire, &mut self.wire_after),
297        };
298        buf.extend(batch);
299        if buf.len() > MAX_TAB_ROWS {
300            let drop = buf.len() - MAX_TAB_ROWS;
301            buf.drain(..drop);
302        }
303        if let Some(id) = last_id {
304            *after = id;
305        }
306    }
307
308    /// Reset every tab's contents and cursor. Called when the
309    /// focused agent changes — the new agent's `inbox` filter would
310    /// otherwise skip historical rows that landed before our last
311    /// `inbox_after`.
312    pub fn reset(&mut self) {
313        *self = Self::default();
314    }
315}
316
317pub mod test_support {
318    //! Shared mock — public so unit tests, integration tests, and
319    //! downstream coverage can wire in a recorder without rolling
320    //! their own. Matches the shape used by `compose::test_support`
321    //! and `approvals::test_support`.
322
323    use super::*;
324    use std::sync::Mutex;
325
326    /// Test stub — returns canned rows on each call, records every
327    /// arg pair. Mailbox is the most-asserted test surface in
328    /// PR-UI-3 so the recorder lets snapshot + interaction tests
329    /// verify "is the right filter being asked the right thing."
330    #[derive(Default)]
331    pub struct MockMailboxSource {
332        pub inbox_rows: Vec<MessageRow>,
333        pub sent_rows: Vec<MessageRow>,
334        pub channel_rows: Vec<MessageRow>,
335        pub wire_rows: Vec<MessageRow>,
336        pub inbox_calls: Mutex<Vec<(String, i64)>>,
337        pub sent_calls: Mutex<Vec<(String, i64)>>,
338        pub channel_calls: Mutex<Vec<(String, i64)>>,
339        pub wire_calls: Mutex<Vec<(String, i64)>>,
340    }
341
342    impl MailboxSource for MockMailboxSource {
343        fn inbox(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
344            self.inbox_calls
345                .lock()
346                .unwrap()
347                .push((agent_id.into(), after_id));
348            Ok(self.inbox_rows.clone())
349        }
350
351        fn sent(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
352            self.sent_calls
353                .lock()
354                .unwrap()
355                .push((agent_id.into(), after_id));
356            Ok(self.sent_rows.clone())
357        }
358
359        fn channel_feed(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
360            self.channel_calls
361                .lock()
362                .unwrap()
363                .push((agent_id.into(), after_id));
364            Ok(self.channel_rows.clone())
365        }
366
367        fn wire(&self, project_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
368            self.wire_calls
369                .lock()
370                .unwrap()
371                .push((project_id.into(), after_id));
372            Ok(self.wire_rows.clone())
373        }
374    }
375}
376
377#[cfg(test)]
378mod tests {
379    use super::test_support::*;
380    use super::*;
381
382    fn row(id: i64, sender: &str, recipient: &str, text: &str) -> MessageRow {
383        MessageRow {
384            id,
385            sender: sender.into(),
386            recipient: recipient.into(),
387            text: text.into(),
388            sent_at: 0.0,
389        }
390    }
391
392    #[test]
393    fn next_cycles_inbox_sent_channel_wire_inbox() {
394        let mut t = MailboxTab::Inbox;
395        t = t.next();
396        assert_eq!(t, MailboxTab::Sent);
397        t = t.next();
398        assert_eq!(t, MailboxTab::Channel);
399        t = t.next();
400        assert_eq!(t, MailboxTab::Wire);
401        t = t.next();
402        assert_eq!(t, MailboxTab::Inbox);
403    }
404
405    #[test]
406    fn prev_cycles_inbox_wire_channel_sent_inbox() {
407        let mut t = MailboxTab::Inbox;
408        t = t.prev();
409        assert_eq!(t, MailboxTab::Wire);
410        t = t.prev();
411        assert_eq!(t, MailboxTab::Channel);
412        t = t.prev();
413        assert_eq!(t, MailboxTab::Sent);
414        t = t.prev();
415        assert_eq!(t, MailboxTab::Inbox);
416    }
417
418    #[test]
419    fn extend_appends_and_bumps_cursor() {
420        let mut buf = MailboxBuffers::default();
421        buf.extend(
422            MailboxTab::Inbox,
423            vec![row(7, "p:m", "p:dev", "hi"), row(8, "p:m", "p:dev", "yo")],
424        );
425        assert_eq!(buf.inbox.len(), 2);
426        assert_eq!(buf.inbox_after, 8);
427        // Empty batch must not move the cursor backward.
428        buf.extend(MailboxTab::Inbox, vec![]);
429        assert_eq!(buf.inbox_after, 8);
430    }
431
432    #[test]
433    fn extend_trims_to_cap() {
434        let mut buf = MailboxBuffers::default();
435        let batch: Vec<MessageRow> = (1..=600).map(|i| row(i, "p:m", "p:dev", "x")).collect();
436        buf.extend(MailboxTab::Wire, batch);
437        assert_eq!(buf.wire.len(), MAX_TAB_ROWS);
438        // Cap keeps the *latest* rows — the cursor reflects the
439        // batch's actual high-water id, not the trimmed buffer's
440        // first row.
441        assert_eq!(buf.wire_after, 600);
442        assert_eq!(buf.wire.last().unwrap().id, 600);
443    }
444
445    #[test]
446    fn reset_clears_buffers_and_cursors() {
447        let mut buf = MailboxBuffers::default();
448        buf.extend(MailboxTab::Inbox, vec![row(3, "a", "b", "x")]);
449        buf.extend(MailboxTab::Channel, vec![row(4, "a", "channel:p:all", "y")]);
450        buf.reset();
451        assert!(buf.inbox.is_empty());
452        assert!(buf.channel.is_empty());
453        assert_eq!(buf.inbox_after, 0);
454        assert_eq!(buf.channel_after, 0);
455    }
456
457    fn empty_team() -> crate::data::TeamSnapshot {
458        crate::data::TeamSnapshot::empty(std::path::PathBuf::from("/tmp"))
459    }
460
461    #[test]
462    fn render_row_flattens_newlines_and_truncates() {
463        let team = empty_team();
464        let r = row(1, "p:m", "p:dev", "first\nsecond\nthird");
465        assert_eq!(render_row(&r, &team), "[p:m] first second third");
466
467        let long: String = "x".repeat(300);
468        let r = row(1, "s", "r", &long);
469        let rendered = render_row(&r, &team);
470        // 5 chars ("[s] ") + at most 180 chars of body = 185.
471        assert!(rendered.chars().count() <= 185);
472    }
473
474    #[test]
475    fn render_row_uses_display_name_when_set() {
476        // T-160: when the sender id has a `display_name` in the team
477        // snapshot, the mailbox row renders the label, not the id.
478        // Unknown senders fall through to the raw id (covered above).
479        use crate::data::{AgentInfo, TeamSnapshot};
480        use team_core::supervisor::AgentState;
481        let agent = AgentInfo {
482            id: "p:sage".into(),
483            agent: "sage".into(),
484            project: "p".into(),
485            tmux_session: "a-p-sage".into(),
486            state: AgentState::Unknown,
487            unread_mail: 0,
488            pending_approvals: 0,
489            is_manager: true,
490            display_name: Some("Sage (Visionary)".into()),
491        };
492        let team = TeamSnapshot {
493            root: std::path::PathBuf::from("/tmp"),
494            team_name: "t".into(),
495            agents: vec![agent],
496            channels: vec![],
497        };
498        let r = row(1, "p:sage", "p:hugo", "ping");
499        assert_eq!(render_row(&r, &team), "[Sage (Visionary)] ping");
500    }
501
502    #[test]
503    fn mock_records_calls() {
504        let mock = MockMailboxSource {
505            inbox_rows: vec![row(1, "p:m", "p:a", "hi")],
506            ..Default::default()
507        };
508        let _ = mock.inbox("p:a", 0).unwrap();
509        let _ = mock.sent("p:a", 2).unwrap();
510        let _ = mock.channel_feed("p:a", 5).unwrap();
511        let _ = mock.wire("p", 9).unwrap();
512        assert_eq!(*mock.inbox_calls.lock().unwrap(), vec![("p:a".into(), 0)]);
513        assert_eq!(*mock.sent_calls.lock().unwrap(), vec![("p:a".into(), 2)]);
514        assert_eq!(*mock.channel_calls.lock().unwrap(), vec![("p:a".into(), 5)]);
515        assert_eq!(*mock.wire_calls.lock().unwrap(), vec![("p".into(), 9)]);
516    }
517}