use std::path::Path;
use anyhow::{Context, Result};
use rusqlite::Connection;
pub const SCHEMA_VERSION: i64 = 3;
const SCHEMA_SQL: &str = r"
CREATE TABLE IF NOT EXISTS schema_version (
version INTEGER PRIMARY KEY
);
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
password_hash TEXT NOT NULL,
totp_secret_enc TEXT NOT NULL,
recovery_codes_hash TEXT,
created_at TEXT NOT NULL,
revoked_at TEXT
);
CREATE TABLE IF NOT EXISTS user_pubkeys (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
fingerprint TEXT NOT NULL,
alg TEXT NOT NULL,
pubkey_blob BLOB NOT NULL,
label TEXT,
added_at TEXT NOT NULL,
revoked_at TEXT
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_user_pubkeys_fp
ON user_pubkeys(user_id, fingerprint);
CREATE TABLE IF NOT EXISTS sessions (
token_hash TEXT PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
fingerprint TEXT NOT NULL,
created_at TEXT NOT NULL,
expires_at TEXT NOT NULL,
revoked_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_sessions_user ON sessions(user_id);
CREATE TABLE IF NOT EXISTS nonces (
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
nonce TEXT NOT NULL,
expires_at TEXT NOT NULL,
PRIMARY KEY (user_id, nonce)
);
CREATE INDEX IF NOT EXISTS idx_nonces_exp ON nonces(expires_at);
CREATE TABLE IF NOT EXISTS challenges (
challenge TEXT PRIMARY KEY,
expires_at TEXT NOT NULL,
used_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_challenges_exp ON challenges(expires_at);
CREATE TABLE IF NOT EXISTS workers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
token_hash TEXT,
register_code_hash TEXT,
register_code_expires TEXT,
last_poll_at TEXT,
current_pwd TEXT,
current_task_id TEXT,
status TEXT,
created_at TEXT NOT NULL,
revoked_at TEXT,
client_pubkey BLOB
);
CREATE TABLE IF NOT EXISTS server_signing_key (
id INTEGER PRIMARY KEY CHECK (id = 1),
priv_pem BLOB NOT NULL,
pub_blob BLOB NOT NULL,
fingerprint TEXT NOT NULL,
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS tasks (
task_id TEXT PRIMARY KEY,
worker_name TEXT NOT NULL,
submitter TEXT NOT NULL,
kind TEXT NOT NULL,
payload TEXT NOT NULL,
collect_json TEXT NOT NULL,
limits_json TEXT NOT NULL,
state TEXT NOT NULL,
submitted_at TEXT NOT NULL,
started_at TEXT,
finished_at TEXT,
exit_code INTEGER,
final_pwd TEXT,
error TEXT,
fetch_path TEXT,
purged INTEGER NOT NULL DEFAULT 0,
last_access_at TEXT,
cancel_requested INTEGER NOT NULL DEFAULT 0,
worker_seq INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_tasks_worker_state ON tasks(worker_name, state);
CREATE INDEX IF NOT EXISTS idx_tasks_submitted_at ON tasks(submitted_at);
CREATE INDEX IF NOT EXISTS idx_tasks_worker_seq ON tasks(worker_name, worker_seq);
CREATE TABLE IF NOT EXISTS controller_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
kind TEXT NOT NULL,
task_id TEXT,
worker_name TEXT,
error TEXT,
payload_json TEXT NOT NULL,
created_at TEXT NOT NULL,
read_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_msgs_user ON controller_messages(user_id, id);
CREATE INDEX IF NOT EXISTS idx_msgs_unread
ON controller_messages(user_id, read_at) WHERE read_at IS NULL;
CREATE TABLE IF NOT EXISTS artifacts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL REFERENCES tasks(task_id) ON DELETE CASCADE,
path TEXT NOT NULL,
size INTEGER NOT NULL,
sha256 TEXT NOT NULL,
blob_path TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_artifacts_task ON artifacts(task_id);
CREATE TABLE IF NOT EXISTS audit_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts TEXT NOT NULL,
actor TEXT,
action TEXT NOT NULL,
target TEXT,
key_fingerprint TEXT,
metadata_json TEXT
);
CREATE INDEX IF NOT EXISTS idx_audit_ts ON audit_log(ts);
";
pub fn connect(path: impl AsRef<Path>) -> Result<Connection> {
let conn = Connection::open(path.as_ref())
.with_context(|| format!("opening sqlite at {}", path.as_ref().display()))?;
conn.pragma_update(None, "journal_mode", "WAL")?;
conn.pragma_update(None, "foreign_keys", "ON")?;
conn.pragma_update(None, "synchronous", "NORMAL")?;
Ok(conn)
}
pub fn connect_in_memory() -> Result<Connection> {
let conn = Connection::open_in_memory()?;
conn.pragma_update(None, "foreign_keys", "ON")?;
Ok(conn)
}
pub fn bootstrap(conn: &Connection) -> Result<()> {
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS schema_version (version INTEGER PRIMARY KEY)",
)?;
let mut current: i64 = conn
.query_row("SELECT version FROM schema_version", [], |r| r.get(0))
.unwrap_or(0);
if current == 1 {
let has_client_pubkey: bool = conn
.prepare("PRAGMA table_info(workers)")?
.query_map([], |r| r.get::<_, String>(1))?
.filter_map(Result::ok)
.any(|c| c == "client_pubkey");
if !has_client_pubkey {
conn.execute("ALTER TABLE workers ADD COLUMN client_pubkey BLOB", [])?;
}
current = 2;
}
if current == 2 {
let has_worker_seq: bool = conn
.prepare("PRAGMA table_info(tasks)")?
.query_map([], |r| r.get::<_, String>(1))?
.filter_map(Result::ok)
.any(|c| c == "worker_seq");
if !has_worker_seq {
conn.execute(
"ALTER TABLE tasks ADD COLUMN worker_seq INTEGER NOT NULL DEFAULT 0",
[],
)?;
}
current = 3;
}
conn.execute_batch(SCHEMA_SQL)?;
let stamped: Option<i64> = conn
.query_row("SELECT version FROM schema_version", [], |r| r.get(0))
.ok();
if stamped.is_none() {
conn.execute(
"INSERT INTO schema_version (version) VALUES (?)",
[SCHEMA_VERSION],
)?;
} else {
conn.execute("UPDATE schema_version SET version=?", [SCHEMA_VERSION])?;
}
if current != SCHEMA_VERSION && current != 0 {
anyhow::bail!("unsupported schema version {current}");
}
Ok(())
}
pub fn schema_version(conn: &Connection) -> Result<i64> {
Ok(conn
.query_row("SELECT version FROM schema_version", [], |r| r.get(0))
.unwrap_or(0))
}
#[cfg(test)]
mod tests {
use super::*;
fn fresh() -> Connection {
let c = connect_in_memory().unwrap();
bootstrap(&c).unwrap();
c
}
fn table_names(conn: &Connection) -> Vec<String> {
conn.prepare("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY name")
.unwrap()
.query_map([], |r| r.get::<_, String>(0))
.unwrap()
.filter_map(Result::ok)
.collect()
}
#[test]
fn fresh_bootstrap_creates_all_tables() {
let c = fresh();
let tables: std::collections::HashSet<String> = table_names(&c).into_iter().collect();
for required in &[
"schema_version",
"users",
"user_pubkeys",
"sessions",
"nonces",
"challenges",
"workers",
"tasks",
"artifacts",
"audit_log",
"server_signing_key",
"controller_messages",
] {
assert!(
tables.contains(*required),
"expected table {required} after bootstrap"
);
}
}
#[test]
fn bootstrap_is_idempotent() {
let c = fresh();
bootstrap(&c).unwrap();
bootstrap(&c).unwrap();
assert_eq!(schema_version(&c).unwrap(), SCHEMA_VERSION);
}
#[test]
fn schema_version_after_fresh_bootstrap_is_three() {
let c = fresh();
assert_eq!(schema_version(&c).unwrap(), 3);
}
#[test]
fn migration_v1_to_v3_adds_columns_and_tables() {
let c = connect_in_memory().unwrap();
c.execute_batch(
"
CREATE TABLE schema_version (version INTEGER PRIMARY KEY);
CREATE TABLE users (id INTEGER PRIMARY KEY, username TEXT NOT NULL UNIQUE,
password_hash TEXT NOT NULL, totp_secret_enc TEXT NOT NULL, created_at TEXT NOT NULL);
CREATE TABLE workers (id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE,
token_hash TEXT, created_at TEXT NOT NULL);
CREATE TABLE tasks (task_id TEXT PRIMARY KEY, worker_name TEXT NOT NULL,
submitter TEXT NOT NULL, kind TEXT NOT NULL, payload TEXT NOT NULL,
collect_json TEXT NOT NULL, limits_json TEXT NOT NULL, state TEXT NOT NULL,
submitted_at TEXT NOT NULL,
purged INTEGER NOT NULL DEFAULT 0,
cancel_requested INTEGER NOT NULL DEFAULT 0);
INSERT INTO schema_version (version) VALUES (1);
",
)
.unwrap();
bootstrap(&c).unwrap();
let cols: Vec<String> = c
.prepare("PRAGMA table_info(workers)")
.unwrap()
.query_map([], |r| r.get::<_, String>(1))
.unwrap()
.filter_map(Result::ok)
.collect();
assert!(cols.contains(&"client_pubkey".to_string()));
let cols: Vec<String> = c
.prepare("PRAGMA table_info(tasks)")
.unwrap()
.query_map([], |r| r.get::<_, String>(1))
.unwrap()
.filter_map(Result::ok)
.collect();
assert!(cols.contains(&"worker_seq".to_string()));
let tables: std::collections::HashSet<_> = table_names(&c).into_iter().collect();
assert!(tables.contains("controller_messages"));
assert!(tables.contains("server_signing_key"));
assert_eq!(schema_version(&c).unwrap(), 3);
}
#[test]
fn foreign_key_cascade_users_to_pubkeys() {
let c = fresh();
c.execute(
"INSERT INTO users (username, password_hash, totp_secret_enc, created_at)
VALUES (?, ?, ?, ?)",
rusqlite::params!["alice", "h", "s", "2026-01-01T00:00:00Z"],
)
.unwrap();
let uid: i64 = c.last_insert_rowid();
c.execute(
"INSERT INTO user_pubkeys (user_id, fingerprint, alg, pubkey_blob, added_at)
VALUES (?, ?, ?, ?, ?)",
rusqlite::params![uid, "SHA256:fp", "ssh-ed25519", &b"k"[..], "2026-01-01T00:00:00Z"],
)
.unwrap();
c.execute("DELETE FROM users WHERE id=?", [uid]).unwrap();
let n: i64 = c
.query_row("SELECT COUNT(*) FROM user_pubkeys", [], |r| r.get(0))
.unwrap();
assert_eq!(n, 0, "deleting a user must cascade-delete their pubkeys");
}
#[test]
fn workers_unique_name_constraint() {
let c = fresh();
c.execute(
"INSERT INTO workers (name, created_at) VALUES (?, ?)",
rusqlite::params!["w1", "2026-01-01T00:00:00Z"],
)
.unwrap();
let dup = c.execute(
"INSERT INTO workers (name, created_at) VALUES (?, ?)",
rusqlite::params!["w1", "2026-01-01T00:00:00Z"],
);
assert!(dup.is_err(), "duplicate worker name must violate UNIQUE");
}
#[test]
fn nonces_pk_user_nonce() {
let c = fresh();
c.execute(
"INSERT INTO users (username, password_hash, totp_secret_enc, created_at)
VALUES (?, ?, ?, ?)",
rusqlite::params!["alice", "h", "s", "2026-01-01T00:00:00Z"],
)
.unwrap();
let uid: i64 = c.last_insert_rowid();
c.execute(
"INSERT INTO nonces (user_id, nonce, expires_at) VALUES (?,?,?)",
rusqlite::params![uid, "abc", "2026-01-01T00:00:00Z"],
)
.unwrap();
let dup = c.execute(
"INSERT INTO nonces (user_id, nonce, expires_at) VALUES (?,?,?)",
rusqlite::params![uid, "abc", "2026-01-01T00:00:00Z"],
);
assert!(dup.is_err(), "(user_id,nonce) replay must be rejected");
}
}