1use std::path::Path;
9
10use anyhow::{Context, Result};
11use rusqlite::Connection;
12
13pub const SCHEMA_VERSION: i64 = 3;
14
15const SCHEMA_SQL: &str = r"
16CREATE TABLE IF NOT EXISTS schema_version (
17 version INTEGER PRIMARY KEY
18);
19
20CREATE TABLE IF NOT EXISTS users (
21 id INTEGER PRIMARY KEY AUTOINCREMENT,
22 username TEXT NOT NULL UNIQUE,
23 password_hash TEXT NOT NULL,
24 totp_secret_enc TEXT NOT NULL,
25 recovery_codes_hash TEXT,
26 created_at TEXT NOT NULL,
27 revoked_at TEXT
28);
29
30CREATE TABLE IF NOT EXISTS user_pubkeys (
31 id INTEGER PRIMARY KEY AUTOINCREMENT,
32 user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
33 fingerprint TEXT NOT NULL,
34 alg TEXT NOT NULL,
35 pubkey_blob BLOB NOT NULL,
36 label TEXT,
37 added_at TEXT NOT NULL,
38 revoked_at TEXT
39);
40CREATE UNIQUE INDEX IF NOT EXISTS idx_user_pubkeys_fp
41 ON user_pubkeys(user_id, fingerprint);
42
43CREATE TABLE IF NOT EXISTS sessions (
44 token_hash TEXT PRIMARY KEY,
45 user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
46 fingerprint TEXT NOT NULL,
47 created_at TEXT NOT NULL,
48 expires_at TEXT NOT NULL,
49 revoked_at TEXT
50);
51CREATE INDEX IF NOT EXISTS idx_sessions_user ON sessions(user_id);
52
53CREATE TABLE IF NOT EXISTS nonces (
54 user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
55 nonce TEXT NOT NULL,
56 expires_at TEXT NOT NULL,
57 PRIMARY KEY (user_id, nonce)
58);
59CREATE INDEX IF NOT EXISTS idx_nonces_exp ON nonces(expires_at);
60
61CREATE TABLE IF NOT EXISTS challenges (
62 challenge TEXT PRIMARY KEY,
63 expires_at TEXT NOT NULL,
64 used_at TEXT
65);
66CREATE INDEX IF NOT EXISTS idx_challenges_exp ON challenges(expires_at);
67
68CREATE TABLE IF NOT EXISTS workers (
69 id INTEGER PRIMARY KEY AUTOINCREMENT,
70 name TEXT NOT NULL UNIQUE,
71 token_hash TEXT,
72 register_code_hash TEXT,
73 register_code_expires TEXT,
74 last_poll_at TEXT,
75 current_pwd TEXT,
76 current_task_id TEXT,
77 status TEXT,
78 created_at TEXT NOT NULL,
79 revoked_at TEXT,
80 client_pubkey BLOB
81);
82
83CREATE TABLE IF NOT EXISTS server_signing_key (
84 id INTEGER PRIMARY KEY CHECK (id = 1),
85 priv_pem BLOB NOT NULL,
86 pub_blob BLOB NOT NULL,
87 fingerprint TEXT NOT NULL,
88 created_at TEXT NOT NULL
89);
90
91CREATE TABLE IF NOT EXISTS tasks (
92 task_id TEXT PRIMARY KEY,
93 worker_name TEXT NOT NULL,
94 submitter TEXT NOT NULL,
95 kind TEXT NOT NULL,
96 payload TEXT NOT NULL,
97 collect_json TEXT NOT NULL,
98 limits_json TEXT NOT NULL,
99 state TEXT NOT NULL,
100 submitted_at TEXT NOT NULL,
101 started_at TEXT,
102 finished_at TEXT,
103 exit_code INTEGER,
104 final_pwd TEXT,
105 error TEXT,
106 fetch_path TEXT,
107 purged INTEGER NOT NULL DEFAULT 0,
108 last_access_at TEXT,
109 cancel_requested INTEGER NOT NULL DEFAULT 0,
110 worker_seq INTEGER NOT NULL DEFAULT 0
111);
112CREATE INDEX IF NOT EXISTS idx_tasks_worker_state ON tasks(worker_name, state);
113CREATE INDEX IF NOT EXISTS idx_tasks_submitted_at ON tasks(submitted_at);
114CREATE INDEX IF NOT EXISTS idx_tasks_worker_seq ON tasks(worker_name, worker_seq);
115
116CREATE TABLE IF NOT EXISTS controller_messages (
117 id INTEGER PRIMARY KEY AUTOINCREMENT,
118 user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
119 kind TEXT NOT NULL,
120 task_id TEXT,
121 worker_name TEXT,
122 error TEXT,
123 payload_json TEXT NOT NULL,
124 created_at TEXT NOT NULL,
125 read_at TEXT
126);
127CREATE INDEX IF NOT EXISTS idx_msgs_user ON controller_messages(user_id, id);
128CREATE INDEX IF NOT EXISTS idx_msgs_unread
129 ON controller_messages(user_id, read_at) WHERE read_at IS NULL;
130
131CREATE TABLE IF NOT EXISTS artifacts (
132 id INTEGER PRIMARY KEY AUTOINCREMENT,
133 task_id TEXT NOT NULL REFERENCES tasks(task_id) ON DELETE CASCADE,
134 path TEXT NOT NULL,
135 size INTEGER NOT NULL,
136 sha256 TEXT NOT NULL,
137 blob_path TEXT NOT NULL
138);
139CREATE INDEX IF NOT EXISTS idx_artifacts_task ON artifacts(task_id);
140
141CREATE TABLE IF NOT EXISTS audit_log (
142 id INTEGER PRIMARY KEY AUTOINCREMENT,
143 ts TEXT NOT NULL,
144 actor TEXT,
145 action TEXT NOT NULL,
146 target TEXT,
147 key_fingerprint TEXT,
148 metadata_json TEXT
149);
150CREATE INDEX IF NOT EXISTS idx_audit_ts ON audit_log(ts);
151";
152
153pub fn connect(path: impl AsRef<Path>) -> Result<Connection> {
157 let conn = Connection::open(path.as_ref())
158 .with_context(|| format!("opening sqlite at {}", path.as_ref().display()))?;
159 conn.pragma_update(None, "journal_mode", "WAL")?;
160 conn.pragma_update(None, "foreign_keys", "ON")?;
161 conn.pragma_update(None, "synchronous", "NORMAL")?;
162 Ok(conn)
163}
164
165pub fn connect_in_memory() -> Result<Connection> {
167 let conn = Connection::open_in_memory()?;
168 conn.pragma_update(None, "foreign_keys", "ON")?;
169 Ok(conn)
170}
171
172pub fn bootstrap(conn: &Connection) -> Result<()> {
180 conn.execute_batch(
183 "CREATE TABLE IF NOT EXISTS schema_version (version INTEGER PRIMARY KEY)",
184 )?;
185 let mut current: i64 = conn
186 .query_row("SELECT version FROM schema_version", [], |r| r.get(0))
187 .unwrap_or(0);
188
189 if current == 1 {
192 let has_client_pubkey: bool = conn
193 .prepare("PRAGMA table_info(workers)")?
194 .query_map([], |r| r.get::<_, String>(1))?
195 .filter_map(Result::ok)
196 .any(|c| c == "client_pubkey");
197 if !has_client_pubkey {
198 conn.execute("ALTER TABLE workers ADD COLUMN client_pubkey BLOB", [])?;
199 }
200 current = 2;
201 }
202 if current == 2 {
203 let has_worker_seq: bool = conn
204 .prepare("PRAGMA table_info(tasks)")?
205 .query_map([], |r| r.get::<_, String>(1))?
206 .filter_map(Result::ok)
207 .any(|c| c == "worker_seq");
208 if !has_worker_seq {
209 conn.execute(
210 "ALTER TABLE tasks ADD COLUMN worker_seq INTEGER NOT NULL DEFAULT 0",
211 [],
212 )?;
213 }
214 current = 3;
215 }
216
217 conn.execute_batch(SCHEMA_SQL)?;
222
223 let stamped: Option<i64> = conn
224 .query_row("SELECT version FROM schema_version", [], |r| r.get(0))
225 .ok();
226 if stamped.is_none() {
227 conn.execute(
228 "INSERT INTO schema_version (version) VALUES (?)",
229 [SCHEMA_VERSION],
230 )?;
231 } else {
232 conn.execute("UPDATE schema_version SET version=?", [SCHEMA_VERSION])?;
233 }
234
235 if current != SCHEMA_VERSION && current != 0 {
236 anyhow::bail!("unsupported schema version {current}");
237 }
238 Ok(())
239}
240
241pub fn schema_version(conn: &Connection) -> Result<i64> {
243 Ok(conn
244 .query_row("SELECT version FROM schema_version", [], |r| r.get(0))
245 .unwrap_or(0))
246}
247
248#[cfg(test)]
249mod tests {
250 use super::*;
251
252 fn fresh() -> Connection {
253 let c = connect_in_memory().unwrap();
254 bootstrap(&c).unwrap();
255 c
256 }
257
258 fn table_names(conn: &Connection) -> Vec<String> {
259 conn.prepare("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY name")
260 .unwrap()
261 .query_map([], |r| r.get::<_, String>(0))
262 .unwrap()
263 .filter_map(Result::ok)
264 .collect()
265 }
266
267 #[test]
268 fn fresh_bootstrap_creates_all_tables() {
269 let c = fresh();
270 let tables: std::collections::HashSet<String> = table_names(&c).into_iter().collect();
271 for required in &[
272 "schema_version",
273 "users",
274 "user_pubkeys",
275 "sessions",
276 "nonces",
277 "challenges",
278 "workers",
279 "tasks",
280 "artifacts",
281 "audit_log",
282 "server_signing_key",
283 "controller_messages",
284 ] {
285 assert!(
286 tables.contains(*required),
287 "expected table {required} after bootstrap"
288 );
289 }
290 }
291
292 #[test]
293 fn bootstrap_is_idempotent() {
294 let c = fresh();
295 bootstrap(&c).unwrap();
296 bootstrap(&c).unwrap();
297 assert_eq!(schema_version(&c).unwrap(), SCHEMA_VERSION);
298 }
299
300 #[test]
301 fn schema_version_after_fresh_bootstrap_is_three() {
302 let c = fresh();
303 assert_eq!(schema_version(&c).unwrap(), 3);
304 }
305
306 #[test]
307 fn migration_v1_to_v3_adds_columns_and_tables() {
308 let c = connect_in_memory().unwrap();
311 c.execute_batch(
312 "
313 CREATE TABLE schema_version (version INTEGER PRIMARY KEY);
314 CREATE TABLE users (id INTEGER PRIMARY KEY, username TEXT NOT NULL UNIQUE,
315 password_hash TEXT NOT NULL, totp_secret_enc TEXT NOT NULL, created_at TEXT NOT NULL);
316 CREATE TABLE workers (id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE,
317 token_hash TEXT, created_at TEXT NOT NULL);
318 CREATE TABLE tasks (task_id TEXT PRIMARY KEY, worker_name TEXT NOT NULL,
319 submitter TEXT NOT NULL, kind TEXT NOT NULL, payload TEXT NOT NULL,
320 collect_json TEXT NOT NULL, limits_json TEXT NOT NULL, state TEXT NOT NULL,
321 submitted_at TEXT NOT NULL,
322 purged INTEGER NOT NULL DEFAULT 0,
323 cancel_requested INTEGER NOT NULL DEFAULT 0);
324 INSERT INTO schema_version (version) VALUES (1);
325 ",
326 )
327 .unwrap();
328
329 bootstrap(&c).unwrap();
330
331 let cols: Vec<String> = c
333 .prepare("PRAGMA table_info(workers)")
334 .unwrap()
335 .query_map([], |r| r.get::<_, String>(1))
336 .unwrap()
337 .filter_map(Result::ok)
338 .collect();
339 assert!(cols.contains(&"client_pubkey".to_string()));
340
341 let cols: Vec<String> = c
343 .prepare("PRAGMA table_info(tasks)")
344 .unwrap()
345 .query_map([], |r| r.get::<_, String>(1))
346 .unwrap()
347 .filter_map(Result::ok)
348 .collect();
349 assert!(cols.contains(&"worker_seq".to_string()));
350
351 let tables: std::collections::HashSet<_> = table_names(&c).into_iter().collect();
353 assert!(tables.contains("controller_messages"));
354 assert!(tables.contains("server_signing_key"));
355
356 assert_eq!(schema_version(&c).unwrap(), 3);
357 }
358
359 #[test]
360 fn foreign_key_cascade_users_to_pubkeys() {
361 let c = fresh();
362 c.execute(
363 "INSERT INTO users (username, password_hash, totp_secret_enc, created_at)
364 VALUES (?, ?, ?, ?)",
365 rusqlite::params!["alice", "h", "s", "2026-01-01T00:00:00Z"],
366 )
367 .unwrap();
368 let uid: i64 = c.last_insert_rowid();
369 c.execute(
370 "INSERT INTO user_pubkeys (user_id, fingerprint, alg, pubkey_blob, added_at)
371 VALUES (?, ?, ?, ?, ?)",
372 rusqlite::params![uid, "SHA256:fp", "ssh-ed25519", &b"k"[..], "2026-01-01T00:00:00Z"],
373 )
374 .unwrap();
375 c.execute("DELETE FROM users WHERE id=?", [uid]).unwrap();
376 let n: i64 = c
377 .query_row("SELECT COUNT(*) FROM user_pubkeys", [], |r| r.get(0))
378 .unwrap();
379 assert_eq!(n, 0, "deleting a user must cascade-delete their pubkeys");
380 }
381
382 #[test]
383 fn workers_unique_name_constraint() {
384 let c = fresh();
385 c.execute(
386 "INSERT INTO workers (name, created_at) VALUES (?, ?)",
387 rusqlite::params!["w1", "2026-01-01T00:00:00Z"],
388 )
389 .unwrap();
390 let dup = c.execute(
391 "INSERT INTO workers (name, created_at) VALUES (?, ?)",
392 rusqlite::params!["w1", "2026-01-01T00:00:00Z"],
393 );
394 assert!(dup.is_err(), "duplicate worker name must violate UNIQUE");
395 }
396
397 #[test]
398 fn nonces_pk_user_nonce() {
399 let c = fresh();
400 c.execute(
401 "INSERT INTO users (username, password_hash, totp_secret_enc, created_at)
402 VALUES (?, ?, ?, ?)",
403 rusqlite::params!["alice", "h", "s", "2026-01-01T00:00:00Z"],
404 )
405 .unwrap();
406 let uid: i64 = c.last_insert_rowid();
407 c.execute(
408 "INSERT INTO nonces (user_id, nonce, expires_at) VALUES (?,?,?)",
409 rusqlite::params![uid, "abc", "2026-01-01T00:00:00Z"],
410 )
411 .unwrap();
412 let dup = c.execute(
413 "INSERT INTO nonces (user_id, nonce, expires_at) VALUES (?,?,?)",
414 rusqlite::params![uid, "abc", "2026-01-01T00:00:00Z"],
415 );
416 assert!(dup.is_err(), "(user_id,nonce) replay must be rejected");
417 }
418}