1use std::path::PathBuf;
27
28use anyhow::Result;
29use rusqlite::{params, Connection};
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum MailboxTab {
33 Inbox,
34 Sent,
35 Channel,
36 Wire,
37}
38
39impl MailboxTab {
40 pub const ALL: [MailboxTab; 4] = [
41 MailboxTab::Inbox,
42 MailboxTab::Sent,
43 MailboxTab::Channel,
44 MailboxTab::Wire,
45 ];
46
47 pub fn label(self) -> &'static str {
48 match self {
49 MailboxTab::Inbox => "Inbox",
50 MailboxTab::Sent => "Sent",
51 MailboxTab::Channel => "Channel",
52 MailboxTab::Wire => "Wire",
53 }
54 }
55
56 pub fn empty_hint(self) -> &'static str {
57 match self {
58 MailboxTab::Inbox => "(no DMs)",
59 MailboxTab::Sent => "(no sent messages)",
60 MailboxTab::Channel => "(no channel traffic)",
61 MailboxTab::Wire => "(quiet)",
62 }
63 }
64
65 pub fn next(self) -> Self {
66 match self {
67 MailboxTab::Inbox => MailboxTab::Sent,
68 MailboxTab::Sent => MailboxTab::Channel,
69 MailboxTab::Channel => MailboxTab::Wire,
70 MailboxTab::Wire => MailboxTab::Inbox,
71 }
72 }
73
74 pub fn prev(self) -> Self {
75 match self {
76 MailboxTab::Inbox => MailboxTab::Wire,
77 MailboxTab::Sent => MailboxTab::Inbox,
78 MailboxTab::Channel => MailboxTab::Sent,
79 MailboxTab::Wire => MailboxTab::Channel,
80 }
81 }
82}
83
84#[derive(Debug, Clone)]
85pub struct MessageRow {
86 pub id: i64,
87 pub sender: String,
88 pub recipient: String,
89 pub text: String,
90 pub sent_at: f64,
91}
92
93pub fn render_row(row: &MessageRow, team: &crate::data::TeamSnapshot, tab: MailboxTab) -> String {
110 let one_line: String = row
111 .text
112 .replace('\n', " ")
113 .replace('\r', "")
114 .chars()
115 .take(180)
116 .collect();
117 match tab {
118 MailboxTab::Sent => {
119 let recipient = crate::data::recipient_label(team, &row.recipient);
120 format!("[→{recipient}] {one_line}")
121 }
122 MailboxTab::Inbox | MailboxTab::Wire => {
123 let sender = crate::data::agent_label(team, &row.sender);
124 format!("[{sender}] {one_line}")
125 }
126 MailboxTab::Channel => {
127 let channel = crate::data::recipient_label(team, &row.recipient);
135 let sender = crate::data::agent_label(team, &row.sender);
136 format!("[{channel}] [{sender}] {one_line}")
137 }
138 }
139}
140
141pub trait MailboxSource: Send + Sync {
146 fn inbox(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>>;
147 fn sent(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>>;
148 fn channel_feed(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>>;
149 fn wire(&self, project_id: &str, after_id: i64) -> Result<Vec<MessageRow>>;
150}
151
152#[derive(Debug, Clone)]
156pub struct BrokerMailboxSource {
157 pub db_path: PathBuf,
158}
159
160impl BrokerMailboxSource {
161 pub fn new(db_path: PathBuf) -> Self {
162 Self { db_path }
163 }
164
165 fn open(&self) -> Result<Option<Connection>> {
166 if !self.db_path.is_file() {
167 return Ok(None);
168 }
169 let conn = Connection::open(&self.db_path)?;
170 Ok(Some(conn))
171 }
172}
173
174impl MailboxSource for BrokerMailboxSource {
175 fn inbox(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
176 let Some(conn) = self.open()? else {
177 return Ok(Vec::new());
178 };
179 let mut stmt = conn.prepare(
180 "SELECT id, sender, recipient, text, sent_at FROM messages
181 WHERE id > ?1 AND recipient = ?2
182 ORDER BY id ASC",
183 )?;
184 let rows = stmt
185 .query_map(params![after_id, agent_id], |r| {
186 Ok(MessageRow {
187 id: r.get(0)?,
188 sender: r.get(1)?,
189 recipient: r.get(2)?,
190 text: r.get(3)?,
191 sent_at: r.get(4)?,
192 })
193 })?
194 .flatten()
195 .collect();
196 Ok(rows)
197 }
198
199 fn sent(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
200 let Some(conn) = self.open()? else {
201 return Ok(Vec::new());
202 };
203 let mut stmt = conn.prepare(
208 "SELECT id, sender, recipient, text, sent_at FROM messages
209 WHERE id > ?1 AND sender = ?2
210 ORDER BY id ASC",
211 )?;
212 let rows = stmt
213 .query_map(params![after_id, agent_id], |r| {
214 Ok(MessageRow {
215 id: r.get(0)?,
216 sender: r.get(1)?,
217 recipient: r.get(2)?,
218 text: r.get(3)?,
219 sent_at: r.get(4)?,
220 })
221 })?
222 .flatten()
223 .collect();
224 Ok(rows)
225 }
226
227 fn channel_feed(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
228 let Some(conn) = self.open()? else {
229 return Ok(Vec::new());
230 };
231 let mut stmt = conn.prepare(
236 "SELECT id, sender, recipient, text, sent_at FROM messages
237 WHERE id > ?1
238 AND recipient IN (
239 SELECT 'channel:' || cm.channel_id FROM channel_members cm
240 WHERE cm.agent_id = ?2
241 )
242 ORDER BY id ASC",
243 )?;
244 let rows = stmt
245 .query_map(params![after_id, agent_id], |r| {
246 Ok(MessageRow {
247 id: r.get(0)?,
248 sender: r.get(1)?,
249 recipient: r.get(2)?,
250 text: r.get(3)?,
251 sent_at: r.get(4)?,
252 })
253 })?
254 .flatten()
255 .collect();
256 Ok(rows)
257 }
258
259 fn wire(&self, project_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
260 let Some(conn) = self.open()? else {
261 return Ok(Vec::new());
262 };
263 let target = format!("channel:{project_id}:all");
267 let mut stmt = conn.prepare(
268 "SELECT id, sender, recipient, text, sent_at FROM messages
269 WHERE id > ?1 AND recipient = ?2
270 ORDER BY id ASC",
271 )?;
272 let rows = stmt
273 .query_map(params![after_id, target], |r| {
274 Ok(MessageRow {
275 id: r.get(0)?,
276 sender: r.get(1)?,
277 recipient: r.get(2)?,
278 text: r.get(3)?,
279 sent_at: r.get(4)?,
280 })
281 })?
282 .flatten()
283 .collect();
284 Ok(rows)
285 }
286}
287
288#[derive(Debug, Default, Clone)]
293pub struct MailboxBuffers {
294 pub inbox: Vec<MessageRow>,
295 pub sent: Vec<MessageRow>,
296 pub channel: Vec<MessageRow>,
297 pub wire: Vec<MessageRow>,
298 pub inbox_after: i64,
299 pub sent_after: i64,
300 pub channel_after: i64,
301 pub wire_after: i64,
302}
303
304const MAX_TAB_ROWS: usize = 500;
305
306impl MailboxBuffers {
307 pub fn rows(&self, tab: MailboxTab) -> &[MessageRow] {
308 match tab {
309 MailboxTab::Inbox => &self.inbox,
310 MailboxTab::Sent => &self.sent,
311 MailboxTab::Channel => &self.channel,
312 MailboxTab::Wire => &self.wire,
313 }
314 }
315
316 pub fn extend(&mut self, tab: MailboxTab, batch: Vec<MessageRow>) {
320 let last_id = batch.last().map(|r| r.id);
321 let (buf, after) = match tab {
322 MailboxTab::Inbox => (&mut self.inbox, &mut self.inbox_after),
323 MailboxTab::Sent => (&mut self.sent, &mut self.sent_after),
324 MailboxTab::Channel => (&mut self.channel, &mut self.channel_after),
325 MailboxTab::Wire => (&mut self.wire, &mut self.wire_after),
326 };
327 buf.extend(batch);
328 if buf.len() > MAX_TAB_ROWS {
329 let drop = buf.len() - MAX_TAB_ROWS;
330 buf.drain(..drop);
331 }
332 if let Some(id) = last_id {
333 *after = id;
334 }
335 }
336
337 pub fn reset(&mut self) {
342 *self = Self::default();
343 }
344}
345
346pub mod test_support {
347 use super::*;
353 use std::sync::Mutex;
354
355 #[derive(Default)]
360 pub struct MockMailboxSource {
361 pub inbox_rows: Vec<MessageRow>,
362 pub sent_rows: Vec<MessageRow>,
363 pub channel_rows: Vec<MessageRow>,
364 pub wire_rows: Vec<MessageRow>,
365 pub inbox_calls: Mutex<Vec<(String, i64)>>,
366 pub sent_calls: Mutex<Vec<(String, i64)>>,
367 pub channel_calls: Mutex<Vec<(String, i64)>>,
368 pub wire_calls: Mutex<Vec<(String, i64)>>,
369 }
370
371 impl MailboxSource for MockMailboxSource {
372 fn inbox(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
373 self.inbox_calls
374 .lock()
375 .unwrap()
376 .push((agent_id.into(), after_id));
377 Ok(self.inbox_rows.clone())
378 }
379
380 fn sent(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
381 self.sent_calls
382 .lock()
383 .unwrap()
384 .push((agent_id.into(), after_id));
385 Ok(self.sent_rows.clone())
386 }
387
388 fn channel_feed(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
389 self.channel_calls
390 .lock()
391 .unwrap()
392 .push((agent_id.into(), after_id));
393 Ok(self.channel_rows.clone())
394 }
395
396 fn wire(&self, project_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
397 self.wire_calls
398 .lock()
399 .unwrap()
400 .push((project_id.into(), after_id));
401 Ok(self.wire_rows.clone())
402 }
403 }
404}
405
406#[cfg(test)]
407mod tests {
408 use super::test_support::*;
409 use super::*;
410
411 fn row(id: i64, sender: &str, recipient: &str, text: &str) -> MessageRow {
412 MessageRow {
413 id,
414 sender: sender.into(),
415 recipient: recipient.into(),
416 text: text.into(),
417 sent_at: 0.0,
418 }
419 }
420
421 #[test]
422 fn next_cycles_inbox_sent_channel_wire_inbox() {
423 let mut t = MailboxTab::Inbox;
424 t = t.next();
425 assert_eq!(t, MailboxTab::Sent);
426 t = t.next();
427 assert_eq!(t, MailboxTab::Channel);
428 t = t.next();
429 assert_eq!(t, MailboxTab::Wire);
430 t = t.next();
431 assert_eq!(t, MailboxTab::Inbox);
432 }
433
434 #[test]
435 fn prev_cycles_inbox_wire_channel_sent_inbox() {
436 let mut t = MailboxTab::Inbox;
437 t = t.prev();
438 assert_eq!(t, MailboxTab::Wire);
439 t = t.prev();
440 assert_eq!(t, MailboxTab::Channel);
441 t = t.prev();
442 assert_eq!(t, MailboxTab::Sent);
443 t = t.prev();
444 assert_eq!(t, MailboxTab::Inbox);
445 }
446
447 #[test]
448 fn extend_appends_and_bumps_cursor() {
449 let mut buf = MailboxBuffers::default();
450 buf.extend(
451 MailboxTab::Inbox,
452 vec![row(7, "p:m", "p:dev", "hi"), row(8, "p:m", "p:dev", "yo")],
453 );
454 assert_eq!(buf.inbox.len(), 2);
455 assert_eq!(buf.inbox_after, 8);
456 buf.extend(MailboxTab::Inbox, vec![]);
458 assert_eq!(buf.inbox_after, 8);
459 }
460
461 #[test]
462 fn extend_trims_to_cap() {
463 let mut buf = MailboxBuffers::default();
464 let batch: Vec<MessageRow> = (1..=600).map(|i| row(i, "p:m", "p:dev", "x")).collect();
465 buf.extend(MailboxTab::Wire, batch);
466 assert_eq!(buf.wire.len(), MAX_TAB_ROWS);
467 assert_eq!(buf.wire_after, 600);
471 assert_eq!(buf.wire.last().unwrap().id, 600);
472 }
473
474 #[test]
475 fn reset_clears_buffers_and_cursors() {
476 let mut buf = MailboxBuffers::default();
477 buf.extend(MailboxTab::Inbox, vec![row(3, "a", "b", "x")]);
478 buf.extend(MailboxTab::Channel, vec![row(4, "a", "channel:p:all", "y")]);
479 buf.reset();
480 assert!(buf.inbox.is_empty());
481 assert!(buf.channel.is_empty());
482 assert_eq!(buf.inbox_after, 0);
483 assert_eq!(buf.channel_after, 0);
484 }
485
486 fn empty_team() -> crate::data::TeamSnapshot {
487 crate::data::TeamSnapshot::empty(std::path::PathBuf::from("/tmp"))
488 }
489
490 #[test]
491 fn render_row_flattens_newlines_and_truncates() {
492 let team = empty_team();
493 let r = row(1, "p:m", "p:dev", "first\nsecond\nthird");
494 assert_eq!(
495 render_row(&r, &team, MailboxTab::Inbox),
496 "[p:m] first second third"
497 );
498
499 let long: String = "x".repeat(300);
500 let r = row(1, "s", "r", &long);
501 let rendered = render_row(&r, &team, MailboxTab::Inbox);
502 assert!(rendered.chars().count() <= 185);
504 }
505
506 #[test]
507 fn render_row_uses_display_name_when_set() {
508 use crate::data::{AgentInfo, TeamSnapshot};
512 use team_core::supervisor::AgentState;
513 let agent = AgentInfo {
514 id: "p:sage".into(),
515 agent: "sage".into(),
516 project: "p".into(),
517 tmux_session: "a-p-sage".into(),
518 state: AgentState::Unknown,
519 unread_mail: 0,
520 pending_approvals: 0,
521 is_manager: true,
522 display_name: Some("Sage (Visionary)".into()),
523 rate_limit_resets_at: None,
524 reports_to: None,
525 };
526 let team = TeamSnapshot {
527 root: std::path::PathBuf::from("/tmp"),
528 team_name: "t".into(),
529 agents: vec![agent],
530 channels: vec![],
531 };
532 let r = row(1, "p:sage", "p:hugo", "ping");
533 assert_eq!(
534 render_row(&r, &team, MailboxTab::Inbox),
535 "[Sage (Visionary)] ping"
536 );
537 }
538
539 #[test]
543 fn render_row_sent_tab_shows_recipient_with_arrow() {
544 let team = empty_team();
548 let r = row(1, "p:me", "p:dev", "ack");
549 assert_eq!(render_row(&r, &team, MailboxTab::Sent), "[→p:dev] ack");
550 }
551
552 #[test]
553 fn render_row_sent_tab_resolves_recipient_display_name() {
554 use crate::data::{AgentInfo, TeamSnapshot};
558 use team_core::supervisor::AgentState;
559 let agent = AgentInfo {
560 id: "p:hugo".into(),
561 agent: "hugo".into(),
562 project: "p".into(),
563 tmux_session: "a-p-hugo".into(),
564 state: AgentState::Running,
565 unread_mail: 0,
566 pending_approvals: 0,
567 is_manager: true,
568 display_name: Some("Hugo (PM)".into()),
569 rate_limit_resets_at: None,
570 reports_to: None,
571 };
572 let team = TeamSnapshot {
573 root: std::path::PathBuf::from("/tmp"),
574 team_name: "t".into(),
575 agents: vec![agent],
576 channels: vec![],
577 };
578 let r = row(1, "p:sage", "p:hugo", "ping");
579 assert_eq!(render_row(&r, &team, MailboxTab::Sent), "[→Hugo (PM)] ping");
580 }
581
582 #[test]
583 fn render_row_sent_tab_renders_channel_recipient_with_hash() {
584 let team = empty_team();
588 let r = row(1, "p:me", "channel:teamctl:dev", "rolling 0.8.3");
589 assert_eq!(
590 render_row(&r, &team, MailboxTab::Sent),
591 "[→#dev] rolling 0.8.3"
592 );
593 }
594
595 #[test]
596 fn render_row_sent_tab_renders_user_recipient_verbatim() {
597 let team = empty_team();
602 let r = row(1, "p:mgr", "user:telegram", "PR url");
603 assert_eq!(
604 render_row(&r, &team, MailboxTab::Sent),
605 "[→user:telegram] PR url"
606 );
607 }
608
609 #[test]
610 fn render_row_non_sent_tabs_still_show_sender() {
611 let team = empty_team();
614 let r = row(1, "p:from", "p:me", "yo");
615 assert_eq!(render_row(&r, &team, MailboxTab::Inbox), "[p:from] yo");
616 assert_eq!(render_row(&r, &team, MailboxTab::Wire), "[p:from] yo");
617 }
618
619 #[test]
625 fn render_row_channel_tab_prefixes_channel_name_and_sender() {
626 let team = empty_team();
627 let r = row(1, "p:from", "channel:teamctl:dev", "yo");
628 assert_eq!(
629 render_row(&r, &team, MailboxTab::Channel),
630 "[#dev] [p:from] yo"
631 );
632 }
633
634 #[test]
635 fn render_row_channel_tab_resolves_sender_display_name() {
636 use crate::data::{AgentInfo, TeamSnapshot};
640 use team_core::supervisor::AgentState;
641 let agent = AgentInfo {
642 id: "p:wren".into(),
643 agent: "wren".into(),
644 project: "p".into(),
645 tmux_session: "a-p-wren".into(),
646 state: AgentState::Running,
647 unread_mail: 0,
648 pending_approvals: 0,
649 is_manager: false,
650 display_name: Some("Wren (Engineer)".into()),
651 rate_limit_resets_at: None,
652 reports_to: None,
653 };
654 let team = TeamSnapshot {
655 root: std::path::PathBuf::from("/tmp"),
656 team_name: "t".into(),
657 agents: vec![agent],
658 channels: vec![],
659 };
660 let r = row(1, "p:wren", "channel:p:all", "hello");
661 assert_eq!(
662 render_row(&r, &team, MailboxTab::Channel),
663 "[#all] [Wren (Engineer)] hello"
664 );
665 }
666
667 #[test]
668 fn render_row_channel_tab_handles_malformed_channel_recipient() {
669 let team = empty_team();
675 let r = row(1, "p:from", "channel:malformed", "yo");
676 assert_eq!(
677 render_row(&r, &team, MailboxTab::Channel),
678 "[#malformed] [p:from] yo"
679 );
680 }
681
682 #[test]
683 fn mock_records_calls() {
684 let mock = MockMailboxSource {
685 inbox_rows: vec![row(1, "p:m", "p:a", "hi")],
686 ..Default::default()
687 };
688 let _ = mock.inbox("p:a", 0).unwrap();
689 let _ = mock.sent("p:a", 2).unwrap();
690 let _ = mock.channel_feed("p:a", 5).unwrap();
691 let _ = mock.wire("p", 9).unwrap();
692 assert_eq!(*mock.inbox_calls.lock().unwrap(), vec![("p:a".into(), 0)]);
693 assert_eq!(*mock.sent_calls.lock().unwrap(), vec![("p:a".into(), 2)]);
694 assert_eq!(*mock.channel_calls.lock().unwrap(), vec![("p:a".into(), 5)]);
695 assert_eq!(*mock.wire_calls.lock().unwrap(), vec![("p".into(), 9)]);
696 }
697}