1use std::path::PathBuf;
27
28use anyhow::Result;
29use rusqlite::{params, Connection};
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum MailboxTab {
33 Inbox,
34 Sent,
35 Channel,
36 Wire,
37}
38
39impl MailboxTab {
40 pub const ALL: [MailboxTab; 4] = [
41 MailboxTab::Inbox,
42 MailboxTab::Sent,
43 MailboxTab::Channel,
44 MailboxTab::Wire,
45 ];
46
47 pub fn label(self) -> &'static str {
48 match self {
49 MailboxTab::Inbox => "Inbox",
50 MailboxTab::Sent => "Sent",
51 MailboxTab::Channel => "Channel",
52 MailboxTab::Wire => "Wire",
53 }
54 }
55
56 pub fn empty_hint(self) -> &'static str {
57 match self {
58 MailboxTab::Inbox => "(no DMs)",
59 MailboxTab::Sent => "(no sent messages)",
60 MailboxTab::Channel => "(no channel traffic)",
61 MailboxTab::Wire => "(quiet)",
62 }
63 }
64
65 pub fn next(self) -> Self {
66 match self {
67 MailboxTab::Inbox => MailboxTab::Sent,
68 MailboxTab::Sent => MailboxTab::Channel,
69 MailboxTab::Channel => MailboxTab::Wire,
70 MailboxTab::Wire => MailboxTab::Inbox,
71 }
72 }
73
74 pub fn prev(self) -> Self {
75 match self {
76 MailboxTab::Inbox => MailboxTab::Wire,
77 MailboxTab::Sent => MailboxTab::Inbox,
78 MailboxTab::Channel => MailboxTab::Sent,
79 MailboxTab::Wire => MailboxTab::Channel,
80 }
81 }
82}
83
84#[derive(Debug, Clone)]
85pub struct MessageRow {
86 pub id: i64,
87 pub sender: String,
88 pub recipient: String,
89 pub text: String,
90 pub sent_at: f64,
91}
92
93pub fn render_row(row: &MessageRow, team: &crate::data::TeamSnapshot) -> String {
101 let one_line: String = row
102 .text
103 .replace('\n', " ")
104 .replace('\r', "")
105 .chars()
106 .take(180)
107 .collect();
108 let sender = crate::data::agent_label(team, &row.sender);
109 format!("[{}] {}", sender, one_line)
110}
111
112pub trait MailboxSource: Send + Sync {
117 fn inbox(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>>;
118 fn sent(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>>;
119 fn channel_feed(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>>;
120 fn wire(&self, project_id: &str, after_id: i64) -> Result<Vec<MessageRow>>;
121}
122
123#[derive(Debug, Clone)]
127pub struct BrokerMailboxSource {
128 pub db_path: PathBuf,
129}
130
131impl BrokerMailboxSource {
132 pub fn new(db_path: PathBuf) -> Self {
133 Self { db_path }
134 }
135
136 fn open(&self) -> Result<Option<Connection>> {
137 if !self.db_path.is_file() {
138 return Ok(None);
139 }
140 let conn = Connection::open(&self.db_path)?;
141 Ok(Some(conn))
142 }
143}
144
145impl MailboxSource for BrokerMailboxSource {
146 fn inbox(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
147 let Some(conn) = self.open()? else {
148 return Ok(Vec::new());
149 };
150 let mut stmt = conn.prepare(
151 "SELECT id, sender, recipient, text, sent_at FROM messages
152 WHERE id > ?1 AND recipient = ?2
153 ORDER BY id ASC",
154 )?;
155 let rows = stmt
156 .query_map(params![after_id, agent_id], |r| {
157 Ok(MessageRow {
158 id: r.get(0)?,
159 sender: r.get(1)?,
160 recipient: r.get(2)?,
161 text: r.get(3)?,
162 sent_at: r.get(4)?,
163 })
164 })?
165 .flatten()
166 .collect();
167 Ok(rows)
168 }
169
170 fn sent(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
171 let Some(conn) = self.open()? else {
172 return Ok(Vec::new());
173 };
174 let mut stmt = conn.prepare(
179 "SELECT id, sender, recipient, text, sent_at FROM messages
180 WHERE id > ?1 AND sender = ?2
181 ORDER BY id ASC",
182 )?;
183 let rows = stmt
184 .query_map(params![after_id, agent_id], |r| {
185 Ok(MessageRow {
186 id: r.get(0)?,
187 sender: r.get(1)?,
188 recipient: r.get(2)?,
189 text: r.get(3)?,
190 sent_at: r.get(4)?,
191 })
192 })?
193 .flatten()
194 .collect();
195 Ok(rows)
196 }
197
198 fn channel_feed(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
199 let Some(conn) = self.open()? else {
200 return Ok(Vec::new());
201 };
202 let mut stmt = conn.prepare(
207 "SELECT id, sender, recipient, text, sent_at FROM messages
208 WHERE id > ?1
209 AND recipient IN (
210 SELECT 'channel:' || cm.channel_id FROM channel_members cm
211 WHERE cm.agent_id = ?2
212 )
213 ORDER BY id ASC",
214 )?;
215 let rows = stmt
216 .query_map(params![after_id, agent_id], |r| {
217 Ok(MessageRow {
218 id: r.get(0)?,
219 sender: r.get(1)?,
220 recipient: r.get(2)?,
221 text: r.get(3)?,
222 sent_at: r.get(4)?,
223 })
224 })?
225 .flatten()
226 .collect();
227 Ok(rows)
228 }
229
230 fn wire(&self, project_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
231 let Some(conn) = self.open()? else {
232 return Ok(Vec::new());
233 };
234 let target = format!("channel:{project_id}:all");
238 let mut stmt = conn.prepare(
239 "SELECT id, sender, recipient, text, sent_at FROM messages
240 WHERE id > ?1 AND recipient = ?2
241 ORDER BY id ASC",
242 )?;
243 let rows = stmt
244 .query_map(params![after_id, target], |r| {
245 Ok(MessageRow {
246 id: r.get(0)?,
247 sender: r.get(1)?,
248 recipient: r.get(2)?,
249 text: r.get(3)?,
250 sent_at: r.get(4)?,
251 })
252 })?
253 .flatten()
254 .collect();
255 Ok(rows)
256 }
257}
258
259#[derive(Debug, Default, Clone)]
264pub struct MailboxBuffers {
265 pub inbox: Vec<MessageRow>,
266 pub sent: Vec<MessageRow>,
267 pub channel: Vec<MessageRow>,
268 pub wire: Vec<MessageRow>,
269 pub inbox_after: i64,
270 pub sent_after: i64,
271 pub channel_after: i64,
272 pub wire_after: i64,
273}
274
275const MAX_TAB_ROWS: usize = 500;
276
277impl MailboxBuffers {
278 pub fn rows(&self, tab: MailboxTab) -> &[MessageRow] {
279 match tab {
280 MailboxTab::Inbox => &self.inbox,
281 MailboxTab::Sent => &self.sent,
282 MailboxTab::Channel => &self.channel,
283 MailboxTab::Wire => &self.wire,
284 }
285 }
286
287 pub fn extend(&mut self, tab: MailboxTab, batch: Vec<MessageRow>) {
291 let last_id = batch.last().map(|r| r.id);
292 let (buf, after) = match tab {
293 MailboxTab::Inbox => (&mut self.inbox, &mut self.inbox_after),
294 MailboxTab::Sent => (&mut self.sent, &mut self.sent_after),
295 MailboxTab::Channel => (&mut self.channel, &mut self.channel_after),
296 MailboxTab::Wire => (&mut self.wire, &mut self.wire_after),
297 };
298 buf.extend(batch);
299 if buf.len() > MAX_TAB_ROWS {
300 let drop = buf.len() - MAX_TAB_ROWS;
301 buf.drain(..drop);
302 }
303 if let Some(id) = last_id {
304 *after = id;
305 }
306 }
307
308 pub fn reset(&mut self) {
313 *self = Self::default();
314 }
315}
316
317pub mod test_support {
318 use super::*;
324 use std::sync::Mutex;
325
326 #[derive(Default)]
331 pub struct MockMailboxSource {
332 pub inbox_rows: Vec<MessageRow>,
333 pub sent_rows: Vec<MessageRow>,
334 pub channel_rows: Vec<MessageRow>,
335 pub wire_rows: Vec<MessageRow>,
336 pub inbox_calls: Mutex<Vec<(String, i64)>>,
337 pub sent_calls: Mutex<Vec<(String, i64)>>,
338 pub channel_calls: Mutex<Vec<(String, i64)>>,
339 pub wire_calls: Mutex<Vec<(String, i64)>>,
340 }
341
342 impl MailboxSource for MockMailboxSource {
343 fn inbox(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
344 self.inbox_calls
345 .lock()
346 .unwrap()
347 .push((agent_id.into(), after_id));
348 Ok(self.inbox_rows.clone())
349 }
350
351 fn sent(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
352 self.sent_calls
353 .lock()
354 .unwrap()
355 .push((agent_id.into(), after_id));
356 Ok(self.sent_rows.clone())
357 }
358
359 fn channel_feed(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
360 self.channel_calls
361 .lock()
362 .unwrap()
363 .push((agent_id.into(), after_id));
364 Ok(self.channel_rows.clone())
365 }
366
367 fn wire(&self, project_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
368 self.wire_calls
369 .lock()
370 .unwrap()
371 .push((project_id.into(), after_id));
372 Ok(self.wire_rows.clone())
373 }
374 }
375}
376
377#[cfg(test)]
378mod tests {
379 use super::test_support::*;
380 use super::*;
381
382 fn row(id: i64, sender: &str, recipient: &str, text: &str) -> MessageRow {
383 MessageRow {
384 id,
385 sender: sender.into(),
386 recipient: recipient.into(),
387 text: text.into(),
388 sent_at: 0.0,
389 }
390 }
391
392 #[test]
393 fn next_cycles_inbox_sent_channel_wire_inbox() {
394 let mut t = MailboxTab::Inbox;
395 t = t.next();
396 assert_eq!(t, MailboxTab::Sent);
397 t = t.next();
398 assert_eq!(t, MailboxTab::Channel);
399 t = t.next();
400 assert_eq!(t, MailboxTab::Wire);
401 t = t.next();
402 assert_eq!(t, MailboxTab::Inbox);
403 }
404
405 #[test]
406 fn prev_cycles_inbox_wire_channel_sent_inbox() {
407 let mut t = MailboxTab::Inbox;
408 t = t.prev();
409 assert_eq!(t, MailboxTab::Wire);
410 t = t.prev();
411 assert_eq!(t, MailboxTab::Channel);
412 t = t.prev();
413 assert_eq!(t, MailboxTab::Sent);
414 t = t.prev();
415 assert_eq!(t, MailboxTab::Inbox);
416 }
417
418 #[test]
419 fn extend_appends_and_bumps_cursor() {
420 let mut buf = MailboxBuffers::default();
421 buf.extend(
422 MailboxTab::Inbox,
423 vec![row(7, "p:m", "p:dev", "hi"), row(8, "p:m", "p:dev", "yo")],
424 );
425 assert_eq!(buf.inbox.len(), 2);
426 assert_eq!(buf.inbox_after, 8);
427 buf.extend(MailboxTab::Inbox, vec![]);
429 assert_eq!(buf.inbox_after, 8);
430 }
431
432 #[test]
433 fn extend_trims_to_cap() {
434 let mut buf = MailboxBuffers::default();
435 let batch: Vec<MessageRow> = (1..=600).map(|i| row(i, "p:m", "p:dev", "x")).collect();
436 buf.extend(MailboxTab::Wire, batch);
437 assert_eq!(buf.wire.len(), MAX_TAB_ROWS);
438 assert_eq!(buf.wire_after, 600);
442 assert_eq!(buf.wire.last().unwrap().id, 600);
443 }
444
445 #[test]
446 fn reset_clears_buffers_and_cursors() {
447 let mut buf = MailboxBuffers::default();
448 buf.extend(MailboxTab::Inbox, vec![row(3, "a", "b", "x")]);
449 buf.extend(MailboxTab::Channel, vec![row(4, "a", "channel:p:all", "y")]);
450 buf.reset();
451 assert!(buf.inbox.is_empty());
452 assert!(buf.channel.is_empty());
453 assert_eq!(buf.inbox_after, 0);
454 assert_eq!(buf.channel_after, 0);
455 }
456
457 fn empty_team() -> crate::data::TeamSnapshot {
458 crate::data::TeamSnapshot::empty(std::path::PathBuf::from("/tmp"))
459 }
460
461 #[test]
462 fn render_row_flattens_newlines_and_truncates() {
463 let team = empty_team();
464 let r = row(1, "p:m", "p:dev", "first\nsecond\nthird");
465 assert_eq!(render_row(&r, &team), "[p:m] first second third");
466
467 let long: String = "x".repeat(300);
468 let r = row(1, "s", "r", &long);
469 let rendered = render_row(&r, &team);
470 assert!(rendered.chars().count() <= 185);
472 }
473
474 #[test]
475 fn render_row_uses_display_name_when_set() {
476 use crate::data::{AgentInfo, TeamSnapshot};
480 use team_core::supervisor::AgentState;
481 let agent = AgentInfo {
482 id: "p:sage".into(),
483 agent: "sage".into(),
484 project: "p".into(),
485 tmux_session: "a-p-sage".into(),
486 state: AgentState::Unknown,
487 unread_mail: 0,
488 pending_approvals: 0,
489 is_manager: true,
490 display_name: Some("Sage (Visionary)".into()),
491 rate_limit_resets_at: None,
492 };
493 let team = TeamSnapshot {
494 root: std::path::PathBuf::from("/tmp"),
495 team_name: "t".into(),
496 agents: vec![agent],
497 channels: vec![],
498 };
499 let r = row(1, "p:sage", "p:hugo", "ping");
500 assert_eq!(render_row(&r, &team), "[Sage (Visionary)] ping");
501 }
502
503 #[test]
504 fn mock_records_calls() {
505 let mock = MockMailboxSource {
506 inbox_rows: vec![row(1, "p:m", "p:a", "hi")],
507 ..Default::default()
508 };
509 let _ = mock.inbox("p:a", 0).unwrap();
510 let _ = mock.sent("p:a", 2).unwrap();
511 let _ = mock.channel_feed("p:a", 5).unwrap();
512 let _ = mock.wire("p", 9).unwrap();
513 assert_eq!(*mock.inbox_calls.lock().unwrap(), vec![("p:a".into(), 0)]);
514 assert_eq!(*mock.sent_calls.lock().unwrap(), vec![("p:a".into(), 2)]);
515 assert_eq!(*mock.channel_calls.lock().unwrap(), vec![("p:a".into(), 5)]);
516 assert_eq!(*mock.wire_calls.lock().unwrap(), vec![("p".into(), 9)]);
517 }
518}