1use std::path::PathBuf;
27
28use anyhow::Result;
29use rusqlite::{params, Connection};
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum MailboxTab {
33 Inbox,
34 Sent,
35 Channel,
36 Wire,
37}
38
39impl MailboxTab {
40 pub const ALL: [MailboxTab; 4] = [
41 MailboxTab::Inbox,
42 MailboxTab::Sent,
43 MailboxTab::Channel,
44 MailboxTab::Wire,
45 ];
46
47 pub fn label(self) -> &'static str {
48 match self {
49 MailboxTab::Inbox => "Inbox",
50 MailboxTab::Sent => "Sent",
51 MailboxTab::Channel => "Channel",
52 MailboxTab::Wire => "Wire",
53 }
54 }
55
56 pub fn empty_hint(self) -> &'static str {
57 match self {
58 MailboxTab::Inbox => "(no DMs)",
59 MailboxTab::Sent => "(no sent messages)",
60 MailboxTab::Channel => "(no channel traffic)",
61 MailboxTab::Wire => "(quiet)",
62 }
63 }
64
65 pub fn next(self) -> Self {
66 match self {
67 MailboxTab::Inbox => MailboxTab::Sent,
68 MailboxTab::Sent => MailboxTab::Channel,
69 MailboxTab::Channel => MailboxTab::Wire,
70 MailboxTab::Wire => MailboxTab::Inbox,
71 }
72 }
73
74 pub fn prev(self) -> Self {
75 match self {
76 MailboxTab::Inbox => MailboxTab::Wire,
77 MailboxTab::Sent => MailboxTab::Inbox,
78 MailboxTab::Channel => MailboxTab::Sent,
79 MailboxTab::Wire => MailboxTab::Channel,
80 }
81 }
82}
83
84#[derive(Debug, Clone)]
85pub struct MessageRow {
86 pub id: i64,
87 pub sender: String,
88 pub recipient: String,
89 pub text: String,
90 pub sent_at: f64,
91}
92
93pub fn render_row(row: &MessageRow, team: &crate::data::TeamSnapshot, tab: MailboxTab) -> String {
110 let one_line: String = row
111 .text
112 .replace('\n', " ")
113 .replace('\r', "")
114 .chars()
115 .take(180)
116 .collect();
117 let prefix = match tab {
118 MailboxTab::Sent => {
119 let recipient = crate::data::recipient_label(team, &row.recipient);
120 format!("→{recipient}")
121 }
122 MailboxTab::Inbox | MailboxTab::Channel | MailboxTab::Wire => {
123 crate::data::agent_label(team, &row.sender).to_string()
124 }
125 };
126 format!("[{}] {}", prefix, one_line)
127}
128
129pub trait MailboxSource: Send + Sync {
134 fn inbox(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>>;
135 fn sent(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>>;
136 fn channel_feed(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>>;
137 fn wire(&self, project_id: &str, after_id: i64) -> Result<Vec<MessageRow>>;
138}
139
140#[derive(Debug, Clone)]
144pub struct BrokerMailboxSource {
145 pub db_path: PathBuf,
146}
147
148impl BrokerMailboxSource {
149 pub fn new(db_path: PathBuf) -> Self {
150 Self { db_path }
151 }
152
153 fn open(&self) -> Result<Option<Connection>> {
154 if !self.db_path.is_file() {
155 return Ok(None);
156 }
157 let conn = Connection::open(&self.db_path)?;
158 Ok(Some(conn))
159 }
160}
161
162impl MailboxSource for BrokerMailboxSource {
163 fn inbox(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
164 let Some(conn) = self.open()? else {
165 return Ok(Vec::new());
166 };
167 let mut stmt = conn.prepare(
168 "SELECT id, sender, recipient, text, sent_at FROM messages
169 WHERE id > ?1 AND recipient = ?2
170 ORDER BY id ASC",
171 )?;
172 let rows = stmt
173 .query_map(params![after_id, agent_id], |r| {
174 Ok(MessageRow {
175 id: r.get(0)?,
176 sender: r.get(1)?,
177 recipient: r.get(2)?,
178 text: r.get(3)?,
179 sent_at: r.get(4)?,
180 })
181 })?
182 .flatten()
183 .collect();
184 Ok(rows)
185 }
186
187 fn sent(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
188 let Some(conn) = self.open()? else {
189 return Ok(Vec::new());
190 };
191 let mut stmt = conn.prepare(
196 "SELECT id, sender, recipient, text, sent_at FROM messages
197 WHERE id > ?1 AND sender = ?2
198 ORDER BY id ASC",
199 )?;
200 let rows = stmt
201 .query_map(params![after_id, agent_id], |r| {
202 Ok(MessageRow {
203 id: r.get(0)?,
204 sender: r.get(1)?,
205 recipient: r.get(2)?,
206 text: r.get(3)?,
207 sent_at: r.get(4)?,
208 })
209 })?
210 .flatten()
211 .collect();
212 Ok(rows)
213 }
214
215 fn channel_feed(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
216 let Some(conn) = self.open()? else {
217 return Ok(Vec::new());
218 };
219 let mut stmt = conn.prepare(
224 "SELECT id, sender, recipient, text, sent_at FROM messages
225 WHERE id > ?1
226 AND recipient IN (
227 SELECT 'channel:' || cm.channel_id FROM channel_members cm
228 WHERE cm.agent_id = ?2
229 )
230 ORDER BY id ASC",
231 )?;
232 let rows = stmt
233 .query_map(params![after_id, agent_id], |r| {
234 Ok(MessageRow {
235 id: r.get(0)?,
236 sender: r.get(1)?,
237 recipient: r.get(2)?,
238 text: r.get(3)?,
239 sent_at: r.get(4)?,
240 })
241 })?
242 .flatten()
243 .collect();
244 Ok(rows)
245 }
246
247 fn wire(&self, project_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
248 let Some(conn) = self.open()? else {
249 return Ok(Vec::new());
250 };
251 let target = format!("channel:{project_id}:all");
255 let mut stmt = conn.prepare(
256 "SELECT id, sender, recipient, text, sent_at FROM messages
257 WHERE id > ?1 AND recipient = ?2
258 ORDER BY id ASC",
259 )?;
260 let rows = stmt
261 .query_map(params![after_id, target], |r| {
262 Ok(MessageRow {
263 id: r.get(0)?,
264 sender: r.get(1)?,
265 recipient: r.get(2)?,
266 text: r.get(3)?,
267 sent_at: r.get(4)?,
268 })
269 })?
270 .flatten()
271 .collect();
272 Ok(rows)
273 }
274}
275
276#[derive(Debug, Default, Clone)]
281pub struct MailboxBuffers {
282 pub inbox: Vec<MessageRow>,
283 pub sent: Vec<MessageRow>,
284 pub channel: Vec<MessageRow>,
285 pub wire: Vec<MessageRow>,
286 pub inbox_after: i64,
287 pub sent_after: i64,
288 pub channel_after: i64,
289 pub wire_after: i64,
290}
291
292const MAX_TAB_ROWS: usize = 500;
293
294impl MailboxBuffers {
295 pub fn rows(&self, tab: MailboxTab) -> &[MessageRow] {
296 match tab {
297 MailboxTab::Inbox => &self.inbox,
298 MailboxTab::Sent => &self.sent,
299 MailboxTab::Channel => &self.channel,
300 MailboxTab::Wire => &self.wire,
301 }
302 }
303
304 pub fn extend(&mut self, tab: MailboxTab, batch: Vec<MessageRow>) {
308 let last_id = batch.last().map(|r| r.id);
309 let (buf, after) = match tab {
310 MailboxTab::Inbox => (&mut self.inbox, &mut self.inbox_after),
311 MailboxTab::Sent => (&mut self.sent, &mut self.sent_after),
312 MailboxTab::Channel => (&mut self.channel, &mut self.channel_after),
313 MailboxTab::Wire => (&mut self.wire, &mut self.wire_after),
314 };
315 buf.extend(batch);
316 if buf.len() > MAX_TAB_ROWS {
317 let drop = buf.len() - MAX_TAB_ROWS;
318 buf.drain(..drop);
319 }
320 if let Some(id) = last_id {
321 *after = id;
322 }
323 }
324
325 pub fn reset(&mut self) {
330 *self = Self::default();
331 }
332}
333
334pub mod test_support {
335 use super::*;
341 use std::sync::Mutex;
342
343 #[derive(Default)]
348 pub struct MockMailboxSource {
349 pub inbox_rows: Vec<MessageRow>,
350 pub sent_rows: Vec<MessageRow>,
351 pub channel_rows: Vec<MessageRow>,
352 pub wire_rows: Vec<MessageRow>,
353 pub inbox_calls: Mutex<Vec<(String, i64)>>,
354 pub sent_calls: Mutex<Vec<(String, i64)>>,
355 pub channel_calls: Mutex<Vec<(String, i64)>>,
356 pub wire_calls: Mutex<Vec<(String, i64)>>,
357 }
358
359 impl MailboxSource for MockMailboxSource {
360 fn inbox(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
361 self.inbox_calls
362 .lock()
363 .unwrap()
364 .push((agent_id.into(), after_id));
365 Ok(self.inbox_rows.clone())
366 }
367
368 fn sent(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
369 self.sent_calls
370 .lock()
371 .unwrap()
372 .push((agent_id.into(), after_id));
373 Ok(self.sent_rows.clone())
374 }
375
376 fn channel_feed(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
377 self.channel_calls
378 .lock()
379 .unwrap()
380 .push((agent_id.into(), after_id));
381 Ok(self.channel_rows.clone())
382 }
383
384 fn wire(&self, project_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
385 self.wire_calls
386 .lock()
387 .unwrap()
388 .push((project_id.into(), after_id));
389 Ok(self.wire_rows.clone())
390 }
391 }
392}
393
394#[cfg(test)]
395mod tests {
396 use super::test_support::*;
397 use super::*;
398
399 fn row(id: i64, sender: &str, recipient: &str, text: &str) -> MessageRow {
400 MessageRow {
401 id,
402 sender: sender.into(),
403 recipient: recipient.into(),
404 text: text.into(),
405 sent_at: 0.0,
406 }
407 }
408
409 #[test]
410 fn next_cycles_inbox_sent_channel_wire_inbox() {
411 let mut t = MailboxTab::Inbox;
412 t = t.next();
413 assert_eq!(t, MailboxTab::Sent);
414 t = t.next();
415 assert_eq!(t, MailboxTab::Channel);
416 t = t.next();
417 assert_eq!(t, MailboxTab::Wire);
418 t = t.next();
419 assert_eq!(t, MailboxTab::Inbox);
420 }
421
422 #[test]
423 fn prev_cycles_inbox_wire_channel_sent_inbox() {
424 let mut t = MailboxTab::Inbox;
425 t = t.prev();
426 assert_eq!(t, MailboxTab::Wire);
427 t = t.prev();
428 assert_eq!(t, MailboxTab::Channel);
429 t = t.prev();
430 assert_eq!(t, MailboxTab::Sent);
431 t = t.prev();
432 assert_eq!(t, MailboxTab::Inbox);
433 }
434
435 #[test]
436 fn extend_appends_and_bumps_cursor() {
437 let mut buf = MailboxBuffers::default();
438 buf.extend(
439 MailboxTab::Inbox,
440 vec![row(7, "p:m", "p:dev", "hi"), row(8, "p:m", "p:dev", "yo")],
441 );
442 assert_eq!(buf.inbox.len(), 2);
443 assert_eq!(buf.inbox_after, 8);
444 buf.extend(MailboxTab::Inbox, vec![]);
446 assert_eq!(buf.inbox_after, 8);
447 }
448
449 #[test]
450 fn extend_trims_to_cap() {
451 let mut buf = MailboxBuffers::default();
452 let batch: Vec<MessageRow> = (1..=600).map(|i| row(i, "p:m", "p:dev", "x")).collect();
453 buf.extend(MailboxTab::Wire, batch);
454 assert_eq!(buf.wire.len(), MAX_TAB_ROWS);
455 assert_eq!(buf.wire_after, 600);
459 assert_eq!(buf.wire.last().unwrap().id, 600);
460 }
461
462 #[test]
463 fn reset_clears_buffers_and_cursors() {
464 let mut buf = MailboxBuffers::default();
465 buf.extend(MailboxTab::Inbox, vec![row(3, "a", "b", "x")]);
466 buf.extend(MailboxTab::Channel, vec![row(4, "a", "channel:p:all", "y")]);
467 buf.reset();
468 assert!(buf.inbox.is_empty());
469 assert!(buf.channel.is_empty());
470 assert_eq!(buf.inbox_after, 0);
471 assert_eq!(buf.channel_after, 0);
472 }
473
474 fn empty_team() -> crate::data::TeamSnapshot {
475 crate::data::TeamSnapshot::empty(std::path::PathBuf::from("/tmp"))
476 }
477
478 #[test]
479 fn render_row_flattens_newlines_and_truncates() {
480 let team = empty_team();
481 let r = row(1, "p:m", "p:dev", "first\nsecond\nthird");
482 assert_eq!(
483 render_row(&r, &team, MailboxTab::Inbox),
484 "[p:m] first second third"
485 );
486
487 let long: String = "x".repeat(300);
488 let r = row(1, "s", "r", &long);
489 let rendered = render_row(&r, &team, MailboxTab::Inbox);
490 assert!(rendered.chars().count() <= 185);
492 }
493
494 #[test]
495 fn render_row_uses_display_name_when_set() {
496 use crate::data::{AgentInfo, TeamSnapshot};
500 use team_core::supervisor::AgentState;
501 let agent = AgentInfo {
502 id: "p:sage".into(),
503 agent: "sage".into(),
504 project: "p".into(),
505 tmux_session: "a-p-sage".into(),
506 state: AgentState::Unknown,
507 unread_mail: 0,
508 pending_approvals: 0,
509 is_manager: true,
510 display_name: Some("Sage (Visionary)".into()),
511 rate_limit_resets_at: None,
512 reports_to: None,
513 };
514 let team = TeamSnapshot {
515 root: std::path::PathBuf::from("/tmp"),
516 team_name: "t".into(),
517 agents: vec![agent],
518 channels: vec![],
519 };
520 let r = row(1, "p:sage", "p:hugo", "ping");
521 assert_eq!(
522 render_row(&r, &team, MailboxTab::Inbox),
523 "[Sage (Visionary)] ping"
524 );
525 }
526
527 #[test]
531 fn render_row_sent_tab_shows_recipient_with_arrow() {
532 let team = empty_team();
536 let r = row(1, "p:me", "p:dev", "ack");
537 assert_eq!(render_row(&r, &team, MailboxTab::Sent), "[→p:dev] ack");
538 }
539
540 #[test]
541 fn render_row_sent_tab_resolves_recipient_display_name() {
542 use crate::data::{AgentInfo, TeamSnapshot};
546 use team_core::supervisor::AgentState;
547 let agent = AgentInfo {
548 id: "p:hugo".into(),
549 agent: "hugo".into(),
550 project: "p".into(),
551 tmux_session: "a-p-hugo".into(),
552 state: AgentState::Running,
553 unread_mail: 0,
554 pending_approvals: 0,
555 is_manager: true,
556 display_name: Some("Hugo (PM)".into()),
557 rate_limit_resets_at: None,
558 reports_to: None,
559 };
560 let team = TeamSnapshot {
561 root: std::path::PathBuf::from("/tmp"),
562 team_name: "t".into(),
563 agents: vec![agent],
564 channels: vec![],
565 };
566 let r = row(1, "p:sage", "p:hugo", "ping");
567 assert_eq!(render_row(&r, &team, MailboxTab::Sent), "[→Hugo (PM)] ping");
568 }
569
570 #[test]
571 fn render_row_sent_tab_renders_channel_recipient_with_hash() {
572 let team = empty_team();
576 let r = row(1, "p:me", "channel:teamctl:dev", "rolling 0.8.3");
577 assert_eq!(
578 render_row(&r, &team, MailboxTab::Sent),
579 "[→#dev] rolling 0.8.3"
580 );
581 }
582
583 #[test]
584 fn render_row_sent_tab_renders_user_recipient_verbatim() {
585 let team = empty_team();
590 let r = row(1, "p:mgr", "user:telegram", "PR url");
591 assert_eq!(
592 render_row(&r, &team, MailboxTab::Sent),
593 "[→user:telegram] PR url"
594 );
595 }
596
597 #[test]
598 fn render_row_non_sent_tabs_still_show_sender() {
599 let team = empty_team();
602 let r = row(1, "p:from", "p:me", "yo");
603 assert_eq!(render_row(&r, &team, MailboxTab::Inbox), "[p:from] yo");
604 assert_eq!(render_row(&r, &team, MailboxTab::Channel), "[p:from] yo");
605 assert_eq!(render_row(&r, &team, MailboxTab::Wire), "[p:from] yo");
606 }
607
608 #[test]
609 fn mock_records_calls() {
610 let mock = MockMailboxSource {
611 inbox_rows: vec![row(1, "p:m", "p:a", "hi")],
612 ..Default::default()
613 };
614 let _ = mock.inbox("p:a", 0).unwrap();
615 let _ = mock.sent("p:a", 2).unwrap();
616 let _ = mock.channel_feed("p:a", 5).unwrap();
617 let _ = mock.wire("p", 9).unwrap();
618 assert_eq!(*mock.inbox_calls.lock().unwrap(), vec![("p:a".into(), 0)]);
619 assert_eq!(*mock.sent_calls.lock().unwrap(), vec![("p:a".into(), 2)]);
620 assert_eq!(*mock.channel_calls.lock().unwrap(), vec![("p:a".into(), 5)]);
621 assert_eq!(*mock.wire_calls.lock().unwrap(), vec![("p".into(), 9)]);
622 }
623}