Skip to main content

teamctl_ui/
mailbox.rs

1//! Mailbox-pane data source and tab definitions.
2//!
3//! Three filter shapes, one per tab in SPEC §2's Triptych mailbox:
4//!
5//! - `Inbox` — DMs whose `recipient = '<project>:<agent>'`.
6//! - `Channel` — channel traffic for channels the focused agent is
7//!   a member of (recipient is `'channel:<channel_id>'`, filtered
8//!   through `channel_members`).
9//! - `Wire` — project-wide broadcast traffic on the `all` channel
10//!   (`recipient = 'channel:<project>:all'`).
11//!
12//! INVARIANT: every `messages.recipient` value falls into exactly
13//! one of three prefix classes — `<project>:<agent>` (DM, no scheme
14//! prefix; the channel-or-user split below depends on this absence),
15//! `channel:<channel_id>`, or `user:<handle>`. `data::mailbox_counts`
16//! relies on the same contract when it filters out channel/user rows
17//! for the per-agent unread-mail counter; if a fourth prefix class
18//! ever lands, the comment there and the queries here both need to
19//! learn it.
20
21use std::path::PathBuf;
22
23use anyhow::Result;
24use rusqlite::{params, Connection};
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum MailboxTab {
28    Inbox,
29    Channel,
30    Wire,
31}
32
33impl MailboxTab {
34    pub const ALL: [MailboxTab; 3] = [MailboxTab::Inbox, MailboxTab::Channel, MailboxTab::Wire];
35
36    pub fn label(self) -> &'static str {
37        match self {
38            MailboxTab::Inbox => "Inbox",
39            MailboxTab::Channel => "Channel",
40            MailboxTab::Wire => "Wire",
41        }
42    }
43
44    pub fn empty_hint(self) -> &'static str {
45        match self {
46            MailboxTab::Inbox => "(no DMs)",
47            MailboxTab::Channel => "(no channel traffic)",
48            MailboxTab::Wire => "(quiet)",
49        }
50    }
51
52    pub fn next(self) -> Self {
53        match self {
54            MailboxTab::Inbox => MailboxTab::Channel,
55            MailboxTab::Channel => MailboxTab::Wire,
56            MailboxTab::Wire => MailboxTab::Inbox,
57        }
58    }
59
60    pub fn prev(self) -> Self {
61        match self {
62            MailboxTab::Inbox => MailboxTab::Wire,
63            MailboxTab::Channel => MailboxTab::Inbox,
64            MailboxTab::Wire => MailboxTab::Channel,
65        }
66    }
67}
68
69#[derive(Debug, Clone)]
70pub struct MessageRow {
71    pub id: i64,
72    pub sender: String,
73    pub recipient: String,
74    pub text: String,
75    pub sent_at: f64,
76}
77
78/// Format a single row for the mailbox pane. Kept terse: `[from]
79/// text` on one line — no timestamps, no recipient (the tab tells
80/// you the recipient class). Multi-line bodies are flattened with a
81/// space so a single message stays one row in the pane.
82pub fn render_row(row: &MessageRow) -> String {
83    let one_line: String = row
84        .text
85        .replace('\n', " ")
86        .replace('\r', "")
87        .chars()
88        .take(180)
89        .collect();
90    format!("[{}] {}", row.sender, one_line)
91}
92
93/// Lookup contract: each method returns rows newer than `after_id`
94/// for the given filter, in ascending id order. Callers fold the
95/// returned rows into a per-tab buffer and bump `after_id` to the
96/// last returned id.
97pub trait MailboxSource: Send + Sync {
98    fn inbox(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>>;
99    fn channel_feed(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>>;
100    fn wire(&self, project_id: &str, after_id: i64) -> Result<Vec<MessageRow>>;
101}
102
103/// Production impl reading the broker SQLite at `<root>/state/mailbox.db`.
104/// Each call opens a fresh connection — `mailbox.db` is local and
105/// short-lived connections cost effectively zero.
106#[derive(Debug, Clone)]
107pub struct BrokerMailboxSource {
108    pub db_path: PathBuf,
109}
110
111impl BrokerMailboxSource {
112    pub fn new(db_path: PathBuf) -> Self {
113        Self { db_path }
114    }
115
116    fn open(&self) -> Result<Option<Connection>> {
117        if !self.db_path.is_file() {
118            return Ok(None);
119        }
120        let conn = Connection::open(&self.db_path)?;
121        Ok(Some(conn))
122    }
123}
124
125impl MailboxSource for BrokerMailboxSource {
126    fn inbox(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
127        let Some(conn) = self.open()? else {
128            return Ok(Vec::new());
129        };
130        let mut stmt = conn.prepare(
131            "SELECT id, sender, recipient, text, sent_at FROM messages
132             WHERE id > ?1 AND recipient = ?2
133             ORDER BY id ASC",
134        )?;
135        let rows = stmt
136            .query_map(params![after_id, agent_id], |r| {
137                Ok(MessageRow {
138                    id: r.get(0)?,
139                    sender: r.get(1)?,
140                    recipient: r.get(2)?,
141                    text: r.get(3)?,
142                    sent_at: r.get(4)?,
143                })
144            })?
145            .flatten()
146            .collect();
147        Ok(rows)
148    }
149
150    fn channel_feed(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
151        let Some(conn) = self.open()? else {
152            return Ok(Vec::new());
153        };
154        // Same shape as `teamctl tail <agent>`'s channel arm: rows
155        // whose recipient is a `channel:` URL the agent is a member
156        // of. Membership lives in `channel_members.agent_id =
157        // <project>:<agent>`.
158        let mut stmt = conn.prepare(
159            "SELECT id, sender, recipient, text, sent_at FROM messages
160             WHERE id > ?1
161               AND recipient IN (
162                   SELECT 'channel:' || cm.channel_id FROM channel_members cm
163                   WHERE cm.agent_id = ?2
164               )
165             ORDER BY id ASC",
166        )?;
167        let rows = stmt
168            .query_map(params![after_id, agent_id], |r| {
169                Ok(MessageRow {
170                    id: r.get(0)?,
171                    sender: r.get(1)?,
172                    recipient: r.get(2)?,
173                    text: r.get(3)?,
174                    sent_at: r.get(4)?,
175                })
176            })?
177            .flatten()
178            .collect();
179        Ok(rows)
180    }
181
182    fn wire(&self, project_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
183        let Some(conn) = self.open()? else {
184            return Ok(Vec::new());
185        };
186        // The project-wide `all` channel is the broadcast wire.
187        // Channel ids are `<project>:<name>`; messages address them
188        // via `channel:<channel_id>`.
189        let target = format!("channel:{project_id}:all");
190        let mut stmt = conn.prepare(
191            "SELECT id, sender, recipient, text, sent_at FROM messages
192             WHERE id > ?1 AND recipient = ?2
193             ORDER BY id ASC",
194        )?;
195        let rows = stmt
196            .query_map(params![after_id, target], |r| {
197                Ok(MessageRow {
198                    id: r.get(0)?,
199                    sender: r.get(1)?,
200                    recipient: r.get(2)?,
201                    text: r.get(3)?,
202                    sent_at: r.get(4)?,
203                })
204            })?
205            .flatten()
206            .collect();
207        Ok(rows)
208    }
209}
210
211/// Per-agent buffer state — three tabs, three `after_id` cursors.
212/// Lives on `App` so swapping the focused agent resets the cursors
213/// without trying to back-fill: the operator sees only forward
214/// motion in the tab they're watching.
215#[derive(Debug, Default, Clone)]
216pub struct MailboxBuffers {
217    pub inbox: Vec<MessageRow>,
218    pub channel: Vec<MessageRow>,
219    pub wire: Vec<MessageRow>,
220    pub inbox_after: i64,
221    pub channel_after: i64,
222    pub wire_after: i64,
223}
224
225const MAX_TAB_ROWS: usize = 500;
226
227impl MailboxBuffers {
228    pub fn rows(&self, tab: MailboxTab) -> &[MessageRow] {
229        match tab {
230            MailboxTab::Inbox => &self.inbox,
231            MailboxTab::Channel => &self.channel,
232            MailboxTab::Wire => &self.wire,
233        }
234    }
235
236    /// Fold a freshly-fetched batch into the appropriate tab,
237    /// trimming to the last `MAX_TAB_ROWS`. Bumps the cursor to the
238    /// last returned id when the batch is non-empty.
239    pub fn extend(&mut self, tab: MailboxTab, batch: Vec<MessageRow>) {
240        let last_id = batch.last().map(|r| r.id);
241        let (buf, after) = match tab {
242            MailboxTab::Inbox => (&mut self.inbox, &mut self.inbox_after),
243            MailboxTab::Channel => (&mut self.channel, &mut self.channel_after),
244            MailboxTab::Wire => (&mut self.wire, &mut self.wire_after),
245        };
246        buf.extend(batch);
247        if buf.len() > MAX_TAB_ROWS {
248            let drop = buf.len() - MAX_TAB_ROWS;
249            buf.drain(..drop);
250        }
251        if let Some(id) = last_id {
252            *after = id;
253        }
254    }
255
256    /// Reset every tab's contents and cursor. Called when the
257    /// focused agent changes — the new agent's `inbox` filter would
258    /// otherwise skip historical rows that landed before our last
259    /// `inbox_after`.
260    pub fn reset(&mut self) {
261        *self = Self::default();
262    }
263}
264
265pub mod test_support {
266    //! Shared mock — public so unit tests, integration tests, and
267    //! downstream coverage can wire in a recorder without rolling
268    //! their own. Matches the shape used by `compose::test_support`
269    //! and `approvals::test_support`.
270
271    use super::*;
272    use std::sync::Mutex;
273
274    /// Test stub — returns canned rows on each call, records every
275    /// arg pair. Mailbox is the most-asserted test surface in
276    /// PR-UI-3 so the recorder lets snapshot + interaction tests
277    /// verify "is the right filter being asked the right thing."
278    #[derive(Default)]
279    pub struct MockMailboxSource {
280        pub inbox_rows: Vec<MessageRow>,
281        pub channel_rows: Vec<MessageRow>,
282        pub wire_rows: Vec<MessageRow>,
283        pub inbox_calls: Mutex<Vec<(String, i64)>>,
284        pub channel_calls: Mutex<Vec<(String, i64)>>,
285        pub wire_calls: Mutex<Vec<(String, i64)>>,
286    }
287
288    impl MailboxSource for MockMailboxSource {
289        fn inbox(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
290            self.inbox_calls
291                .lock()
292                .unwrap()
293                .push((agent_id.into(), after_id));
294            Ok(self.inbox_rows.clone())
295        }
296
297        fn channel_feed(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
298            self.channel_calls
299                .lock()
300                .unwrap()
301                .push((agent_id.into(), after_id));
302            Ok(self.channel_rows.clone())
303        }
304
305        fn wire(&self, project_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
306            self.wire_calls
307                .lock()
308                .unwrap()
309                .push((project_id.into(), after_id));
310            Ok(self.wire_rows.clone())
311        }
312    }
313}
314
315#[cfg(test)]
316mod tests {
317    use super::test_support::*;
318    use super::*;
319
320    fn row(id: i64, sender: &str, recipient: &str, text: &str) -> MessageRow {
321        MessageRow {
322            id,
323            sender: sender.into(),
324            recipient: recipient.into(),
325            text: text.into(),
326            sent_at: 0.0,
327        }
328    }
329
330    #[test]
331    fn next_cycles_inbox_channel_wire_inbox() {
332        let mut t = MailboxTab::Inbox;
333        t = t.next();
334        assert_eq!(t, MailboxTab::Channel);
335        t = t.next();
336        assert_eq!(t, MailboxTab::Wire);
337        t = t.next();
338        assert_eq!(t, MailboxTab::Inbox);
339    }
340
341    #[test]
342    fn extend_appends_and_bumps_cursor() {
343        let mut buf = MailboxBuffers::default();
344        buf.extend(
345            MailboxTab::Inbox,
346            vec![row(7, "p:m", "p:dev", "hi"), row(8, "p:m", "p:dev", "yo")],
347        );
348        assert_eq!(buf.inbox.len(), 2);
349        assert_eq!(buf.inbox_after, 8);
350        // Empty batch must not move the cursor backward.
351        buf.extend(MailboxTab::Inbox, vec![]);
352        assert_eq!(buf.inbox_after, 8);
353    }
354
355    #[test]
356    fn extend_trims_to_cap() {
357        let mut buf = MailboxBuffers::default();
358        let batch: Vec<MessageRow> = (1..=600).map(|i| row(i, "p:m", "p:dev", "x")).collect();
359        buf.extend(MailboxTab::Wire, batch);
360        assert_eq!(buf.wire.len(), MAX_TAB_ROWS);
361        // Cap keeps the *latest* rows — the cursor reflects the
362        // batch's actual high-water id, not the trimmed buffer's
363        // first row.
364        assert_eq!(buf.wire_after, 600);
365        assert_eq!(buf.wire.last().unwrap().id, 600);
366    }
367
368    #[test]
369    fn reset_clears_buffers_and_cursors() {
370        let mut buf = MailboxBuffers::default();
371        buf.extend(MailboxTab::Inbox, vec![row(3, "a", "b", "x")]);
372        buf.extend(MailboxTab::Channel, vec![row(4, "a", "channel:p:all", "y")]);
373        buf.reset();
374        assert!(buf.inbox.is_empty());
375        assert!(buf.channel.is_empty());
376        assert_eq!(buf.inbox_after, 0);
377        assert_eq!(buf.channel_after, 0);
378    }
379
380    #[test]
381    fn render_row_flattens_newlines_and_truncates() {
382        let r = row(1, "p:m", "p:dev", "first\nsecond\nthird");
383        assert_eq!(render_row(&r), "[p:m] first second third");
384
385        let long: String = "x".repeat(300);
386        let r = row(1, "s", "r", &long);
387        let rendered = render_row(&r);
388        // 5 chars ("[s] ") + at most 180 chars of body = 185.
389        assert!(rendered.chars().count() <= 185);
390    }
391
392    #[test]
393    fn mock_records_calls() {
394        let mock = MockMailboxSource {
395            inbox_rows: vec![row(1, "p:m", "p:a", "hi")],
396            ..Default::default()
397        };
398        let _ = mock.inbox("p:a", 0).unwrap();
399        let _ = mock.channel_feed("p:a", 5).unwrap();
400        let _ = mock.wire("p", 9).unwrap();
401        assert_eq!(*mock.inbox_calls.lock().unwrap(), vec![("p:a".into(), 0)]);
402        assert_eq!(*mock.channel_calls.lock().unwrap(), vec![("p:a".into(), 5)]);
403        assert_eq!(*mock.wire_calls.lock().unwrap(), vec![("p".into(), 9)]);
404    }
405}