use std::path::PathBuf;
use anyhow::Result;
use rusqlite::{params, Connection};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MailboxTab {
Inbox,
Sent,
Channel,
Wire,
}
impl MailboxTab {
pub const ALL: [MailboxTab; 4] = [
MailboxTab::Inbox,
MailboxTab::Sent,
MailboxTab::Channel,
MailboxTab::Wire,
];
pub fn label(self) -> &'static str {
match self {
MailboxTab::Inbox => "Inbox",
MailboxTab::Sent => "Sent",
MailboxTab::Channel => "Channel",
MailboxTab::Wire => "Wire",
}
}
pub fn empty_hint(self) -> &'static str {
match self {
MailboxTab::Inbox => "(no DMs)",
MailboxTab::Sent => "(no sent messages)",
MailboxTab::Channel => "(no channel traffic)",
MailboxTab::Wire => "(quiet)",
}
}
pub fn next(self) -> Self {
match self {
MailboxTab::Inbox => MailboxTab::Sent,
MailboxTab::Sent => MailboxTab::Channel,
MailboxTab::Channel => MailboxTab::Wire,
MailboxTab::Wire => MailboxTab::Inbox,
}
}
pub fn prev(self) -> Self {
match self {
MailboxTab::Inbox => MailboxTab::Wire,
MailboxTab::Sent => MailboxTab::Inbox,
MailboxTab::Channel => MailboxTab::Sent,
MailboxTab::Wire => MailboxTab::Channel,
}
}
}
#[derive(Debug, Clone)]
pub struct MessageRow {
pub id: i64,
pub sender: String,
pub recipient: String,
pub text: String,
pub sent_at: f64,
}
pub fn render_row(row: &MessageRow, team: &crate::data::TeamSnapshot) -> String {
let one_line: String = row
.text
.replace('\n', " ")
.replace('\r', "")
.chars()
.take(180)
.collect();
let sender = crate::data::agent_label(team, &row.sender);
format!("[{}] {}", sender, one_line)
}
pub trait MailboxSource: Send + Sync {
fn inbox(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>>;
fn sent(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>>;
fn channel_feed(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>>;
fn wire(&self, project_id: &str, after_id: i64) -> Result<Vec<MessageRow>>;
}
#[derive(Debug, Clone)]
pub struct BrokerMailboxSource {
pub db_path: PathBuf,
}
impl BrokerMailboxSource {
pub fn new(db_path: PathBuf) -> Self {
Self { db_path }
}
fn open(&self) -> Result<Option<Connection>> {
if !self.db_path.is_file() {
return Ok(None);
}
let conn = Connection::open(&self.db_path)?;
Ok(Some(conn))
}
}
impl MailboxSource for BrokerMailboxSource {
fn inbox(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
let Some(conn) = self.open()? else {
return Ok(Vec::new());
};
let mut stmt = conn.prepare(
"SELECT id, sender, recipient, text, sent_at FROM messages
WHERE id > ?1 AND recipient = ?2
ORDER BY id ASC",
)?;
let rows = stmt
.query_map(params![after_id, agent_id], |r| {
Ok(MessageRow {
id: r.get(0)?,
sender: r.get(1)?,
recipient: r.get(2)?,
text: r.get(3)?,
sent_at: r.get(4)?,
})
})?
.flatten()
.collect();
Ok(rows)
}
fn sent(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
let Some(conn) = self.open()? else {
return Ok(Vec::new());
};
let mut stmt = conn.prepare(
"SELECT id, sender, recipient, text, sent_at FROM messages
WHERE id > ?1 AND sender = ?2
ORDER BY id ASC",
)?;
let rows = stmt
.query_map(params![after_id, agent_id], |r| {
Ok(MessageRow {
id: r.get(0)?,
sender: r.get(1)?,
recipient: r.get(2)?,
text: r.get(3)?,
sent_at: r.get(4)?,
})
})?
.flatten()
.collect();
Ok(rows)
}
fn channel_feed(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
let Some(conn) = self.open()? else {
return Ok(Vec::new());
};
let mut stmt = conn.prepare(
"SELECT id, sender, recipient, text, sent_at FROM messages
WHERE id > ?1
AND recipient IN (
SELECT 'channel:' || cm.channel_id FROM channel_members cm
WHERE cm.agent_id = ?2
)
ORDER BY id ASC",
)?;
let rows = stmt
.query_map(params![after_id, agent_id], |r| {
Ok(MessageRow {
id: r.get(0)?,
sender: r.get(1)?,
recipient: r.get(2)?,
text: r.get(3)?,
sent_at: r.get(4)?,
})
})?
.flatten()
.collect();
Ok(rows)
}
fn wire(&self, project_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
let Some(conn) = self.open()? else {
return Ok(Vec::new());
};
let target = format!("channel:{project_id}:all");
let mut stmt = conn.prepare(
"SELECT id, sender, recipient, text, sent_at FROM messages
WHERE id > ?1 AND recipient = ?2
ORDER BY id ASC",
)?;
let rows = stmt
.query_map(params![after_id, target], |r| {
Ok(MessageRow {
id: r.get(0)?,
sender: r.get(1)?,
recipient: r.get(2)?,
text: r.get(3)?,
sent_at: r.get(4)?,
})
})?
.flatten()
.collect();
Ok(rows)
}
}
#[derive(Debug, Default, Clone)]
pub struct MailboxBuffers {
pub inbox: Vec<MessageRow>,
pub sent: Vec<MessageRow>,
pub channel: Vec<MessageRow>,
pub wire: Vec<MessageRow>,
pub inbox_after: i64,
pub sent_after: i64,
pub channel_after: i64,
pub wire_after: i64,
}
const MAX_TAB_ROWS: usize = 500;
impl MailboxBuffers {
pub fn rows(&self, tab: MailboxTab) -> &[MessageRow] {
match tab {
MailboxTab::Inbox => &self.inbox,
MailboxTab::Sent => &self.sent,
MailboxTab::Channel => &self.channel,
MailboxTab::Wire => &self.wire,
}
}
pub fn extend(&mut self, tab: MailboxTab, batch: Vec<MessageRow>) {
let last_id = batch.last().map(|r| r.id);
let (buf, after) = match tab {
MailboxTab::Inbox => (&mut self.inbox, &mut self.inbox_after),
MailboxTab::Sent => (&mut self.sent, &mut self.sent_after),
MailboxTab::Channel => (&mut self.channel, &mut self.channel_after),
MailboxTab::Wire => (&mut self.wire, &mut self.wire_after),
};
buf.extend(batch);
if buf.len() > MAX_TAB_ROWS {
let drop = buf.len() - MAX_TAB_ROWS;
buf.drain(..drop);
}
if let Some(id) = last_id {
*after = id;
}
}
pub fn reset(&mut self) {
*self = Self::default();
}
}
pub mod test_support {
use super::*;
use std::sync::Mutex;
#[derive(Default)]
pub struct MockMailboxSource {
pub inbox_rows: Vec<MessageRow>,
pub sent_rows: Vec<MessageRow>,
pub channel_rows: Vec<MessageRow>,
pub wire_rows: Vec<MessageRow>,
pub inbox_calls: Mutex<Vec<(String, i64)>>,
pub sent_calls: Mutex<Vec<(String, i64)>>,
pub channel_calls: Mutex<Vec<(String, i64)>>,
pub wire_calls: Mutex<Vec<(String, i64)>>,
}
impl MailboxSource for MockMailboxSource {
fn inbox(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
self.inbox_calls
.lock()
.unwrap()
.push((agent_id.into(), after_id));
Ok(self.inbox_rows.clone())
}
fn sent(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
self.sent_calls
.lock()
.unwrap()
.push((agent_id.into(), after_id));
Ok(self.sent_rows.clone())
}
fn channel_feed(&self, agent_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
self.channel_calls
.lock()
.unwrap()
.push((agent_id.into(), after_id));
Ok(self.channel_rows.clone())
}
fn wire(&self, project_id: &str, after_id: i64) -> Result<Vec<MessageRow>> {
self.wire_calls
.lock()
.unwrap()
.push((project_id.into(), after_id));
Ok(self.wire_rows.clone())
}
}
}
#[cfg(test)]
mod tests {
use super::test_support::*;
use super::*;
fn row(id: i64, sender: &str, recipient: &str, text: &str) -> MessageRow {
MessageRow {
id,
sender: sender.into(),
recipient: recipient.into(),
text: text.into(),
sent_at: 0.0,
}
}
#[test]
fn next_cycles_inbox_sent_channel_wire_inbox() {
let mut t = MailboxTab::Inbox;
t = t.next();
assert_eq!(t, MailboxTab::Sent);
t = t.next();
assert_eq!(t, MailboxTab::Channel);
t = t.next();
assert_eq!(t, MailboxTab::Wire);
t = t.next();
assert_eq!(t, MailboxTab::Inbox);
}
#[test]
fn prev_cycles_inbox_wire_channel_sent_inbox() {
let mut t = MailboxTab::Inbox;
t = t.prev();
assert_eq!(t, MailboxTab::Wire);
t = t.prev();
assert_eq!(t, MailboxTab::Channel);
t = t.prev();
assert_eq!(t, MailboxTab::Sent);
t = t.prev();
assert_eq!(t, MailboxTab::Inbox);
}
#[test]
fn extend_appends_and_bumps_cursor() {
let mut buf = MailboxBuffers::default();
buf.extend(
MailboxTab::Inbox,
vec![row(7, "p:m", "p:dev", "hi"), row(8, "p:m", "p:dev", "yo")],
);
assert_eq!(buf.inbox.len(), 2);
assert_eq!(buf.inbox_after, 8);
buf.extend(MailboxTab::Inbox, vec![]);
assert_eq!(buf.inbox_after, 8);
}
#[test]
fn extend_trims_to_cap() {
let mut buf = MailboxBuffers::default();
let batch: Vec<MessageRow> = (1..=600).map(|i| row(i, "p:m", "p:dev", "x")).collect();
buf.extend(MailboxTab::Wire, batch);
assert_eq!(buf.wire.len(), MAX_TAB_ROWS);
assert_eq!(buf.wire_after, 600);
assert_eq!(buf.wire.last().unwrap().id, 600);
}
#[test]
fn reset_clears_buffers_and_cursors() {
let mut buf = MailboxBuffers::default();
buf.extend(MailboxTab::Inbox, vec![row(3, "a", "b", "x")]);
buf.extend(MailboxTab::Channel, vec![row(4, "a", "channel:p:all", "y")]);
buf.reset();
assert!(buf.inbox.is_empty());
assert!(buf.channel.is_empty());
assert_eq!(buf.inbox_after, 0);
assert_eq!(buf.channel_after, 0);
}
fn empty_team() -> crate::data::TeamSnapshot {
crate::data::TeamSnapshot::empty(std::path::PathBuf::from("/tmp"))
}
#[test]
fn render_row_flattens_newlines_and_truncates() {
let team = empty_team();
let r = row(1, "p:m", "p:dev", "first\nsecond\nthird");
assert_eq!(render_row(&r, &team), "[p:m] first second third");
let long: String = "x".repeat(300);
let r = row(1, "s", "r", &long);
let rendered = render_row(&r, &team);
assert!(rendered.chars().count() <= 185);
}
#[test]
fn render_row_uses_display_name_when_set() {
use crate::data::{AgentInfo, TeamSnapshot};
use team_core::supervisor::AgentState;
let agent = AgentInfo {
id: "p:sage".into(),
agent: "sage".into(),
project: "p".into(),
tmux_session: "a-p-sage".into(),
state: AgentState::Unknown,
unread_mail: 0,
pending_approvals: 0,
is_manager: true,
display_name: Some("Sage (Visionary)".into()),
rate_limit_resets_at: None,
};
let team = TeamSnapshot {
root: std::path::PathBuf::from("/tmp"),
team_name: "t".into(),
agents: vec![agent],
channels: vec![],
};
let r = row(1, "p:sage", "p:hugo", "ping");
assert_eq!(render_row(&r, &team), "[Sage (Visionary)] ping");
}
#[test]
fn mock_records_calls() {
let mock = MockMailboxSource {
inbox_rows: vec![row(1, "p:m", "p:a", "hi")],
..Default::default()
};
let _ = mock.inbox("p:a", 0).unwrap();
let _ = mock.sent("p:a", 2).unwrap();
let _ = mock.channel_feed("p:a", 5).unwrap();
let _ = mock.wire("p", 9).unwrap();
assert_eq!(*mock.inbox_calls.lock().unwrap(), vec![("p:a".into(), 0)]);
assert_eq!(*mock.sent_calls.lock().unwrap(), vec![("p:a".into(), 2)]);
assert_eq!(*mock.channel_calls.lock().unwrap(), vec![("p:a".into(), 5)]);
assert_eq!(*mock.wire_calls.lock().unwrap(), vec![("p".into(), 9)]);
}
}