Skip to main content

team_core/
mailbox.rs

1//! SQLite mailbox schema shared by `team-mcp` and integration tests.
2//!
3//! The actual connection handling lives in `team-mcp`; this module defines
4//! the schema + migrations so both crates agree on the shape of the data.
5
6/// The one privileged mailbox `kind`. A `system` message is a lifecycle signal
7/// (drain, startup, rate-limit) the supervisor emits inline + real-time; only a
8/// `system:*` source may originate one (#254). If any agent or `user:*` could,
9/// a forged "session terminating" signal would be trivial.
10pub const PRIVILEGED_KIND: &str = "system";
11
12/// Is `kind` the privileged system kind? Single source of truth for the
13/// privileged-kind contract, consulted on every mailbox *write* path — the
14/// insert allowlist (`team-mcp` `store::send_dm_kind`, sender-gated) and the
15/// UPDATE guard (`team-bot` media dispatch, which refuses it outright) — so the
16/// contract has one definition rather than a `"system"` literal copied per site
17/// (#320).
18pub fn is_privileged_kind(kind: &str) -> bool {
19    kind == PRIVILEGED_KIND
20}
21
22/// Idempotent schema bootstrap. Safe to run on every connect.
23pub const SCHEMA: &str = r#"
24-- NOTE: pragmas (journal_mode=WAL, busy_timeout, foreign_keys) are set by
25-- the connection opener *before* this batch runs — concurrent openers race
26-- if we set them here.
27
28CREATE TABLE IF NOT EXISTS projects (
29    id   TEXT PRIMARY KEY,
30    name TEXT NOT NULL
31);
32
33CREATE TABLE IF NOT EXISTS agents (
34    id         TEXT PRIMARY KEY,          -- "<project>:<agent>"
35    project_id TEXT NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
36    role       TEXT NOT NULL,
37    runtime    TEXT NOT NULL,
38    is_manager INTEGER NOT NULL DEFAULT 0,
39    reports_to TEXT                        -- short name, resolved within project
40);
41
42CREATE INDEX IF NOT EXISTS agents_project_idx ON agents(project_id);
43
44CREATE TABLE IF NOT EXISTS messages (
45    id           INTEGER PRIMARY KEY AUTOINCREMENT,
46    project_id   TEXT NOT NULL,
47    sender       TEXT NOT NULL,            -- "<project>:<agent>" or "user:<handle>" or "cli"
48    recipient    TEXT NOT NULL,            -- "<project>:<agent>" or "channel:<project>:<name>"
49    text         TEXT NOT NULL,
50    thread_id    TEXT,
51    sent_at      REAL NOT NULL,
52    delivered_at REAL,
53    acked_at     REAL
54);
55
56CREATE INDEX IF NOT EXISTS messages_recipient_idx
57    ON messages(recipient, acked_at);
58CREATE INDEX IF NOT EXISTS messages_project_idx
59    ON messages(project_id, sent_at);
60
61-- Channels + subscriptions + per-agent ACLs.
62CREATE TABLE IF NOT EXISTS channels (
63    id         TEXT PRIMARY KEY,               -- "<project>:<name>"
64    project_id TEXT NOT NULL,
65    name       TEXT NOT NULL,
66    wildcard   INTEGER NOT NULL DEFAULT 0       -- 1 iff members = "*"
67);
68
69CREATE TABLE IF NOT EXISTS channel_members (
70    channel_id TEXT NOT NULL REFERENCES channels(id) ON DELETE CASCADE,
71    agent_id   TEXT NOT NULL,
72    PRIMARY KEY (channel_id, agent_id)
73);
74
75CREATE INDEX IF NOT EXISTS channel_members_agent_idx
76    ON channel_members(agent_id);
77
78CREATE TABLE IF NOT EXISTS agent_acls (
79    agent_id        TEXT PRIMARY KEY REFERENCES agents(id) ON DELETE CASCADE,
80    can_dm_json     TEXT NOT NULL DEFAULT '[]',    -- ["dev","critic"]
81    can_bcast_json  TEXT NOT NULL DEFAULT '[]'     -- ["product","all"]
82);
83
84-- Inter-project manager bridges.
85CREATE TABLE IF NOT EXISTS bridges (
86    id           INTEGER PRIMARY KEY AUTOINCREMENT,
87    from_agent   TEXT NOT NULL,             -- "<project>:<agent>", must be a manager
88    to_agent     TEXT NOT NULL,             -- "<project>:<agent>", must be a manager
89    topic        TEXT NOT NULL,
90    opened_by    TEXT NOT NULL,             -- "user:<handle>" or "cli"
91    opened_at    REAL NOT NULL,
92    expires_at   REAL NOT NULL,
93    closed_at    REAL
94);
95
96CREATE INDEX IF NOT EXISTS bridges_open_idx
97    ON bridges(expires_at, closed_at);
98
99-- Human-in-the-loop permission fabric.
100CREATE TABLE IF NOT EXISTS approvals (
101    id             INTEGER PRIMARY KEY AUTOINCREMENT,
102    project_id     TEXT NOT NULL,
103    agent_id       TEXT NOT NULL,
104    action         TEXT NOT NULL,          -- "publish", "deploy", ...
105    scope_tag      TEXT,                   -- optional narrower tag
106    summary        TEXT NOT NULL,
107    payload_json   TEXT,
108    status         TEXT NOT NULL,          -- pending | approved | denied | expired | undeliverable
109    requested_at   REAL NOT NULL,
110    decided_at     REAL,
111    decided_by     TEXT,
112    decision_note  TEXT,
113    expires_at     REAL NOT NULL,
114    delivered_at   REAL                    -- NULL until an interface adapter confirms surfacing to a human
115);
116
117CREATE INDEX IF NOT EXISTS approvals_pending_idx
118    ON approvals(status, expires_at);
119
120-- Budget ledger. Rows are appended by runtime cost parsers (the claude-code
121-- writer is `teamctl budget-record`, fired from the Stop hook). `teamctl
122-- budget` aggregates per project/day.
123CREATE TABLE IF NOT EXISTS budget (
124    id          INTEGER PRIMARY KEY AUTOINCREMENT,
125    project_id  TEXT NOT NULL,
126    agent_id    TEXT,
127    runtime     TEXT,
128    usd         REAL NOT NULL DEFAULT 0,
129    -- claude-code writer: prompt + both cache buckets (the schema has no cache
130    -- columns; cache tokens are priced into `usd` at their own rates). So this
131    -- is "all non-output tokens", not strictly Anthropic "input tokens".
132    input_tok   INTEGER NOT NULL DEFAULT 0,
133    output_tok  INTEGER NOT NULL DEFAULT 0,
134    observed_at REAL NOT NULL
135);
136
137CREATE INDEX IF NOT EXISTS budget_project_day_idx
138    ON budget(project_id, observed_at);
139
140-- Rate-limit events. Written by `teamctl rl-watch` whenever a runtime
141-- emits a rate-limit signature. Hooks (notify, webhook, run) run off these
142-- rows; the wrapper loop sleeps until `resets_at` before respawning.
143CREATE TABLE IF NOT EXISTS rate_limits (
144    id          INTEGER PRIMARY KEY AUTOINCREMENT,
145    agent_id    TEXT NOT NULL,
146    runtime     TEXT NOT NULL,
147    hit_at      REAL NOT NULL,
148    resets_at   REAL,                  -- nullable: sometimes we can't parse
149    raw_match   TEXT NOT NULL,
150    handled_at  REAL
151);
152
153CREATE INDEX IF NOT EXISTS rate_limits_agent_idx
154    ON rate_limits(agent_id, hit_at);
155"#;
156
157/// Bootstrap the schema and apply additive migrations. Idempotent — safe on
158/// every connect. Replaces direct `execute_batch(SCHEMA)` calls so that
159/// existing databases pick up new columns without a destructive reset.
160pub fn ensure(conn: &rusqlite::Connection) -> rusqlite::Result<()> {
161    conn.execute_batch(SCHEMA)?;
162    // Additive migrations. SQLite has no `ADD COLUMN IF NOT EXISTS`, so each
163    // migration tolerates the "duplicate column name" error to stay idempotent.
164    let migrations: &[&str] = &[
165        "ALTER TABLE approvals ADD COLUMN delivered_at REAL",
166        // T-086-A: discriminator + structured payload for non-text mailbox kinds
167        // (image, file, reaction). Existing text rows have NULL on both — readers
168        // treat NULL kind as 'text' for back-compat.
169        // #254/#320: `'system'` is also a recognized kind value — lifecycle/
170        // system signals (drain, startup, rate-limit) the supervisor emits. No
171        // schema change is needed: `kind` is free-form TEXT with no CHECK/enum
172        // and the db has no version mechanism, so the value's contract lives in
173        // code (see `is_privileged_kind`): originated only by a `system:*`
174        // source at the insert choke point (store::send_dm_kind), refused on
175        // every UPDATE path (team-bot media dispatch), and always-inline channel
176        // delivery (team-mcp format_channel_event), never a lazy stub.
177        "ALTER TABLE messages ADD COLUMN kind TEXT",
178        "ALTER TABLE messages ADD COLUMN structured_payload TEXT",
179        // T-086-B: Telegram message id this row pertains to. Direction-
180        // disambiguated by sender: inbound rows (sender = `user:telegram`)
181        // store the source Telegram message id so agents know what to
182        // reply to; outbound rows (sender = `<project>:<agent>`) store the
183        // id this reply threads under for `reply_parameters`. NULL on
184        // pre-T-086-B rows and on rows that aren't Telegram-bound.
185        "ALTER TABLE messages ADD COLUMN telegram_msg_id INTEGER",
186        // T-104: per-message delivery mode for lazy inbox. NULL = lazy
187        // (the channel watcher emits a stub; the agent drills in via
188        // `inbox_read`). `'immediate'` = full body delivered inline,
189        // bypassing the stub. Set by the bot when an operator prefixes a
190        // message with `/readnow `.
191        "ALTER TABLE messages ADD COLUMN delivery_mode TEXT",
192        // #299: multi-option interactive decisions. `options_json` is a
193        // JSON array of `{label,value}` the bot renders as N inline
194        // buttons; NULL means the binary Approve/Deny back-compat path
195        // (existing callers never set options). `decision_value` holds
196        // the chosen option's `value` once the operator taps; NULL for
197        // binary decisions and for Cancel (status carries those).
198        "ALTER TABLE approvals ADD COLUMN options_json TEXT",
199        "ALTER TABLE approvals ADD COLUMN decision_value TEXT",
200    ];
201    for stmt in migrations {
202        if let Err(e) = conn.execute(stmt, []) {
203            let msg = e.to_string();
204            if !msg.contains("duplicate column name") {
205                return Err(e);
206            }
207        }
208    }
209    Ok(())
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215
216    #[test]
217    fn privileged_kind_const_is_system() {
218        // The privileged-kind contract is pinned to the literal `'system'`
219        // (#254/#320). A rename here would silently desync the insert
220        // allowlist and the UPDATE guard, so pin the value itself.
221        assert_eq!(PRIVILEGED_KIND, "system");
222    }
223
224    #[test]
225    fn is_privileged_kind_true_only_for_system() {
226        assert!(is_privileged_kind("system"));
227    }
228
229    #[test]
230    fn is_privileged_kind_false_for_ordinary_kinds() {
231        // Every kind a non-`system:*` source legitimately writes — the
232        // media UPDATE kinds, plain text, and the empty/NULL-as-text
233        // sentinel — must be allowed through, or the guard would refuse
234        // ordinary traffic (#320).
235        for kind in ["image", "file", "media_error", "text", ""] {
236            assert!(
237                !is_privileged_kind(kind),
238                "kind {kind:?} must not be privileged"
239            );
240        }
241    }
242}