Skip to main content

dragoon_server/
db.rs

1//! SQLite schema, bootstrap, and forward migrations.
2//!
3//! Mirrors `python/src/remote_executor/server/db.py`. The schema text is
4//! identical so a Python-managed database is readable by the Rust server
5//! and vice versa — supporting the rolling Python ↔ Rust migration the
6//! plan calls for.
7
8use std::path::Path;
9
10use anyhow::{Context, Result};
11use rusqlite::Connection;
12
13pub const SCHEMA_VERSION: i64 = 3;
14
15const SCHEMA_SQL: &str = r"
16CREATE TABLE IF NOT EXISTS schema_version (
17    version INTEGER PRIMARY KEY
18);
19
20CREATE TABLE IF NOT EXISTS users (
21    id                INTEGER PRIMARY KEY AUTOINCREMENT,
22    username          TEXT NOT NULL UNIQUE,
23    password_hash     TEXT NOT NULL,
24    totp_secret_enc   TEXT NOT NULL,
25    recovery_codes_hash TEXT,
26    created_at        TEXT NOT NULL,
27    revoked_at        TEXT
28);
29
30CREATE TABLE IF NOT EXISTS user_pubkeys (
31    id           INTEGER PRIMARY KEY AUTOINCREMENT,
32    user_id      INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
33    fingerprint  TEXT NOT NULL,
34    alg          TEXT NOT NULL,
35    pubkey_blob  BLOB NOT NULL,
36    label        TEXT,
37    added_at     TEXT NOT NULL,
38    revoked_at   TEXT
39);
40CREATE UNIQUE INDEX IF NOT EXISTS idx_user_pubkeys_fp
41    ON user_pubkeys(user_id, fingerprint);
42
43CREATE TABLE IF NOT EXISTS sessions (
44    token_hash   TEXT PRIMARY KEY,
45    user_id      INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
46    fingerprint  TEXT NOT NULL,
47    created_at   TEXT NOT NULL,
48    expires_at   TEXT NOT NULL,
49    revoked_at   TEXT
50);
51CREATE INDEX IF NOT EXISTS idx_sessions_user ON sessions(user_id);
52
53CREATE TABLE IF NOT EXISTS nonces (
54    user_id     INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
55    nonce       TEXT NOT NULL,
56    expires_at  TEXT NOT NULL,
57    PRIMARY KEY (user_id, nonce)
58);
59CREATE INDEX IF NOT EXISTS idx_nonces_exp ON nonces(expires_at);
60
61CREATE TABLE IF NOT EXISTS challenges (
62    challenge   TEXT PRIMARY KEY,
63    expires_at  TEXT NOT NULL,
64    used_at     TEXT
65);
66CREATE INDEX IF NOT EXISTS idx_challenges_exp ON challenges(expires_at);
67
68CREATE TABLE IF NOT EXISTS workers (
69    id                       INTEGER PRIMARY KEY AUTOINCREMENT,
70    name                     TEXT NOT NULL UNIQUE,
71    token_hash               TEXT,
72    register_code_hash       TEXT,
73    register_code_expires    TEXT,
74    last_poll_at             TEXT,
75    current_pwd              TEXT,
76    current_task_id          TEXT,
77    status                   TEXT,
78    created_at               TEXT NOT NULL,
79    revoked_at               TEXT,
80    client_pubkey            BLOB
81);
82
83CREATE TABLE IF NOT EXISTS server_signing_key (
84    id           INTEGER PRIMARY KEY CHECK (id = 1),
85    priv_pem     BLOB NOT NULL,
86    pub_blob     BLOB NOT NULL,
87    fingerprint  TEXT NOT NULL,
88    created_at   TEXT NOT NULL
89);
90
91CREATE TABLE IF NOT EXISTS tasks (
92    task_id          TEXT PRIMARY KEY,
93    worker_name      TEXT NOT NULL,
94    submitter        TEXT NOT NULL,
95    kind             TEXT NOT NULL,
96    payload          TEXT NOT NULL,
97    collect_json     TEXT NOT NULL,
98    limits_json      TEXT NOT NULL,
99    state            TEXT NOT NULL,
100    submitted_at     TEXT NOT NULL,
101    started_at       TEXT,
102    finished_at      TEXT,
103    exit_code        INTEGER,
104    final_pwd        TEXT,
105    error            TEXT,
106    fetch_path       TEXT,
107    purged           INTEGER NOT NULL DEFAULT 0,
108    last_access_at   TEXT,
109    cancel_requested INTEGER NOT NULL DEFAULT 0,
110    worker_seq       INTEGER NOT NULL DEFAULT 0
111);
112CREATE INDEX IF NOT EXISTS idx_tasks_worker_state ON tasks(worker_name, state);
113CREATE INDEX IF NOT EXISTS idx_tasks_submitted_at ON tasks(submitted_at);
114CREATE INDEX IF NOT EXISTS idx_tasks_worker_seq ON tasks(worker_name, worker_seq);
115
116CREATE TABLE IF NOT EXISTS controller_messages (
117    id            INTEGER PRIMARY KEY AUTOINCREMENT,
118    user_id       INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
119    kind          TEXT NOT NULL,
120    task_id       TEXT,
121    worker_name   TEXT,
122    error         TEXT,
123    payload_json  TEXT NOT NULL,
124    created_at    TEXT NOT NULL,
125    read_at       TEXT
126);
127CREATE INDEX IF NOT EXISTS idx_msgs_user ON controller_messages(user_id, id);
128CREATE INDEX IF NOT EXISTS idx_msgs_unread
129    ON controller_messages(user_id, read_at) WHERE read_at IS NULL;
130
131CREATE TABLE IF NOT EXISTS artifacts (
132    id          INTEGER PRIMARY KEY AUTOINCREMENT,
133    task_id     TEXT NOT NULL REFERENCES tasks(task_id) ON DELETE CASCADE,
134    path        TEXT NOT NULL,
135    size        INTEGER NOT NULL,
136    sha256      TEXT NOT NULL,
137    blob_path   TEXT NOT NULL
138);
139CREATE INDEX IF NOT EXISTS idx_artifacts_task ON artifacts(task_id);
140
141CREATE TABLE IF NOT EXISTS audit_log (
142    id              INTEGER PRIMARY KEY AUTOINCREMENT,
143    ts              TEXT NOT NULL,
144    actor           TEXT,
145    action          TEXT NOT NULL,
146    target          TEXT,
147    key_fingerprint TEXT,
148    metadata_json   TEXT
149);
150CREATE INDEX IF NOT EXISTS idx_audit_ts ON audit_log(ts);
151";
152
153/// Open a SQLite connection at `path`, set WAL + foreign-keys + relaxed
154/// fsync (matching the Python `connect()`), and return it. Schema is NOT
155/// applied — call [`bootstrap`] afterwards.
156pub fn connect(path: impl AsRef<Path>) -> Result<Connection> {
157    let conn = Connection::open(path.as_ref())
158        .with_context(|| format!("opening sqlite at {}", path.as_ref().display()))?;
159    conn.pragma_update(None, "journal_mode", "WAL")?;
160    conn.pragma_update(None, "foreign_keys", "ON")?;
161    conn.pragma_update(None, "synchronous", "NORMAL")?;
162    Ok(conn)
163}
164
165/// Open an in-memory SQLite for tests.
166pub fn connect_in_memory() -> Result<Connection> {
167    let conn = Connection::open_in_memory()?;
168    conn.pragma_update(None, "foreign_keys", "ON")?;
169    Ok(conn)
170}
171
172/// Apply the schema (CREATE TABLE/INDEX IF NOT EXISTS) and run forward
173/// migrations from older versions. Idempotent on a v3 database.
174///
175/// Order matters: when an older schema is detected we run the column
176/// additions FIRST so that the v3 SCHEMA_SQL — which references the new
177/// columns in `CREATE INDEX` statements — can be applied as a no-op
178/// idempotent layer afterwards.
179pub fn bootstrap(conn: &Connection) -> Result<()> {
180    // Always make sure the version table exists so we can read the cursor
181    // before deciding what else to apply.
182    conn.execute_batch(
183        "CREATE TABLE IF NOT EXISTS schema_version (version INTEGER PRIMARY KEY)",
184    )?;
185    let mut current: i64 = conn
186        .query_row("SELECT version FROM schema_version", [], |r| r.get(0))
187        .unwrap_or(0);
188
189    // Forward migrations: bring an existing pre-v3 database up to v3 BEFORE
190    // we apply SCHEMA_SQL. The migrations themselves are pure ALTER TABLE.
191    if current == 1 {
192        let has_client_pubkey: bool = conn
193            .prepare("PRAGMA table_info(workers)")?
194            .query_map([], |r| r.get::<_, String>(1))?
195            .filter_map(Result::ok)
196            .any(|c| c == "client_pubkey");
197        if !has_client_pubkey {
198            conn.execute("ALTER TABLE workers ADD COLUMN client_pubkey BLOB", [])?;
199        }
200        current = 2;
201    }
202    if current == 2 {
203        let has_worker_seq: bool = conn
204            .prepare("PRAGMA table_info(tasks)")?
205            .query_map([], |r| r.get::<_, String>(1))?
206            .filter_map(Result::ok)
207            .any(|c| c == "worker_seq");
208        if !has_worker_seq {
209            conn.execute(
210                "ALTER TABLE tasks ADD COLUMN worker_seq INTEGER NOT NULL DEFAULT 0",
211                [],
212            )?;
213        }
214        current = 3;
215    }
216
217    // Now apply the v3 schema. CREATE TABLE IF NOT EXISTS is a no-op on
218    // existing tables; CREATE INDEX IF NOT EXISTS references columns that
219    // now exist either because the migrations above added them or because
220    // the table was just freshly created.
221    conn.execute_batch(SCHEMA_SQL)?;
222
223    let stamped: Option<i64> = conn
224        .query_row("SELECT version FROM schema_version", [], |r| r.get(0))
225        .ok();
226    if stamped.is_none() {
227        conn.execute(
228            "INSERT INTO schema_version (version) VALUES (?)",
229            [SCHEMA_VERSION],
230        )?;
231    } else {
232        conn.execute("UPDATE schema_version SET version=?", [SCHEMA_VERSION])?;
233    }
234
235    if current != SCHEMA_VERSION && current != 0 {
236        anyhow::bail!("unsupported schema version {current}");
237    }
238    Ok(())
239}
240
241/// Read the schema_version row (returns 0 if unset).
242pub fn schema_version(conn: &Connection) -> Result<i64> {
243    Ok(conn
244        .query_row("SELECT version FROM schema_version", [], |r| r.get(0))
245        .unwrap_or(0))
246}
247
248#[cfg(test)]
249mod tests {
250    use super::*;
251
252    fn fresh() -> Connection {
253        let c = connect_in_memory().unwrap();
254        bootstrap(&c).unwrap();
255        c
256    }
257
258    fn table_names(conn: &Connection) -> Vec<String> {
259        conn.prepare("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY name")
260            .unwrap()
261            .query_map([], |r| r.get::<_, String>(0))
262            .unwrap()
263            .filter_map(Result::ok)
264            .collect()
265    }
266
267    #[test]
268    fn fresh_bootstrap_creates_all_tables() {
269        let c = fresh();
270        let tables: std::collections::HashSet<String> = table_names(&c).into_iter().collect();
271        for required in &[
272            "schema_version",
273            "users",
274            "user_pubkeys",
275            "sessions",
276            "nonces",
277            "challenges",
278            "workers",
279            "tasks",
280            "artifacts",
281            "audit_log",
282            "server_signing_key",
283            "controller_messages",
284        ] {
285            assert!(
286                tables.contains(*required),
287                "expected table {required} after bootstrap"
288            );
289        }
290    }
291
292    #[test]
293    fn bootstrap_is_idempotent() {
294        let c = fresh();
295        bootstrap(&c).unwrap();
296        bootstrap(&c).unwrap();
297        assert_eq!(schema_version(&c).unwrap(), SCHEMA_VERSION);
298    }
299
300    #[test]
301    fn schema_version_after_fresh_bootstrap_is_three() {
302        let c = fresh();
303        assert_eq!(schema_version(&c).unwrap(), 3);
304    }
305
306    #[test]
307    fn migration_v1_to_v3_adds_columns_and_tables() {
308        // Build a minimal v1 schema: no client_pubkey on workers, no
309        // worker_seq on tasks, no controller_messages, no server_signing_key.
310        let c = connect_in_memory().unwrap();
311        c.execute_batch(
312            "
313            CREATE TABLE schema_version (version INTEGER PRIMARY KEY);
314            CREATE TABLE users (id INTEGER PRIMARY KEY, username TEXT NOT NULL UNIQUE,
315                password_hash TEXT NOT NULL, totp_secret_enc TEXT NOT NULL, created_at TEXT NOT NULL);
316            CREATE TABLE workers (id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE,
317                token_hash TEXT, created_at TEXT NOT NULL);
318            CREATE TABLE tasks (task_id TEXT PRIMARY KEY, worker_name TEXT NOT NULL,
319                submitter TEXT NOT NULL, kind TEXT NOT NULL, payload TEXT NOT NULL,
320                collect_json TEXT NOT NULL, limits_json TEXT NOT NULL, state TEXT NOT NULL,
321                submitted_at TEXT NOT NULL,
322                purged INTEGER NOT NULL DEFAULT 0,
323                cancel_requested INTEGER NOT NULL DEFAULT 0);
324            INSERT INTO schema_version (version) VALUES (1);
325            ",
326        )
327        .unwrap();
328
329        bootstrap(&c).unwrap();
330
331        // workers.client_pubkey must now exist
332        let cols: Vec<String> = c
333            .prepare("PRAGMA table_info(workers)")
334            .unwrap()
335            .query_map([], |r| r.get::<_, String>(1))
336            .unwrap()
337            .filter_map(Result::ok)
338            .collect();
339        assert!(cols.contains(&"client_pubkey".to_string()));
340
341        // tasks.worker_seq must now exist
342        let cols: Vec<String> = c
343            .prepare("PRAGMA table_info(tasks)")
344            .unwrap()
345            .query_map([], |r| r.get::<_, String>(1))
346            .unwrap()
347            .filter_map(Result::ok)
348            .collect();
349        assert!(cols.contains(&"worker_seq".to_string()));
350
351        // controller_messages + server_signing_key created
352        let tables: std::collections::HashSet<_> = table_names(&c).into_iter().collect();
353        assert!(tables.contains("controller_messages"));
354        assert!(tables.contains("server_signing_key"));
355
356        assert_eq!(schema_version(&c).unwrap(), 3);
357    }
358
359    #[test]
360    fn foreign_key_cascade_users_to_pubkeys() {
361        let c = fresh();
362        c.execute(
363            "INSERT INTO users (username, password_hash, totp_secret_enc, created_at)
364             VALUES (?, ?, ?, ?)",
365            rusqlite::params!["alice", "h", "s", "2026-01-01T00:00:00Z"],
366        )
367        .unwrap();
368        let uid: i64 = c.last_insert_rowid();
369        c.execute(
370            "INSERT INTO user_pubkeys (user_id, fingerprint, alg, pubkey_blob, added_at)
371             VALUES (?, ?, ?, ?, ?)",
372            rusqlite::params![uid, "SHA256:fp", "ssh-ed25519", &b"k"[..], "2026-01-01T00:00:00Z"],
373        )
374        .unwrap();
375        c.execute("DELETE FROM users WHERE id=?", [uid]).unwrap();
376        let n: i64 = c
377            .query_row("SELECT COUNT(*) FROM user_pubkeys", [], |r| r.get(0))
378            .unwrap();
379        assert_eq!(n, 0, "deleting a user must cascade-delete their pubkeys");
380    }
381
382    #[test]
383    fn workers_unique_name_constraint() {
384        let c = fresh();
385        c.execute(
386            "INSERT INTO workers (name, created_at) VALUES (?, ?)",
387            rusqlite::params!["w1", "2026-01-01T00:00:00Z"],
388        )
389        .unwrap();
390        let dup = c.execute(
391            "INSERT INTO workers (name, created_at) VALUES (?, ?)",
392            rusqlite::params!["w1", "2026-01-01T00:00:00Z"],
393        );
394        assert!(dup.is_err(), "duplicate worker name must violate UNIQUE");
395    }
396
397    #[test]
398    fn nonces_pk_user_nonce() {
399        let c = fresh();
400        c.execute(
401            "INSERT INTO users (username, password_hash, totp_secret_enc, created_at)
402             VALUES (?, ?, ?, ?)",
403            rusqlite::params!["alice", "h", "s", "2026-01-01T00:00:00Z"],
404        )
405        .unwrap();
406        let uid: i64 = c.last_insert_rowid();
407        c.execute(
408            "INSERT INTO nonces (user_id, nonce, expires_at) VALUES (?,?,?)",
409            rusqlite::params![uid, "abc", "2026-01-01T00:00:00Z"],
410        )
411        .unwrap();
412        let dup = c.execute(
413            "INSERT INTO nonces (user_id, nonce, expires_at) VALUES (?,?,?)",
414            rusqlite::params![uid, "abc", "2026-01-01T00:00:00Z"],
415        );
416        assert!(dup.is_err(), "(user_id,nonce) replay must be rejected");
417    }
418}