1use std::path::PathBuf;
22
23use anyhow::Result;
24use rusqlite::{params, Connection};
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum MailboxTab {
28 Inbox,
29 Channel,
30 Wire,
31}
32
33impl MailboxTab {
34 pub const ALL: [MailboxTab; 3] = [MailboxTab::Inbox, MailboxTab::Channel, MailboxTab::Wire];
35
36 pub fn label(self) -> &'static str {
37 match self {
38 MailboxTab::Inbox => "Inbox",
39 MailboxTab::Channel => "Channel",
40 MailboxTab::Wire => "Wire",
41 }
42 }
43
44 pub fn empty_hint(self) -> &'static str {
45 match self {
46 MailboxTab::Inbox => "(no DMs)",
47 MailboxTab::Channel => "(no channel traffic)",
48 MailboxTab::Wire => "(quiet)",
49 }
50 }
51
52 pub fn next(self) -> Self {
53 match self {
54 MailboxTab::Inbox => MailboxTab::Channel,
55 MailboxTab::Channel => MailboxTab::Wire,
56 MailboxTab::Wire => MailboxTab::Inbox,
57 }
58 }
59
60 pub fn prev(self) -> Self {
61 match self {
62 MailboxTab::Inbox => MailboxTab::Wire,
63 MailboxTab::Channel => MailboxTab::Inbox,
64 MailboxTab::Wire => MailboxTab::Channel,
65 }
66 }
67}
68
69#[derive(Debug, Clone)]
70pub struct MessageRow {
71 pub id: i64,
72 pub sender: String,
73 pub recipient: String,
74 pub text: String,
75 pub sent_at: f64,
76}
77
78pub fn render_row(row: &MessageRow) -> String {
83 let one_line: String = row
84 .text
85 .replace('\n', " ")
86 .replace('\r', "")
87 .chars()
88 .take(180)
89 .collect();
90 format!("[{}] {}", row.sender, one_line)
91}
92
93pub trait MailboxSource: Send + Sync {
98 fn inbox(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>>;
99 fn channel_feed(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>>;
100 fn wire(&self, project_id: &str, after_id: i64) -> Result<Vec<MessageRow>>;
101}
102
103#[derive(Debug, Clone)]
107pub struct BrokerMailboxSource {
108 pub db_path: PathBuf,
109}
110
111impl BrokerMailboxSource {
112 pub fn new(db_path: PathBuf) -> Self {
113 Self { db_path }
114 }
115
116 fn open(&self) -> Result<Option<Connection>> {
117 if !self.db_path.is_file() {
118 return Ok(None);
119 }
120 let conn = Connection::open(&self.db_path)?;
121 Ok(Some(conn))
122 }
123}
124
125impl MailboxSource for BrokerMailboxSource {
126 fn inbox(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
127 let Some(conn) = self.open()? else {
128 return Ok(Vec::new());
129 };
130 let mut stmt = conn.prepare(
131 "SELECT id, sender, recipient, text, sent_at FROM messages
132 WHERE id > ?1 AND recipient = ?2
133 ORDER BY id ASC",
134 )?;
135 let rows = stmt
136 .query_map(params![after_id, agent_id], |r| {
137 Ok(MessageRow {
138 id: r.get(0)?,
139 sender: r.get(1)?,
140 recipient: r.get(2)?,
141 text: r.get(3)?,
142 sent_at: r.get(4)?,
143 })
144 })?
145 .flatten()
146 .collect();
147 Ok(rows)
148 }
149
150 fn channel_feed(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
151 let Some(conn) = self.open()? else {
152 return Ok(Vec::new());
153 };
154 let mut stmt = conn.prepare(
159 "SELECT id, sender, recipient, text, sent_at FROM messages
160 WHERE id > ?1
161 AND recipient IN (
162 SELECT 'channel:' || cm.channel_id FROM channel_members cm
163 WHERE cm.agent_id = ?2
164 )
165 ORDER BY id ASC",
166 )?;
167 let rows = stmt
168 .query_map(params![after_id, agent_id], |r| {
169 Ok(MessageRow {
170 id: r.get(0)?,
171 sender: r.get(1)?,
172 recipient: r.get(2)?,
173 text: r.get(3)?,
174 sent_at: r.get(4)?,
175 })
176 })?
177 .flatten()
178 .collect();
179 Ok(rows)
180 }
181
182 fn wire(&self, project_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
183 let Some(conn) = self.open()? else {
184 return Ok(Vec::new());
185 };
186 let target = format!("channel:{project_id}:all");
190 let mut stmt = conn.prepare(
191 "SELECT id, sender, recipient, text, sent_at FROM messages
192 WHERE id > ?1 AND recipient = ?2
193 ORDER BY id ASC",
194 )?;
195 let rows = stmt
196 .query_map(params![after_id, target], |r| {
197 Ok(MessageRow {
198 id: r.get(0)?,
199 sender: r.get(1)?,
200 recipient: r.get(2)?,
201 text: r.get(3)?,
202 sent_at: r.get(4)?,
203 })
204 })?
205 .flatten()
206 .collect();
207 Ok(rows)
208 }
209}
210
211#[derive(Debug, Default, Clone)]
216pub struct MailboxBuffers {
217 pub inbox: Vec<MessageRow>,
218 pub channel: Vec<MessageRow>,
219 pub wire: Vec<MessageRow>,
220 pub inbox_after: i64,
221 pub channel_after: i64,
222 pub wire_after: i64,
223}
224
225const MAX_TAB_ROWS: usize = 500;
226
227impl MailboxBuffers {
228 pub fn rows(&self, tab: MailboxTab) -> &[MessageRow] {
229 match tab {
230 MailboxTab::Inbox => &self.inbox,
231 MailboxTab::Channel => &self.channel,
232 MailboxTab::Wire => &self.wire,
233 }
234 }
235
236 pub fn extend(&mut self, tab: MailboxTab, batch: Vec<MessageRow>) {
240 let last_id = batch.last().map(|r| r.id);
241 let (buf, after) = match tab {
242 MailboxTab::Inbox => (&mut self.inbox, &mut self.inbox_after),
243 MailboxTab::Channel => (&mut self.channel, &mut self.channel_after),
244 MailboxTab::Wire => (&mut self.wire, &mut self.wire_after),
245 };
246 buf.extend(batch);
247 if buf.len() > MAX_TAB_ROWS {
248 let drop = buf.len() - MAX_TAB_ROWS;
249 buf.drain(..drop);
250 }
251 if let Some(id) = last_id {
252 *after = id;
253 }
254 }
255
256 pub fn reset(&mut self) {
261 *self = Self::default();
262 }
263}
264
265pub mod test_support {
266 use super::*;
272 use std::sync::Mutex;
273
274 #[derive(Default)]
279 pub struct MockMailboxSource {
280 pub inbox_rows: Vec<MessageRow>,
281 pub channel_rows: Vec<MessageRow>,
282 pub wire_rows: Vec<MessageRow>,
283 pub inbox_calls: Mutex<Vec<(String, i64)>>,
284 pub channel_calls: Mutex<Vec<(String, i64)>>,
285 pub wire_calls: Mutex<Vec<(String, i64)>>,
286 }
287
288 impl MailboxSource for MockMailboxSource {
289 fn inbox(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
290 self.inbox_calls
291 .lock()
292 .unwrap()
293 .push((agent_id.into(), after_id));
294 Ok(self.inbox_rows.clone())
295 }
296
297 fn channel_feed(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
298 self.channel_calls
299 .lock()
300 .unwrap()
301 .push((agent_id.into(), after_id));
302 Ok(self.channel_rows.clone())
303 }
304
305 fn wire(&self, project_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
306 self.wire_calls
307 .lock()
308 .unwrap()
309 .push((project_id.into(), after_id));
310 Ok(self.wire_rows.clone())
311 }
312 }
313}
314
315#[cfg(test)]
316mod tests {
317 use super::test_support::*;
318 use super::*;
319
320 fn row(id: i64, sender: &str, recipient: &str, text: &str) -> MessageRow {
321 MessageRow {
322 id,
323 sender: sender.into(),
324 recipient: recipient.into(),
325 text: text.into(),
326 sent_at: 0.0,
327 }
328 }
329
330 #[test]
331 fn next_cycles_inbox_channel_wire_inbox() {
332 let mut t = MailboxTab::Inbox;
333 t = t.next();
334 assert_eq!(t, MailboxTab::Channel);
335 t = t.next();
336 assert_eq!(t, MailboxTab::Wire);
337 t = t.next();
338 assert_eq!(t, MailboxTab::Inbox);
339 }
340
341 #[test]
342 fn extend_appends_and_bumps_cursor() {
343 let mut buf = MailboxBuffers::default();
344 buf.extend(
345 MailboxTab::Inbox,
346 vec![row(7, "p:m", "p:dev", "hi"), row(8, "p:m", "p:dev", "yo")],
347 );
348 assert_eq!(buf.inbox.len(), 2);
349 assert_eq!(buf.inbox_after, 8);
350 buf.extend(MailboxTab::Inbox, vec![]);
352 assert_eq!(buf.inbox_after, 8);
353 }
354
355 #[test]
356 fn extend_trims_to_cap() {
357 let mut buf = MailboxBuffers::default();
358 let batch: Vec<MessageRow> = (1..=600).map(|i| row(i, "p:m", "p:dev", "x")).collect();
359 buf.extend(MailboxTab::Wire, batch);
360 assert_eq!(buf.wire.len(), MAX_TAB_ROWS);
361 assert_eq!(buf.wire_after, 600);
365 assert_eq!(buf.wire.last().unwrap().id, 600);
366 }
367
368 #[test]
369 fn reset_clears_buffers_and_cursors() {
370 let mut buf = MailboxBuffers::default();
371 buf.extend(MailboxTab::Inbox, vec![row(3, "a", "b", "x")]);
372 buf.extend(MailboxTab::Channel, vec![row(4, "a", "channel:p:all", "y")]);
373 buf.reset();
374 assert!(buf.inbox.is_empty());
375 assert!(buf.channel.is_empty());
376 assert_eq!(buf.inbox_after, 0);
377 assert_eq!(buf.channel_after, 0);
378 }
379
380 #[test]
381 fn render_row_flattens_newlines_and_truncates() {
382 let r = row(1, "p:m", "p:dev", "first\nsecond\nthird");
383 assert_eq!(render_row(&r), "[p:m] first second third");
384
385 let long: String = "x".repeat(300);
386 let r = row(1, "s", "r", &long);
387 let rendered = render_row(&r);
388 assert!(rendered.chars().count() <= 185);
390 }
391
392 #[test]
393 fn mock_records_calls() {
394 let mock = MockMailboxSource {
395 inbox_rows: vec![row(1, "p:m", "p:a", "hi")],
396 ..Default::default()
397 };
398 let _ = mock.inbox("p:a", 0).unwrap();
399 let _ = mock.channel_feed("p:a", 5).unwrap();
400 let _ = mock.wire("p", 9).unwrap();
401 assert_eq!(*mock.inbox_calls.lock().unwrap(), vec![("p:a".into(), 0)]);
402 assert_eq!(*mock.channel_calls.lock().unwrap(), vec![("p:a".into(), 5)]);
403 assert_eq!(*mock.wire_calls.lock().unwrap(), vec![("p".into(), 9)]);
404 }
405}